0


springboot 集成kafka 详细教程,看这一篇就够了

废话不多说,直接上代码
为啥这样说,现在大家都想先看大妈效果,再去看逻辑
先看整体架构
在这里插入图片描述

先贴yml吧,这个毕竟是项目一创建就需要的

  1. spring:
  2. application:
  3. admin: apache-kafka
  4. kafka:
  5. bootstrap-servers: 这里是你自己的kafka地址 # kafka 服务器集群地址,默认为 localhost:9092
  6. template:
  7. default-topic: demo #将消息发送到的默认主题,KafkaTemplate.sendDefault
  8. listener:
  9. type: batch #监听器类型,可选值有:SINGLE(单条消费,默认)、BATCH(批量消息)# kafka 生产者配置
  10. producer:
  11. key-serializer: org.apache.kafka.common.serialization.StringSerializer #生产者 key 序列化方式
  12. value-serializer: org.apache.kafka.common.serialization.StringSerializer #生产者 value 序列化方式
  13. batch-size: 16KB #默认批处理大小,如果值太小,则可能降低吞吐量,为零将完全禁用批处理,当 linger.ms=0 时,此值无效
  14. buffer-memory: 32MB #生产者可以用来缓冲等待发送到服务器的记录的总内存大小
  15. retries: 3#发送失败时的重试次数,当大于零时,允许重试失败的发送。# 在考虑请求完成之前,生产者要求领导者已收到的确认数,可选值有:-1、0、1(默认为1)# 使用事务时,必须配置为 -1,表示领导者必须收到所有副本的确认消息。
  16. acks: -1
  17. properties:
  18. #消息提交延时时间(单位毫秒),当生产者接收到消息 linger.ms 秒钟后,就会将消息提交给 kafka。#当生产端积累的消息达到 batch-size 大小后,也会将消息提交给 kafka。#linger.ms 默认为 0 ,表示每接收到一条消息就会立即提交给 kafka,此时 batch-size 无效。如果对实时性要求高,则建议设置为 0
  19. linger.ms: 0
  20. partitioner:
  21. class: com.wmx.apachekafka.beans.MyKafkaPartitioner #kafka 自定义分区规则
  22. transaction-id-prefix: tx_kafka.
  23. # kafka 消费者配置
  24. consumer:
  25. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #消费者 key 反序列化方式
  26. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #消费者 value 反序列化方式
  27. group-id: test-consumer-group #标识此消费者所属的消费者组的唯一字符串,这里只要你是默认安装,那就是这个,不用修改#消费者客户端 id,在消费者组需要唯一指定。发出请求时会传递给服务器,用于服务器端日志记录#不写时,会自动命名,比如:consumer-1、consumer-2...,原子性递增。通常不建议自定义,使用默认值即可,因为容易冲突#client-id: wangmx1
  28. enable-auto-commit: true#消费者的偏移量是否在后台自动提交,默认为 true
  29. auto-commit-interval: 5000#如果enable.auto.commit=true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为 5000# 当 Kafka 中没有初始偏移量或者服务器上不再存在当前偏移量时该怎么办,可选的值有 latest、earliest、exception、none,默认值为 latest# latest:重置为分区中最新的 offset(消费分区中新产生的数据)、earliest:重置为分区中最小的 offset
  30. auto-offset-reset: latest
  31. properties:
  32. session.timeout.ms: 180000#消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发 rebalance(重新平衡) 操作)
  33. request.timeout.ms: 120000#消费请求超时时间
  34. max-poll-records: 5#一次调用poll()时返回的最大记录数,即批量消费每次最多消费多少条消息,注意是最多,并不是必须满足数量后才消费.

自定义分区MyKafkaPartitioner:

  1. package com.zy.apachekafka.beans;import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;import java.util.Map;
  2. /**
  3. * kafka 自定义分区规则,一旦自定义了分区规则,就不会再走 kafka 默认的分区规则
  4. *
  5. * @author zy
  6. */
  7. public class MyKafkaPartitioner implements Partitioner {
  8. /**
  9. * 计算给定记录的分区,发送消息到 kafka 服务器之前,都会先走这里进行计算目标分区,即将消息发送到具体的哪个分区
  10. *
  11. * @param topic :主题名称
  12. * @param key :要分区的键(如果没有键,则为null)
  13. * @param keyBytes :要分区的序列化键(如果没有键,则为null)
  14. * @param value :要分区的值或null,健可以有可无,值才是真正的消息内容
  15. * @param valueBytes :要分区的序列化值或null
  16. * @param cluster :当前集群信息
  17. */
  18. @Override
  19. public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster){
  20. // 返回的整数值就是表示生产者将消息发送到的分区
  21. // 具体的规则可以根据自身需要设置
  22. System.out.println("发送消息:" + value);
  23. System.out.println("指定分区为:" + 0);return0;}
  24. /**
  25. * 在分区程序关闭时调用
  26. */
  27. @Override
  28. public void close(){}
  29. /**
  30. * 使用给定的键值对配置此类
  31. *
  32. * @param configs
  33. */
  34. @Override
  35. public void configure(Map<String, ?> configs){}}

