0


linux 消息发布工具-kafka

下载地址:https://mirrors.cnnic.cn/apache/kafka

一、系统构成

在整个消息发布流程中,kafka作为一个中间件对系统的运行提供了解耦、削峰、异步处理的能力。

由生产者发起信息同步条件,中间件作为信息转储的角色对news进行发布。消费者会通过offset来保证接收最新的消息

主要由生产者、消费者、中间件构成。中间件由

producer

生产者

broker

kafka实现实例

其可实现主要功能部分

topic

每个kafka实例内部的消息类型

每个实例内部可存在多个topic

partition

消息类型分组,每组的数据不同。

每个topic内存在多个partition

message

每条发送的消息主体

consumer

消费者

consumer_group

消费者组

组形式的消费者,一个可以获取多个信息

zookeeper

保存集群内部信息,保证系统可用性

配置文件,系统启动

二、kafka应用环境部署

2.1、环境搭建

  • 下载应用包

  • 下载后的文件放在linux系统的

  • 创建一个日志文件,用来保存日志信息

  • 修改config文件,添加日志路径,侦听端口信息

对下列信息改动

设置broker的数量

提供生产者和消费者的服务地址

zk服务器地址

  • 启动zookeeper

./bin/zookeeper-server-start.sh config/zookeeper.properties &

  • 启动kafka服务

./bin/kafka-server-start.sh config/server.properties &

  • 创建一个topic

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic1

--zookeeper: kafka连接zookeeper的url,和server.properties文件中的配置项 zookeeper.connect=localhost:2181 一致

  • 查看topic

./bin/kafka-topics.sh --list --zookeeper localhost:2181

  • 查看topic的详细信息

./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic topic1

  • 生产消息

/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic1

这里的 --broker-list localhost:9092

  • 消费消息

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic1 --from-beginning

hello,world

this is my kafka

2.2、编写生产者-消费者示例代码

源码编译gcc语句

gcc my_producer.c -o my_producer -lrdkafka -lz -lpthread -lrt

gcc my_consumer.c -o my_consumer -lrdkafka -lz -lpthread -lrt

生产者流程

  • 创建conf配置对象rd_kafka_conf_new配置broke集群rd_kafka_conf_set:localhost:9092配置消息发送回调函数rd_kafka_conf_set_dr_msg_cb创建一个producer实例rd_kafka_new+使用上述conf创建一个topicrd_kafka_topic_new发送消息rd_kafka_produce消息发送成功回调rd_kafka_poll

释放

等待队列处理完毕

rd_kafka_flush

释放topic配置

rd_kafka_topic_destroy

释放product配置

rd_kafka_destroy

消费者流程

初始化

创建kafka配置

rd_kafka_conf_new

设置信号处理

rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf,

const char *name,

const char *value, char *errstr, size_t errstr_size);

conf:配置结构

vame:配置项名称

value:配置项值

errstr:错误提示

errstr_size:错误提示长度

返回值:rd_kafka_conf_res_t 枚举,错误写入errstr中

创建topic配置

rd_kafka_topic_conf_t *rd_kafka_topic_conf_new (void)

参数:无

返回值:rd_kafka_topic_conf_t *

创建一个主题配置结构,并进行默认初始化设置,返回其引用指针。

创建kafka实例

rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *app_conf,

char *errstr, size_t errstr_size)

参数:

Type:RD_KAFKA_PRODUCER是创建生产者类型,RD_KAFKA_CONSUMER是创建消费者类型

Conf:配置结构

Errstr:错误提示

errstr_size:错误提示长度

返回值:

成功:返回rd_kafka_t *kafka操作句柄

失败:返回NULL,并记录错误信息到errstr

程序中先配置conf和topic_conf,然后调用此接口生成操作句柄。对消费者来讲,订阅主题,轮询接收消息。对生产者来讲,根据主题生成主题操作句柄,并通过主题操作句柄发送消息。

添加服务器

int rd_kafka_brokers_add (rd_kafka_t *rk, const char *brokerlist)

参数:

Rk:kafka操作句柄

Brokerlist:broker字符串 如:”172.20.51.38:9092” 不写端口,则采用默认端口9092

多个broker brokerlist = "broker1:10000,broker2"

返回值:成功添加的broker个数

添加一个broker也可以通过 设置rd_kafka_conf_t结构中的 "bootstrap.servers" 配置项

rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr, sizeof(errstr))

消息重定向

rd_kafka_resp_err_t rd_kafka_poll_set_consumer (rd_kafka_t *rk)

参数:

Rk:kafka操作句柄

返回值:rd_kafka_resp_err_t 枚举

将消息重定向到了消费者队列,可以使用rd_kafka_consumer_poll()进行取消息。

创建一个Topic+Partition的存储空间

rd_kafka_topic_partition_list_t *rd_kafka_topic_partition_list_new (int size)

rd_kafka_topic_partition_list_new()创建,创建时指定长度,通过rd_kafka_topic_partition_list_add()添加 主题-分区对,用于订阅消息。

开启consumer订阅,匹配的topic将被添加到订阅列表中

rd_kafka_topic_partition_list_add

rd_kafka_subscribe

读取队列消息

rd_kafka_consumer_poll

参数一:kafka示例句柄
参数二:等待时间

获取topic name

rd_kafka_topic_name

参数一:topic handle 句柄

释放

待队列消息处理完

rd_kafka_consumer_close

kafka句柄

释放队列资源

rd_kafka_topic_partition_list_destroy

释放队列,类型为rd_kafka_topic_partition_list_t

释放kafka实例

rd_kafka_destroy

kafka句柄

标签: linux kafka 运维

本文转载自: https://blog.csdn.net/qq_41167620/article/details/135821821
版权归原作者 强壮的向阳花 所有, 如有侵权,请联系我们删除。

“linux 消息发布工具-kafka”的评论:

还没有评论