C++实现kafka生产者客户端
一、Kafka 生产者的逻辑
#mermaid-svg-SUNmUvxgS4WY6lrb {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-SUNmUvxgS4WY6lrb .error-icon{fill:#552222;}#mermaid-svg-SUNmUvxgS4WY6lrb .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-SUNmUvxgS4WY6lrb .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-SUNmUvxgS4WY6lrb .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-SUNmUvxgS4WY6lrb .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-SUNmUvxgS4WY6lrb .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-SUNmUvxgS4WY6lrb .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-SUNmUvxgS4WY6lrb .marker{fill:#333333;stroke:#333333;}#mermaid-svg-SUNmUvxgS4WY6lrb .marker.cross{stroke:#333333;}#mermaid-svg-SUNmUvxgS4WY6lrb svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-SUNmUvxgS4WY6lrb .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-SUNmUvxgS4WY6lrb .cluster-label text{fill:#333;}#mermaid-svg-SUNmUvxgS4WY6lrb .cluster-label span{color:#333;}#mermaid-svg-SUNmUvxgS4WY6lrb .label text,#mermaid-svg-SUNmUvxgS4WY6lrb span{fill:#333;color:#333;}#mermaid-svg-SUNmUvxgS4WY6lrb .node rect,#mermaid-svg-SUNmUvxgS4WY6lrb .node circle,#mermaid-svg-SUNmUvxgS4WY6lrb .node ellipse,#mermaid-svg-SUNmUvxgS4WY6lrb .node polygon,#mermaid-svg-SUNmUvxgS4WY6lrb .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-SUNmUvxgS4WY6lrb .node .label{text-align:center;}#mermaid-svg-SUNmUvxgS4WY6lrb .node.clickable{cursor:pointer;}#mermaid-svg-SUNmUvxgS4WY6lrb .arrowheadPath{fill:#333333;}#mermaid-svg-SUNmUvxgS4WY6lrb .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-SUNmUvxgS4WY6lrb .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-SUNmUvxgS4WY6lrb .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-SUNmUvxgS4WY6lrb .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-SUNmUvxgS4WY6lrb .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-SUNmUvxgS4WY6lrb .cluster text{fill:#333;}#mermaid-svg-SUNmUvxgS4WY6lrb .cluster span{color:#333;}#mermaid-svg-SUNmUvxgS4WY6lrb div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-SUNmUvxgS4WY6lrb :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;}
配置客户端参数
创建生产者实例
构建待发送消息
发送消息
关闭实例
(1)配置生产者客户端参数。
(2)创建相应的生产者实例。
(3)构建待发送的消息。
(4)发送消息。
(5)关闭生产者实例。
二、Kafka 的C++ API
2.1、RdKafka::Conf
enumConfType{
CONF_GLOBAL,// 全局配置
CONF_TOPIC // Topic配置 };enumConfResult{
CONF_UNKNOWN =-2,
CONF_INVALID =-1,
CONF_OK =0};
- static Conf * create(ConfType type); 创建配置对象。
- Conf::ConfResult set(const std::string &name, const std::string &value, std::string &errstr); 设置配置对象的属性值,成功返回CONF_OK,错误时错误信息输出到errstr。
- Conf::ConfResult set(const std::string &name, DeliveryReportCb *dr_cb, std::string &errstr); 设置dr_cb属性值。
- Conf::ConfResult set(const std::string &name, EventCb *event_cb, std::string &errstr); 设置event_cb属性值。
- Conf::ConfResult set(const std::string &name, const Conf *topic_conf, std::string &errstr); 设置用于自动订阅Topic的默认Topic配置。
- Conf::ConfResult set(const std::string &name, PartitionerCb *partitioner_cb, std::string &errstr); 设置partitioner_cb属性值,配置对象必须是CONF_TOPIC类型。
- Conf::ConfResult set(const std::string &name, PartitionerKeyPointerCb *partitioner_kp_cb,std::string &errstr); 设置partitioner_key_pointer_cb属性值。
- Conf::ConfResult set(const std::string &name, SocketCb *socket_cb, std::string &errstr); 设置socket_cb属性值。
- Conf::ConfResult set(const std::string &name, OpenCb *open_cb, std::string &errstr); 设置open_cb属性值。
- Conf::ConfResult set(const std::string &name, RebalanceCb *rebalance_cb, std::string &errstr); 设置rebalance_cb属性值。
- Conf::ConfResult set(const std::string &name, OffsetCommitCb *offset_commit_cb, std::string &errstr); 设置offset_commit_cb属性值。
- Conf::ConfResult get(const std::string &name, std::string &value) const; 查询单条属性配置值。
2.2、RdKafka::Message
Message表示一条消费或生产的消息,或是事件。
- std::string errstr() const; 如果消息是一条错误事件,返回错误字符串,否则返回空字符串。
- ErrorCode err() const; 如果消息是一条错误事件,返回错误代码,否则返回0。
- Topic * topic() const; 返回消息的Topic对象。如果消息的Topic对象没有显示使用RdKafka::Topic::create()创建,需要使用topic_name函数。
- std::string topic_name() const; 返回消息的Topic名称。
- int32_t partition() const; 如果分区可用,返回分区号。
- void * payload() const; 返回消息数据。
- size_t len() const; 返回消息数据的长度。
- const std::string * key() const; 返回字符串类型的消息key。
- const void * key_pointer() const; 返回void类型的消息key。
- size_t key_len() const; 返回消息key的二进制长度。
- int64_t offset () const; 返回消息或错误的位移。
- void * msg_opaque() const; 返回RdKafka::Producer::produce()提供的msg_opaque。
- virtual MessageTimestamp timestamp() const = 0; 返回消息时间戳。
- virtual int64_t latency() const = 0; 返回produce函数内生产消息的微秒级时间延迟,如果延迟不可用,返回-1。
- virtual struct rd_kafka_message_s *c_ptr () = 0; 返回底层数据结构的C rd_kafka_message_t句柄。
- virtual Status status () const = 0; 返回消息在Topic Log的持久化状态。
- virtual RdKafka::Headers *headers () = 0; 返回消息头。
- virtual RdKafka::Headers *headers (RdKafka::ErrorCode *err) = 0; 返回消息头,错误信息会输出到err。
2.3、RdKafka::DeliveryReportCb
每收到一条RdKafka::Producer::produce()函数生产的消息,调用一次投递报告回调函数,RdKafka::Message::err()将会标识Produce请求的结果。
为了使用队列化的投递报告回调函数,必须调用RdKafka::poll()函数。
virtual voiddr_cb(Message &message)=0;
当一条消息成功生产或是rdkafka遇到永久失败或是重试次数耗尽,投递报告回调函数会被调用。
C++封装示例:
classProducerDeliveryReportCb:public RdKafka::DeliveryReportCb{public:voiddr_cb(RdKafka::Message &message){if(message.err())
std::cerr <<"Message delivery failed: "<< message.errstr()<< std::endl;else{// Message delivered to topic test [0] at offset 135000
std::cerr <<"Message delivered to topic "<< message.topic_name()<<" ["<< message.partition()<<"] at offset "<< message.offset()<< std::endl;}}};
2.4、RdKafka::Event
enumType{
EVENT_ERROR,//错误条件事件
EVENT_STATS,// Json文档统计事件
EVENT_LOG,// Log消息事件
EVENT_THROTTLE // 来自Broker的throttle级信号事件 };
- virtual Type type() const =0; 返回事件类型。
- virtual ErrorCode err() const =0; 返回事件错误代码。
- virtual Severity severity() const =0; 返回log严重级别。
- virtual std::string fac() const =0; 返回log基础字符串。
- virtual std::string str () const =0; 返回Log消息字符串。
- virtual int throttle_time() const =0; 返回throttle时间。
- virtual std::string broker_name() const =0; 返回Broker名称。
- virtual int broker_id() const =0; 返回Broker ID。
2.5、RdKafka::EventCb
事件是从RdKafka传递错误、统计信息、日志等消息到应用程序的通用接口。
virtualvoidevent_cb(Event &event)=0;// 事件回调函数
C++封装示例:
classProducerEventCb:public RdKafka::EventCb{public:voidevent_cb(RdKafka::Event &event){switch(event.type()){case RdKafka::Event::EVENT_ERROR:
std::cout <<"RdKafka::Event::EVENT_ERROR: "<<RdKafka::err2str(event.err())<< std::endl;break;case RdKafka::Event::EVENT_STATS:
std::cout <<"RdKafka::Event::EVENT_STATS: "<< event.str()<< std::endl;break;case RdKafka::Event::EVENT_LOG:
std::cout <<"RdKafka::Event::EVENT_LOG "<< event.fac()<< std::endl;break;case RdKafka::Event::EVENT_THROTTLE:
std::cout <<"RdKafka::Event::EVENT_THROTTLE "<< event.broker_name()<< std::endl;break;}}};
2.6、RdKafka::PartitionerCb
PartitionerCb用实现自定义分区策略,需要使用RdKafka::Conf::set()设置partitioner_cb属性。
virtualint32_tpartitioner_cb(const Topic *topic,const std::string *key,int32_t partition_cnt,void*msg_opaque)=0;//Partitioner回调函数
返回topic主题中使用key的分区,key可以是NULL或字符串。
返回值必须在0到partition_cnt间,如果分区失败可能返回RD_KAFKA_PARTITION_UA (-1)。
msg_opaque与RdKafka::Producer::produce()调用提供的msg_opaque相同。
C++封装示例:
classHashPartitionerCb:public RdKafka::PartitionerCb{public:int32_tpartitioner_cb(const RdKafka::Topic *topic,const std::string *key,int32_t partition_cnt,void*msg_opaque){char msg[128]={0};int32_t partition_id =generate_hash(key->c_str(), key->size())% partition_cnt;// [topic][key][partition_cnt][partition_id] // :[test][6419][2][1]sprintf(msg,"HashPartitionerCb:topic:[%s], key:[%s]partition_cnt:[%d], partition_id:[%d]", topic->name().c_str(),
key->c_str(), partition_cnt, partition_id);
std::cout << msg << std::endl;return partition_id;}private:staticinlineunsignedintgenerate_hash(constchar*str, size_t len){unsignedint hash =5381;for(size_t i =0; i < len ; i++)
hash =((hash <<5)+ hash)+ str[i];return hash;}};
2.7、RdKafka::Topic
- static Topic * create(Handle *base, const std::string &topic_str, Conf *conf, std::string &errstr); 使用conf配置创建名为topic_str的Topic句柄。
- const std::string name (); 获取Topic名称。
- bool partition_available(int32_t partition) const; 获取parition分区是否可用,只能在 RdKafka::PartitionerCb回调函数内被调用。
- ErrorCode offset_store(int32_t partition, int64_t offset); 存储Topic的partition分区的offset位移,只能用于RdKafka::Consumer,不能用于RdKafka::KafkaConsumer高级接口类。使用本接口时,auto.commit.enable参数必须设置为false。
- virtual struct rd_kafka_topic_s *c_ptr () = 0; 返回底层数据结构的rd_kafka_topic_t句柄,不推荐利用rd_kafka_topic_t句柄调用C API,但如果C++ API没有提供相应功能,可以直接使用C API和librdkafka核心交互。
staticconstint32_t PARTITION_UA =-1;//未赋值分区staticconstint64_t OFFSET_BEGINNING =-2;//特殊位移,从开始消费staticconstint64_t OFFSET_END =-1;//特殊位移,从末尾消费staticconstint64_t OFFSET_STORED =-1000;//使用offset存储
2.8、RdKafka::Producer(核心)
static Producer * create(Conf *conf, std::string &errstr); 创建一个新的Producer客户端对象,conf用于替换默认配置对象,本函数调用后conf可以重用。成功返回新的Producer客户端对象,失败返回NULL,errstr可读错误信息。
ErrorCode produce(Topic *topic, int32_t partition, int msgflags, void *payload, size_t len,const std::string *key, void *msg_opaque); 生产和发送单条消息到Broker。msgflags:可选项为RK_MSG_BLOCK、RK_MSG_FREE、RK_MSG_COPY。
参数含义topic主题partition分区msgflags可选项为RK_MSG_BLOCK、RK_MSG_FREE、RK_MSG_COPY。RK_MSG_FREE表 示RdKafka调用produce完成后会释放payload数据;RK_MSG_COPY表示payload数据会被拷贝,在produce调用完成后RdKafka不会使用payload指针;RK_MSG_BLOCK表示在消息队列满时阻塞produce函数,如果dr_cb回调函数被使用,应用程序必须调用rd_kafka_poll函数确保投递消息队列的投递消息投递完。当消息队列满时,失败会导致produce函数的永久阻塞。RK_MSG_FREE和RK_MSG_COPY是互斥操作。如果produce函数调用时指定了RK_MSG_FREE,并返回了错误码,与payload指针相关的内存数据必须由使用者负责释放。payload长度为len的消息负载数据lenpayload消息数据的长度。keykey是可选的消息key,如果非NULL,会被传递给主题partitioner,并被随消息发送到Broker和传递给Consumer。msg_opaquemsg_opaque是可选的应用程序提供给每条消息的opaque指针,opaque指针会在dr_cb回调函数内提供。
返回错误码:
错误码含义ERR_NO_ERROR消息成功发送并入对列。ERR_QUEUE_FULL最大消息数量达到queue.buffering.max.message。ERR_MSG_SIZE_TOO_LARGE消息数据大小太大,超过messages.max.bytes配置的值。ERR_UNKNOWN_PARTITION请求一个Kafka集群内的未知分区。ERR_UNKNOWN_TOPICtopic是Kafka集群的未知主题。ErrorCode produce(Topic *topic, int32_t partition, int msgflags, void *payload, size_t len,const void *key, size_t key_len, void *msg_opaque); 生产和发送单条消息到Broker,传递key数据指针和key长度。
ErrorCode produce(Topic *topic, int32_t partition, const std::vector< char > *payload, const std::vector< char > *key, void *msg_opaque); 生产和发送单条消息到Broker,传递消息数组和key数组。接受数组类型的key和payload,数组会被复制。
ErrorCode flush (int timeout_ms); 等待所有未完成的所有Produce请求完成。为了确保所有队列和已经执行的Produce请求在中止前完成,flush操作优先于销毁生产者实例完成。本函数会调用Producer::poll()函数,因此会触发回调函数。
ErrorCode purge (int purge_flags); 清理生产者当前处理的消息。本函数调用时可能会阻塞一定时间,当后台线程队列在清理时。应用程序需要在调用poll或flush函数后,执行清理消息的dr_cb回调函数。
virtual Error *init_transactions (int timeout_ms) = 0; 初始化Producer实例的事务。失败返回RdKafka::Error错误对象,成功返回NULL。 通过调用RdKafka::Error::is_retriable()函数可以检查返回的错误对象是否有权限重试,调用 RdKafka::Error::is_fatal()检查返回的错误对象是否是严重错误。返回的错误对象必须elete。
virtual Error *begin_transaction () = 0; 启动事务。本函数调用前,init_transactions()函数必须被成功调用。 成功返回NULL,失败返回错误对象。通过调用RdKafka::Error::is_fatal_error()函数可以检查是否是严重错误,返回的错误对象必须delete。
virtual Error send_offsets_to_transaction (const std::vector &offsets,const ConsumerGroupMetadata *group_metadata,int timeout_ms) = 0; 发送TopicPartition位移链表到由group_metadata指定的Consumer Group协调器,如果事务提交成功,位移才会被提交。
virtual Error *commit_transaction (int timeout_ms) = 0; 提交当前事务。在实际提交事务时,任何未完成的消息会被完成投递。 成功返回NULL,失败返回错误对象。通过调用错误对象的方法可以检查是否有权限重试,是否是严重错误、可中止错误等。
virtual Error *abort_transaction (int timeout_ms) = 0; 停止事务。本函数从非严重错误、可终止事务中用于恢复。未完成消息会被清理。
三、Kafka 生产者客户端开发
3.1、必要的参数配置(bootstrap.servers)
(1)指定连接 Kafka 集群所需要的 broker 地址清单,具体的内容格式为 host1:port1,host2:port2,可以设置一个或者多个地址,中间以逗号进行隔开,此参数的默认值为 “”。
(2)注意这里并非需要所有的 broker 地址,因为生产者会从给定的 broker 里查找其他 broker 的信息。
(3)过建议至少要设置两个以上的 broker 地址信息,当其中任意一个宕机时,生产者仍然可以连接到 Kafka 集群上。
// 创建Kafka Conf对象
m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);if(m_config ==NULL){
std::cout <<"Create RdKafka Conf failed."<< std::endl;}// 创建Topic Conf对象
m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);if(m_topicConfig ==NULL){
std::cout <<"Create RdKafka Topic Conf failed."<< std::endl;}// 设置Broker属性
RdKafka::Conf::ConfResult errCode;
m_dr_cb =new ProducerDeliveryReportCb;
std::string errorStr;
errCode = m_config->set("dr_cb", m_dr_cb, errorStr);if(errCode != RdKafka::Conf::CONF_OK){
std::cout <<"Conf set failed:"<< errorStr << std::endl;}
m_event_cb =new ProducerEventCb;
errCode = m_config->set("event_cb", m_event_cb, errorStr);if(errCode != RdKafka::Conf::CONF_OK){
std::cout <<"Conf set failed:"<< errorStr << std::endl;}
m_partitioner_cb =new HashPartitionerCb;
errCode = m_topicConfig->set("partitioner_cb", m_partitioner_cb, errorStr);if(errCode != RdKafka::Conf::CONF_OK){
std::cout <<"Conf set failed:"<< errorStr << std::endl;}
errCode = m_config->set("statistics.interval.ms","10000", errorStr);if(errCode != RdKafka::Conf::CONF_OK){
std::cout <<"Conf set failed:"<< errorStr << std::endl;}
errCode = m_config->set("message.max.bytes","10240000", errorStr);if(errCode != RdKafka::Conf::CONF_OK){
std::cout <<"Conf set failed:"<< errorStr << std::endl;}
errCode = m_config->set("bootstrap.servers", m_brokers, errorStr);if(errCode != RdKafka::Conf::CONF_OK){
std::cout <<"Conf set failed:"<< errorStr << std::endl;}
3.2、创建生产者实例
生产者的相关配置和实例的创建可以在类的构造函数实现。比如Kafka Conf对象、Topic Conf对象、设置Broker属性、Producer、Topic对象等。
// 创建Producer
m_producer = RdKafka::Producer::create(m_config, errorStr);if(m_producer ==NULL){
std::cout <<"Create Producer failed:"<< errorStr << std::endl;}// 创建Topic对象
m_topic = RdKafka::Topic::create(m_producer, m_topicStr, m_topicConfig, errorStr);if(m_topic ==NULL){
std::cout <<"Create Topic failed:"<< errorStr << std::endl;}
3.3、消息发送
librdkafka提供的异步的生产接口,异步的消费接口和同步的消息接口,没有同步的生产接口。
同一个生产者可以发送多个主题的,在内部处理时根据传入的topic对象发送给对应的主题分区。
RdKafka::ErrorCode errorCode = m_producer->produce(
m_topic,
RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
payload,
len,&key,NULL);
3.4、完整示例代码
KafkaProducer.h
#ifndefKAFKAPRODUCER_H#defineKAFKAPRODUCER_H#pragmaonce#include<string>#include<iostream>#include"rdkafkacpp.h"classKafkaProducer{public:/**
* @brief KafkaProducer
* @param brokers
* @param topic
* @param partition
*/explicitKafkaProducer(const std::string& brokers,const std::string& topic,int partition);/**
* @brief push Message to Kafka
* @param str, message data
*/voidpushMessage(const std::string& str,const std::string& key);~KafkaProducer();private:
std::string m_brokers;// Broker列表,多个使用逗号分隔
std::string m_topicStr;// Topic名称int m_partition;// 分区
RdKafka::Conf* m_config;// Kafka Conf对象
RdKafka::Conf* m_topicConfig;// Topic Conf对象
RdKafka::Topic* m_topic;// Topic对象
RdKafka::Producer* m_producer;// Producer对象/*只要看到Cb 结尾的类,要继承它然后实现对应的回调函数*/
RdKafka::DeliveryReportCb* m_dr_cb;
RdKafka::EventCb* m_event_cb;
RdKafka::PartitionerCb* m_partitioner_cb;};#endif
KafkaProducer.cpp
#include"KafkaProducer.h"// call backclassProducerDeliveryReportCb:public RdKafka::DeliveryReportCb{public:voiddr_cb(RdKafka::Message &message){if(message.err())
std::cerr <<"Message delivery failed: "<< message.errstr()<< std::endl;else{// Message delivered to topic test [0] at offset 135000
std::cerr <<"Message delivered to topic "<< message.topic_name()<<" ["<< message.partition()<<"] at offset "<< message.offset()<< std::endl;}}};classProducerEventCb:public RdKafka::EventCb{public:voidevent_cb(RdKafka::Event &event){switch(event.type()){case RdKafka::Event::EVENT_ERROR:
std::cout <<"RdKafka::Event::EVENT_ERROR: "<<RdKafka::err2str(event.err())<< std::endl;break;case RdKafka::Event::EVENT_STATS:
std::cout <<"RdKafka::Event::EVENT_STATS: "<< event.str()<< std::endl;break;case RdKafka::Event::EVENT_LOG:
std::cout <<"RdKafka::Event::EVENT_LOG "<< event.fac()<< std::endl;break;case RdKafka::Event::EVENT_THROTTLE:
std::cout <<"RdKafka::Event::EVENT_THROTTLE "<< event.broker_name()<< std::endl;break;}}};classHashPartitionerCb:public RdKafka::PartitionerCb{public:int32_tpartitioner_cb(const RdKafka::Topic *topic,const std::string *key,int32_t partition_cnt,void*msg_opaque){char msg[128]={0};int32_t partition_id =generate_hash(key->c_str(), key->size())% partition_cnt;// [topic][key][partition_cnt][partition_id] // :[test][6419][2][1]sprintf(msg,"HashPartitionerCb:topic:[%s], key:[%s]partition_cnt:[%d], partition_id:[%d]", topic->name().c_str(),
key->c_str(), partition_cnt, partition_id);
std::cout << msg << std::endl;return partition_id;}private:staticinlineunsignedintgenerate_hash(constchar*str, size_t len){unsignedint hash =5381;for(size_t i =0; i < len; i++)
hash =((hash <<5)+ hash)+ str[i];return hash;}};KafkaProducer::KafkaProducer(const std::string& brokers,const std::string& topic,int partition){
m_brokers = brokers;
m_topicStr = topic;
m_partition = partition;/* 创建Kafka Conf对象 */
m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);if(m_config==NULL)
std::cout <<"Create RdKafka Conf failed."<< std::endl;/* 创建Topic Conf对象 */
m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);if(m_topicConfig ==NULL)
std::cout <<"Create RdKafka Topic Conf failed."<< std::endl;/* 设置Broker属性 */
RdKafka::Conf::ConfResult errCode;
std::string errorStr;
m_dr_cb =new ProducerDeliveryReportCb;// 设置dr_cb属性值
errCode = m_config->set("dr_cb", m_dr_cb, errorStr);if(errCode != RdKafka::Conf::CONF_OK){
std::cout <<"Conf set failed:"<< errorStr << std::endl;}// 设置event_cb属性值
m_event_cb =new ProducerEventCb;
errCode = m_config->set("event_cb", m_event_cb, errorStr);if(errCode != RdKafka::Conf::CONF_OK){
std::cout <<"Conf set failed:"<< errorStr << std::endl;}// 自定义分区策略
m_partitioner_cb =new HashPartitionerCb;
errCode = m_topicConfig->set("partitioner_cb", m_partitioner_cb, errorStr);if(errCode != RdKafka::Conf::CONF_OK){
std::cout <<"Conf set failed:"<< errorStr << std::endl;}// 设置配置对象的属性值
errCode = m_config->set("statistics.interval.ms","10000", errorStr);if(errCode != RdKafka::Conf::CONF_OK){
std::cout <<"Conf set failed:"<< errorStr << std::endl;}
errCode = m_config->set("message.max.bytes","10240000", errorStr);if(errCode != RdKafka::Conf::CONF_OK){
std::cout <<"Conf set failed:"<< errorStr << std::endl;}
errCode = m_config->set("bootstrap.servers", m_brokers, errorStr);if(errCode != RdKafka::Conf::CONF_OK){
std::cout <<"Conf set failed:"<< errorStr << std::endl;}/* 创建Producer */
m_producer = RdKafka::Producer::create(m_config, errorStr);if(m_producer ==NULL){
std::cout <<"Create Producer failed:"<< errorStr << std::endl;}/* 创建Topic对象 */
m_topic = RdKafka::Topic::create(m_producer, m_topicStr, m_topicConfig, errorStr);if(m_topic ==NULL){
std::cout <<"Create Topic failed:"<< errorStr << std::endl;}}KafkaProducer::~KafkaProducer(){while(m_producer->outq_len()>0){
std::cerr <<"Waiting for "<< m_producer->outq_len()<< std::endl;
m_producer->flush(5000);}delete m_config;delete m_topicConfig;delete m_topic;delete m_producer;delete m_dr_cb;delete m_event_cb;delete m_partitioner_cb;}voidKafkaProducer::pushMessage(const std::string& str,const std::string& key){int32_t len = str.length();void* payload =const_cast<void*>(static_cast<constvoid*>(str.data()));
RdKafka::ErrorCode errorCode = m_producer->produce(
m_topic,
RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
payload,
len,&key,NULL);
m_producer->poll(0);if(errorCode != RdKafka::ERR_NO_ERROR){
std::cerr <<"Produce failed: "<<RdKafka::err2str(errorCode)<< std::endl;if(errorCode == RdKafka::ERR__QUEUE_FULL){
m_producer->poll(100);}}}
CMakeLists.txt
cmake_minimum_required(VERSION 2.8)
project(KafkaProducer)
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_COMPILER "g++")
set(CMAKE_CXX_FLAGS "-std=c++11 ${CMAKE_CXX_FLAGS}")
set(CMAKE_INCLUDE_CURRENT_DIR ON)# Kafka头文件路径
include_directories(/usr/local/include/librdkafka)# Kafka库路径
link_directories(/usr/local/lib)
aux_source_directory(. SOURCE)
add_executable(${PROJECT_NAME}${SOURCE})
TARGET_LINK_LIBRARIES(${PROJECT_NAME} rdkafka++)
测试文件main.cpp
#include<iostream>#include"KafkaProducer.h"usingnamespace std;intmain(){// 创建Producer// KafkaProducer producer("127.0.0.1:9092,192.168.2.111:9092", "test", 0);
KafkaProducer producer("127.0.0.1:9092","test",0);for(int i =0; i <10000; i++){char msg[64]={0};sprintf(msg,"%s%4d","Hello RdKafka ", i);// 生产消息char key[8]={0};// 主要用来做负载均衡sprintf(key,"%d", i);
producer.pushMessage(msg, key);}RdKafka::wait_destroyed(5000);}
编译:
mkdir build
cd build
cmake ..make
总结
Kafka Producer使用流程:
- 创建Kafka配置实例。
- 创建Topic配置实例。
- 设置Kafka配置实例Broker属性。
- 设置Topic配置实例属性。
- 注册回调函数(分区策略回调函数需要注册到Topic配置实例)。
- 创建Kafka Producer客户端实例。
- 创建Topic实例。
- 生产消息。
- 阻塞等待Producer生产消息完成。
- 等待Produce请求完成。
- 销毁Kafka Producer客户端实例。
版权归原作者 Lion Long 所有, 如有侵权,请联系我们删除。