0


消息中间件-Kafka的使用(2)

消息中间件-Kafka的使用(2)


前言

  • 因为Kafka中间件有分区的概念,所以可用Kafka中间件处理TP级海量数据
  • Kafka是持久化存储的,和其他消息队列对比,Kafka哪怕消费了,消息也不会被删除。
  • Kafka可用偏移量(Offset)来处理重复消费的问题。

一、环境安装

1.1 安装JDK8.0(含)以上

由于Zookeeper是用Java开发的,所以必须安装JDK。

1.2 下载Kafka,然后解压。

下载地址:https://archive.apache.org/dist/kafka/2.8.1/kafka_2.13-2.8.1.tgz

1.3 启动Zookeeper (Kafka内置有Zookeeper)

主要用到以下三个命令(bin\windows\)
注:(window目录外的文件主要是linux系统运行的)
在这里插入图片描述

1.4 配置Zookeeper

打开配置文件zookeeper.properties(\config\)
在这里插入图片描述

配置端口和Zookeeper存储目录
在这里插入图片描述

1.5 运行Zookeeper,命令如下:

zookeeper-server-start.bat ../../config/zookeeper.properties

在这里插入图片描述
运行成功,如下
在这里插入图片描述

1.6 打开\config\server.properties文件, 配置存储持久化目录

在这里插入图片描述
注:Kafka的默认端口号为 9092
在这里插入图片描述

1.7 运行Kafka,命令如下:

kafka-server-start.bat ../../config/server.properties

在这里插入图片描述
运行成功,监听端口9092
在这里插入图片描述
注意:请不要关闭以上2个cmd窗口,不然下面的项目会无法使用Kafka服务。

二、项目实战

2.1 生产者创建主题

2.1.1 引入包 Confluent.Kafka(1.8.2)

2.1.2 生产者新建Api以创建主题和分区

/// <summary>/// 创建订单/// </summary>/// <param name="orderCreateDto"></param>/// <returns></returns>[HttpPost]publicIEnumerable<OrderCreateDto>CreateOrder(OrderCreateDto orderCreateDto){#region 1、生产者 Producer{var producerConfig =newProducerConfig{
        BootstrapServers ="127.0.0.1:9092",
        MessageTimeoutMs =50000,
        EnableIdempotence =true};var builder =newProducerBuilder<string,string>(producerConfig);
    builder.SetDefaultPartitioner(RoundRobinPartitioner);using(var producer = builder.Build()){try{var OrderJson = JsonConvert.SerializeObject(orderCreateDto);TopicPartition topicPartition =newTopicPartition("create-order-producer",0);// 指定分区发送消息var dr = producer.ProduceAsync(topicPartition,newMessage<string,string>{ Key ="order-1", Value = OrderJson }).GetAwaiter().GetResult();//var dr = producer.ProduceAsync("order-create", new Message<string, string> { Key = "order-1", Value = OrderJson }).GetAwaiter().GetResult();
            _logger.LogInformation("发送事件 {0} 到 {1} 成功", dr.Value, dr.TopicPartitionOffset);}catch(ProduceException<string,string> ex){
            _logger.LogError(ex,"发送事件到 {0} 失败,原因 {1} ","order", ex.Error.Reason);}}}returnnull;}

测试结果:
调用Api成功
在这里插入图片描述
通过Kafka Tool可以查看到数据
在这里插入图片描述

2.2 消费者创建主题

服务器不能自动创建主题,需要手工创建

2.2.1 消费者新建Api来创建主题

/// <summary>/// 创建主题/// </summary>/// <param name="topic"></param>/// <param name="Partitions"></param>/// <returns></returns>[HttpGet("TopicCreate")]publicasyncTaskTopicCreate(string topic){AdminClientConfig adminClientConfig =newAdminClientConfig{
        BootstrapServers ="127.0.0.1:9092",};var bu =newAdminClientBuilder(adminClientConfig).Build();
    bu.CreateTopicsAsync(newTopicSpecification[]{newTopicSpecification{ Name = topic}}).Wait();await Task.CompletedTask;}

测试TopicCreate Api:
调用Api, 创建create-order 主题
在这里插入图片描述
在这里插入图片描述

2.2.2 消费者新建Api来创建主题和多个分区

/// <summary>/// 创建主题和分区/// </summary>/// <param name="topic"></param>/// <param name="Partitions"></param>/// <returns></returns>[HttpGet("TopicPartitionCreate")]publicasyncTaskTopicPartitionCreate(string topic,int PartitionCount){AdminClientConfig adminClientConfig =newAdminClientConfig{
             BootstrapServers ="127.0.0.1:9092",};var bu =newAdminClientBuilder(adminClientConfig).Build();
         bu.CreateTopicsAsync(newTopicSpecification[]{newTopicSpecification{ Name = topic,NumPartitions =PartitionCount}}).Wait();await Task.CompletedTask;}

