0


spring boot 之 整合 kafka

spring boot

版本:

2.3.12.RELEASE
MySQL

版本:

8.0
Kafka

版本:

kafka_2.13-2.5.0

本次示例流程:

MySQL

中有数据表一张

student

,两个字段(

id、name

),随机插入100条数据,然后从

MySQL

查询数据,发送数据到

Kafka

,并消费

Kafka

.

pom

依赖

<!--spring-boot-starter--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!--spring-boot-starter-web--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- spring-kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.26</version></dependency><!--fastjson--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.32</version></dependency><!--HuTool-all--><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.26</version></dependency><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>33.2.0-jre</version><scope>compile</scope></dependency><!-- gson --><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.10.1</version></dependency><!--Mybatis-Plus 注意版本--><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.1</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.2.16</version></dependency><!-- mysql-connector-j --><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><version>8.0.33</version></dependency><!--钉钉推送依赖--><dependency><groupId>com.aliyun</groupId><artifactId>alibaba-dingtalk-service-sdk</artifactId><version>2.0.0</version></dependency>

application.yml

配置

server:port:8899spring:application:name: kafkaDemo
  # `kafka`的大部分配置写在了配置类里,可按个人需求写在`application.yml`,然后注入配置类。kafka:bootstrap-servers: localhost:9092listener:type: batch
  datasource:type: com.alibaba.druid.pool.DruidDataSource
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/demo?allowMultiQueries=true&serverTimezone=Asia/Shanghai&useSSL=falseusername: root
    password:123456mybatis-plus:# 实体类Mapper.xml文件所在包mapper-locations: classpath:mapper/*Mapper.xml# 指定 MyBatis 别名包扫描路径,用于给包中的类注册别名。注册后,在 Mapper 对应的 XML 文件中可以直接使用类名,无需使用全限定类名。type-aliases-package: com.sun.pojo
  configuration:# 开启驼峰映射,A_COLUMN > aColumnmap-underscore-to-camel-case:true# 是否允许单条sql 返回多个数据集multiple-result-sets-enabled:true# mybatis-plus的日志输出到控制台log-impl: org.apache.ibatis.logging.stdout.StdOutImpl

配置类

kafka

的大部分配置写在了配置类里,可按个人需求写在

application.yml

,然后注入配置类。
线程池配置,可参考之前文章,点击跳转
发送消息钉钉,模拟异常通知,可参考之前文章,点击跳转

kafka

常量

/**
 * topic、消费组 常量
 */publicclassKafkaConstant{/**
     *  测试topic
     */publicstaticfinalStringKAFKA_TOPIC="student_topic";/**
     * 消费者ID
     */publicstaticfinalStringID="student_group_id";/**
     * 消费组ID
     */publicstaticfinalStringGROUP_ID="student_group";}

kafka

生产者配置

