使用C/C++语言操作Kafka时,librdkafka是首选的开源库
- 使用librdkafka创建消费者客户端时,应配置如下属性
- 消费者会话组保持活动心跳间隔
- 自动提交偏移
- 自动重置偏移自动重置偏移有五种属性设置:a. earliest (最早的)b. latest (最近的)c. largest (最大的)d. smallest (最小的)e. beginning (起始的)f. end (结束的)g. error (错误的)
#include"librdkafka/rdkafka.h"// 声明消费者实例rd_kafka_t*rk;// 临时配置对象rd_kafka_conf_t*conf;
conf =rd_kafka_conf_new();//消费者会话组保持活动心跳间隔if(rd_kafka_conf_set(conf,"heartbeat.interval.ms","3000", errstr,sizeof(errstr))!= RD_KAFKA_CONF_OK){fprintf(stderr,"%s\n", errstr);rd_kafka_conf_destroy(conf);return1;}//自动提交偏移if(rd_kafka_conf_set(conf,"auto.commit.enable","true", errstr,sizeof(errstr))!= RD_KAFKA_CONF_OK){fprintf(stderr,"%s\n", errstr);rd_kafka_conf_destroy(conf);return1;}//自动重置偏移if(rd_kafka_conf_set(conf,"auto.offset.reset","latest", errstr,sizeof(errstr))!= RD_KAFKA_CONF_OK){fprintf(stderr,"%s\n", errstr);rd_kafka_conf_destroy(conf);return1;}// 创建消费者实例
rk =rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr,sizeof(errstr));
- 使用librdkafka创建生产者客户端时,若生产频率1s/次,可以设置socket为长连接
- 在代理套接字上启用TCP保持活动
#include"librdkafka/rdkafka.h"// 声明生产者实例rd_kafka_t*rk;// 临时配置对象rd_kafka_conf_t*conf;
conf =rd_kafka_conf_new();// 设置长连接if(rd_kafka_conf_set(conf,"socket.keepalive.enable","true", errstr,sizeof(errstr))!= RD_KAFKA_CONF_OK){rd_kafka_conf_destroy(conf);return false;}// 创建生产者实例
rk =rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr,sizeof(errstr));
本文转载自: https://blog.csdn.net/Love_XiaoQinEr/article/details/128693308
版权归原作者 晓琴儿 所有, 如有侵权,请联系我们删除。
版权归原作者 晓琴儿 所有, 如有侵权,请联系我们删除。