查看一下压缩策略
bin/kafka-topics.sh --describe --zookeeper xxxx:2181 --topic SHI_TOPIC1
Topic:SHI_TOPIC1 PartitionCount:1 ReplicationFactor:1 Configs:cleanup.policy=compact
Topic: SHI_TOPIC1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Configs:cleanup.policy=compact
:
然后再检查一下自己发送消息的时候是不是没有传
key
问题堆栈信息
org.springframework.kafka.listener.ListenerExecutionFailedException: invokeHandler Failed;
nested exception is java.lang.IllegalStateException: No Acknowledgment available as an argument,
the listener container must have a MANUAL AckMode to populate the Acknowledgment.;
nested exception is java.lang.IllegalStateException:
No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.
问题原因
解决方案
问题堆栈信息
Failed to start bean ‘org.springframework.kafka.config.internalKafkaListenerEndpointRegistry’; nested exception is java.lang.IllegalStateException: Consumer cannot be configured for auto commit for ackMode MANUAL_IMMEDIATE
问题原因
**不能再配置中既配置
kafka.consumer.enable-auto-commit=true
自动提交; 然后又在监听器中使用手动提交**
例如:
kafka.consumer.enable-auto-commit=true
@Autowired
private ConsumerFactory consumerFactory;
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaManualAckListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
//设置提交偏移量的方式 当Acknowledgment.acknowledge()侦听器调用该方法时,立即提交偏移量
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
/**
- 手动ack 提交记录
- @param data
- @param ack
- @throws InterruptedException
*/
@KafkaListener(id = “consumer-id2”,topics = “SHI_TOPIC1”,concurrency = “1”,
clientIdPrefix = “myClientId2”,containerFactory = “kafkaManualAckListenerContainerFactory”)
public void consumer2(String data, Acknowledgment ack) {
log.info(“consumer-id2-手动ack,提交记录,data:{}”,data);
ack.acknowledge();
}
解决方法:
将自动提交关掉,或者去掉手动提交;
如果你想他们都同时存在,某些情况自动提交;某些情况手动提交; 那你创建 一个新的
consumerFactory
将它的是否自动提交设置为false;比如
@Configuration
@EnableKafka
public class KafkaConfig {
@Autowired
private KafkaProperties properties;
/**
- 创建一个新的消费者工厂
- 创建多个工厂的时候 SpringBoot就不会自动帮忙创建工厂了;所以默认的还是自己创建一下
- @return
*/
@Bean
public ConsumerFactory<Object, Object> kafkaConsumerFactory() {
Map<String, Object> map = properties.buildConsumerProperties();
DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>( map);
return factory;
}
/**
- 创建一个新的消费者工厂
- 但是修改为不自动提交
- @return
*/
@Bean
public ConsumerFactory<Object, Object> kafkaManualConsumerFactory() {
Map<String, Object> map = properties.buildConsumerProperties();
map.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>( map);
return factory;
}
/**
- 手动提交的监听器工厂 (使用的消费组工厂必须 kafka.consumer.enable-auto-commit = false)
- @return
*/
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaManualAckListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaManualConsumerFactory());
//设置 《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》无偿开源 威信搜索公众号【编程进阶路】 提交偏移量的方式 当Acknowledgment.acknowledge()侦听器调用该方法时,立即提交偏移量
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
版权归原作者 AK774S 所有, 如有侵权,请联系我们删除。