importlombok.extern.slf4j.Slf4j;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.clients.producer.ProducerRecord;importorg.apache.kafka.clients.producer.RecordMetadata;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.boot.SpringBootConfiguration;importorg.springframework.context.annotation.Bean;importorg.springframework.kafka.core.DefaultKafkaProducerFactory;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.kafka.core.ProducerFactory;importorg.springframework.kafka.support.ProducerListener;importorg.springframework.kafka.support.serializer.JsonSerializer;importjava.util.HashMap;importjava.util.Map;@Slf4j@SpringBootConfigurationpublicclassKafkaProducerConfig{@Value("${spring.kafka.bootstrap-servers}")privateString bootstrapServers;@BeanpublicMap<String,Object>producerConfigs(){Map<String,Object> props =newHashMap<String,Object>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);/*
        acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
        acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
        acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
        开启事务必须设为all
         */
        props.put(ProducerConfig.ACKS_CONFIG,"all");/*
        发生错误后,消息重发的次数,开启事务必须大于0
         */
        props.put(ProducerConfig.RETRIES_CONFIG,10);/*
        当多个消息发送到相同分区时,生产者会将消息打包到一起,以减少请求交互. 而不是一条条发送
        批次的大小可以通过batch.size 参数设置默认是16KB
        较小的批次大小有可能降低吞吐量(批次大小为0则完全禁用批处理)。
        比如说,kafka里的消息5秒钟Batch才凑满了16KB,才能发送出去。那这些消息的延迟就是5秒钟
        实测batchSize这个参数没有用
         */
        props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);/*
        有的时刻消息比较少,过了很久,比如5min也没有凑够16KB,这样延时就很大,
        所以需要一个参数. 再设置一个时间,到了这个时间,即使数据没达到16KB,也将这个批次发送出去
         */
        props.put(ProducerConfig.LINGER_MS_CONFIG,50);/*
        生产者内存缓冲区的大小
         */
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);/*
        生产者空间不足时,send()被阻塞的时间,默认60s
         */
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG,6000);/*
        消息的最大大小限制,也就是说send的消息大小不能超过这个限制, 默认1048576(1MB)
         */
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,1048576);/*
        压缩消息,支持四种类型,分别为:none、lz4、gzip、snappy,默认为none。
        消费者默认支持解压,所以压缩设置在生产者,消费者无需设置。
         */
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"none");
        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"none");/*
        反序列化,和生产者的序列化方式对应
         */
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,JsonSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,JsonSerializer.class);return props;}/**
     * Kafka 提供了 ProducerListener 监听器来异步监听生产者消息是否发送成功,
     * 我们可以自定义一个 kafkaTemplate 添加 ProducerListener,
     * 当消息发送失败我们可以拿到消息进行重试或者把失败消息记录到数据库定时重试或者以发送钉钉的形式提醒
     * 注意:当发送一条消息,既会走 ListenableFutureCallback 回调,也会走ProducerListener回调。
     */@BeanpublicKafkaTemplate<String,Object>kafkaTemplate(){KafkaTemplate<String,Object> kafkaTemplate =newKafkaTemplate<String,Object>(producerFactory());
        kafkaTemplate.setProducerListener(newProducerListener<String,Object>(){@OverridepublicvoidonSuccess(ProducerRecord<String,Object> producerRecord,RecordMetadata recordMetadata){
                log.info("发送成功 -- topic:{}, partition:{}, key:{}, value:{}, timestamp:{}, headers:{}", producerRecord.topic(), producerRecord.partition(), producerRecord.key(), producerRecord.value(), producerRecord.timestamp(), producerRecord.headers());}@OverridepublicvoidonError(ProducerRecord<String,Object> producerRecord,Exception exception){
                log.error("发送失败 -- topic:{}, partition:{}, key:{}, value:{}, timestamp:{}, headers:{}", producerRecord.topic(), producerRecord.partition(), producerRecord.key(), producerRecord.value(), producerRecord.timestamp(), producerRecord.headers());
                log.error("发送失败 -- exception:{}", exception.getMessage());}});return kafkaTemplate;}@BeanpublicProducerFactory<String,Object>producerFactory(){DefaultKafkaProducerFactory<String,Object> factory =newDefaultKafkaProducerFactory<>(producerConfigs());return factory;}}

kafka

消费者配置

