Kafka生产或消费较慢问题排查及优化
一、kafka消费、生产性能问题分析及优化方法
1.kafkaListner消费较慢
问题分析:将代码逻辑注释掉,直进行拉取数据操作,性能应为每分钟产生消息的2倍以上
private Map<String, AtomicInteger> map =newConcurrentHashMap<>();
map.computeIfAbsent(DateTimeUtils.format(newDate(),"yyyy-MM-dd HH:mm"),
key ->newAtomicInteger(0)).incrementAndGet();// 这段代码放到处理一条数据接收后的位置。或者处理完一批数据后的位置将incrementAndGet()换成addAndGet(messageCount)
优化方法:
1.每次读取一批数据减少网络IO
配置文件application.properties中添加配置
spring.kafka.listener.type=batch // 设置批量消费
spring.kafka.consumer.max-poll-records=500 //批量消费每次最多消费多少条消息
spring.kafka.consumer.enable-auto-commit=true // 是否自动提交
2.分区数=concurrency乘listner的个数乘节点数
spring.kafka.listener.concurrency=10 //线程数
3.每批次拉去数据量最大值应按照业务需求来设置
spring.kafka.consumer.max-poll-records=500 //批量消费每次最多消费多少条消息
spring.kafka.consumer.properties.fetch.max.bytes=52428800 //一次fetch请求中从一个broker中取得records的最大大小
spring.kafka.consumer.properties.max.partition.fetch.bytes=1048576 // 一次fetch请求中,从一个partition中取得的records最大大小
2.代码性能慢
问题分析:查看每分钟代码消费数量应为每分钟产生消息的2倍以上
private Map<String, AtomicInteger> map =newConcurrentHashMap<>();
map.computeIfAbsent(DateTimeUtils.format(newDate(),"yyyy-MM-dd HH:mm"),
key ->newAtomicInteger(0)).incrementAndGet();// 这段代码放到处理一条数据接收后的位置。或者处理完一批数据后的位置将incrementAndGet()换成addAndGet(messageCount)
优化方法:减少/避免查库操作(缓存起来);减少低性能代码逻辑。
二、Kafka生产者性能差
1.kafkaProducer性能差
问题分析:和消费者一样,查看每分钟生产多少消息到kafka队列中
优化方法:
1.创建线程池去异步发送消息
private final ExecutorService kafkaExecutor =newThreadPoolExecutor(64,64,
0L, TimeUnit.MILLISECONDS,newLinkedBlockingQueue<>(8192),newThreadPoolExecutor.DiscardOldestPolicy());//创建线程池
kafkaExecutor.execute(()->{try{
kafkaProducer.sendMessage(topic, sendMap);}catch(Exception ex){LOGGER.error("Error send Kafka", ex);}});//异步发送消息
2.利用消息缓存,进行批量发送消息,减少网络IO
spring.kafka.producer.batch-size=4096 // 当缓存达到4k时进行批量发送消息
spring.kafka.producer.linger-ms=100 // 当缓存达到100ms进行批量发送消息(这两个参数保证了吞吐量和延时的均衡)
spring.kafka.producer.compression-type=lz4 // 压缩方式
spring.kafka.producer.buffer-memory=33554432 // 缓存区最大值32MB
关系图如下
2.代码性能问题(和生产者的处理方式一样)
最后注意网络问题,可以ping下kafka的连接串,是否延时较大。
版权归原作者 黑马Y 所有, 如有侵权,请联系我们删除。