0


SpringBoot整合Kafka消息队列并实现发布订阅和消费

pom依赖 --版本和springboot相关

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

配置文件 yml

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test1
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual
# 提交offset延时(接收到消息后多久提交offset)
#spring.kafka.consumer.auto.commit.interval.ms=10000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消费请求超时时间
spring.kafka.consumer.properties.request.timeout.ms=180000
# 消费端监听的topic不存在时,项目启动会报错(关掉)
spring.kafka.listener.missing-topics-fatal=false

#
spring.kafka.producer.group-id=test1
#spring.kafka.producer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#spring.kafka.producer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

#重试次数
spring.kafka.producer.retries=0
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=1
# 批量大小
spring.kafka.producer.batch-size=16384
# 提交延时
spring.kafka.producer.properties.linger.ms=0
# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了

# 生产端缓冲区大小
spring.kafka.producer.buffer-memory = 33554432

配置发送者

importjava.util.HashMap;importjava.util.Map;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.common.serialization.StringSerializer;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.kafka.annotation.EnableKafka;importorg.springframework.kafka.core.DefaultKafkaProducerFactory;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.kafka.core.ProducerFactory;importorg.springframework.kafka.support.serializer.JsonSerializer;/**e
* @date  2022/03/10
* @author mcy
* @version 1.0.0
*/@Configuration@EnableKafkapublicclassKafkaProducerConfig{@Value("${kafka.producer.servers}")privateString servers;@Value("${kafka.producer.retries}")privateint retries;@Value("${kafka.producer.batch.size}")privateint batchSize;@Value("${kafka.producer.linger}")privateint linger;@Value("${kafka.producer.buffer.memory}")privateint bufferMemory;publicMap<String,Object>producerConfigs(){Map<String,Object> props =newHashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);return props;}publicProducerFactory<String,String>producerFactory(){returnnewDefaultKafkaProducerFactory<>(producerConfigs(),newStringSerializer(),newJsonSerializer<String>());}@BeanpublicKafkaTemplate<String,String>kafkaTemplate(){returnnewKafkaTemplate<>(producerFactory());}}

配置消费者

importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.common.serialization.StringDeserializer;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.kafka.annotation.EnableKafka;importorg.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;importorg.springframework.kafka.config.KafkaListenerContainerFactory;importorg.springframework.kafka.core.ConsumerFactory;importorg.springframework.kafka.core.DefaultKafkaConsumerFactory;importorg.springframework.kafka.listener.ConcurrentMessageListenerContainer;importorg.springframework.kafka.support.serializer.JsonDeserializer;importjava.util.HashMap;importjava.util.Map;/**e
 * @date  2022/03/10
 * @author mcy
 * @version 1.0.0
 */@Configuration@EnableKafkapublicclassKafkaConsumerConfig{@Value("${kafka.consumer.servers}")privateString servers;@Value("${kafka.consumer.enable.auto.commit}")privateboolean enableAutoCommit;@Value("${kafka.consumer.session.timeout}")privateString sessionTimeout;@Value("${kafka.consumer.auto.commit.interval}")privateString autoCommitInterval;@Value("${kafka.consumer.group.id}")privateString groupId;@Value("${kafka.consumer.auto.offset.reset}")privateString autoOffsetReset;@Value("${kafka.consumer.concurrency}")privateint concurrency;@BeanpublicKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,String>>kafkaListenerContainerFactory(){ConcurrentKafkaListenerContainerFactory<String,String> factory =newConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(1500);return factory;}privateConsumerFactory<String,String>consumerFactory(){returnnewDefaultKafkaConsumerFactory<>(consumerConfigs(),newStringDeserializer(),newJsonDeserializer<>(String.class));}privateMap<String,Object>consumerConfigs(){Map<String,Object> propsMap =newHashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);return propsMap;}}

配置生产者监听

importlombok.extern.slf4j.Slf4j;importorg.apache.kafka.clients.producer.ProducerRecord;importorg.apache.kafka.clients.producer.RecordMetadata;importorg.springframework.kafka.support.ProducerListener;importorg.springframework.stereotype.Component;/**e
* @date  2022/03/10
* @author mcy
* @version 1.0.0
*/@Component@Slf4jpublicclassKafkaProducerListenerimplementsProducerListener<String,String>{@OverridepublicvoidonSuccess(ProducerRecord<String,String> producerRecord,RecordMetadata recordMetadata){
        log.info("发送者监听:消息推送成功,推送数据大小为:{}byte;推送内容为:{}",recordMetadata.serializedKeySize(),producerRecord.value());}@OverridepublicvoidonError(ProducerRecord<String,String> producerRecord,RecordMetadata recordMetadata,Exception exception){
        log.error("发送者监听:推送失败{},失败原因",producerRecord.value(),exception.getMessage());}}

配置消费者监听

importlombok.extern.slf4j.Slf4j;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Component;/**e
 * @date  2022/03/10
 * @author mcy
 * @version 1.0.0
 */@Component@Slf4jpublicclassKafkaConsumerListener{@KafkaListener(topics ="test",containerFactory ="kafkaListenerContainnerContainerFactory")publicvoidlistenConsumer(ConsumerRecord<?,?>record){
        log.info("消费者监听:value为:{}",record.value());}}

尝试发送数据到kafka队列上

@AutowiredKafkaTemplate kafkaTemplate;/**
     * 我这里就展示一下我全量查询
     * mysql数据库中的数据,然后一条一条的推送到kafka上吧
     * @return
     */@RequestMapping(value ="/query",method =RequestMethod.GET)publicStringsendEdith(){ProducerRecordrecord=null;List<User> userList = userDataService.overviewQuery();int num =0;try{for(User user: userList 
             ){record=newProducerRecord<String,String>("test",newObjectMapper().writeValueAsString(user));
                kafkaTemplate.send(record);
           log.info("成功推送第{}条数据",++num);try{//这里可以添加线程睡眠控制推送速率Thread.sleep(3000);}catch(InterruptedException e){
                e.printStackTrace();}}}catch(JsonProcessingException e){
            e.printStackTrace();}return"数据全部传输完毕!";}}
这里只是将自己控制层的代码发了出来,
没发出来的部分就是简单的一些查询操作,
自己可以根据自己的业务,做出同样的改变即可,
另外免费的点赞,关注,评论,收藏来点呗,阿里嘎多!!!
标签: 大数据 kafka java

本文转载自: https://blog.csdn.net/qq_45039852/article/details/124094034
版权归原作者 低调小马(mcy) 所有, 如有侵权,请联系我们删除。

“SpringBoot整合Kafka消息队列并实现发布订阅和消费”的评论:

还没有评论