测试成功
在这里插入图片描述
在这里插入图片描述### 2.2.3 消费者更改主题和分区(分区的个数要大于等于当前分区数量)

/// <summary>/// 创建分区(更新分区)/// </summary>/// <param name="topic"></param>/// <param name="Partitions"></param>/// <returns></returns>[HttpGet("PartitionUpdate")]publicasyncTaskPartitionCreate(string topic,int PartitionCount){AdminClientConfig adminClientConfig =newAdminClientConfig{
        BootstrapServers ="127.0.0.1:9092",};var bu =newAdminClientBuilder(adminClientConfig).Build();
    bu.CreatePartitionsAsync(newPartitionsSpecification[]{newPartitionsSpecification{ Topic = topic, IncreaseTo=PartitionCount}}).Wait();await Task.CompletedTask;}

测试成功
在这里插入图片描述

在这里插入图片描述

2.3 消费者监听主题(自动确认消费,并且是获取某一主题的所有分区)

消费者代码

/// <summary>/// 创建订单/// </summary>/// <returns></returns>[HttpGet]publicasyncTask<Order>OrderCreate(){#region 1、工作队列(单消费者) Consumer{newTask(()=>{var consumerConfig =newConsumerConfig{
                 BootstrapServers ="127.0.0.1:9092",
                 AutoOffsetReset = AutoOffsetReset.Earliest,
                 GroupId ="order",
                 EnableAutoCommit =true};var builder =newConsumerBuilder<string,string>(consumerConfig);using(var consumer = builder.Build()){// 1、订阅
                 consumer.Subscribe("create-order");while(true){try{// 2、消费(自动确认)var result = consumer.Consume();// 3、业务逻辑:业务逻辑---->执行失败--->消息丢失string key = result.Key;stringvalue= result.Value;

                         _logger.LogInformation($"创建商品:Key:{key}");
                         _logger.LogInformation($"创建商品:Order:{value}");}catch(Exception e){
                         _logger.LogInformation($"异常:Order:{e}");}}}}).Start();}#endregion
     Console.WriteLine("订单创建监听......");returnnull;}

2.4 消费者监听主题(手工确认消费)

因为自动确认消费有个缺点:一旦业务代码报错了,就会在下次重启消费者时造成数据丢失。
所以需要手工确认消费。
以下是消费者代码(手工确认消费)

/// <summary>/// 创建订单/// </summary>/// <returns></returns>[HttpGet]publicasyncTask<Order>OrderCreate(){#region 2、工作队列(单消费者)-手动确认消息{newTask(()=>{var consumerConfig =newConsumerConfig{
            BootstrapServers ="127.0.0.1:9092",
            AutoOffsetReset = AutoOffsetReset.Earliest,
            GroupId ="order",
            EnableAutoCommit =false,};var builder =newConsumerBuilder<string,string>(consumerConfig);var consumer = builder.Build();// 1、订阅
        consumer.Subscribe("create-order");while(true){// 2、消费var result = consumer.Consume();// 3、业务逻辑string key = result.Key;stringvalue= result.Value;

            _logger.LogInformation($"创建商品:Key:{key}");
            _logger.LogInformation($"创建商品:Order:{value}");// 3、手动提交(向kafka确认消息)----偏移量---消息的序号
            consumer.Commit(result);}}).Start();}#endregion
     Console.WriteLine("订单创建监听......");returnnull;}

2.5 消费者监听主题(偏移量)

因为手工确认消息有个缺点:一旦kafka宕机了,就会在下次重启消费者时造成重复消费。
所以需要记录偏移量来避免重复消费。
以下是消费者代码(重置偏移量)

