0


Kafka 批量消费

业务背景
项目有个需求需要统计IM聊天相关数据,原设计思想是在聊天产生时通过消息队列进行数据记录,利用rocketMQ实现。上线后发现由于内部IM活跃用户量级较大,MQ生产者生产消息过多,消费者实时消费会造成服务器CPU和硬盘读写压力,在改不了硬件配置的情况下,笔者通过了解到kafka批量消费的实现可解决这个问题,记录下该方案。

环境

kafka、Springboot、JDK8

依赖

使用的是Springboot v2.1.5.RELEASE版本,pom依赖如下:

<!-- kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.6.RELEASE</version></dependency>

配置文件

生产者配置

核心配置是:

#设置是否批量消费,默认 single(单条),batch(批量)
spring.kafka.listener.type=batch
# 手动
spring.kafka.listener.ack-mode=manual_immediate
# 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
spring.kafka.producer.properties.linger.ms=10000

单条消费和提交有时候会影响性能,spring-kafka提供了批量拉取数据和手动提交的策略

#设置是否批量消费,默认 single(单条),batch(批量)
spring.kafka.listener.type=batch
# 手动
spring.kafka.listener.ack-mode=manual_immediate
# 集群地址
spring.kafka.bootstrap-servers=192.168.2.135:9092
# 重试次数
spring.kafka.producer.retries=3
# 应答级别
# acks=0 把消息发送到kafka就认为发送成功
# acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
# acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
spring.kafka.producer.acks=all
# 批量处理的最大大小 单位 byte
spring.kafka.producer.batch-size=4096
# 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
spring.kafka.producer.buffer-memory=33554432
# 客户端ID
spring.kafka.producer.client-id=im-kafka
# Key 序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
# Value 序列化类
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 消息压缩:none、lz4、gzip、snappy,默认为 none。
spring.kafka.producer.compression-type=gzip
# 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
spring.kafka.producer.properties.linger.ms=1000
# KafkaProducer.send() 和 partitionsFor() 方法的最长阻塞时间 单位 ms
spring.kafka.producer.properties.max.block.ms=6000

消费者配置

核心配置是:

kafka:listener:# 手动ack-mode: manual_immediate
      #设置是否批量消费,默认 single(单条),batch(批量)type: batch
      # 自动提交 offset 默认 trueenable-auto-commit:false# 批量消费最大数量max-poll-records:100

在配置文件中关闭自动提交,开启手动提交和批量消费就可以批量消费了,但是最后需要手动提交offset

kafka:listener:# 手动ack-mode: manual_immediate
      #设置是否批量消费,默认 single(单条),batch(批量)type: batch
      # 集群地址bootstrap-servers: 192.168.2.135:9092# 消费者配置consumer:# 默认消费者组group-id: imStatisticsConsumerGroup
      # 自动提交 offset 默认 trueenable-auto-commit:false# 自动提交的频率 单位 msauto-commit-interval:1000# 批量消费最大数量max-poll-records:100# Key 反序列化类key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # Value 反序列化类value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 当kafka中没有初始offset或offset超出范围时将自动重置offset# earliest:重置为分区中最小的offset# latest:重置为分区中最新的offset(消费分区中新产生的数据)# none:只要有一个分区不存在已提交的offset,就抛出异常auto-offset-reset: latest
      properties:session:timeout:# session超时,超过这个时间consumer没有发送心跳,就会触发rebalance操作ms:120000request:timeout:# 请求超时ms:120000

生产者端代码

publicvoidsendToImStatistics(List<ImChatStatistics> statistics){
        kafkaTemplate.send(KAFKA_IM_CHAT_STATISTICS,JsonUtils.toString(statistics));}

消费者端代码

@KafkaListener(topics ={"imChatStatistics"}, groupId ="{imStatisticsConsumerGroup}")publicvoidlisten(List<ConsumerRecord<String,String>> consumerRecords,Acknowledgment acknowledgment){try{if(CollectionUtils.isEmpty(consumerRecords)){return;}LogUtils.info("KafkaImStatisticsListener 处理推送消息[data大小: {}]", consumerRecords.size());List<ImChatStatistics> totalList =newArrayList<>();for(ConsumerRecord<String,String> consumerRecord : consumerRecords){List<ImChatStatistics> list =JSON.parseArray(consumerRecord.value(),ImChatStatistics.class);
                list.stream().forEach(item ->{
                    item.setWeek(DateUtils.getWeek(item.getDate()));});
                totalList.addAll(list);}
            imChatStatisticsMapper.batchInsertOrUpdate(totalList);// 手动提交offset
            acknowledgment.acknowledge();}catch(Exception e){LogUtils.error("ImChartConsumer 消息消费失败 :"+ e.getMessage(), e);}}
标签: kafka java springboot

本文转载自: https://blog.csdn.net/weixin_47648825/article/details/128021223
版权归原作者 GalenYi 所有, 如有侵权,请联系我们删除。

“Kafka 批量消费”的评论:

还没有评论