0


kafka消费、生产性能问题分析及优化方法

Kafka生产或消费较慢问题排查及优化

一、kafka消费、生产性能问题分析及优化方法

1.kafkaListner消费较慢

问题分析:将代码逻辑注释掉,直进行拉取数据操作,性能应为每分钟产生消息的2倍以上

private Map<String, AtomicInteger> map =newConcurrentHashMap<>();
map.computeIfAbsent(DateTimeUtils.format(newDate(),"yyyy-MM-dd HH:mm"),
                    key ->newAtomicInteger(0)).incrementAndGet();// 这段代码放到处理一条数据接收后的位置。或者处理完一批数据后的位置将incrementAndGet()换成addAndGet(messageCount)

优化方法:

1.每次读取一批数据减少网络IO

配置文件application.properties中添加配置
spring.kafka.listener.type=batch // 设置批量消费
spring.kafka.consumer.max-poll-records=500 //批量消费每次最多消费多少条消息
spring.kafka.consumer.enable-auto-commit=true // 是否自动提交

2.分区数=concurrency乘listner的个数乘节点数

spring.kafka.listener.concurrency=10 //线程数

3.每批次拉去数据量最大值应按照业务需求来设置

spring.kafka.consumer.max-poll-records=500 //批量消费每次最多消费多少条消息
spring.kafka.consumer.properties.fetch.max.bytes=52428800 //一次fetch请求中从一个broker中取得records的最大大小
spring.kafka.consumer.properties.max.partition.fetch.bytes=1048576 // 一次fetch请求中,从一个partition中取得的records最大大小

2.代码性能慢

问题分析:查看每分钟代码消费数量应为每分钟产生消息的2倍以上

private Map<String, AtomicInteger> map =newConcurrentHashMap<>();
map.computeIfAbsent(DateTimeUtils.format(newDate(),"yyyy-MM-dd HH:mm"),
                    key ->newAtomicInteger(0)).incrementAndGet();// 这段代码放到处理一条数据接收后的位置。或者处理完一批数据后的位置将incrementAndGet()换成addAndGet(messageCount)

优化方法:减少/避免查库操作(缓存起来);减少低性能代码逻辑。

二、Kafka生产者性能差

1.kafkaProducer性能差

问题分析:和消费者一样,查看每分钟生产多少消息到kafka队列中
优化方法:

1.创建线程池去异步发送消息

private final ExecutorService kafkaExecutor =newThreadPoolExecutor(64,64, 
            0L, TimeUnit.MILLISECONDS,newLinkedBlockingQueue<>(8192),newThreadPoolExecutor.DiscardOldestPolicy());//创建线程池
            
kafkaExecutor.execute(()->{try{
                        kafkaProducer.sendMessage(topic, sendMap);}catch(Exception ex){LOGGER.error("Error send Kafka", ex);}});//异步发送消息

2.利用消息缓存,进行批量发送消息,减少网络IO

spring.kafka.producer.batch-size=4096 // 当缓存达到4k时进行批量发送消息
spring.kafka.producer.linger-ms=100 // 当缓存达到100ms进行批量发送消息(这两个参数保证了吞吐量和延时的均衡)
spring.kafka.producer.compression-type=lz4 // 压缩方式
spring.kafka.producer.buffer-memory=33554432 // 缓存区最大值32MB

关系图如下
在这里插入图片描述

2.代码性能问题(和生产者的处理方式一样)

最后注意网络问题,可以ping下kafka的连接串,是否延时较大。

标签: kafka 分布式 java

本文转载自: https://blog.csdn.net/m0_68178610/article/details/130503063
版权归原作者 黑马Y 所有, 如有侵权,请联系我们删除。

“kafka消费、生产性能问题分析及优化方法”的评论:

还没有评论