下载地址:https://mirrors.cnnic.cn/apache/kafka
一、系统构成
在整个消息发布流程中,kafka作为一个中间件对系统的运行提供了解耦、削峰、异步处理的能力。
由生产者发起信息同步条件,中间件作为信息转储的角色对news进行发布。消费者会通过offset来保证接收最新的消息
主要由生产者、消费者、中间件构成。中间件由
producer
生产者
broker
kafka实现实例
其可实现主要功能部分
topic
每个kafka实例内部的消息类型
每个实例内部可存在多个topic
partition
消息类型分组,每组的数据不同。
每个topic内存在多个partition
message
每条发送的消息主体
consumer
消费者
consumer_group
消费者组
组形式的消费者,一个可以获取多个信息
zookeeper
保存集群内部信息,保证系统可用性
配置文件,系统启动
二、kafka应用环境部署
2.1、环境搭建
下载应用包
下载后的文件放在linux系统的
创建一个日志文件,用来保存日志信息
修改config文件,添加日志路径,侦听端口信息
对下列信息改动
设置broker的数量
提供生产者和消费者的服务地址
zk服务器地址
- 启动zookeeper
./bin/zookeeper-server-start.sh config/zookeeper.properties &
- 启动kafka服务
./bin/kafka-server-start.sh config/server.properties &
- 创建一个topic
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic1
--zookeeper: kafka连接zookeeper的url,和server.properties文件中的配置项 zookeeper.connect=localhost:2181 一致
- 查看topic
./bin/kafka-topics.sh --list --zookeeper localhost:2181
- 查看topic的详细信息
./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic topic1
- 生产消息
/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic1
这里的 --broker-list localhost:9092
- 消费消息
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic1 --from-beginning
hello,world
this is my kafka
2.2、编写生产者-消费者示例代码
源码编译gcc语句
gcc my_producer.c -o my_producer -lrdkafka -lz -lpthread -lrt
gcc my_consumer.c -o my_consumer -lrdkafka -lz -lpthread -lrt
生产者流程
- 创建conf配置对象rd_kafka_conf_new配置broke集群rd_kafka_conf_set:localhost:9092配置消息发送回调函数rd_kafka_conf_set_dr_msg_cb创建一个producer实例rd_kafka_new+使用上述conf创建一个topicrd_kafka_topic_new发送消息rd_kafka_produce消息发送成功回调rd_kafka_poll
释放
等待队列处理完毕
rd_kafka_flush
释放topic配置
rd_kafka_topic_destroy
释放product配置
rd_kafka_destroy
消费者流程
初始化
创建kafka配置
rd_kafka_conf_new
设置信号处理
rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf,
const char *name,
const char *value, char *errstr, size_t errstr_size);
conf:配置结构
vame:配置项名称
value:配置项值
errstr:错误提示
errstr_size:错误提示长度
返回值:rd_kafka_conf_res_t 枚举,错误写入errstr中
创建topic配置
rd_kafka_topic_conf_t *rd_kafka_topic_conf_new (void)
参数:无
返回值:rd_kafka_topic_conf_t *
创建一个主题配置结构,并进行默认初始化设置,返回其引用指针。
创建kafka实例
rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *app_conf,
char *errstr, size_t errstr_size)
参数:
Type:RD_KAFKA_PRODUCER是创建生产者类型,RD_KAFKA_CONSUMER是创建消费者类型
Conf:配置结构
Errstr:错误提示
errstr_size:错误提示长度
返回值:
成功:返回rd_kafka_t *kafka操作句柄
失败:返回NULL,并记录错误信息到errstr
程序中先配置conf和topic_conf,然后调用此接口生成操作句柄。对消费者来讲,订阅主题,轮询接收消息。对生产者来讲,根据主题生成主题操作句柄,并通过主题操作句柄发送消息。
添加服务器
int rd_kafka_brokers_add (rd_kafka_t *rk, const char *brokerlist)
参数:
Rk:kafka操作句柄
Brokerlist:broker字符串 如:”172.20.51.38:9092” 不写端口,则采用默认端口9092
多个broker brokerlist = "broker1:10000,broker2"
返回值:成功添加的broker个数
添加一个broker也可以通过 设置rd_kafka_conf_t结构中的 "bootstrap.servers" 配置项
rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr, sizeof(errstr))
消息重定向
rd_kafka_resp_err_t rd_kafka_poll_set_consumer (rd_kafka_t *rk)
参数:
Rk:kafka操作句柄
返回值:rd_kafka_resp_err_t 枚举
将消息重定向到了消费者队列,可以使用rd_kafka_consumer_poll()进行取消息。
创建一个Topic+Partition的存储空间
rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_new (int size)
rd_kafka_topic_partition_list_new()创建,创建时指定长度,通过rd_kafka_topic_partition_list_add()添加 主题-分区对,用于订阅消息。
开启consumer订阅,匹配的topic将被添加到订阅列表中
rd_kafka_topic_partition_list_add
rd_kafka_subscribe
读取队列消息
rd_kafka_consumer_poll
参数一:kafka示例句柄
参数二:等待时间
获取topic name
rd_kafka_topic_name
参数一:topic handle 句柄
释放
待队列消息处理完
rd_kafka_consumer_close
kafka句柄
释放队列资源
rd_kafka_topic_partition_list_destroy
释放队列,类型为rd_kafka_topic_partition_list_t
释放kafka实例
rd_kafka_destroy
kafka句柄
版权归原作者 强壮的向阳花 所有, 如有侵权,请联系我们删除。