0


SpringBoot集成Kafka消息队列(JSON序列化和反序列化对象)

1.说明

上文SpringBoot集成Kafka消息队列介绍了
SpringBoot集成Kafka的方法,
其生产者和消费者的发送和接收的是字符串,
本文介绍使用JSON序列化和反序列化对象的方法,
即生产者发送一个对象的实例,
消费者能够接收到一个对象的实例。

2.引入依赖

在 pom.xml 中引入Spring Kafka版本,
完整pom如下:

  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  2. <modelVersion>4.0.0</modelVersion>
  3. <parent>
  4. <groupId>com.yuwen.spring</groupId>
  5. <artifactId>MessageQueue</artifactId>
  6. <version>0.0.1-SNAPSHOT</version>
  7. </parent>
  8. <artifactId>kafka-json</artifactId>
  9. <description>Spring Boot使用spring-kafka消息队列,使用JSON序列化和反序列化对象</description>
  10. <dependencies>
  11. <dependency>
  12. <groupId>org.springframework.boot</groupId>
  13. <artifactId>spring-boot-starter-web</artifactId>
  14. </dependency>
  15. <dependency>
  16. <groupId>org.springframework.kafka</groupId>
  17. <artifactId>spring-kafka</artifactId>
  18. </dependency>
  19. </dependencies>
  20. </project>

具体的版本号建议通过spring-boot-dependencies管理:

  1. <properties>
  2. <spring-boot.version>2.3.1.RELEASE</spring-boot.version>
  3. </properties>
  4. <dependencyManagement>
  5. <dependencies>
  6. <dependency>
  7. <groupId>org.springframework.boot</groupId>
  8. <artifactId>spring-boot-dependencies</artifactId>
  9. <version>${spring-boot.version}</version>
  10. <type>pom</type>
  11. <scope>import</scope>
  12. </dependency>
  13. </dependencies>
  14. </dependencyManagement>

3.配置

新建applicaion.yml,
新增如下kafka的相关配置,
完整applicaion.yml配置:

  1. server:
  2. port: 8028
  3. spring:
  4. kafka:
  5. # kafka连接接地址
  6. bootstrap-servers: localhost:9092
  7. # 生产者配置
  8. producer:
  9. # 序列化key的类
  10. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  11. # 反序列化value的类
  12. value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  13. # 消费者配置
  14. consumer:
  15. # 反序列化key的类
  16. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  17. # 反序列化value的类
  18. value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
  19. # 消费者所属消息组
  20. group-id: testGroup
  21. # 从头开始消费,配合不同的group id
  22. auto-offset-reset: earliest
  23. # 表示接受反序列化任意的类,也可限定包路径
  24. properties:
  25. spring:
  26. json:
  27. trusted:
  28. packages: '*'

注意配置spring.json.trusted.packages受信任的类所在的路径,
即需要发送和接收的对象类所在的包路径。

4.开发代码

新建KafkaMQApplication.java启动类,
注意要新增

  1. @EnableKafka

注解:

  1. package com.yuwen.spring.kafka;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. import org.springframework.kafka.annotation.EnableKafka;
  5. @SpringBootApplication
  6. @EnableKafka
  7. public class KafkaMQApplication {
  8. public static void main(String[] args) {
  9. SpringApplication.run(KafkaMQApplication.class, args);
  10. }
  11. }

用户信息类

新建UserInfo.java类,
作为在Kafka中传输的对象类:

  1. package com.yuwen.spring.kafka.entity;
  2. public class UserInfo {
  3. private String name;
  4. private int age;
  5. public String getName() {
  6. return name;
  7. }
  8. public void setName(String name) {
  9. this.name = name;
  10. }
  11. public int getAge() {
  12. return age;
  13. }
  14. public void setAge(int age) {
  15. this.age = age;
  16. }
  17. @Override
  18. public String toString() {
  19. return "UserInfo [name=" + name + ", age=" + age + "]";
  20. }
  21. }

生产者发送消息

Spring Kafka 提供KafkaTemplate类发送消息,
在需要的地方注入即可,
新增ProviderService.java生产者服务类:

  1. package com.yuwen.spring.kafka.provider;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.kafka.core.KafkaTemplate;
  4. import org.springframework.stereotype.Service;
  5. import com.yuwen.spring.kafka.entity.UserInfo;
  6. @Service
  7. public class ProviderService {
  8. public static final String TOPIC = "userTopic";
  9. @Autowired
  10. private KafkaTemplate<?, UserInfo> kafkaTemplate;
  11. public void send(UserInfo user) {
  12. // 发送消息
  13. kafkaTemplate.send(TOPIC, user);
  14. System.out.println("Provider= " + user);
  15. }
  16. }

