说明
本示例只配置了Consumer没有配置Producer,可参考配置文件_1中注释内容部分
1.引入依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
2.yml配置
spring:
kafka:
one:
#测试环境
bootstrap-servers:127.0.0.1:9092
topic: default_topic
properties:
security:
protocol: SASL_PLAINTEXT
sasl:
mechanism: SCRAM-SHA-512
jaas:
config:org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";
consumer:
# 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
group-id: defaultName
#关闭自动提交
enable-auto-commit:false
#重置消费者的offset
# smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
auto-offset-reset: latest
#key value 的反序列化
key-deserializer:org.apache.kafka.common.serialization.StringDeserializer
value-deserializer:org.apache.kafka.common.serialization.StringDeserializer
max-poll-records:5
two:
#测试环境
bootstrap-servers:127.0.0.1:9092
topic: default_topic_two
consumer:
# 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
group-id: defaultName_two
#关闭自动提交
enable-auto-commit:false
#重置消费者的offset
# smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
auto-offset-reset: latest
#key value 的反序列化
key-deserializer:org.apache.kafka.common.serialization.StringDeserializer
value-deserializer:org.apache.kafka.common.serialization.StringDeserializer
max-poll-records:5
3.新建配置文件
3.1配置文件_1
@Configuration@EnableKafkapublicclassK1kafkaConfiguration{@Value("${spring.kafka.one.bootstrap-servers}")privateString bootstrapServers;@Value("${spring.kafka.one.consumer.group-id}")privateString groupId;@Value("${spring.kafka.one.consumer.enable-auto-commit}")privateString enableAutoCommit;@Value("${spring.kafka.one.consumer.auto-offset-reset}")privateString autoOffsetReset;@Value("${spring.kafka.one.consumer.max-poll-records}")privateString maxPollRecords;@Value("${spring.kafka.one.properties.security.protocol}")privateString securityprotocol;@Value("${spring.kafka.one.properties.sasl.mechanism}")privateString mechanism;@Value("${spring.kafka.one.properties.sasl.jaas.config}")privateString jaasconfig;//@Value("${spring.kafka.one.producer.linger-ms}")//private Integer lingerMs;//@Value("${spring.kafka.one.producer.max-request-size}")//private Integer maxRequestSize;//@Value("${spring.kafka.one.producer.batch-size}")//private Integer batchSize;//@Value("${spring.kafka.one.producer.buffer-memory}")//private Integer bufferMemory;//@Bean//public KafkaTemplate<String, String> kafkaOneTemplate() {// return new KafkaTemplate<>(producerFactory());//}@Bean@Primary//理解为默认优先选择当前容器下的消费者工厂KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer,String>>kafkaListenerContainerFactory(){ConcurrentKafkaListenerContainerFactory<Integer,String> factory =newConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());//并发数量
factory.setConcurrency(1);//开启批量监听//factory.setBatchListener(type);// 被过滤的消息将被丢弃// factory.setAckDiscarded(true);
factory.getContainerProperties().setPollTimeout(3000);//设置手动提交ackMode
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setMissingTopicsFatal(false);// 设置记录筛选策略//factory.setRecordFilterStrategy(new RecordFilterStrategy() {// @Override// public boolean filter(ConsumerRecord consumerRecord) {// String msg = consumerRecord.value().toString();// if(Integer.parseInt(msg.substring(msg.length() - 1)) % 2 == 0){// return false;// }// // 返回true消息将会被丢弃// return true;// }//});return factory;}//private ProducerFactory<String, String> producerFactory() {// return new DefaultKafkaProducerFactory<>(producerConfigs());//}@Bean//第一个消费者工厂的beanpublicConsumerFactory<Integer,String>consumerFactory(){returnnewDefaultKafkaConsumerFactory<>(consumerConfigs());}//private Map<String, Object> producerConfigs() {// Map<String, Object> props = new HashMap<>();// props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);// props.put(ProducerConfig.LINGER_MS_CONFIG,lingerMs);// props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize);// props.put(ProducerConfig.BATCH_SIZE_CONFIG,batchSize);// props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,bufferMemory);// props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// return props;//}@BeanpublicMap<String,Object>consumerConfigs(){Map<String,Object> props =newHashMap<>();
props.put("security.protocol",securityprotocol);
props.put("sasl.mechanism",mechanism);
props.put("sasl.jaas.config",jaasconfig);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,enableAutoCommit);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,autoOffsetReset);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxPollRecords);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);return props;}
3.2配置文件_2
@Configuration@EnableKafkapublicclassK2kafkaConfiguration{@Value("${spring.kafka.two.bootstrap-servers}")privateString bootstrapServers;@Value("${spring.kafka.two.consumer.group-id}")privateString groupId;@Value("${spring.kafka.two.consumer.enable-auto-commit}")privateString enableAutoCommit;@Value("${spring.kafka.two.consumer.auto-offset-reset}")privateString autoOffsetReset;@Value("${spring.kafka.two.consumer.max-poll-records}")privateString maxPollRecords;@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer,String>>kafkaTwoContainerFactory(){ConcurrentKafkaListenerContainerFactory<Integer,String> factory =newConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());//并发数量
factory.setConcurrency(1);//开启批量监听//factory.setBatchListener(type);// 被过滤的消息将被丢弃// factory.setAckDiscarded(true);
factory.getContainerProperties().setPollTimeout(3000);//设置手动提交ackMode
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setMissingTopicsFatal(false);return factory;}publicConsumerFactory<Integer,String>consumerFactory(){returnnewDefaultKafkaConsumerFactory<>(consumerConfigs());}privateMap<String,Object>consumerConfigs(){Map<String,Object> props =newHashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,enableAutoCommit);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,autoOffsetReset);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxPollRecords);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);return props;}
4.设置消费
4.1 设置消费_1
@Component@Slf4j(topic ="KAFKALOG")publicclassConsumer{@KafkaListener(topics ="#{'${spring.kafka.one.topic}'}", groupId ="defaultName",containerFactory ="kafkaListenerContainerFactory")publicvoidlistenGroup(ConsumerRecord<String,String>record,Acknowledgment ack){
log.info("[Consumer] 接收到kafka消息:{}",record.value());System.out.println(record);System.out.println(record.value());//手动提交offset//ack.acknowledge();}
4.2 设置消费_2
@Component@Slf4j(topic ="KAFKALOG")publicclassConsumer2{@KafkaListener(topics ="#{'${spring.kafka.two.topic}'}", groupId ="defaultName_two",containerFactory ="kafkaTwoContainerFactory")publicvoidlistenGroup(ConsumerRecord<String,String>record,Acknowledgment ack){
log.info("[Consumer2 ] 接收到kafka消息:{}",record.value());System.out.println(record);System.out.println(record.value());//手动提交offset//ack.acknowledge();}
标签:
spring boot
kafka
本文转载自: https://blog.csdn.net/ai_haibin/article/details/131436216
版权归原作者 听者listener 所有, 如有侵权,请联系我们删除。
版权归原作者 听者listener 所有, 如有侵权,请联系我们删除。