importcn.hutool.core.util.ObjectUtil;importcom.alibaba.fastjson.JSONObject;importcom.sun.config.SendMessageToDing;importcom.sun.kafka.constant.KafkaConstant;importlombok.extern.slf4j.Slf4j;importorg.apache.kafka.clients.consumer.Consumer;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.boot.SpringBootConfiguration;importorg.springframework.context.annotation.Bean;importorg.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;importorg.springframework.kafka.core.ConsumerFactory;importorg.springframework.kafka.core.DefaultKafkaConsumerFactory;importorg.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;importorg.springframework.kafka.listener.ContainerProperties;importorg.springframework.kafka.listener.ListenerExecutionFailedException;importorg.springframework.kafka.listener.adapter.RecordFilterStrategy;importorg.springframework.kafka.support.serializer.JsonDeserializer;importorg.springframework.messaging.Message;importjava.util.HashMap;importjava.util.Map;/**
 * kafka配置,也可以写在yml,这个文件会覆盖yml
 */@Slf4j@SpringBootConfigurationpublicclassKafkaConsumerConfig{@Value("${spring.kafka.bootstrap-servers}")privateString bootstrapServers;@BeanpublicMap<String,Object>consumerConfigs(){Map<String,Object> propsMap =newHashMap<String,Object>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG,KafkaConstant.GROUP_ID);//是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);//自动提交的时间间隔,自动提交开启时生效
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"2000");//该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理://earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录//latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录)//none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");//两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
        propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,"60000");//这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。//这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息,//如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,//然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。//要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数//注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,10);//当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"120000");// 消费请求的超时时间
        propsMap.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,"18000");//序列化(建议使用Json,这种序列化方式可以无需额外配置传输实体类)
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);return propsMap;}@BeanpublicConsumerFactory<Object,Object>consumerFactory(){//配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要try(JsonDeserializer<Object> deserializer =newJsonDeserializer<>()){
            deserializer.trustedPackages("*");returnnewDefaultKafkaConsumerFactory<>(consumerConfigs(),newJsonDeserializer<>(), deserializer);}}/**
     * 异常处理器,当监听抛出异常的时候,则会自动调用异常处理器
     */@BeanpublicConsumerAwareListenerErrorHandlerconsumerAwareErrorHandler(){returnnewConsumerAwareListenerErrorHandler(){@OverridepublicObjecthandleError(Message<?> message,ListenerExecutionFailedException exception,Consumer<?,?> consumer){SendMessageToDing.sendMessage(String.format("消息消费异常,message: %s,exception: %s", message.getPayload(), exception.getMessage()));
                log.error("消息消费异常,message:{},exception:{}", message.getPayload(), exception.getMessage());returnnull;}};}/**
     * 批量消费 消息监听
     * 消息过滤器,在消息抵达consumer之前被拦截
     */@Bean("concurrentKafkaListenerContainerFactory")publicConcurrentKafkaListenerContainerFactory<String,Object>concurrentKafkaListenerContainerFactory(){ConcurrentKafkaListenerContainerFactory<String,Object> factory =newConcurrentKafkaListenerContainerFactory<String,Object>();
        factory.setConsumerFactory(this.consumerFactory());//在侦听器容器中运行的线程数,一般设置为 机器数*分区数
        factory.setConcurrency(3);//设置为批量监听,需要用List接收
        factory.setBatchListener(true);//消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误
        factory.setMissingTopicsFatal(false);//自动提交关闭,需要设置手动消息确认
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);//两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
        factory.getContainerProperties().setPollTimeout(600000);//被过滤的消息将被丢弃
        factory.setAckDiscarded(true);//消息过滤策略,此处模拟id为奇数的消息过滤掉
        factory.setRecordFilterStrategy(newRecordFilterStrategy(){@Overridepublicbooleanfilter(ConsumerRecord consumerRecord){JSONObject json =JSONObject.parseObject(consumerRecord.value().toString());if(ObjectUtil.isNotEmpty(json)){Integer id = json.getInteger("id");return id %2!=0;}returntrue;}});return factory;}/**
     * 逐条消费 消息监听
     * 消息过滤器,在消息抵达consumer之前被拦截
     */@BeanpublicConcurrentKafkaListenerContainerFactory<String,Object>kafkaListenerContainerFactory(){ConcurrentKafkaListenerContainerFactory<String,Object> factory =newConcurrentKafkaListenerContainerFactory<String,Object>();
        factory.setConsumerFactory(consumerFactory());//在侦听器容器中运行的线程数,一般设置为 机器数*分区数
        factory.setConcurrency(3);//消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误
        factory.setMissingTopicsFatal(false);//自动提交关闭,需要设置手动消息确认
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);//两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
        factory.getContainerProperties().setPollTimeout(600000);//设置为批量监听,需要用List接收
        factory.setBatchListener(false);//被过滤的消息将被丢弃
        factory.setAckDiscarded(true);//消息过滤策略,此处模拟id为奇数的消息过滤掉
        factory.setRecordFilterStrategy(newRecordFilterStrategy(){@Overridepublicbooleanfilter(ConsumerRecord consumerRecord){JSONObject json =JSONObject.parseObject(consumerRecord.value().toString());if(ObjectUtil.isNotEmpty(json)){Integer id = json.getInteger("id");return id %2!=0;}returntrue;}});return factory;}}

发送消息封装