注意指定 topic ,
以及发送的内容是UserInfo类。

消费者接收消息

新增ConsumerService.java类,
注意使用

  1. @KafkaListener

注解,
接收方法的入参是UserInfo类:

  1. package com.yuwen.spring.kafka.consumer;
  2. import org.springframework.kafka.annotation.KafkaListener;
  3. import org.springframework.stereotype.Service;
  4. import com.yuwen.spring.kafka.entity.UserInfo;
  5. import com.yuwen.spring.kafka.provider.ProviderService;
  6. @Service
  7. public class ConsumerService {
  8. @KafkaListener(topics = ProviderService.TOPIC)
  9. public void receive(UserInfo user) {
  10. System.out.println("Consumer= " + user);
  11. }
  12. }

5.自动产生消息

为了测试生产者产生消息,
编写AutoGenerate.java,
自动生成UserInfo类的实例,
作为生产者向kafka发送消息:

  1. package com.yuwen.spring.kafka.provider;
  2. import java.util.concurrent.TimeUnit;
  3. import org.springframework.beans.factory.InitializingBean;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Component;
  6. import com.yuwen.spring.kafka.entity.UserInfo;
  7. @Component
  8. public class AutoGenerate implements InitializingBean {
  9. @Autowired
  10. private ProviderService providerService;
  11. @Override
  12. public void afterPropertiesSet() throws Exception {
  13. Thread t = new Thread(new Runnable() {
  14. @Override
  15. public void run() {
  16. int age = 0;
  17. while (true) {
  18. UserInfo user = new UserInfo();
  19. user.setName("zhangsan");
  20. user.setAge(++age);
  21. providerService.send(user);
  22. try {
  23. TimeUnit.SECONDS.sleep(5);
  24. } catch (InterruptedException e) {
  25. e.printStackTrace();
  26. }
  27. }
  28. }
  29. });
  30. t.start();
  31. }
  32. }

6.运行服务