消费者定时器ConsumerTimer:

  1. package com.zy.apachekafka.component;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.config.KafkaListenerEndpointRegistry;import org.springframework.kafka.listener.MessageListenerContainer;import org.springframework.scheduling.annotation.EnableAsync;import org.springframework.scheduling.annotation.EnableScheduling;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;import java.time.LocalDateTime;import java.util.Set;
  2. /**
  3. * 消费者定时器——定时开关消费者消费功能
  4. * 1、本类使用 @EnableScheduling 定时任务的方式开关消费者监听器,同理可以自己提供控制层接口,通过 http 的方式来开关。
  5. *
  6. * @author zy
  7. */
  8. @Component
  9. @EnableScheduling
  10. @EnableAsync
  11. public class ConsumerTimer {
  12. /**
  13. * 1、{@link KafkaListener} 注解标注的方法会被注册在 KafkaListenerEndpointRegistry 中。
  14. * 2、{@link KafkaListenerEndpointRegistry} 在 Spring IOC 容器中已经存在,可以直接取。
  15. */
  16. @Autowired
  17. private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
  18. /**
  19. * 定时启动消费者监听器
  20. * <p>
  21. * MessageListenerContainer getListenerContainer(String id)
  22. * * 1、返回具有指定id的{@link MessageListenerContainer},如果不存在此类容器,则返回 null。
  23. * * 2、这个 id 就是 @KafkaListener 注解的 id 属性值
  24. * Set<String> getListenerContainerIds():获取所有的 KafkaListener 监听器 id
  25. * Collection<MessageListenerContainer> getListenerContainers():获取所有的 KafkaListener 监听器容器
  26. */
  27. @Scheduled(cron ="0 52 20 * * ? ")
  28. public void startListener(){
  29. Set<String> containerIds = kafkaListenerEndpointRegistry.getListenerContainerIds();
  30. containerIds.stream().forEach(item -> System.out.println("KafkaListener 消费者监听器:" + item));
  31. //boolean isRunning():检查此组件当前是否正在运行
  32. //void start():启动此组件,如果组件已在运行,则不应引发异常,配合 stop 方法使用,
  33. //void resume():如果暂停,在下一次轮询后恢复此容器,配合 pause 方法使用。
  34. kafkaListenerEndpointRegistry.getListenerContainer("basicConsumer").resume();
  35. //kafkaListenerEndpointRegistry.getListenerContainer("basicConsumer").start();
  36. System.out.println(LocalDateTime.now() + " 启动 kafka 消费者监听器:basicConsumer");}
  37. /**
  38. * 定时关闭/暂停消费者监听器
  39. * void pause():在下次轮询之前暂停此容器,配合 resume
  40. * void stop():以同步方式停止此组件/容器,如果组件未运行(尚未启动),则不应引发异常。配合 start 方法重新启动
  41. */
  42. @Scheduled(cron ="0 50 20 * * ? ")
  43. public void shutDownListener(){
  44. kafkaListenerEndpointRegistry.getListenerContainer("basicConsumer").pause();
  45. //kafkaListenerEndpointRegistry.getListenerContainer("basicConsumer").stop();
  46. System.out.println(LocalDateTime.now() + " 暂停 kafka 消费者监听器:basicConsumer");}}

