kafka 配置类
用途:定义使用的基本 kafka 配置,以及定义Bean
下面文件是读取本地 spring 的标准配置文件的类,用于一般属性获取等操作
importlombok.Data;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.common.serialization.StringDeserializer;importorg.springframework.boot.context.properties.ConfigurationProperties;importorg.springframework.context.annotation.Bean;importorg.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;importorg.springframework.kafka.config.KafkaListenerContainerFactory;importorg.springframework.kafka.core.ConsumerFactory;importorg.springframework.kafka.core.DefaultKafkaConsumerFactory;importorg.springframework.kafka.listener.ConcurrentMessageListenerContainer;importorg.springframework.kafka.listener.ContainerProperties;importorg.springframework.stereotype.Component;importjava.util.HashMap;importjava.util.Map;@Component@ConfigurationProperties(prefix ="my.kafka")@DatapublicclassMyTaskKafkaProperties{/**r
* kafka地址
*/privateString serverUrl;/**
* groupId
*/privateString groupId;/**
* topic
*/privateString topic;privateboolean enableAutoCommit;privateString autoOffsetReset;@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer,String>>kafkaTwoContainerFactory(){ConcurrentKafkaListenerContainerFactory<Integer,String> factory =newConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(6);
factory.getContainerProperties().setPollTimeout(6000);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}privateConsumerFactory<Integer,String>consumerFactory(){returnnewDefaultKafkaConsumerFactory<>(consumerConfigs());}privateMap<String,Object>consumerConfigs(){Map<String,Object> props =newHashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);return props;}}
@Data 为其他用于控制get set 方法的,与 此处配置不是强关联,可以没有
实际 kafka 监听消费
importcom.dtdream.dthink.dtalent.dmall.openplat.service.opendata.OpenDataService;importlombok.extern.slf4j.Slf4j;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.autoconfigure.condition.ConditionalOnProperty;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.kafka.support.Acknowledgment;importorg.springframework.kafka.support.KafkaHeaders;importorg.springframework.messaging.handler.annotation.Header;importorg.springframework.stereotype.Component;importjava.util.Optional;@Slf4j@ConditionalOnProperty(name ="my.kafka.enable", havingValue ="true")@ComponentpublicclassMyTaskConsumer{@AutowiredprivateXxxxxService xxxxxService;@KafkaListener(topics ="${my.kafka.topic}", groupId ="${my.kafka.groupId}",
containerFactory ="kafkaTwoContainerFactory")publicvoiddxpTaskEnd(ConsumerRecord<String,String> record,Acknowledgment ack,@Header(KafkaHeaders.RECEIVED_TOPIC)String topic){consume(record, ack, topic, msg -> xxxxxService.xxxxxxx(msg));}privatevoidconsume(ConsumerRecord<String,String> record,Acknowledgment ack,String topic,java.util.function.Consumer<String> consumer){Optional<String> optional =Optional.ofNullable(record.value());if(!optional.isPresent()){
log.warn("kafka收到消息 但为空,record:{}", record);return;}String msg = optional.get();
log.info("kafka收到消息 开始消费 topic:{},msg:{}", topic, msg);try{
consumer.accept(msg);// 上面方法执行成功后手动提交
ack.acknowledge();
log.info("kafka收到消息消费成功 topic:{},msg:{}", topic, msg);}catch(Exception e){
log.error("kafka消费消息失败 topic:{},msg:{}", topic, msg, e);}}}
@ConditionalOnProperty spring boot 用于判断当前类是否加载的条件
XxxxxService: 为我们的业务服务层,用于消费消息
版权归原作者 极光雨雨 所有, 如有侵权,请联系我们删除。