0


Kafka部署安装及简单使用

一、环境准备

1、jdk 8+

2、zookeeper

3、kafka

说明:在kafka较新版本中已经集成了zookeeper,所以不用单独安装zookeeper,只需要在kafka文件目录中启动zookeeper即可

二、下载地址

Apache Kafka

三、部署

1、启动zookeeper

  1. -- 启动
  2. ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
  3. -- 查看是否启动成功
  4. ps -ef | grep zoo

2、进入解压的kafka目录,修改/config/kafka-server的配置文件

  1. vi config/server.properties
  1. -- 重点配置节点说明
  2. listeners=PLAINTEXT://localhost:9092 -- 将localhost修改为主机ip
  3. log.dirs=/bigdata/kafka/logs-1 -- 默认不修改也可

3、使用kafka-server-start.sh,启动kafka服务

  1. ./bin/kafka-server-start.sh config/server.properties

四、使用客户端kafka tools连接kafka

客户端下载地址:Offset Explorer

关于客户端如何使用可查看:kafka可视化客户端工具(Kafka Tool)的基本使用 - Frankdeng - 博客园

五、kafka实战简单使用

NuGet:Confluent.Kafka

1、新建.Net Core控制台项目,代码如下:

  1. static void Main(string[] args)
  2. {
  3. // 发送消息
  4. var producerConfig = new ProducerConfig
  5. {
  6. BootstrapServers = "192.168.140.131:9092",
  7. MessageTimeoutMs = 50000
  8. };
  9. var builder = new ProducerBuilder<string, string>(producerConfig);
  10. using (var producer = builder.Build())
  11. {
  12. var data = new { key = "1", value = "001" };
  13. var json = JsonConvert.SerializeObject(data);
  14. var dr = producer.ProduceAsync("order", new Message<string, string> { Key = "order", Value = json }).GetAwaiter().GetResult();
  15. Console.WriteLine($"发送事件{dr.Value}到{dr.TopicPartitionOffset}成功");
  16. }
  17. // 消费消息
  18. var consumerConfig = new ConsumerConfig {
  19. BootstrapServers = "192.168.140.131:9092",
  20. AutoOffsetReset=AutoOffsetReset.Earliest,
  21. GroupId="1111", // 自定义
  22. EnableAutoCommit=true
  23. };
  24. var consumerBuilder = new ConsumerBuilder<string, string>(consumerConfig);
  25. using (var consumer = consumerBuilder.Build())
  26. {
  27. // 1、订阅
  28. consumer.Subscribe("order");
  29. while (true)
  30. {
  31. try
  32. {
  33. // 2、消费(自动确认)
  34. var result = consumer.Consume();
  35. // 3、业务逻辑
  36. string key = result.Key;
  37. string value = result.Value;
  38. Console.WriteLine($"创建商品:Key:{key}");
  39. Console.WriteLine($"创建商品:Order:{value}");
  40. consumer.Commit(result);
  41. }
  42. catch (Exception e)
  43. {
  44. Console.WriteLine($"异常:Order:{e}");
  45. }
  46. }
  47. }
  48. }

2、使用kafka客户端查看消息投递

标签: kafka java 分布式

本文转载自: https://blog.csdn.net/jh035/article/details/128062117
版权归原作者 jh035 所有, 如有侵权,请联系我们删除。

“Kafka部署安装及简单使用”的评论:

还没有评论