newTask(()=>{var consumerConfig =newConsumerConfig{
         BootstrapServers ="127.0.0.1:9092",
         AutoOffsetReset = AutoOffsetReset.Earliest,
         GroupId ="order",
         EnableAutoCommit =true,};var builder =newConsumerBuilder<string,string>(consumerConfig);using(var consumer = builder.Build()){// 1、订阅
         consumer.Subscribe("create-order");// 1.2、获取偏移量string offset = distributedCache.GetString("create-order");if(string.IsNullOrEmpty(offset)){
             offset ="0";}// 1.3、重置偏移量
         consumer.Assign(newTopicPartitionOffset(newTopicPartition("create-order",0),int.Parse(offset)+1));while(true){// 2、消费var result = consumer.Consume();// 2.1、获取偏移量
             _logger.LogInformation($"订单消息偏移量:Offset:{result.Offset}");// 2.2、把kafka队列中偏移量存起来。redis mysql// 2.3、重置kafka队列的偏移量
             distributedCache.SetString("create-order", result.Offset.Value.ToString());// 3、业务处理string key = result.Key;stringvalue= result.Value;
             _logger.LogInformation($"创建订单:Key:{key}");
             _logger.LogInformation($"创建订单:Order:{value}");// redis缺陷:无法保证偏移和业务同时成功。// 方案:使用数据库来存储偏移量//       核心:通过数据库事务来保证// 3、手动提交(有了自动提交,不需要手工提交)// consumer.Commit(result);}}}).Start();

2.6 消费者监听主题(多消费者)

不同的消费者,对应的GroupId应该要设置为不一样,这样就会单独地消费。需要注意的是,新加入的消费者会从偏移量0开始消费(需要特别处理)
消费者1(订单消费者)代码:

#region 5、订阅发布(广播消费)1、创建订单----2、发送短信-GroupId{newTask(()=>{var consumerConfig =newConsumerConfig{
            BootstrapServers ="127.0.0.1:9092",
            AutoOffsetReset = AutoOffsetReset.Earliest,
            GroupId ="order",
            EnableAutoCommit =true,};var builder =newConsumerBuilder<string,string>(consumerConfig);var consumer = builder.Build();// 1、订阅
        consumer.Subscribe("create-order");// 2、获取偏移量string offset = distributedCache.GetString("create-order");if(string.IsNullOrEmpty(offset)){
            offset ="0";}// 3、重置偏移量
        consumer.Assign(newTopicPartitionOffset(newTopicPartition("create-order",0),int.Parse(offset)+1));while(true){// 2、消费var result = consumer.Consume();// 2.1、获取偏移量
            _logger.LogInformation($"订单消息偏移量:Offset:{result.Offset}");// 3、业务处理string key = result.Key;stringvalue= result.Value;
            _logger.LogInformation($"创建商品:Key:{key}");
            _logger.LogInformation($"创建商品:Order:{value}");// 2.2、把kafka队列中偏移量存起来。redis mysql// 2.3、重置kafka队列的偏移量
            distributedCache.SetString("create-order", result.Offset.Value.ToString());// 3、手动提交//consumer.Commit(result);}}).Start();}#endregion

消费者2(短信消费者)代码:

#region 5、订阅发布(广播消费)1、创建订单----2、发送短信-GroupId{newTask(()=>{var consumerConfig =newConsumerConfig{
        BootstrapServers ="127.0.0.1:9092",
        AutoOffsetReset = AutoOffsetReset.Earliest,
        GroupId ="sms",
        EnableAutoCommit =false,};var builder =newConsumerBuilder<string,string>(consumerConfig);var consumer = builder.Build();// 1、订阅
    consumer.Subscribe("create-order");while(true){// 2、消费var result = consumer.Consume();// 2.1、获取偏移量
        _logger.LogInformation($"订单消息偏移量:Offset:{result.Offset}");// 3、业务处理string key = result.Key;stringvalue= result.Value;
        _logger.LogInformation($"创建商品:Key:{key}");
        _logger.LogInformation($"创建商品:Order:{value}");// 3、手动提交
        consumer.Commit(result);}}).Start();}#endregion

2.7 设置消费者延迟消费(拓展)

案例:如订单如果30分钟之后没支付,就取消订单。
消费者代码:

#region 8、创建订单----1、订单消息延迟处理{newTask(()=>{var consumerConfig =newConsumerConfig{
            BootstrapServers ="127.0.0.1:9092",
            AutoOffsetReset = AutoOffsetReset.Earliest,
            GroupId ="order",
            EnableAutoCommit =false,//FetchMinBytes = 1,//FetchMaxBytes = 3060};var builder =newConsumerBuilder<string,string>(consumerConfig);using(var consumer = builder.Build()){// 1、订阅
            consumer.Subscribe("create-order-1");// 2、偏移量恢复string offset = distributedCache.GetString("create-order-1");if(string.IsNullOrEmpty(offset)){
                offset ="0";}
            consumer.Assign(newTopicPartitionOffset(newTopicPartition("create-order-1",0),int.Parse(offset)+1));while(true){// 1、恢复消息(每隔5秒钟消费一次)newTimer((s)=>{
                    consumer.Resume(newList<TopicPartition>{newTopicPartition("create-order-1",0)});},null, Timeout.Infinite, Timeout.Infinite).Change(5000,5000);// 1.1、消费暂停
                consumer.Pause(newList<TopicPartition>{newTopicPartition("create-order-1",0)});// 2、消费消息var result = consumer.Consume();//批量获取消息,根据-----》字节数try{// 2.1、获取偏移量
                    _logger.LogInformation($"订单消息偏移量:Offset:{result.Offset}");// 3、业务处理string key = result.Key;stringvalue= result.Value;
                    _logger.LogInformation($"创建商品:Key:{key}");
                    _logger.LogInformation($"创建商品:Order:{value}");// 2.2、把kafka队列中偏移量存起来。redis mysql// 2.3、重置kafka队列的偏移量
                    distributedCache.SetString("create-order-1", result.Offset.Value.ToString());// 3、手动提交
                    consumer.Commit(result);}catch(Exception ex){throw;}finally{
                    consumer.Pause(newList<TopicPartition>{newTopicPartition("create-order-1",0)});
                    Console.WriteLine($"暂停消费");}}}}).Start();}#endregion