importcn.hutool.core.util.ObjectUtil;importcom.sun.config.SendMessageToDing;importcom.sun.kafka.constant.KafkaConstant;importlombok.extern.slf4j.Slf4j;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.core.KafkaOperations;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.kafka.support.SendResult;importorg.springframework.stereotype.Component;importorg.springframework.util.concurrent.ListenableFutureCallback;/**
 * 两种发送消息后带有回调的写法:addCallback 和 ListenableFutureCallback
 * 且分别提供了:
 * 1、直接发送消息;
 * 2、发送消息时带上key;
 */@Slf4j@ComponentpublicclassMessageProducer{@AutowiredprivateKafkaTemplate<String,Object> kafkaTemplate;publicvoidsendSuccessCallbackMessage(Object message){
        kafkaTemplate.send(KafkaConstant.KAFKA_TOPIC, message).addCallback(success ->{if(ObjectUtil.isNotEmpty(success)){// 消息发送到的topicString topic = success.getRecordMetadata().topic();// 消息发送到的分区int partition = success.getRecordMetadata().partition();// 消息在分区内的offsetlong offset = success.getRecordMetadata().offset();
                log.info("回调发送成功,topic: {}, partition: {}, offset: {}", topic, partition, offset);}}, throwable ->{SendMessageToDing.sendMessage(String.format("回调发送失败: %s", throwable.getMessage()));
            log.error("回调发送失败:{}", throwable.getMessage());});}publicvoidsendSuccessCallbackMessage(String key,Object message){
        kafkaTemplate.send(KafkaConstant.KAFKA_TOPIC, key, message).addCallback(success ->{if(ObjectUtil.isNotEmpty(success)){// 消息发送到的topicString topic = success.getRecordMetadata().topic();// 消息发送到的分区int partition = success.getRecordMetadata().partition();// 消息在分区内的offsetlong offset = success.getRecordMetadata().offset();
                log.info("回调发送成功,topic: {}, partition: {}, offset: {}", topic, partition, offset);}}, throwable ->{SendMessageToDing.sendMessage(String.format("回调发送失败: %s", throwable.getMessage()));
            log.error("回调发送失败:{}", throwable.getMessage());});}publicvoidsendListenableFutureCallbackMessage(Object message){
        kafkaTemplate.send(KafkaConstant.KAFKA_TOPIC, message).addCallback(newListenableFutureCallback<SendResult<String,Object>>(){@OverridepublicvoidonSuccess(SendResult<String,Object> result){if(ObjectUtil.isNotEmpty(result)){// 消息发送到的topicString topic = result.getRecordMetadata().topic();// 消息发送到的分区int partition = result.getRecordMetadata().partition();// 消息在分区内的offsetlong offset = result.getRecordMetadata().offset();
                    log.info("回调发送成功,topic: {}, partition: {}, offset: {}", topic, partition, offset);}}@OverridepublicvoidonFailure(Throwable throwable){SendMessageToDing.sendMessage(String.format("回调发送失败: %s", throwable.getMessage()));
                log.error("回调发送失败:{}", throwable.getMessage());}});}publicvoidsendListenableFutureCallbackMessage(String key,Object message){
        kafkaTemplate.send(KafkaConstant.KAFKA_TOPIC, key, message).addCallback(newListenableFutureCallback<SendResult<String,Object>>(){@OverridepublicvoidonSuccess(SendResult<String,Object> result){if(ObjectUtil.isNotEmpty(result)){// 消息发送到的topicString topic = result.getRecordMetadata().topic();// 消息发送到的分区int partition = result.getRecordMetadata().partition();// 消息在分区内的offsetlong offset = result.getRecordMetadata().offset();
                    log.info("回调发送成功,topic: {}, partition: {}, offset: {}", topic, partition, offset);}}@OverridepublicvoidonFailure(Throwable throwable){SendMessageToDing.sendMessage(String.format("回调发送失败: %s", throwable.getMessage()));
                log.error("回调发送失败:{}", throwable.getMessage());}});}}

消息发送业务模拟

importcom.alibaba.fastjson.JSONObject;importcom.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;importcom.baomidou.mybatisplus.extension.conditions.query.LambdaQueryChainWrapper;importcom.google.gson.Gson;importcom.google.gson.GsonBuilder;importcom.sun.kafka.producer.MessageProducer;importcom.sun.mapper.StudentMapper;importcom.sun.pojo.Student;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;importjava.util.List;importjava.util.function.Consumer;@RestController@RequestMapping("student")publicclassStudentController{@AutowiredprivateStudentMapper studentMapper;@AutowiredprivateMessageProducer messageProducer;// json数据写入到kafkaprivatestaticfinalGson gson =newGsonBuilder().create();@GetMapping("send")publicvoidsendMessage(){LambdaQueryWrapper<Student> lambdaQueryWrapper =newLambdaQueryWrapper<>();List<Student> studentList = studentMapper.selectList(lambdaQueryWrapper);
        studentList.forEach(student ->{JSONObject json =(JSONObject)JSONObject.toJSON(student);
            messageProducer.sendSuccessCallbackMessage(gson.toJson(json));});}}