下面该有消费和生产消息:
消费者 · 接收消息.KafkaConsumer:

  1. package com.zy.apachekafka.controller;import cn.hutool.core.exceptions.ExceptionUtil;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.common.header.Headers;import org.apache.kafka.common.record.TimestampType;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.messaging.handler.annotation.SendTo;import org.springframework.stereotype.Component;import java.util.List;
  2. /**
  3. * Kafka 消费者 · 接收消息.
  4. * 1、topics:监听的主题,可以写死,也可以通过全局配置文件配置取值,如 @KafkaListener(topics ={"${my.kafka.topic-name}"})
  5. * 2、系统中定义了消费者(@KafkaListener)时,启动服务后,如果连不上kafka服务器则会输出大量的警告日志,但是不会报错。
  6. * 不是每个环境都启动了kafka服务,所以当没有配置消费者组id的时候,本类不交由Spring容器初始化,不再监听消息。
  7. *
  8. * @author zy
  9. */
  10. @Component
  11. @ConditionalOnProperty(prefix ="spring.kafka.consumer", name ="group-id")
  12. public class KafkaConsumer {
  13. private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
  14. /**
  15. * 监听指定主题上的消息,topics 属性是一个字符串数组,可以监听多个主题。
  16. * * id :用于唯一标识此消费者监听器,不同方法上此注解的id必须唯一,不设置时,会自动生成
  17. * * topics:监听的主题,可以写死,也可以通过全局配置文件配置取值,如 @KafkaListener(topics ={"${my.kafka.topic-name}"})
  18. *
  19. * @param record :消息记录对象,包含消息正文、主题名、分区号、偏移量、时间戳等等
  20. */
  21. @KafkaListener(id ="basicConsumer", topics ={"car-infos", "basic-info", "helloWorld", "bgt.basic.agency.frame.topic"})
  22. public void messageListener1(ConsumerRecord<?, ?> record){
  23. /**
  24. * headers:消息头信息
  25. * offset:此记录在相应的 Kafka 分区中的位置。
  26. * partition:记录所在的分区
  27. * serializedKeySize:序列化的未压缩密钥的大小(以字节为单位),如果 key为 null,则返回的大小为 -1
  28. * serializedValueSize:序列化的未压缩值(消息正文)的大小(以字节为单位,record.value().getBytes().length)。如果值为 null,则返回的大小为 -1
  29. * timestamp:记录的时间戳
  30. * TimestampType:记录的时间戳类型
  31. * topic:接收此记录的主题
  32. * value:消息内容
  33. */
  34. Headers headers = record.headers();
  35. long offset = record.offset();
  36. int partition = record.partition();
  37. int serializedKeySize = record.serializedKeySize();
  38. int serializedValueSize = record.serializedValueSize();
  39. long timestamp = record.timestamp();
  40. TimestampType timestampType = record.timestampType();
  41. String topic = record.topic();
  42. Object value = record.value();
  43. System.out.println("收到消息:");
  44. System.out.println("\theaders=" + headers);
  45. System.out.println("\toffset=" + offset);
  46. System.out.println("\tpartition=" + partition);
  47. System.out.println("\tserializedKeySize=" + serializedKeySize);
  48. System.out.println("\tserializedValueSize=" + serializedValueSize);
  49. System.out.println("\ttimestamp=" + timestamp);
  50. System.out.println("\ttimestampType=" + timestampType);
  51. System.out.println("\ttopic=" + topic);
  52. System.out.println("\tvalue=" + value);}
  53. /**
  54. * 批量消费时,必须使用 List 接收,否则会抛异常。
  55. * 即如果配置文件配置的是批量消费(spring.kafka.listener.type=batch),则监听时必须使用 list 接收
  56. * 反之如果配置是单条消息消费,则不能使用 list 接收,否则也会异常.
  57. *
  58. * @param records
  59. */
  60. @KafkaListener(topics ="batch-msg")
  61. public void messageListener2(List<ConsumerRecord<?, ?>> records){
  62. System.out.println(">>>批量消费返回条数,records.size()=" + records.size());
  63. int count =0;for(ConsumerRecord<?, ?> record : records){
  64. System.out.println("\t消息" + (++count) + ":" + record.value());}}
  65. /**
  66. * 消费消息并转换。SendTo 可以标注在类上,此时对类中的所有方法有效,方法的返回值表示转发的消息内容。
  67. *
  68. * @param record
  69. * @return
  70. */
  71. @KafkaListener(topics ={"sendTo"})
  72. @SendTo("car-infos")
  73. public String messageListener3(ConsumerRecord<?, ?> record){
  74. System.out.println("消费单条消费并转发:" + record.value() + "," + record.timestamp());return record.value().toString();}
  75. /**
  76. * 单位一体化编码与名称更正消息监听
  77. * 约定更正接口返回结果监听的主题为:basic.kafka.syncAgencyStatInfo.reply
  78. *
  79. * @param recordList
  80. */
  81. @KafkaListener(topics ={"${app.kafka.topics.agency:topic3}"})
  82. public void syncAgencyStatInfoMsgListener(List<ConsumerRecord<String, String>> recordList){for(ConsumerRecord<String, String> record : recordList){
  83. log.info("监听单位一体化编码与名称更正消息:{}", record);
  84. try {
  85. System.out.println("消息处理.....");} catch (Exception e){
  86. log.error("单位一体化编码与名称更正消息消费失败:{}", ExceptionUtil.getMessage(e));}}}}

