消息中间件-Kafka的使用(2)
前言
- 因为Kafka中间件有分区的概念,所以可用Kafka中间件处理TP级海量数据
- Kafka是持久化存储的,和其他消息队列对比,Kafka哪怕消费了,消息也不会被删除。
- Kafka可用偏移量(Offset)来处理重复消费的问题。
一、环境安装
1.1 安装JDK8.0(含)以上
由于Zookeeper是用Java开发的,所以必须安装JDK。
1.2 下载Kafka,然后解压。
下载地址:https://archive.apache.org/dist/kafka/2.8.1/kafka_2.13-2.8.1.tgz
1.3 启动Zookeeper (Kafka内置有Zookeeper)
主要用到以下三个命令(bin\windows\)
注:(window目录外的文件主要是linux系统运行的)
1.4 配置Zookeeper
打开配置文件zookeeper.properties(\config\)
配置端口和Zookeeper存储目录
1.5 运行Zookeeper,命令如下:
zookeeper-server-start.bat ../../config/zookeeper.properties
运行成功,如下
1.6 打开\config\server.properties文件, 配置存储持久化目录
注:Kafka的默认端口号为 9092
1.7 运行Kafka,命令如下:
kafka-server-start.bat ../../config/server.properties
运行成功,监听端口9092
注意:请不要关闭以上2个cmd窗口,不然下面的项目会无法使用Kafka服务。
二、项目实战
2.1 生产者创建主题
2.1.1 引入包 Confluent.Kafka(1.8.2)
2.1.2 生产者新建Api以创建主题和分区
/// <summary>/// 创建订单/// </summary>/// <param name="orderCreateDto"></param>/// <returns></returns>[HttpPost]publicIEnumerable<OrderCreateDto>CreateOrder(OrderCreateDto orderCreateDto){#region 1、生产者 Producer{var producerConfig =newProducerConfig{
BootstrapServers ="127.0.0.1:9092",
MessageTimeoutMs =50000,
EnableIdempotence =true};var builder =newProducerBuilder<string,string>(producerConfig);
builder.SetDefaultPartitioner(RoundRobinPartitioner);using(var producer = builder.Build()){try{var OrderJson = JsonConvert.SerializeObject(orderCreateDto);TopicPartition topicPartition =newTopicPartition("create-order-producer",0);// 指定分区发送消息var dr = producer.ProduceAsync(topicPartition,newMessage<string,string>{ Key ="order-1", Value = OrderJson }).GetAwaiter().GetResult();//var dr = producer.ProduceAsync("order-create", new Message<string, string> { Key = "order-1", Value = OrderJson }).GetAwaiter().GetResult();
_logger.LogInformation("发送事件 {0} 到 {1} 成功", dr.Value, dr.TopicPartitionOffset);}catch(ProduceException<string,string> ex){
_logger.LogError(ex,"发送事件到 {0} 失败,原因 {1} ","order", ex.Error.Reason);}}}returnnull;}
测试结果:
调用Api成功
通过Kafka Tool可以查看到数据
2.2 消费者创建主题
服务器不能自动创建主题,需要手工创建
2.2.1 消费者新建Api来创建主题
/// <summary>/// 创建主题/// </summary>/// <param name="topic"></param>/// <param name="Partitions"></param>/// <returns></returns>[HttpGet("TopicCreate")]publicasyncTaskTopicCreate(string topic){AdminClientConfig adminClientConfig =newAdminClientConfig{
BootstrapServers ="127.0.0.1:9092",};var bu =newAdminClientBuilder(adminClientConfig).Build();
bu.CreateTopicsAsync(newTopicSpecification[]{newTopicSpecification{ Name = topic}}).Wait();await Task.CompletedTask;}
测试TopicCreate Api:
调用Api, 创建create-order 主题
2.2.2 消费者新建Api来创建主题和多个分区
/// <summary>/// 创建主题和分区/// </summary>/// <param name="topic"></param>/// <param name="Partitions"></param>/// <returns></returns>[HttpGet("TopicPartitionCreate")]publicasyncTaskTopicPartitionCreate(string topic,int PartitionCount){AdminClientConfig adminClientConfig =newAdminClientConfig{
BootstrapServers ="127.0.0.1:9092",};var bu =newAdminClientBuilder(adminClientConfig).Build();
bu.CreateTopicsAsync(newTopicSpecification[]{newTopicSpecification{ Name = topic,NumPartitions =PartitionCount}}).Wait();await Task.CompletedTask;}
测试成功
### 2.2.3 消费者更改主题和分区(分区的个数要大于等于当前分区数量)
/// <summary>/// 创建分区(更新分区)/// </summary>/// <param name="topic"></param>/// <param name="Partitions"></param>/// <returns></returns>[HttpGet("PartitionUpdate")]publicasyncTaskPartitionCreate(string topic,int PartitionCount){AdminClientConfig adminClientConfig =newAdminClientConfig{
BootstrapServers ="127.0.0.1:9092",};var bu =newAdminClientBuilder(adminClientConfig).Build();
bu.CreatePartitionsAsync(newPartitionsSpecification[]{newPartitionsSpecification{ Topic = topic, IncreaseTo=PartitionCount}}).Wait();await Task.CompletedTask;}
测试成功
2.3 消费者监听主题(自动确认消费,并且是获取某一主题的所有分区)
消费者代码
/// <summary>/// 创建订单/// </summary>/// <returns></returns>[HttpGet]publicasyncTask<Order>OrderCreate(){#region 1、工作队列(单消费者) Consumer{newTask(()=>{var consumerConfig =newConsumerConfig{
BootstrapServers ="127.0.0.1:9092",
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId ="order",
EnableAutoCommit =true};var builder =newConsumerBuilder<string,string>(consumerConfig);using(var consumer = builder.Build()){// 1、订阅
consumer.Subscribe("create-order");while(true){try{// 2、消费(自动确认)var result = consumer.Consume();// 3、业务逻辑:业务逻辑---->执行失败--->消息丢失string key = result.Key;stringvalue= result.Value;
_logger.LogInformation($"创建商品:Key:{key}");
_logger.LogInformation($"创建商品:Order:{value}");}catch(Exception e){
_logger.LogInformation($"异常:Order:{e}");}}}}).Start();}#endregion
Console.WriteLine("订单创建监听......");returnnull;}
2.4 消费者监听主题(手工确认消费)
因为自动确认消费有个缺点:一旦业务代码报错了,就会在下次重启消费者时造成数据丢失。
所以需要手工确认消费。
以下是消费者代码(手工确认消费)
/// <summary>/// 创建订单/// </summary>/// <returns></returns>[HttpGet]publicasyncTask<Order>OrderCreate(){#region 2、工作队列(单消费者)-手动确认消息{newTask(()=>{var consumerConfig =newConsumerConfig{
BootstrapServers ="127.0.0.1:9092",
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId ="order",
EnableAutoCommit =false,};var builder =newConsumerBuilder<string,string>(consumerConfig);var consumer = builder.Build();// 1、订阅
consumer.Subscribe("create-order");while(true){// 2、消费var result = consumer.Consume();// 3、业务逻辑string key = result.Key;stringvalue= result.Value;
_logger.LogInformation($"创建商品:Key:{key}");
_logger.LogInformation($"创建商品:Order:{value}");// 3、手动提交(向kafka确认消息)----偏移量---消息的序号
consumer.Commit(result);}}).Start();}#endregion
Console.WriteLine("订单创建监听......");returnnull;}
2.5 消费者监听主题(偏移量)
因为手工确认消息有个缺点:一旦kafka宕机了,就会在下次重启消费者时造成重复消费。
所以需要记录偏移量来避免重复消费。
以下是消费者代码(重置偏移量)
newTask(()=>{var consumerConfig =newConsumerConfig{
BootstrapServers ="127.0.0.1:9092",
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId ="order",
EnableAutoCommit =true,};var builder =newConsumerBuilder<string,string>(consumerConfig);using(var consumer = builder.Build()){// 1、订阅
consumer.Subscribe("create-order");// 1.2、获取偏移量string offset = distributedCache.GetString("create-order");if(string.IsNullOrEmpty(offset)){
offset ="0";}// 1.3、重置偏移量
consumer.Assign(newTopicPartitionOffset(newTopicPartition("create-order",0),int.Parse(offset)+1));while(true){// 2、消费var result = consumer.Consume();// 2.1、获取偏移量
_logger.LogInformation($"订单消息偏移量:Offset:{result.Offset}");// 2.2、把kafka队列中偏移量存起来。redis mysql// 2.3、重置kafka队列的偏移量
distributedCache.SetString("create-order", result.Offset.Value.ToString());// 3、业务处理string key = result.Key;stringvalue= result.Value;
_logger.LogInformation($"创建订单:Key:{key}");
_logger.LogInformation($"创建订单:Order:{value}");// redis缺陷:无法保证偏移和业务同时成功。// 方案:使用数据库来存储偏移量// 核心:通过数据库事务来保证// 3、手动提交(有了自动提交,不需要手工提交)// consumer.Commit(result);}}}).Start();
2.6 消费者监听主题(多消费者)
不同的消费者,对应的GroupId应该要设置为不一样,这样就会单独地消费。需要注意的是,新加入的消费者会从偏移量0开始消费(需要特别处理)
消费者1(订单消费者)代码:
#region 5、订阅发布(广播消费)1、创建订单----2、发送短信-GroupId{newTask(()=>{var consumerConfig =newConsumerConfig{
BootstrapServers ="127.0.0.1:9092",
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId ="order",
EnableAutoCommit =true,};var builder =newConsumerBuilder<string,string>(consumerConfig);var consumer = builder.Build();// 1、订阅
consumer.Subscribe("create-order");// 2、获取偏移量string offset = distributedCache.GetString("create-order");if(string.IsNullOrEmpty(offset)){
offset ="0";}// 3、重置偏移量
consumer.Assign(newTopicPartitionOffset(newTopicPartition("create-order",0),int.Parse(offset)+1));while(true){// 2、消费var result = consumer.Consume();// 2.1、获取偏移量
_logger.LogInformation($"订单消息偏移量:Offset:{result.Offset}");// 3、业务处理string key = result.Key;stringvalue= result.Value;
_logger.LogInformation($"创建商品:Key:{key}");
_logger.LogInformation($"创建商品:Order:{value}");// 2.2、把kafka队列中偏移量存起来。redis mysql// 2.3、重置kafka队列的偏移量
distributedCache.SetString("create-order", result.Offset.Value.ToString());// 3、手动提交//consumer.Commit(result);}}).Start();}#endregion
消费者2(短信消费者)代码:
#region 5、订阅发布(广播消费)1、创建订单----2、发送短信-GroupId{newTask(()=>{var consumerConfig =newConsumerConfig{
BootstrapServers ="127.0.0.1:9092",
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId ="sms",
EnableAutoCommit =false,};var builder =newConsumerBuilder<string,string>(consumerConfig);var consumer = builder.Build();// 1、订阅
consumer.Subscribe("create-order");while(true){// 2、消费var result = consumer.Consume();// 2.1、获取偏移量
_logger.LogInformation($"订单消息偏移量:Offset:{result.Offset}");// 3、业务处理string key = result.Key;stringvalue= result.Value;
_logger.LogInformation($"创建商品:Key:{key}");
_logger.LogInformation($"创建商品:Order:{value}");// 3、手动提交
consumer.Commit(result);}}).Start();}#endregion
2.7 设置消费者延迟消费(拓展)
案例:如订单如果30分钟之后没支付,就取消订单。
消费者代码:
#region 8、创建订单----1、订单消息延迟处理{newTask(()=>{var consumerConfig =newConsumerConfig{
BootstrapServers ="127.0.0.1:9092",
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId ="order",
EnableAutoCommit =false,//FetchMinBytes = 1,//FetchMaxBytes = 3060};var builder =newConsumerBuilder<string,string>(consumerConfig);using(var consumer = builder.Build()){// 1、订阅
consumer.Subscribe("create-order-1");// 2、偏移量恢复string offset = distributedCache.GetString("create-order-1");if(string.IsNullOrEmpty(offset)){
offset ="0";}
consumer.Assign(newTopicPartitionOffset(newTopicPartition("create-order-1",0),int.Parse(offset)+1));while(true){// 1、恢复消息(每隔5秒钟消费一次)newTimer((s)=>{
consumer.Resume(newList<TopicPartition>{newTopicPartition("create-order-1",0)});},null, Timeout.Infinite, Timeout.Infinite).Change(5000,5000);// 1.1、消费暂停
consumer.Pause(newList<TopicPartition>{newTopicPartition("create-order-1",0)});// 2、消费消息var result = consumer.Consume();//批量获取消息,根据-----》字节数try{// 2.1、获取偏移量
_logger.LogInformation($"订单消息偏移量:Offset:{result.Offset}");// 3、业务处理string key = result.Key;stringvalue= result.Value;
_logger.LogInformation($"创建商品:Key:{key}");
_logger.LogInformation($"创建商品:Order:{value}");// 2.2、把kafka队列中偏移量存起来。redis mysql// 2.3、重置kafka队列的偏移量
distributedCache.SetString("create-order-1", result.Offset.Value.ToString());// 3、手动提交
consumer.Commit(result);}catch(Exception ex){throw;}finally{
consumer.Pause(newList<TopicPartition>{newTopicPartition("create-order-1",0)});
Console.WriteLine($"暂停消费");}}}}).Start();}#endregion
2.8 生产者-失败重试
生产者使用幂等性属性EnableIdempotence=true,不会造成重复数据
消费者代码:
#region 2、生产者-失败重试{var producerConfig =newProducerConfig{
BootstrapServers ="127.0.0.1:9092",
MessageTimeoutMs =50000,
EnableIdempotence =true// 保证消息:不重复发送,失败重试};var builder =newProducerBuilder<string,string>(producerConfig);using(var producer = builder.Build()){try{var OrderJson = JsonConvert.SerializeObject(orderCreateDto);TopicPartition topicPartition =newTopicPartition("order-create-5",newPartition(0));var dr = producer.ProduceAsync("order-create-5",newMessage<string,string>{ Key ="order", Value = OrderJson }).GetAwaiter().GetResult();
_logger.LogInformation("发送事件 {0} 到 {1} 成功", dr.Value, dr.TopicPartitionOffset);}catch(ProduceException<string,string> ex){
_logger.LogError(ex,"发送事件到 {0} 失败,原因 {1} ","order", ex.Error.Reason);}}}#endregion
2.9 生产者-失败重试-多消息发送(使用事务)
消费者代码:
#region 3、生产者-失败重试-多消息发送{var producerConfig =newProducerConfig{
BootstrapServers ="127.0.0.1:9092",
MessageTimeoutMs =50000,
EnableIdempotence =true,
TransactionalId = Guid.NewGuid().ToString()};var builder =newProducerBuilder<string,string>(producerConfig);using(var producer = builder.Build()){// 1、初始化事务
producer.InitTransactions(TimeSpan.FromSeconds(60));try{var OrderJson = JsonConvert.SerializeObject(orderCreateDto);// 2、开始事务
producer.BeginTransaction();for(int i =0; i <100; i++){var dr = producer.ProduceAsync("order-create-5",newMessage<string,string>{ Key ="order", Value = OrderJson }).GetAwaiter().GetResult();
_logger.LogInformation("发送事件 {0} 到 {1} 成功", dr.Value, dr.TopicPartitionOffset);}// 3、提交事务
producer.CommitTransaction();}catch(ProduceException<string,string> ex){
_logger.LogError(ex,"发送事件到 {0} 失败,原因 {1} ","order", ex.Error.Reason);// 4、关闭事务
producer.AbortTransaction();}}}#endregion
2.10 生产者不设置分区,将会自动以消息名词为key, 用哈希一致性算法计算出存到哪个分区,然后会一直存到该分区。
消费者代码:
#region 4、生产者-固定分区发送{var producerConfig =newProducerConfig{
BootstrapServers ="127.0.0.1:9092",
MessageTimeoutMs =50000};var builder =newProducerBuilder<string,string>(producerConfig);using(var producer = builder.Build()){try{var OrderJson = JsonConvert.SerializeObject(orderCreateDto);//TopicPartition topicPartition = new TopicPartition("order-create", new Partition(0));var dr = producer.ProduceAsync("create-order-1",newMessage<string,string>{ Key ="order", Value = OrderJson }).GetAwaiter().GetResult();
_logger.LogInformation("发送事件 {0} 到 {1} 成功", dr.Value, dr.TopicPartitionOffset);}catch(ProduceException<string,string> ex){
_logger.LogError(ex,"发送事件到 {0} 失败,原因 {1} ","order", ex.Error.Reason);}}}#endregion
版权归原作者 m0_46291636 所有, 如有侵权,请联系我们删除。