2.8 生产者-失败重试

生产者使用幂等性属性EnableIdempotence=true,不会造成重复数据
消费者代码:

#region 2、生产者-失败重试{var producerConfig =newProducerConfig{
        BootstrapServers ="127.0.0.1:9092",
        MessageTimeoutMs =50000,
        EnableIdempotence =true// 保证消息:不重复发送,失败重试};var builder =newProducerBuilder<string,string>(producerConfig);using(var producer = builder.Build()){try{var OrderJson = JsonConvert.SerializeObject(orderCreateDto);TopicPartition topicPartition =newTopicPartition("order-create-5",newPartition(0));var dr = producer.ProduceAsync("order-create-5",newMessage<string,string>{ Key ="order", Value = OrderJson }).GetAwaiter().GetResult();
            _logger.LogInformation("发送事件 {0} 到 {1} 成功", dr.Value, dr.TopicPartitionOffset);}catch(ProduceException<string,string> ex){
            _logger.LogError(ex,"发送事件到 {0} 失败,原因 {1} ","order", ex.Error.Reason);}}}#endregion

2.9 生产者-失败重试-多消息发送(使用事务)

消费者代码:

#region 3、生产者-失败重试-多消息发送{var producerConfig =newProducerConfig{
                   BootstrapServers ="127.0.0.1:9092",
                   MessageTimeoutMs =50000,
                   EnableIdempotence =true,
                   TransactionalId = Guid.NewGuid().ToString()};var builder =newProducerBuilder<string,string>(producerConfig);using(var producer = builder.Build()){// 1、初始化事务
                   producer.InitTransactions(TimeSpan.FromSeconds(60));try{var OrderJson = JsonConvert.SerializeObject(orderCreateDto);// 2、开始事务
                       producer.BeginTransaction();for(int i =0; i <100; i++){var dr = producer.ProduceAsync("order-create-5",newMessage<string,string>{ Key ="order", Value = OrderJson }).GetAwaiter().GetResult();
                           _logger.LogInformation("发送事件 {0} 到 {1} 成功", dr.Value, dr.TopicPartitionOffset);}// 3、提交事务
                       producer.CommitTransaction();}catch(ProduceException<string,string> ex){
                       _logger.LogError(ex,"发送事件到 {0} 失败,原因 {1} ","order", ex.Error.Reason);// 4、关闭事务
                       producer.AbortTransaction();}}}#endregion

2.10 生产者不设置分区,将会自动以消息名词为key, 用哈希一致性算法计算出存到哪个分区,然后会一直存到该分区。

消费者代码:

#region 4、生产者-固定分区发送{var producerConfig =newProducerConfig{
         BootstrapServers ="127.0.0.1:9092",
         MessageTimeoutMs =50000};var builder =newProducerBuilder<string,string>(producerConfig);using(var producer = builder.Build()){try{var OrderJson = JsonConvert.SerializeObject(orderCreateDto);//TopicPartition topicPartition = new TopicPartition("order-create", new Partition(0));var dr = producer.ProduceAsync("create-order-1",newMessage<string,string>{ Key ="order", Value = OrderJson }).GetAwaiter().GetResult();
             _logger.LogInformation("发送事件 {0} 到 {1} 成功", dr.Value, dr.TopicPartitionOffset);}catch(ProduceException<string,string> ex){
             _logger.LogError(ex,"发送事件到 {0} 失败,原因 {1} ","order", ex.Error.Reason);}}}#endregion
标签: kafka

本文转载自: https://blog.csdn.net/m0_46291636/article/details/140773171
版权归原作者 m0_46291636 所有, 如有侵权,请联系我们删除。

“消息中间件-Kafka的使用(2)”的评论:

还没有评论