SpringBoot 整合 Kafka
基本使用-简单的生产消费
项目的基本构建
新建一个 maven 项目,引入 kafka 依赖,pom 文件内容如下
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.3</version><relativePath/></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.16</version></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies>
编写配置文件 application.yml,进行 kafka 的基本配置
spring:# Kafka 基本配置kafka:# ===============Kafka 服务器配置===============bootstrap-servers: localhost:9092listener:# 消费者监听的topic不存在时,项目会报错,设置为falsemissing-topics-fatal:false# 设置批量消费消息(不批量处理时需要注释掉)#type: batch# ===============消费者配置===============consumer:# 是否自动提交偏移量offsetenable-auto-commit:true# 提交offset延时auto:commit:intervals:ms:1000# 当kafka中没有初始offset或offset超出范围时将自动重置offset# (1) earliest:重置为分区中最小的offset;# (2) latest:重置为分区中最新的offset(消费分区中新产生的数据);# (3) none:只要有一个分区不存在已提交的offset,就抛出异常;auto-offset-reset: latest
# 消息的反序列化key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 设置批量消费的消息数max-poll-records:10properties:# 消费会话超时时间session:timeout:ms:120000# 消费请求超时时间request:timeout:ms:180000# 默认的消费组idgroup:id: defaultConsumerGroup
# ===============生产者配置===============producer:# 重试次数retries:1# 应答级别:多少个分区副本完成备份后向生产者发送应答消息(可选0、1、all/-1)acks:-1# 批量大小batch-size:16384# 生产端缓冲区大小buffer-memory:33554432# 消息的序列化key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 设置生成事务的前缀(使用事务时开启,不使用则注释掉)#transaction-id-prefix: transaction_properties:# 提交延时# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka# linger.ms为0表示每接收到一条消息就提交给kafka,此时batch-size失效linger:ms:0# 自定义分区器#partitioner:#class: com.zqf.config.MyPartitioner # 配置自定义分区策略后所有消息都在 partition 0
创建启动类
@SpringBootApplicationpublicclassKafkaDemoApplication{publicstaticvoidmain(String[] args){SpringApplication.run(KafkaDemoApplication.class);}}
可以编写一个配置类,创建一个名为 testTopic 的主题,并将其分区数设置为 8(当我们发送消息到主题时,Kafka 会自动创建该主题,但此时仅存在一个分区)
@ConfigurationpublicclassKafkaInitialConfiguration{// 创建一个名为 testTopic 的 topic 并设置分区数为 8,分区副本数为 2@BeanpublicNewTopicinitialTopic(){returnnewNewTopic("testTopic",8,(short)2);}// 如果要修改分区数,只需修改配置值重启项目即可// 修改分区数并不会导致数据的丢失,但是分区数只能增大不能减小@BeanpublicNewTopicupdateTopic(){returnnewNewTopic("testTopic",10,(short)2);}}
编写生产者 Controller 控制类模拟消息的生产,引入 KafkaTemplate
@RestController@RequestMapping("/kafka/producer")publicclassProducerController{@ResourceprivateKafkaTemplate<Object,Object> kafkaTemplate;}
简单的生产消费
在 ProducerController 中模拟消息的生产
/**
* 简单生成消息
* @param topic 主题
* @param msg 消息
* @return 结果
*/@GetMapping("/simpleSend/{topic}/{msg}")publicStringsimpleSend(@PathVariable("topic")String topic,@PathVariable("msg")String msg){
kafkaTemplate.send(topic, msg);return"Kafka 生产消息成功~";}
创建 ConsumerService 作为消费者进行消息的消费,
@KafkaListener
的
topics
属性指定了监听的主题,可同时监听多个,用逗号隔开即可
/**
* 简单监听
* @param record 消息
*/@KafkaListener(topics ={"testTopic"})publicvoidsimpleGetMsg(ConsumerRecord<Object,Object>record){// 方法内定义消息的处理逻辑...System.out.println("topic:"+record.topic());System.out.println("partition:"+record.partition());System.out.println("msg:"+(String)record.value());}
使用 postman 测试结果如下
消费结果
进阶使用-生产者
带回调的生产者
/**
* 带回调的发送消息
* @param topic 主题
* @param msg 消息
* @return 结果
*/@GetMapping("/callbackSend/{topic}/{msg}")publicStringcallbackSend(@PathVariable("topic")String topic,@PathVariable("msg")String msg){try{// 发送消息并设置成功和失败的逻辑
kafkaTemplate.send(topic, msg).addCallback(newListenableFutureCallback<SendResult<Object,Object>>(){// 发送失败处理逻辑@OverridepublicvoidonFailure(Throwable e){System.out.println("消息发送失败..."+ e.getMessage());}// 发送成功处理逻辑@OverridepublicvoidonSuccess(SendResult<Object,Object> result){System.out.println("消息发送成功..."+ result.getRecordMetadata().topic()+"-"+
result.getRecordMetadata().partition()+"-"+ result.getRecordMetadata().offset());}});}catch(Throwable throwable){return"kafka 发送消息失败~";}return"kafka 发送消息成功~";}
测试结果
发送消息成功回调打印信息
事务提交消息
/**
* kafka 事务发送消息
* @param topic 主题
* @param msg 消息
* @return 结果
*/@GetMapping("/transactionSend/{topic}/{msg}")publicStringtransactionSend(@PathVariable("topic")String topic,@PathVariable("msg")String msg){finalBoolean[] result ={true};
kafkaTemplate.executeInTransaction(newKafkaOperations.OperationsCallback<Object,Object,Object>(){@OverridepublicObjectdoInOperations(KafkaOperations<Object,Object> kafkaOperations){try{if("HelloKafka_transaction".equals(msg)){thrownewRuntimeException("消息异常,启动 Kafka 事务,不生产对应消息~");}try{
kafkaTemplate.send(topic, msg).get();}catch(ExecutionException e){
e.printStackTrace();}return"生产消息无异常,生产成功~";}catch(Exception e){
e.printStackTrace();
result[0]=false;return"生产消息有异常,生产失败~";}}});if(!result[0]){return"kafka 发送消息失败~";}return"kafka 发送消息成功~";}
使用事务发送消息时,需要在配置文件中配置
transaction-id-prefix
属性,即事务前缀,详见开篇的配置文件,打开相应的注释即可
生产异常处理,当有异常产生时,消息不会被发送,故消费者没有监听到信息
自定义分区器
Kafka 中 每一个 Topic 都可以划分成多个分区,而消息将被 append 到哪一个分区,则有对应的分区策略:
- 若我们在发送消息时制定了分区策略,则将消息按照策略 append 到相应分区;
- 若我们在发送消息时没有指定分区,但消息携带了 key,此时 kafka 将根据 key 将消息划分到对应分区,该策略将保证 key 相同的消息被 append 到同一分区;
- 若分区和 key 都没有指定,则使用 kafka 的默认分区策略,轮询得到一个分区;
编写自定义分区器
publicclassMyPartitionerimplementsPartitioner{@OverridepublicvoidonNewBatch(String topic,Cluster cluster,int prevPartition){Partitioner.super.onNewBatch(topic, cluster, prevPartition);}// 在此处自定义分区策略,目前该策略默认使用 partition 0@Overridepublicintpartition(String s,Object o,byte[] bytes,Object o1,byte[] bytes1,Cluster cluster){return0;}@Overridepublicvoidclose(){}@Overridepublicvoidconfigure(Map<String,?> map){}}
配置好自定义的分区逻辑后,需要在配置文件中配置自定义分区策略
spring.kafka.producer.properties.partitioner.class=com.zqf.config.MyPartitioner
详见开篇配置文件末尾,将注释去掉即可
测试发现,所有消息都会被分发进入分区 0
进阶使用-消费者
指定消费者监听主题、分区、偏移量
配置
@KafkaListener
的属性可指定消费者监听的主题、分区以及偏移量
/**
* 监听特定主题、特定分区
* @param record 消息
*/@KafkaListener(id ="consumer01", groupId ="testGroup", topicPartitions ={@TopicPartition(topic ="topic1", partitions ="0"),@TopicPartition(topic ="testTopic", partitions ={"1","3","5","7","9"})})publicvoidtargetGetMsg1(ConsumerRecord<Object,Object>record){// 方法内定义消息的处理逻辑...System.out.println("=======consumer01收到消息=======");System.out.println("topic:"+record.topic());System.out.println("partition:"+record.partition());System.out.println("msg:"+(String)record.value());}/**
* 监听特定主题、特定分区
* @param record 消息
*/@KafkaListener(id ="consumer02", groupId ="testGroup", topicPartitions ={@TopicPartition(topic ="topic2", partitions ="0"),@TopicPartition(topic ="testTopic", partitions ={"0","2","4","6","8"})})publicvoidtargetGetMsg2(ConsumerRecord<Object,Object>record){// 方法内定义消息的处理逻辑...System.out.println("=======consumer02收到消息=======");System.out.println("topic:"+record.topic());System.out.println("partition:"+record.partition());System.out.println("msg:"+(String)record.value());}
测试结果
消费者批量消费
修改配置文件,开启批量消费模式,并指定每一次消费的消息数量
# 设置批量消费
spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
spring.kafka.consumer.max-poll-records=50
详见开篇配置文件,将对应属性的注释去掉即可
编写生产者方法批量生产数据并发送
/**
* 批量消息处理
* @param topic 主题
* @param msg 消息
* @return 结果
*/@GetMapping("/multiAccept/{topic}/{msg}")publicStringmultiAccept(@PathVariable("topic")String topic,@PathVariable("msg")String msg){for(int i =1; i <=30; i++){
kafkaTemplate.send(topic, msg + i);}return"Kafka 发送消息成功~";}
消费者批量进行消息的消费
/**
* 批量消息处理
* @param records 消息
*/@KafkaListener(topics ={"multiAcceptTopic"})publicvoidmultiAccept(List<ConsumerRecord<Object,Object>> records){System.out.println("======= Kafka 开始批量消费 =======");System.out.println("消息数量 >>> "+ records.size());
records.forEach(consumer ->{System.out.println("msg:"+(String) consumer.value());});}
测试结果
消费者异常处理
消费者异常处理需要编写异常处理器,并配置到监听方法中
/**
* 消费异常处理器
*/@ConfigurationpublicclassMyConsumerAwareErrorHandler{@BeanpublicConsumerAwareListenerErrorHandlerconsumerAwareListenerErrorHandler(){returnnewConsumerAwareListenerErrorHandler(){@OverridepublicObjecthandleError(Message<?> message,ListenerExecutionFailedException e,Consumer<?,?> consumer){// 编写消费者异常处理逻辑System.out.println("消费发生异常...");System.out.println("异常信息:"+ e.getMessage());returnnull;}};}}
模拟消费者消费异常
/**
* 消费异常处理
* @param record 消息
*/@KafkaListener(topics ={"errorHandler"}, errorHandler ="consumerAwareListenerErrorHandler")publicvoiderrorHandlerListener(ConsumerRecord<Object,Object>record){thrownewRuntimeException("模拟接收消息异常...");}
测试结果
消费者消息过滤
消费者进行消息的过滤需要编写过滤器
/**
* 消息过滤器
*/@ComponentpublicclassMyKafkaFilter{@AutowiredConsumerFactory<Object,Object> consumerFactory;@BeanpublicConcurrentKafkaListenerContainerFactory<Object,Object>listenerContainerFactory(){ConcurrentKafkaListenerContainerFactory<Object,Object> factory =newConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);// 设置被过滤的消息将被丢弃
factory.setAckDiscarded(true);// 设置消息过滤的策略
factory.setRecordFilterStrategy(consumerRecord ->{if(consumerRecord.value().toString().length()<5){// 返回 false 则消息不被过滤returnfalse;}// 返回 true 则消息被过滤System.out.println("消息不合格~被过滤丢弃~");returntrue;});return factory;}}
编写消费者消费方法
/**
* 消费者消息过滤
* @param record 消息
*/@KafkaListener(topics ={"msgFilter"}, containerFactory ="listenerContainerFactory")publicvoidmsgFilterListener(ConsumerRecord<Object,Object>record){// 方法内定义消息的处理逻辑...System.out.println("topic:"+record.topic());System.out.println("partition:"+record.partition());System.out.println("msg:"+(String)record.value());}
测试结果
根据过滤逻辑,未被过滤的消息,正常被消费
被过滤掉的消息,执行过滤逻辑,消息未被消费
消费者消息转发
消费者可以在监听到消息后,接受消息并对消息进行处理,并将处理后的消息转发到另外的主题,只需要添加注解
@sendTo
即可
/**
* 消费者消息转发
* @param record 消息
*/@KafkaListener(topics ={"sendTo"})@SendTo("testTopic")publicStringsendToListener(ConsumerRecord<Object,Object>record){// 此处编写消息处理逻辑,处理完成后将处理后的消息转发至目标主题returnrecord.value()+"--我被处理了~";}
进行测试,发送消息内容为:“HiKafka”
消费者进行监听,接收到消息后进行处理并转发到主题
testTopic
,并被对应的消费者处理
版权归原作者 情绪大瓜皮丶 所有, 如有侵权,请联系我们删除。