0


c++ kafka

在 C++ 中使用 Kafka,通常我们会使用 Confluent 的 Kafka C++ 客户端库。下面我将给出一个简单的示例,展示如何使用这个库来创建一个生产者和一个消费者。

首先,你需要安装 Confluent 的 Kafka C++ 客户端。你可以从 GitHub 上获取源代码并编译,或者如果你使用的是 Debian 或 Ubuntu,可以使用下面的命令安装:

  1. sudo apt-get install librdkafka-dev

生产者示例

这是一个简单的 Kafka 生产者的示例代码,它会向一个名为 test_topic 的主题发送消息:

  1. #include <iostream>
  2. #include <rdkafka/rdkafkacpp.h>
  3. void delivery_report(RdKafka::Message &message) {
  4. if (message.err() == RdKafka::ERR__TIMED_OUT)
  5. std::cout << "Message delivery timed out." << std::endl;
  6. else if (message.err())
  7. std::cerr << "Failed to deliver message: " << message.errstr() << std::endl;
  8. else
  9. std::cout << "Produced message to topic " << message.topic_name()
  10. << " at offset " << message.offset() << std::endl;
  11. }
  12. int main() {
  13. std::string errstr;
  14. // 创建一个生产者配置实例
  15. RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
  16. // 设置 broker 列表
  17. conf->set("bootstrap.servers", "localhost:9092", errstr);
  18. if (conf->get_err()) {
  19. std::cerr << "Failed to set 'bootstrap.servers' configuration property: "
  20. << conf->errstr() << std::endl;
  21. return -1;
  22. }
  23. // 创建一个生产者实例
  24. RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
  25. if (!producer) {
  26. std::cerr << "Failed to create producer: " << errstr << std::endl;
  27. return -1;
  28. }
  29. // 发送消息
  30. const char *topic = "test_topic";
  31. const char *key = "mykey";
  32. const char *payload = "Hello, Kafka!";
  33. RdKafka::ErrorCode resp = producer->produce(
  34. topic, RdKafka::TopicPartition::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY,
  35. const_cast<char *>(payload), strlen(payload), const_cast<char *>(key),
  36. strlen(key), NULL, delivery_report);
  37. if (resp != RdKafka::ERR_NO_ERROR) {
  38. std::cerr << "% Produce failed: " << RdKafka::err2str(resp) << std::endl;
  39. }
  40. // 等待所有消息发送完成
  41. producer->poll(0);
  42. // 清理
  43. delete producer;
  44. delete conf;
  45. return 0;
  46. }

消费者示例

接下来是一个简单的 Kafka 消费者的示例代码,它订阅了 test_topic 并消费其中的消息:

  1. #include <iostream>
  2. #include <rdkafka/rdkafkacpp.h>
  3. class ConsumerRebalanceCb : public RdKafka::RebalanceCb {
  4. public:
  5. void on_rebalance(RdKafka::KafkaConsumer *consumer,
  6. RdKafka::ErrorCode err, std::vector<RdKafka::TopicPartition *> &partitions, void *opaque) {
  7. std::cerr << "Rebalance triggered: " << RdKafka::err2str(err) << std::endl;
  8. if (err == RdKafka::ERR__ASSIGN_PARTITIONS)
  9. consumer->assign(partitions);
  10. else
  11. consumer->unassign();
  12. }
  13. };
  14. int main() {
  15. std::string errstr;
  16. RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
  17. conf->set("bootstrap.servers", "localhost:9092", errstr);
  18. if (conf->get_err()) {
  19. std::cerr << "Failed to set 'bootstrap.servers' configuration property: "
  20. << conf->errstr() << std::endl;
  21. return -1;
  22. }
  23. conf->set("group.id", "test_group", errstr);
  24. if (conf->get_err()) {
  25. std::cerr << "Failed to set 'group.id' configuration property: "
  26. << conf->errstr() << std::endl;
  27. return -1;
  28. }
  29. RdKafka::KafkaConsumer *consumer = RdKafka::KafkaConsumer::create(conf, errstr);
  30. if (!consumer) {
  31. std::cerr << "Failed to create consumer: " << errstr << std::endl;
  32. return -1;
  33. }
  34. ConsumerRebalanceCb rebalance_cb;
  35. consumer->set_rebalance_cb(rebalance_cb);
  36. std::vector<std::string> topics;
  37. topics.push_back("test_topic");
  38. consumer->subscribe(topics);
  39. bool running = true;
  40. while (running) {
  41. RdKafka::Message *msg = consumer->consume(1000);
  42. switch (msg->err()) {
  43. case RdKafka::ERR_NO_ERROR:
  44. std::cout << "%% Message on " << msg->topic_name()
  45. << " at offset " << msg->offset() << ": "
  46. << (char *)msg->payload() << std::endl;
  47. break;
  48. case RdKafka::ERR__PARTITION_EOF:
  49. std::cerr << "%% Reached end of partition" << std::endl;
  50. break;
  51. default:
  52. std::cerr << "%% Error " << msg->err() << ": "
  53. << msg->errstr() << std::endl;
  54. break;
  55. }
  56. delete msg;
  57. }
  58. consumer->unsubscribe();
  59. delete consumer;
  60. delete conf;
  61. return 0;
  62. }

请确保将上述代码中的 localhost:9092 替换为你实际的 Kafka 服务器地址。此外,你也需要根据你的需求调整 topic 名称、组 ID 等参数。这两个示例展示了如何在 C++ 中使用 Kafka 进行消息的生产和消费。

标签: kafka

本文转载自: https://blog.csdn.net/weixin_43803780/article/details/140361331
版权归原作者 程序员孟猛 所有, 如有侵权,请联系我们删除。

“c++ kafka”的评论:

还没有评论