0


kafka消费者接收不到消息

背景:

对kafka消息进行监听,生产者发了消息,但是消费端没有接到消息,监听代码

消费端,kafka配置

spring.kafka.bootstrap-servers=kafka.cestc.dmp:9591

spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="Kafka#Cestc2021";

spring.kafka.properties.security.protocol=SASL_PLAINTEXT

spring.kafka.properties.sasl.mechanism=SCRAM-SHA-256

#=============== provider =======================

spring.kafka.producer.retries=0

每次批量发送消息的数量

spring.kafka.producer.batch-size=16384

spring.kafka.producer.buffer-memory=33554432

指定消息key和消息体的编解码方式

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer =======================

指定默认消费者group id

spring.kafka.consumer.group-id=dq

spring.kafka.consumer.auto-offset-reset=latest

spring.kafka.consumer.enable-auto-commit=true

spring.kafka.consumer.auto-commit-interval=100

指定消息key和消息体的编解码方式

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

@KafkaListener(groupId = "${spring.kafka.consumer.group-id:dq}",topics = {"t_dq_rwzt_topic"})
public ReturnT<String> listenKafka2(String records, Acknowledgment ack) {

}

offset explorer发现生产者发送了消息,offset是0

问题解决:

后来查看生产者kafka配置,发现他们的enable-auto-commit是false:

spring.kafka.consumer.enable-auto-commit=false

修改kafka配置

spring.kafka.consumer.enable-auto-commit=false

在侦听器容器中运行的线程数

spring.kafka.listener.concurrency=5

listner负责ack,每调用commit方法,立即向服务器提交

spring.kafka.listener.ack-mode=manual_immediate

标签: kafka 分布式

本文转载自: https://blog.csdn.net/s07aser123/article/details/136350035
版权归原作者 爱晒太阳的小老鼠 所有, 如有侵权,请联系我们删除。

“kafka消费者接收不到消息”的评论:

还没有评论