消息消费业务模拟

// service层接口:KafkaConsumerServiceimportorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.springframework.kafka.support.Acknowledgment;importjava.util.List;publicinterfaceKafkaConsumerService{// 逐条消费voidsimpleConsumer(ConsumerRecord<String,Object> records,Acknowledgment ack);// 批量消费voidconsumerTask(List<ConsumerRecord<String,Object>> records,Acknowledgment ack);}// 接口实现:KafkaConsumerServiceImplimportcn.hutool.core.collection.CollUtil;importcn.hutool.core.util.ObjectUtil;importcom.sun.service.KafkaConsumerService;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.springframework.kafka.support.Acknowledgment;importorg.springframework.scheduling.annotation.Async;importorg.springframework.stereotype.Service;importjava.util.List;@ServicepublicclassKafkaConsumerServiceImplimplementsKafkaConsumerService{/**
     * 逐条消费
     */@Async("kafkaThreadPool")@OverridepublicvoidsimpleConsumer(ConsumerRecord<String,Object> records,Acknowledgment ack){if(ObjectUtil.isNotEmpty(records)){try{// 模拟业务数据处理System.err.println(records.value().toString()+"\t"+Thread.currentThread().getName());}catch(Exception e){// 此处可换成自己定义的异常处理thrownewRuntimeException(e.getMessage());}finally{// 手动提交offset
                ack.acknowledge();}}}/**
     * 批量消费
     */@Async("kafkaThreadPool")@OverridepublicvoidconsumerTask(List<ConsumerRecord<String,Object>> records,Acknowledgment ack){if(CollUtil.isNotEmpty(records)){for(ConsumerRecord<String,Object> record : records){try{// 模拟业务数据处理System.err.println(record.value()+"\t"+Thread.currentThread().getName());}catch(Exception e){// 此处可换成自己定义的异常处理thrownewRuntimeException(e.getMessage());}finally{// 手动提交offset
                    ack.acknowledge();}}}}}

消费消息封装

importcom.sun.kafka.constant.KafkaConstant;importcom.sun.service.KafkaConsumerService;importlombok.extern.slf4j.Slf4j;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.kafka.support.Acknowledgment;importorg.springframework.stereotype.Component;importjava.util.List;@Slf4j@ComponentpublicclassMessageConsumer{@AutowiredprivateKafkaConsumerService kafkaConsumerService;// 逐条消费@KafkaListener(
            id ="test_id",
            topics ={KafkaConstant.KAFKA_TOPIC},
            groupId ="test_group",
            errorHandler ="consumerAwareErrorHandler",
            containerFactory ="kafkaListenerContainerFactory")publicvoidsimpleConsumer(ConsumerRecord<String,Object> records,Acknowledgment ack)throwsException{
        log.info("线程名称: {}",Thread.currentThread().getName());
        kafkaConsumerService.simpleConsumer(records, ack);}// 批量消费@KafkaListener(
            id =KafkaConstant.ID,
            topics ={KafkaConstant.KAFKA_TOPIC},
            groupId =KafkaConstant.GROUP_ID,
            errorHandler ="consumerAwareErrorHandler",
            containerFactory ="concurrentKafkaListenerContainerFactory")publicvoidonBatchMessage(List<ConsumerRecord<String,Object>> records,Acknowledgment ack)throwsException{
        log.info("线程名称: {} >>> 批量消费一次,消费数量,size: {}",Thread.currentThread().getName(), records.size());
        kafkaConsumerService.consumerTask(records, ack);}}
标签: spring boot kafka

本文转载自: https://blog.csdn.net/yang2717/article/details/139310756
版权归原作者 我叫晨曦啊 所有, 如有侵权,请联系我们删除。

“spring boot 之 整合 kafka”的评论:

还没有评论