废话不多说,直接上代码
为啥这样说,现在大家都想先看大妈效果,再去看逻辑
先看整体架构
先贴yml吧,这个毕竟是项目一创建就需要的
spring:
application:
admin: apache-kafka
kafka:
bootstrap-servers: 这里是你自己的kafka地址 # kafka 服务器集群地址,默认为 localhost:9092
template:
default-topic: demo #将消息发送到的默认主题,KafkaTemplate.sendDefault
listener:
type: batch #监听器类型,可选值有:SINGLE(单条消费,默认)、BATCH(批量消息)# kafka 生产者配置
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer #生产者 key 序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer #生产者 value 序列化方式
batch-size: 16KB #默认批处理大小,如果值太小,则可能降低吞吐量,为零将完全禁用批处理,当 linger.ms=0 时,此值无效
buffer-memory: 32MB #生产者可以用来缓冲等待发送到服务器的记录的总内存大小
retries: 3#发送失败时的重试次数,当大于零时,允许重试失败的发送。# 在考虑请求完成之前,生产者要求领导者已收到的确认数,可选值有:-1、0、1(默认为1)# 使用事务时,必须配置为 -1,表示领导者必须收到所有副本的确认消息。
acks: -1
properties:
#消息提交延时时间(单位毫秒),当生产者接收到消息 linger.ms 秒钟后,就会将消息提交给 kafka。#当生产端积累的消息达到 batch-size 大小后,也会将消息提交给 kafka。#linger.ms 默认为 0 ,表示每接收到一条消息就会立即提交给 kafka,此时 batch-size 无效。如果对实时性要求高,则建议设置为 0
linger.ms: 0
partitioner:
class: com.wmx.apachekafka.beans.MyKafkaPartitioner #kafka 自定义分区规则
transaction-id-prefix: tx_kafka.
# kafka 消费者配置
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #消费者 key 反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #消费者 value 反序列化方式
group-id: test-consumer-group #标识此消费者所属的消费者组的唯一字符串,这里只要你是默认安装,那就是这个,不用修改#消费者客户端 id,在消费者组需要唯一指定。发出请求时会传递给服务器,用于服务器端日志记录#不写时,会自动命名,比如:consumer-1、consumer-2...,原子性递增。通常不建议自定义,使用默认值即可,因为容易冲突#client-id: wangmx1
enable-auto-commit: true#消费者的偏移量是否在后台自动提交,默认为 true
auto-commit-interval: 5000#如果enable.auto.commit=true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为 5000# 当 Kafka 中没有初始偏移量或者服务器上不再存在当前偏移量时该怎么办,可选的值有 latest、earliest、exception、none,默认值为 latest# latest:重置为分区中最新的 offset(消费分区中新产生的数据)、earliest:重置为分区中最小的 offset
auto-offset-reset: latest
properties:
session.timeout.ms: 180000#消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发 rebalance(重新平衡) 操作)
request.timeout.ms: 120000#消费请求超时时间
max-poll-records: 5#一次调用poll()时返回的最大记录数,即批量消费每次最多消费多少条消息,注意是最多,并不是必须满足数量后才消费.
自定义分区MyKafkaPartitioner:
package com.zy.apachekafka.beans;import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;import java.util.Map;
/**
* kafka 自定义分区规则,一旦自定义了分区规则,就不会再走 kafka 默认的分区规则
*
* @author zy
*/
public class MyKafkaPartitioner implements Partitioner {
/**
* 计算给定记录的分区,发送消息到 kafka 服务器之前,都会先走这里进行计算目标分区,即将消息发送到具体的哪个分区
*
* @param topic :主题名称
* @param key :要分区的键(如果没有键,则为null)
* @param keyBytes :要分区的序列化键(如果没有键,则为null)
* @param value :要分区的值或null,健可以有可无,值才是真正的消息内容
* @param valueBytes :要分区的序列化值或null
* @param cluster :当前集群信息
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster){
// 返回的整数值就是表示生产者将消息发送到的分区
// 具体的规则可以根据自身需要设置
System.out.println("发送消息:" + value);
System.out.println("指定分区为:" + 0);return0;}
/**
* 在分区程序关闭时调用
*/
@Override
public void close(){}
/**
* 使用给定的键值对配置此类
*
* @param configs
*/
@Override
public void configure(Map<String, ?> configs){}}
消费者定时器ConsumerTimer:
package com.zy.apachekafka.component;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.config.KafkaListenerEndpointRegistry;import org.springframework.kafka.listener.MessageListenerContainer;import org.springframework.scheduling.annotation.EnableAsync;import org.springframework.scheduling.annotation.EnableScheduling;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;import java.time.LocalDateTime;import java.util.Set;
/**
* 消费者定时器——定时开关消费者消费功能
* 1、本类使用 @EnableScheduling 定时任务的方式开关消费者监听器,同理可以自己提供控制层接口,通过 http 的方式来开关。
*
* @author zy
*/
@Component
@EnableScheduling
@EnableAsync
public class ConsumerTimer {
/**
* 1、{@link KafkaListener} 注解标注的方法会被注册在 KafkaListenerEndpointRegistry 中。
* 2、{@link KafkaListenerEndpointRegistry} 在 Spring IOC 容器中已经存在,可以直接取。
*/
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
/**
* 定时启动消费者监听器
* <p>
* MessageListenerContainer getListenerContainer(String id)
* * 1、返回具有指定id的{@link MessageListenerContainer},如果不存在此类容器,则返回 null。
* * 2、这个 id 就是 @KafkaListener 注解的 id 属性值
* Set<String> getListenerContainerIds():获取所有的 KafkaListener 监听器 id
* Collection<MessageListenerContainer> getListenerContainers():获取所有的 KafkaListener 监听器容器
*/
@Scheduled(cron ="0 52 20 * * ? ")
public void startListener(){
Set<String> containerIds = kafkaListenerEndpointRegistry.getListenerContainerIds();
containerIds.stream().forEach(item -> System.out.println("KafkaListener 消费者监听器:" + item));
//boolean isRunning():检查此组件当前是否正在运行
//void start():启动此组件,如果组件已在运行,则不应引发异常,配合 stop 方法使用,
//void resume():如果暂停,在下一次轮询后恢复此容器,配合 pause 方法使用。
kafkaListenerEndpointRegistry.getListenerContainer("basicConsumer").resume();
//kafkaListenerEndpointRegistry.getListenerContainer("basicConsumer").start();
System.out.println(LocalDateTime.now() + " 启动 kafka 消费者监听器:basicConsumer");}
/**
* 定时关闭/暂停消费者监听器
* void pause():在下次轮询之前暂停此容器,配合 resume
* void stop():以同步方式停止此组件/容器,如果组件未运行(尚未启动),则不应引发异常。配合 start 方法重新启动
*/
@Scheduled(cron ="0 50 20 * * ? ")
public void shutDownListener(){
kafkaListenerEndpointRegistry.getListenerContainer("basicConsumer").pause();
//kafkaListenerEndpointRegistry.getListenerContainer("basicConsumer").stop();
System.out.println(LocalDateTime.now() + " 暂停 kafka 消费者监听器:basicConsumer");}}
下面该有消费和生产消息:
消费者 · 接收消息.KafkaConsumer:
package com.zy.apachekafka.controller;import cn.hutool.core.exceptions.ExceptionUtil;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.common.header.Headers;import org.apache.kafka.common.record.TimestampType;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.messaging.handler.annotation.SendTo;import org.springframework.stereotype.Component;import java.util.List;
/**
* Kafka 消费者 · 接收消息.
* 1、topics:监听的主题,可以写死,也可以通过全局配置文件配置取值,如 @KafkaListener(topics ={"${my.kafka.topic-name}"})
* 2、系统中定义了消费者(@KafkaListener)时,启动服务后,如果连不上kafka服务器则会输出大量的警告日志,但是不会报错。
* 不是每个环境都启动了kafka服务,所以当没有配置消费者组id的时候,本类不交由Spring容器初始化,不再监听消息。
*
* @author zy
*/
@Component
@ConditionalOnProperty(prefix ="spring.kafka.consumer", name ="group-id")
public class KafkaConsumer {
private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
/**
* 监听指定主题上的消息,topics 属性是一个字符串数组,可以监听多个主题。
* * id :用于唯一标识此消费者监听器,不同方法上此注解的id必须唯一,不设置时,会自动生成
* * topics:监听的主题,可以写死,也可以通过全局配置文件配置取值,如 @KafkaListener(topics ={"${my.kafka.topic-name}"})
*
* @param record :消息记录对象,包含消息正文、主题名、分区号、偏移量、时间戳等等
*/
@KafkaListener(id ="basicConsumer", topics ={"car-infos", "basic-info", "helloWorld", "bgt.basic.agency.frame.topic"})
public void messageListener1(ConsumerRecord<?, ?> record){
/**
* headers:消息头信息
* offset:此记录在相应的 Kafka 分区中的位置。
* partition:记录所在的分区
* serializedKeySize:序列化的未压缩密钥的大小(以字节为单位),如果 key为 null,则返回的大小为 -1
* serializedValueSize:序列化的未压缩值(消息正文)的大小(以字节为单位,record.value().getBytes().length)。如果值为 null,则返回的大小为 -1
* timestamp:记录的时间戳
* TimestampType:记录的时间戳类型
* topic:接收此记录的主题
* value:消息内容
*/
Headers headers = record.headers();
long offset = record.offset();
int partition = record.partition();
int serializedKeySize = record.serializedKeySize();
int serializedValueSize = record.serializedValueSize();
long timestamp = record.timestamp();
TimestampType timestampType = record.timestampType();
String topic = record.topic();
Object value = record.value();
System.out.println("收到消息:");
System.out.println("\theaders=" + headers);
System.out.println("\toffset=" + offset);
System.out.println("\tpartition=" + partition);
System.out.println("\tserializedKeySize=" + serializedKeySize);
System.out.println("\tserializedValueSize=" + serializedValueSize);
System.out.println("\ttimestamp=" + timestamp);
System.out.println("\ttimestampType=" + timestampType);
System.out.println("\ttopic=" + topic);
System.out.println("\tvalue=" + value);}
/**
* 批量消费时,必须使用 List 接收,否则会抛异常。
* 即如果配置文件配置的是批量消费(spring.kafka.listener.type=batch),则监听时必须使用 list 接收
* 反之如果配置是单条消息消费,则不能使用 list 接收,否则也会异常.
*
* @param records
*/
@KafkaListener(topics ="batch-msg")
public void messageListener2(List<ConsumerRecord<?, ?>> records){
System.out.println(">>>批量消费返回条数,records.size()=" + records.size());
int count =0;for(ConsumerRecord<?, ?> record : records){
System.out.println("\t消息" + (++count) + ":" + record.value());}}
/**
* 消费消息并转换。SendTo 可以标注在类上,此时对类中的所有方法有效,方法的返回值表示转发的消息内容。
*
* @param record
* @return
*/
@KafkaListener(topics ={"sendTo"})
@SendTo("car-infos")
public String messageListener3(ConsumerRecord<?, ?> record){
System.out.println("消费单条消费并转发:" + record.value() + "," + record.timestamp());return record.value().toString();}
/**
* 单位一体化编码与名称更正消息监听
* 约定更正接口返回结果监听的主题为:basic.kafka.syncAgencyStatInfo.reply
*
* @param recordList
*/
@KafkaListener(topics ={"${app.kafka.topics.agency:topic3}"})
public void syncAgencyStatInfoMsgListener(List<ConsumerRecord<String, String>> recordList){for(ConsumerRecord<String, String> record : recordList){
log.info("监听单位一体化编码与名称更正消息:{}", record);
try {
System.out.println("消息处理.....");} catch (Exception e){
log.error("单位一体化编码与名称更正消息消费失败:{}", ExceptionUtil.getMessage(e));}}}}
生产者KafkaProducer:
package com.zy.apachekafka.controller;import com.fasterxml.jackson.core.JsonProcessingException;import com.fasterxml.jackson.databind.ObjectMapper;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;import org.springframework.boot.autoconfigure.kafka.KafkaProperties;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.support.SendResult;import org.springframework.transaction.annotation.Transactional;import org.springframework.util.concurrent.FailureCallback;import org.springframework.util.concurrent.ListenableFuture;import org.springframework.util.concurrent.ListenableFutureCallback;import org.springframework.util.concurrent.SuccessCallback;import org.springframework.web.bind.annotation.PostMapping;import org.springframework.web.bind.annotation.RequestBody;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;import java.util.Map;import java.util.concurrent.TimeUnit;
/**
* kafka 生产者 · 发送消息
*
* @author zy
*/
@RestController
public class KafkaProducer {
private static final Logger logger = LoggerFactory.getLogger(KafkaProducer.class);
/**
* {@link KafkaAutoConfiguration} 中会自动根据 {@link KafkaProperties} 配置属性读取配置,
* 然后将 {@link KafkaTemplate} 模板添加到 Spring 容器中,所以这里直接获取使用即可。
*/
@Resource
private KafkaTemplate<String, Object> kafkaTemplate;
/**
* 向指定主题(topic)发送消息:http://localhost:8080/kafka/sendMsg?topic=car-infos
* <p>
* 1、send(String topic, @Nullable V data):向指定主题发送消息,如果 topic 不存在,则自动创建,
* 但是创建的主题默认只有一个分区 - PartitionCount: 1、分区也没有副本 - ReplicationFactor: 1,1表示自身。
* 2、send 方法默认是异步的,主线程会直接继续向后运行,想要获取发送结果是否成功,请添加回调方法 addCallback。
* [WARN ][org.apache.kafka.common.utils.LogContext$KafkaLogger.warn(LogContext.java:241)]:[Producer clientId=producer-1] Connection to node-1 could not be established. Broker may not be available.
* [ERROR][org.springframework.kafka.support.LoggingProducerListener.onError(LoggingProducerListener.java:76)]:Exception thrown when sending a message with key='xxx' and payload='xxx' to topic bgt.basic.agency.frame.topic:
* 3、send().get() 可以同步阻塞主线程直到获取执行结果,或者执行超时抛出异常.
* java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException:
* Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
*
* @param topic :主题名称,不存在时自动创建,默认1个分区,无副本。主题名称也可以通过配置文件配置,这里直接通过参数传入。
* @param message :待发送的消息,如:{"version":1,"text":"后日凌晨三点执行任务"}
* @return
*/
@PostMapping("kafka/sendMsg")
@Transactional(rollbackFor = Exception.class)
public Map<String, Object> sendMessage(@RequestParam String topic, @RequestBody Map<String, Object> message){
logger.info("向指定主题发送信息,topic={},message={}", topic, message);
try {
String valueAsString = new ObjectMapper().writeValueAsString(message);
// 异步
// kafkaTemplate.send(topic, valueAsString);
// 同步:get() 获取执行结果,此时线程将阻塞,等待执行结果
SendResult<String, Object> sendResult = kafkaTemplate.send(topic, valueAsString).get();
sendResult.toString();
message.put("sendResult", sendResult.toString());
// org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
} catch (Exception e){
// 异步发送时子线程中的异常是不会进入这里的,只有同步发送时,主线程阻塞,发送是吧,抛出异常时,才会进入这里。
e.printStackTrace();}return message;}
/**
* 向默认主题(default-topic)发送消息:http://localhost:8080/kafka/sendMsgDefault
* 默认主题由 spring.kafka.template.default-topic 选项进行配置
*
* @param message :待发送的消息,如:{"version":2,"text":"后日凌晨三点执行任务,不得有误"}
* @return
*/
@PostMapping("kafka/sendMsgDefault")
@Transactional(rollbackFor = Exception.class)
public Map<String, Object> sendMsgDefault(@RequestBody Map<String, Object> message){
logger.info("向默认主题发送信息,topic={},topic={}", kafkaTemplate.getDefaultTopic(), message);
try {
String valueAsString = new ObjectMapper().writeValueAsString(message);
kafkaTemplate.sendDefault(valueAsString);} catch (JsonProcessingException e){
e.printStackTrace();}return message;}
/**
* 异步回调写法 1
* 发送信息,并添加异步回调方法,用于监控消息发送成功或者失败。发送成功可以记录日志,发送失败则应该有相应的措施,比如延期再发送等。
* http://localhost:8080/kafka/sendMsgCallback?topic=car-infos
* 1、addCallback 方法用于获取 send 发送的结果,成功或者失败,此时 send 方法不再阻塞线程。
*
* @param topic :car-infos
* @param message :{"version":223,"text":"后日凌晨三点执行任务,不得有误"}
* @return
*/
@PostMapping("kafka/sendMsgCallback")
@Transactional(rollbackFor = Exception.class)
public Map<String, Object> sendMessageCallback(@RequestParam String topic,
@RequestBody Map<String, Object> message){
try {
String valueAsString = new ObjectMapper().writeValueAsString(message);
/**
* addCallback:添加成功或者失败的异步回调
* {@link SuccessCallback}:是发送成功回调,函数式接口,其中的方法参数为 {@link SendResult},表示发送结果
* {@link FailureCallback}:是发送失败回调,函数式接口,其中的方法参数为 Throwable,表示异常对象
*/
kafkaTemplate.send(topic, valueAsString).addCallback(success ->{
String topic2 = success.getRecordMetadata().topic();
int partition = success.getRecordMetadata().partition();
long offset = success.getRecordMetadata().offset();
logger.info("发送消息成功,topic={},partition={},offset={}", topic2, partition, offset);}, failure ->{
logger.warn("消息发送失败:{},{}", failure.getMessage(), failure);
logger.warn("保存到数据库中,后期再做处理.");});} catch (JsonProcessingException e){
e.printStackTrace();}
logger.info("向指定主题发送信息,回调,topic={},message={}", topic, message);return message;}
/**
* 异步回调写法 2
* 发送信息,并添加异步回调方法,用于监控消息发送成功或者失败。发送成功可以记录日志,发送失败则应该有相应的措施,比如延期再发送等。
* http://localhost:8080/kafka/sendMsgCallback2?topic=helloWorld
* 1、addCallback 方法用于获取 send 发送的结果,成功或者失败,此时 send 方法不再阻塞线程,主线程会直接运行过去。
*
* @param topic :helloWorld
* @param message :{"version":223,"text":"后日凌晨三点执行任务,不得有误"}
* @return
*/
@PostMapping("kafka/sendMsgCallback2")
@Transactional(rollbackFor = Exception.class)
public Map<String, Object> sendMessageCallback2(@RequestParam String topic,
@RequestBody Map<String, Object> message){
try {
String valueAsString = new ObjectMapper().writeValueAsString(message);
/**
* ListenableFutureCallback 接口继承了 {@link SuccessCallback}、 {@link FailureCallback} 函数式接口
* 重写方法即可
*/
kafkaTemplate.send(topic, valueAsString).addCallback(
new ListenableFutureCallback<SendResult<String, Object>>(){
@Override
public void onSuccess(SendResult<String, Object> success){
int partition = success.getRecordMetadata().partition();
long offset = success.getRecordMetadata().offset();
String topic2 = success.getRecordMetadata().topic();
logger.info("发送消息成功,topic={},partition={},offset={}", topic2, partition, offset);}
@Override
public void onFailure(Throwable failure){
logger.warn("消息发送失败:{},{}", failure.getMessage(), failure);
logger.warn("保存到数据库中,后期再做处理.");}});} catch (JsonProcessingException e){
e.printStackTrace();}
logger.info("向指定主题发送信息,回调,topic={},message={}", topic, message);return message;}
/**
* 向指定主题(topic)发送消息:http://localhost:8080/kafka/sendMsgTransactional1?topic=car-infos
* 与 springframework 框架的事务整合到一起,此时异常处理完全和平时一样.
*
* @param topic :主题名称,不存在时自动创建,默认1个分区,无副本。主题名称也可以通过配置文件配置,这里直接通过参数传入。
* @param message :待发送的消息,如:{"version":1,"text":"后日凌晨三点执行任务"}
* @return
*/
@PostMapping("kafka/sendMsgTransactional1")
@Transactional(rollbackFor = Exception.class)
public Map<String, Object> sendMessageTransactional1(@RequestParam String topic,
@RequestBody Map<String, Object> message){
try {
logger.info("向指定主题发送信息,带事务管理,topic={},message={}", topic, message);
String msg = new ObjectMapper().writeValueAsString(message);
ListenableFuture<SendResult<String, Object>> send = kafkaTemplate.send(topic, msg);if("110".equals(message.get("version").toString())){
TimeUnit.SECONDS.sleep(3);
System.out.println(1 / 0);}} catch (JsonProcessingException e){
e.printStackTrace();} catch (InterruptedException e){
e.printStackTrace();}return message;}
/**
* http://localhost:8080/kafka/sendMsgTransactional2?topic=car-infos
* 生成者发送消息事务管理方式2:使用 executeInTransaction(OperationsCallback<K, V, T> callback)
* executeInTransaction:表示执行本地事务,不参与全局事务(如果存在),即方法内部和外部是分离的,只要内部不
* 发生异常,消息就会发送,与外部无关,即使外部有 @Transactional 注解也不影响消息发送,此时外围有没有 @Transactional 都一样。
*
* @param topic
* @param message
* @return
*/
@PostMapping("kafka/sendMsgTransactional2")
public Map<String, Object> sendMessageTransactional2(@RequestParam String topic,
@RequestBody Map<String, Object> message){
try {
logger.info("向指定主题发送信息,带事务管理:topic={},message={}", topic, message);
String msg = new ObjectMapper().writeValueAsString(message);
/**
* executeInTransaction 表示这些操作在本地事务中调用,不参与全局事务(如果存在)
* 所以回调方法内部发生异常时,消息不会发生出去,但是方法外部发生异常不会回滚,即便是外围方法加了 @Transactional 也没用。
*/
kafkaTemplate.executeInTransaction(operations ->{
operations.send(topic, msg);if("120".equals(message.get("version").toString())){
System.out.println(1 / 0);}return null;});
//如果在这里发生异常,则只要 executeInTransaction 里面不发生异常,它仍旧会发生消息成功
} catch (JsonProcessingException e){
e.printStackTrace();}return message;}}
贴一份pom文件吧,现在随着依赖的增加,很多时候会出现依赖之间出现问题,而且还很难排错,,有一个idea插件可以安排(maven helper)
pom.xml
<?xml version="1.0"encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.3.RELEASE</version><relativePath/><!-- lookup parent from repository --></parent><groupId>com.wmx</groupId><artifactId>apache-kafka</artifactId><version>0.0.1-SNAPSHOT</version><name>apache-kafka</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- spring 整合的 apache kafka 消息队列依赖--><!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.5.7</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
如果有不明白的联系作者,一起学习
版权归原作者 EdwardYange 所有, 如有侵权,请联系我们删除。