生产者KafkaProducer:

  1. package com.zy.apachekafka.controller;import com.fasterxml.jackson.core.JsonProcessingException;import com.fasterxml.jackson.databind.ObjectMapper;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;import org.springframework.boot.autoconfigure.kafka.KafkaProperties;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.support.SendResult;import org.springframework.transaction.annotation.Transactional;import org.springframework.util.concurrent.FailureCallback;import org.springframework.util.concurrent.ListenableFuture;import org.springframework.util.concurrent.ListenableFutureCallback;import org.springframework.util.concurrent.SuccessCallback;import org.springframework.web.bind.annotation.PostMapping;import org.springframework.web.bind.annotation.RequestBody;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;import java.util.Map;import java.util.concurrent.TimeUnit;
  2. /**
  3. * kafka 生产者 · 发送消息
  4. *
  5. * @author zy
  6. */
  7. @RestController
  8. public class KafkaProducer {
  9. private static final Logger logger = LoggerFactory.getLogger(KafkaProducer.class);
  10. /**
  11. * {@link KafkaAutoConfiguration} 中会自动根据 {@link KafkaProperties} 配置属性读取配置,
  12. * 然后将 {@link KafkaTemplate} 模板添加到 Spring 容器中,所以这里直接获取使用即可。
  13. */
  14. @Resource
  15. private KafkaTemplate<String, Object> kafkaTemplate;
  16. /**
  17. * 向指定主题(topic)发送消息:http://localhost:8080/kafka/sendMsg?topic=car-infos
  18. * <p>
  19. * 1、send(String topic, @Nullable V data):向指定主题发送消息,如果 topic 不存在,则自动创建,
  20. * 但是创建的主题默认只有一个分区 - PartitionCount: 1、分区也没有副本 - ReplicationFactor: 1,1表示自身。
  21. * 2、send 方法默认是异步的,主线程会直接继续向后运行,想要获取发送结果是否成功,请添加回调方法 addCallback。
  22. * [WARN ][org.apache.kafka.common.utils.LogContext$KafkaLogger.warn(LogContext.java:241)]:[Producer clientId=producer-1] Connection to node-1 could not be established. Broker may not be available.
  23. * [ERROR][org.springframework.kafka.support.LoggingProducerListener.onError(LoggingProducerListener.java:76)]:Exception thrown when sending a message with key='xxx' and payload='xxx' to topic bgt.basic.agency.frame.topic:
  24. * 3、send().get() 可以同步阻塞主线程直到获取执行结果,或者执行超时抛出异常.
  25. * java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException:
  26. * Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
  27. *
  28. * @param topic :主题名称,不存在时自动创建,默认1个分区,无副本。主题名称也可以通过配置文件配置,这里直接通过参数传入。
  29. * @param message :待发送的消息,如:{"version":1,"text":"后日凌晨三点执行任务"}
  30. * @return
  31. */
  32. @PostMapping("kafka/sendMsg")
  33. @Transactional(rollbackFor = Exception.class)
  34. public Map<String, Object> sendMessage(@RequestParam String topic, @RequestBody Map<String, Object> message){
  35. logger.info("向指定主题发送信息,topic={},message={}", topic, message);
  36. try {
  37. String valueAsString = new ObjectMapper().writeValueAsString(message);
  38. // 异步
  39. // kafkaTemplate.send(topic, valueAsString);
  40. // 同步:get() 获取执行结果,此时线程将阻塞,等待执行结果
  41. SendResult<String, Object> sendResult = kafkaTemplate.send(topic, valueAsString).get();
  42. sendResult.toString();
  43. message.put("sendResult", sendResult.toString());
  44. // org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
  45. } catch (Exception e){
  46. // 异步发送时子线程中的异常是不会进入这里的,只有同步发送时,主线程阻塞,发送是吧,抛出异常时,才会进入这里。
  47. e.printStackTrace();}return message;}
  48. /**
  49. * 向默认主题(default-topic)发送消息:http://localhost:8080/kafka/sendMsgDefault
  50. * 默认主题由 spring.kafka.template.default-topic 选项进行配置
  51. *
  52. * @param message :待发送的消息,如:{"version":2,"text":"后日凌晨三点执行任务,不得有误"}
  53. * @return
  54. */
  55. @PostMapping("kafka/sendMsgDefault")
  56. @Transactional(rollbackFor = Exception.class)
  57. public Map<String, Object> sendMsgDefault(@RequestBody Map<String, Object> message){
  58. logger.info("向默认主题发送信息,topic={},topic={}", kafkaTemplate.getDefaultTopic(), message);
  59. try {
  60. String valueAsString = new ObjectMapper().writeValueAsString(message);
  61. kafkaTemplate.sendDefault(valueAsString);} catch (JsonProcessingException e){
  62. e.printStackTrace();}return message;}
  63. /**
  64. * 异步回调写法 1
  65. * 发送信息,并添加异步回调方法,用于监控消息发送成功或者失败。发送成功可以记录日志,发送失败则应该有相应的措施,比如延期再发送等。
  66. * http://localhost:8080/kafka/sendMsgCallback?topic=car-infos
  67. * 1、addCallback 方法用于获取 send 发送的结果,成功或者失败,此时 send 方法不再阻塞线程。
  68. *
  69. * @param topic :car-infos
  70. * @param message :{"version":223,"text":"后日凌晨三点执行任务,不得有误"}
  71. * @return
  72. */
  73. @PostMapping("kafka/sendMsgCallback")
  74. @Transactional(rollbackFor = Exception.class)
  75. public Map<String, Object> sendMessageCallback(@RequestParam String topic,
  76. @RequestBody Map<String, Object> message){
  77. try {
  78. String valueAsString = new ObjectMapper().writeValueAsString(message);
  79. /**
  80. * addCallback:添加成功或者失败的异步回调
  81. * {@link SuccessCallback}:是发送成功回调,函数式接口,其中的方法参数为 {@link SendResult},表示发送结果
  82. * {@link FailureCallback}:是发送失败回调,函数式接口,其中的方法参数为 Throwable,表示异常对象
  83. */
  84. kafkaTemplate.send(topic, valueAsString).addCallback(success ->{
  85. String topic2 = success.getRecordMetadata().topic();
  86. int partition = success.getRecordMetadata().partition();
  87. long offset = success.getRecordMetadata().offset();
  88. logger.info("发送消息成功,topic={},partition={},offset={}", topic2, partition, offset);}, failure ->{
  89. logger.warn("消息发送失败:{},{}", failure.getMessage(), failure);
  90. logger.warn("保存到数据库中,后期再做处理.");});} catch (JsonProcessingException e){
  91. e.printStackTrace();}
  92. logger.info("向指定主题发送信息,回调,topic={},message={}", topic, message);return message;}
  93. /**
  94. * 异步回调写法 2
  95. * 发送信息,并添加异步回调方法,用于监控消息发送成功或者失败。发送成功可以记录日志,发送失败则应该有相应的措施,比如延期再发送等。
  96. * http://localhost:8080/kafka/sendMsgCallback2?topic=helloWorld
  97. * 1、addCallback 方法用于获取 send 发送的结果,成功或者失败,此时 send 方法不再阻塞线程,主线程会直接运行过去。
  98. *
  99. * @param topic :helloWorld
  100. * @param message :{"version":223,"text":"后日凌晨三点执行任务,不得有误"}
  101. * @return
  102. */
  103. @PostMapping("kafka/sendMsgCallback2")
  104. @Transactional(rollbackFor = Exception.class)
  105. public Map<String, Object> sendMessageCallback2(@RequestParam String topic,
  106. @RequestBody Map<String, Object> message){
  107. try {
  108. String valueAsString = new ObjectMapper().writeValueAsString(message);
  109. /**
  110. * ListenableFutureCallback 接口继承了 {@link SuccessCallback}、 {@link FailureCallback} 函数式接口
  111. * 重写方法即可
  112. */
  113. kafkaTemplate.send(topic, valueAsString).addCallback(
  114. new ListenableFutureCallback<SendResult<String, Object>>(){
  115. @Override
  116. public void onSuccess(SendResult<String, Object> success){
  117. int partition = success.getRecordMetadata().partition();
  118. long offset = success.getRecordMetadata().offset();
  119. String topic2 = success.getRecordMetadata().topic();
  120. logger.info("发送消息成功,topic={},partition={},offset={}", topic2, partition, offset);}
  121. @Override
  122. public void onFailure(Throwable failure){
  123. logger.warn("消息发送失败:{},{}", failure.getMessage(), failure);
  124. logger.warn("保存到数据库中,后期再做处理.");}});} catch (JsonProcessingException e){
  125. e.printStackTrace();}
  126. logger.info("向指定主题发送信息,回调,topic={},message={}", topic, message);return message;}
  127. /**
  128. * 向指定主题(topic)发送消息:http://localhost:8080/kafka/sendMsgTransactional1?topic=car-infos
  129. * 与 springframework 框架的事务整合到一起,此时异常处理完全和平时一样.
  130. *
  131. * @param topic :主题名称,不存在时自动创建,默认1个分区,无副本。主题名称也可以通过配置文件配置,这里直接通过参数传入。
  132. * @param message :待发送的消息,如:{"version":1,"text":"后日凌晨三点执行任务"}
  133. * @return
  134. */
  135. @PostMapping("kafka/sendMsgTransactional1")
  136. @Transactional(rollbackFor = Exception.class)
  137. public Map<String, Object> sendMessageTransactional1(@RequestParam String topic,
  138. @RequestBody Map<String, Object> message){
  139. try {
  140. logger.info("向指定主题发送信息,带事务管理,topic={},message={}", topic, message);
  141. String msg = new ObjectMapper().writeValueAsString(message);
  142. ListenableFuture<SendResult<String, Object>> send = kafkaTemplate.send(topic, msg);if("110".equals(message.get("version").toString())){
  143. TimeUnit.SECONDS.sleep(3);
  144. System.out.println(1 / 0);}} catch (JsonProcessingException e){
  145. e.printStackTrace();} catch (InterruptedException e){
  146. e.printStackTrace();}return message;}
  147. /**
  148. * http://localhost:8080/kafka/sendMsgTransactional2?topic=car-infos
  149. * 生成者发送消息事务管理方式2:使用 executeInTransaction(OperationsCallback<K, V, T> callback)
  150. * executeInTransaction:表示执行本地事务,不参与全局事务(如果存在),即方法内部和外部是分离的,只要内部不
  151. * 发生异常,消息就会发送,与外部无关,即使外部有 @Transactional 注解也不影响消息发送,此时外围有没有 @Transactional 都一样。
  152. *
  153. * @param topic
  154. * @param message
  155. * @return
  156. */
  157. @PostMapping("kafka/sendMsgTransactional2")
  158. public Map<String, Object> sendMessageTransactional2(@RequestParam String topic,
  159. @RequestBody Map<String, Object> message){
  160. try {
  161. logger.info("向指定主题发送信息,带事务管理:topic={},message={}", topic, message);
  162. String msg = new ObjectMapper().writeValueAsString(message);
  163. /**
  164. * executeInTransaction 表示这些操作在本地事务中调用,不参与全局事务(如果存在)
  165. * 所以回调方法内部发生异常时,消息不会发生出去,但是方法外部发生异常不会回滚,即便是外围方法加了 @Transactional 也没用。
  166. */
  167. kafkaTemplate.executeInTransaction(operations ->{
  168. operations.send(topic, msg);if("120".equals(message.get("version").toString())){
  169. System.out.println(1 / 0);}return null;});
  170. //如果在这里发生异常,则只要 executeInTransaction 里面不发生异常,它仍旧会发生消息成功
  171. } catch (JsonProcessingException e){
  172. e.printStackTrace();}return message;}}

贴一份pom文件吧,现在随着依赖的增加,很多时候会出现依赖之间出现问题,而且还很难排错,,有一个idea插件可以安排(maven helper)

pom.xml

  1. <?xml version="1.0"encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.3.RELEASE</version><relativePath/><!-- lookup parent from repository --></parent><groupId>com.wmx</groupId><artifactId>apache-kafka</artifactId><version>0.0.1-SNAPSHOT</version><name>apache-kafka</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- spring 整合的 apache kafka 消息队列依赖--><!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.5.7</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

如果有不明白的联系作者,一起学习


本文转载自: https://blog.csdn.net/qq_41041630/article/details/140328012
版权归原作者 EdwardYange 所有, 如有侵权,请联系我们删除。

“springboot 集成kafka 详细教程,看这一篇就够了”的评论:

还没有评论