运行KafkaMQApplication.java启动类,
输出如下日志,
可以看到生产者产生的随机字符串,
能够被消费者正确获取到:

  1. . ____ _ __ _ _
  2. /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
  3. ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
  4. \\/ ___)| |_)| | | | | || (_| | ) ) ) )
  5. ' |____| .__|_| |_|_| |_\__, | / / / /
  6. =========|_|==============|___/=/_/_/_/
  7. :: Spring Boot :: (v2.3.1.RELEASE)
  8. 2022-05-06 09:58:35.090 INFO 11564 --- [ main] c.yuwen.spring.kafka.KafkaMQApplication : Starting KafkaMQApplication on yuwen-asiainfo with PID 11564 (D:\Code\Learn\SpringBoot\spring-boot-demo\MessageQueue\kafka-json\target\classes started by yuwen in D:\Code\Learn\SpringBoot\spring-boot-demo\MessageQueue\kafka-json)
  9. 2022-05-06 09:58:35.092 INFO 11564 --- [ main] c.yuwen.spring.kafka.KafkaMQApplication : No active profile set, falling back to default profiles: default
  10. 2022-05-06 09:58:36.585 INFO 11564 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8029 (http)
  11. 2022-05-06 09:58:36.593 INFO 11564 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
  12. 2022-05-06 09:58:36.593 INFO 11564 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.36]
  13. 2022-05-06 09:58:36.676 INFO 11564 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
  14. 2022-05-06 09:58:36.677 INFO 11564 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1550 ms
  15. 2022-05-06 09:58:36.805 INFO 11564 --- [ Thread-119] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
  16. acks = 1
  17. batch.size = 16384
  18. bootstrap.servers = [10.21.13.14:9092]
  19. buffer.memory = 33554432
  20. client.dns.lookup = default
  21. client.id = producer-1
  22. compression.type = none
  23. connections.max.idle.ms = 540000
  24. delivery.timeout.ms = 120000
  25. enable.idempotence = false
  26. interceptor.classes = []
  27. key.serializer = class org.apache.kafka.common.serialization.StringSerializer
  28. linger.ms = 0
  29. max.block.ms = 60000
  30. max.in.flight.requests.per.connection = 5
  31. max.request.size = 1048576
  32. metadata.max.age.ms = 300000
  33. metadata.max.idle.ms = 300000
  34. metric.reporters = []
  35. metrics.num.samples = 2
  36. metrics.recording.level = INFO
  37. metrics.sample.window.ms = 30000
  38. partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
  39. receive.buffer.bytes = 32768
  40. reconnect.backoff.max.ms = 1000
  41. reconnect.backoff.ms = 50
  42. request.timeout.ms = 30000
  43. retries = 2147483647
  44. retry.backoff.ms = 100
  45. sasl.client.callback.handler.class = null
  46. sasl.jaas.config = null
  47. sasl.kerberos.kinit.cmd = /usr/bin/kinit
  48. sasl.kerberos.min.time.before.relogin = 60000
  49. sasl.kerberos.service.name = null
  50. sasl.kerberos.ticket.renew.jitter = 0.05
  51. sasl.kerberos.ticket.renew.window.factor = 0.8
  52. sasl.login.callback.handler.class = null
  53. sasl.login.class = null
  54. sasl.login.refresh.buffer.seconds = 300
  55. sasl.login.refresh.min.period.seconds = 60
  56. sasl.login.refresh.window.factor = 0.8
  57. sasl.login.refresh.window.jitter = 0.05
  58. sasl.mechanism = GSSAPI
  59. security.protocol = PLAINTEXT
  60. security.providers = null
  61. send.buffer.bytes = 131072
  62. ssl.cipher.suites = null
  63. ssl.enabled.protocols = [TLSv1.2]
  64. ssl.endpoint.identification.algorithm = https
  65. ssl.key.password = null
  66. ssl.keymanager.algorithm = SunX509
  67. ssl.keystore.location = null
  68. ssl.keystore.password = null
  69. ssl.keystore.type = JKS
  70. ssl.protocol = TLSv1.2
  71. ssl.provider = null
  72. ssl.secure.random.implementation = null
  73. ssl.trustmanager.algorithm = PKIX
  74. ssl.truststore.location = null
  75. ssl.truststore.password = null
  76. ssl.truststore.type = JKS
  77. transaction.timeout.ms = 60000
  78. transactional.id = null
  79. value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer
  80. 2022-05-06 09:58:36.877 INFO 11564 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
  81. 2022-05-06 09:58:36.903 INFO 11564 --- [ Thread-119] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.0
  82. 2022-05-06 09:58:36.904 INFO 11564 --- [ Thread-119] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 66563e712b0b9f84
  83. 2022-05-06 09:58:36.904 INFO 11564 --- [ Thread-119] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1651802316901
  84. 2022-05-06 09:58:37.138 INFO 11564 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
  85. allow.auto.create.topics = true
  86. auto.commit.interval.ms = 5000
  87. auto.offset.reset = earliest
  88. bootstrap.servers = [10.21.13.14:9092]
  89. check.crcs = true
  90. client.dns.lookup = default
  91. client.id =
  92. client.rack =
  93. connections.max.idle.ms = 540000
  94. default.api.timeout.ms = 60000
  95. enable.auto.commit = false
  96. exclude.internal.topics = true
  97. fetch.max.bytes = 52428800
  98. fetch.max.wait.ms = 500
  99. fetch.min.bytes = 1
  100. group.id = testGroup
  101. group.instance.id = null
  102. heartbeat.interval.ms = 3000
  103. interceptor.classes = []
  104. internal.leave.group.on.close = true
  105. isolation.level = read_uncommitted
  106. key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
  107. max.partition.fetch.bytes = 1048576
  108. max.poll.interval.ms = 300000
  109. max.poll.records = 500
  110. metadata.max.age.ms = 300000
  111. metric.reporters = []
  112. metrics.num.samples = 2
  113. metrics.recording.level = INFO
  114. metrics.sample.window.ms = 30000
  115. partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
  116. receive.buffer.bytes = 65536
  117. reconnect.backoff.max.ms = 1000
  118. reconnect.backoff.ms = 50
  119. request.timeout.ms = 30000
  120. retry.backoff.ms = 100
  121. sasl.client.callback.handler.class = null
  122. sasl.jaas.config = null
  123. sasl.kerberos.kinit.cmd = /usr/bin/kinit
  124. sasl.kerberos.min.time.before.relogin = 60000
  125. sasl.kerberos.service.name = null
  126. sasl.kerberos.ticket.renew.jitter = 0.05
  127. sasl.kerberos.ticket.renew.window.factor = 0.8
  128. sasl.login.callback.handler.class = null
  129. sasl.login.class = null
  130. sasl.login.refresh.buffer.seconds = 300
  131. sasl.login.refresh.min.period.seconds = 60
  132. sasl.login.refresh.window.factor = 0.8
  133. sasl.login.refresh.window.jitter = 0.05
  134. sasl.mechanism = GSSAPI
  135. security.protocol = PLAINTEXT
  136. security.providers = null
  137. send.buffer.bytes = 131072
  138. session.timeout.ms = 10000
  139. ssl.cipher.suites = null
  140. ssl.enabled.protocols = [TLSv1.2]
  141. ssl.endpoint.identification.algorithm = https
  142. ssl.key.password = null
  143. ssl.keymanager.algorithm = SunX509
  144. ssl.keystore.location = null
  145. ssl.keystore.password = null
  146. ssl.keystore.type = JKS
  147. ssl.protocol = TLSv1.2
  148. ssl.provider = null
  149. ssl.secure.random.implementation = null
  150. ssl.trustmanager.algorithm = PKIX
  151. ssl.truststore.location = null
  152. ssl.truststore.password = null
  153. ssl.truststore.type = JKS
  154. value.deserializer = class org.springframework.kafka.support.serializer.JsonDeserializer
  155. 2022-05-06 09:58:37.175 INFO 11564 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.0
  156. 2022-05-06 09:58:37.175 INFO 11564 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 66563e712b0b9f84
  157. 2022-05-06 09:58:37.175 INFO 11564 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1651802317175
  158. 2022-05-06 09:58:37.177 INFO 11564 --- [ main] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Subscribed to topic(s): userTopic
  159. 2022-05-06 09:58:37.179 INFO 11564 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService
  160. 2022-05-06 09:58:37.197 INFO 11564 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8029 (http) with context path ''
  161. 2022-05-06 09:58:37.207 INFO 11564 --- [ main] c.yuwen.spring.kafka.KafkaMQApplication : Started KafkaMQApplication in 2.453 seconds (JVM running for 2.824)
  162. 2022-05-06 09:58:37.608 INFO 11564 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Cluster ID: zdSPCGGvT8qBnM4LSjz9Hw
  163. 2022-05-06 09:58:37.659 INFO 11564 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Discovered group coordinator 10.21.13.14:9092 (id: 2147483647 rack: null)
  164. 2022-05-06 09:58:37.660 INFO 11564 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] (Re-)joining group
  165. 2022-05-06 09:58:37.844 INFO 11564 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: zdSPCGGvT8qBnM4LSjz9Hw
  166. Provider= UserInfo [name=zhangsan, age=1]
  167. 2022-05-06 09:58:38.141 INFO 11564 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Finished assignment for group at generation 15: {consumer-testGroup-1-511d5998-e82a-4b8c-a338-5bccf28c92a6=Assignment(partitions=[userTopic-0])}
  168. 2022-05-06 09:58:38.270 INFO 11564 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Successfully joined group with generation 15
  169. 2022-05-06 09:58:38.273 INFO 11564 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Adding newly assigned partitions: userTopic-0
  170. 2022-05-06 09:58:38.423 INFO 11564 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-testGroup-1, groupId=testGroup] Setting offset for partition userTopic-0 to the committed offset FetchPosition{offset=20, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[10.21.13.14:9092 (id: 0 rack: null)], epoch=absent}}
  171. 2022-05-06 09:58:38.424 INFO 11564 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : testGroup: partitions assigned: [userTopic-0]
  172. Consumer= UserInfo [name=zhangsan, age=1]
  173. Provider= UserInfo [name=zhangsan, age=2]
  174. Consumer= UserInfo [name=zhangsan, age=2]
  175. Provider= UserInfo [name=zhangsan, age=3]
  176. Consumer= UserInfo [name=zhangsan, age=3]

7.参考文章

Spring Boot Kafka - 序列化和反序列化JSON

标签: kafka spring boot json

本文转载自: https://blog.csdn.net/bugzeroman/article/details/124636654
版权归原作者 木木与呆呆 所有, 如有侵权,请联系我们删除。

“SpringBoot集成Kafka消息队列(JSON序列化和反序列化对象)”的评论:

还没有评论