1.说明
上文SpringBoot集成Kafka消息队列介绍了
SpringBoot集成Kafka的方法,
其生产者和消费者的发送和接收的是字符串,
本文介绍使用JSON序列化和反序列化对象的方法,
即生产者发送一个对象的实例,
消费者能够接收到一个对象的实例。
2.引入依赖
在 pom.xml 中引入Spring Kafka版本,
完整pom如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.yuwen.spring</groupId>
<artifactId>MessageQueue</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>kafka-json</artifactId>
<description>Spring Boot使用spring-kafka消息队列,使用JSON序列化和反序列化对象</description>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
</project>
具体的版本号建议通过spring-boot-dependencies管理:
<properties>
<spring-boot.version>2.3.1.RELEASE</spring-boot.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
3.配置
新建applicaion.yml,
新增如下kafka的相关配置,
完整applicaion.yml配置:
server:
port: 8028
spring:
kafka:
# kafka连接接地址
bootstrap-servers: localhost:9092
# 生产者配置
producer:
# 序列化key的类
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 反序列化value的类
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
# 消费者配置
consumer:
# 反序列化key的类
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 反序列化value的类
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
# 消费者所属消息组
group-id: testGroup
# 从头开始消费,配合不同的group id
auto-offset-reset: earliest
# 表示接受反序列化任意的类,也可限定包路径
properties:
spring:
json:
trusted:
packages: '*'
注意配置spring.json.trusted.packages受信任的类所在的路径,
即需要发送和接收的对象类所在的包路径。
4.开发代码
新建KafkaMQApplication.java启动类,
注意要新增
@EnableKafka
注解:
package com.yuwen.spring.kafka;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;
@SpringBootApplication
@EnableKafka
public class KafkaMQApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaMQApplication.class, args);
}
}
用户信息类
新建UserInfo.java类,
作为在Kafka中传输的对象类:
package com.yuwen.spring.kafka.entity;
public class UserInfo {
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "UserInfo [name=" + name + ", age=" + age + "]";
}
}
生产者发送消息
Spring Kafka 提供KafkaTemplate类发送消息,
在需要的地方注入即可,
新增ProviderService.java生产者服务类:
package com.yuwen.spring.kafka.provider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import com.yuwen.spring.kafka.entity.UserInfo;
@Service
public class ProviderService {
public static final String TOPIC = "userTopic";
@Autowired
private KafkaTemplate<?, UserInfo> kafkaTemplate;
public void send(UserInfo user) {
// 发送消息
kafkaTemplate.send(TOPIC, user);
System.out.println("Provider= " + user);
}
}
注意指定 topic ,
以及发送的内容是UserInfo类。
消费者接收消息
新增ConsumerService.java类,
注意使用
@KafkaListener
注解,
接收方法的入参是UserInfo类:
package com.yuwen.spring.kafka.consumer;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import com.yuwen.spring.kafka.entity.UserInfo;
import com.yuwen.spring.kafka.provider.ProviderService;
@Service
public class ConsumerService {
@KafkaListener(topics = ProviderService.TOPIC)
public void receive(UserInfo user) {
System.out.println("Consumer= " + user);
}
}
5.自动产生消息
为了测试生产者产生消息,
编写AutoGenerate.java,
自动生成UserInfo类的实例,
作为生产者向kafka发送消息:
package com.yuwen.spring.kafka.provider;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.yuwen.spring.kafka.entity.UserInfo;
@Component
public class AutoGenerate implements InitializingBean {
@Autowired
private ProviderService providerService;
@Override
public void afterPropertiesSet() throws Exception {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
int age = 0;
while (true) {
UserInfo user = new UserInfo();
user.setName("zhangsan");
user.setAge(++age);
providerService.send(user);
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
t.start();
}
}
6.运行服务
运行KafkaMQApplication.java启动类,
输出如下日志,
可以看到生产者产生的随机字符串,
能够被消费者正确获取到:
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.3.1.RELEASE)
2022-05-06 09:58:35.090 INFO 11564 --- [ main] c.yuwen.spring.kafka.KafkaMQApplication : Starting KafkaMQApplication on yuwen-asiainfo with PID 11564 (D:\Code\Learn\SpringBoot\spring-boot-demo\MessageQueue\kafka-json\target\classes started by yuwen in D:\Code\Learn\SpringBoot\spring-boot-demo\MessageQueue\kafka-json)
2022-05-06 09:58:35.092 INFO 11564 --- [ main] c.yuwen.spring.kafka.KafkaMQApplication : No active profile set, falling back to default profiles: default
2022-05-06 09:58:36.585 INFO 11564 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8029 (http)
2022-05-06 09:58:36.593 INFO 11564 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2022-05-06 09:58:36.593 INFO 11564 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.36]
2022-05-06 09:58:36.676 INFO 11564 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2022-05-06 09:58:36.677 INFO 11564 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1550 ms
2022-05-06 09:58:36.805 INFO 11564 --- [ Thread-119] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [10.21.13.14:9092]
buffer.memory = 33554432
client.dns.lookup = default
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer
2022-05-06 09:58:36.877 INFO 11564 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2022-05-06 09:58:36.903 INFO 11564 --- [ Thread-119] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.0
2022-05-06 09:58:36.904 INFO 11564 --- [ Thread-119] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 66563e712b0b9f84
2022-05-06 09:58:36.904 INFO 11564 --- [ Thread-119] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1651802316901
2022-05-06 09:58:37.138 INFO 11564 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [10.21.13.14:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = testGroup
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.springframework.kafka.support.serializer.JsonDeserializer
2022-05-06 09:58:37.175 INFO 11564 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.0
2022-05-06 09:58:37.175 INFO 11564 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 66563e712b0b9f84
2022-05-06 09:58:37.175 INFO 11564 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1651802317175
2022-05-06 09:58:37.177 INFO 11564 --- [ main] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Subscribed to topic(s): userTopic
2022-05-06 09:58:37.179 INFO 11564 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService
2022-05-06 09:58:37.197 INFO 11564 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8029 (http) with context path ''
2022-05-06 09:58:37.207 INFO 11564 --- [ main] c.yuwen.spring.kafka.KafkaMQApplication : Started KafkaMQApplication in 2.453 seconds (JVM running for 2.824)
2022-05-06 09:58:37.608 INFO 11564 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Cluster ID: zdSPCGGvT8qBnM4LSjz9Hw
2022-05-06 09:58:37.659 INFO 11564 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Discovered group coordinator 10.21.13.14:9092 (id: 2147483647 rack: null)
2022-05-06 09:58:37.660 INFO 11564 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] (Re-)joining group
2022-05-06 09:58:37.844 INFO 11564 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: zdSPCGGvT8qBnM4LSjz9Hw
Provider= UserInfo [name=zhangsan, age=1]
2022-05-06 09:58:38.141 INFO 11564 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Finished assignment for group at generation 15: {consumer-testGroup-1-511d5998-e82a-4b8c-a338-5bccf28c92a6=Assignment(partitions=[userTopic-0])}
2022-05-06 09:58:38.270 INFO 11564 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Successfully joined group with generation 15
2022-05-06 09:58:38.273 INFO 11564 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Adding newly assigned partitions: userTopic-0
2022-05-06 09:58:38.423 INFO 11564 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Setting offset for partition userTopic-0 to the committed offset FetchPosition{offset=20, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[10.21.13.14:9092 (id: 0 rack: null)], epoch=absent}}
2022-05-06 09:58:38.424 INFO 11564 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : testGroup: partitions assigned: [userTopic-0]
Consumer= UserInfo [name=zhangsan, age=1]
Provider= UserInfo [name=zhangsan, age=2]
Consumer= UserInfo [name=zhangsan, age=2]
Provider= UserInfo [name=zhangsan, age=3]
Consumer= UserInfo [name=zhangsan, age=3]
7.参考文章
Spring Boot Kafka - 序列化和反序列化JSON
版权归原作者 木木与呆呆 所有, 如有侵权,请联系我们删除。