0


Kafka怎么发送JAVA对象并在消费者端解析出JAVA对象--示例

1、在pom.xml中加入依赖

  1. <dependency>
  2. <groupId>org.springframework.cloud</groupId>
  3. <artifactId>spring-cloud-starter-stream-kafka</artifactId>
  4. <version>3.1.6</version>
  5. </dependency>

2、配置application.yml

加入Kafka的配置

  1. spring
  2. kafka:
  3. #Kafka地址,可以是一个,也可以是Kafka集群的地址,多个地址用逗号分隔
  4. bootstrap-servers: 192.168.57.1xx:9093,192.168.57.1xx:9094,192.168.57.1xx:9095
  5. producer:
  6. # 消息确认模式:0=不等待确认,1=等待leader确认,all=所有副本确认
  7. acks: 1
  8. # 发送失败时的重试次数,0表示不重试
  9. retries: 0
  10. # 批量发送时的批次大小(字节)
  11. batch-size: 30720000 # 30MB
  12. # 生产者的内存缓冲区大小(字节)
  13. buffer-memory: 33554432 # 32MB
  14. # Key的序列化器类
  15. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  16. # Value的序列化器类
  17. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  18. consumer:
  19. # 消费者所属的组ID
  20. group-id: test-kafka
  21. # 禁用自动提交offset,改为手动提交
  22. enable-auto-commit: false
  23. # 偏移量重置策略:
  24. # earliest:从最早的记录开始消费
  25. # latest:从最新的记录开始消费
  26. auto-offset-reset: earliest
  27. # Key的反序列化器类
  28. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  29. # Value的反序列化器类
  30. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  31. # 每次poll()调用返回的最大消息条数
  32. max-poll-records: 2
  33. session:
  34. # 消费者会话超时时间,超时未发送心跳将被认为失联(毫秒)
  35. timeout:
  36. ms: 300000 # 5分钟
  37. listener:
  38. # 如果指定的主题不存在,是否让应用启动失败,false表示不会报错
  39. missing-topics-fatal: false
  40. # 消费模式:single=单条消息,batch=批量消费
  41. type: single
  42. # 消费确认模式:
  43. # manual_immediate:手动确认消息,立即提交offset
  44. ack-mode: manual_immediate

这里的生产者value的序列化器用org.apache.kafka.common.serialization.StringSerializer
,消费者value的序列化器用org.apache.kafka.common.serialization.StringDeserializer即可。

(这里不需要自定义序列化器,但在代码需要将JAVA对象转化为JSON字符串发送)

3、config、producer、consumer代码

3.1、User.java

  1. import lombok.AllArgsConstructor;
  2. import lombok.Builder;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. @Data
  6. @Builder
  7. @NoArgsConstructor
  8. @AllArgsConstructor
  9. public class User {
  10. private int id;
  11. private String name;
  12. }

3.2、Task.java

  1. import lombok.AllArgsConstructor;
  2. import lombok.Builder;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. @Data
  6. @Builder
  7. @NoArgsConstructor
  8. @AllArgsConstructor
  9. public
  10. class Task {
  11. private int id;
  12. private String description;
  13. private User assignedUser;
  14. }

模拟嵌套类

3.3、KafkaConfig.java

  1. import org.springframework.context.annotation.Bean;
  2. import org.springframework.context.annotation.Configuration;
  3. import org.springframework.kafka.annotation.EnableKafka;
  4. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
  5. import org.springframework.kafka.core.ConsumerFactory;
  6. @EnableKafka
  7. @Configuration
  8. public class KafkaConfig {
  9. // 单条消费监听器工厂,手动提交offset
  10. @Bean
  11. public ConcurrentKafkaListenerContainerFactory<String, String> singleFactory(
  12. ConsumerFactory<String, String> consumerFactory) {
  13. ConcurrentKafkaListenerContainerFactory<String, String> factory =
  14. new ConcurrentKafkaListenerContainerFactory<>();
  15. factory.setConsumerFactory(consumerFactory);
  16. factory.getContainerProperties().setAckMode(org.springframework.kafka.listener.ContainerProperties.AckMode.MANUAL_IMMEDIATE);
  17. return factory;
  18. }
  19. }

3.4、KafkaProducer.java

  1. import com.fasterxml.jackson.databind.ObjectMapper;
  2. import org.springframework.boot.CommandLineRunner;
  3. import org.springframework.boot.SpringApplication;
  4. import org.springframework.boot.autoconfigure.SpringBootApplication;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.kafka.core.KafkaTemplate;
  7. @SpringBootApplication
  8. public class KafkaProducer {
  9. public static void main(String[] args) {
  10. SpringApplication.run(KafkaProducer.class, args);
  11. }
  12. @Bean
  13. CommandLineRunner commandLineRunner(KafkaTemplate<String, String> kafkaTemplate) {
  14. return args -> {
  15. String topic = "task-topic";
  16. ObjectMapper objectMapper = new ObjectMapper();
  17. for (int i = 1; i <= 5; i++) {
  18. // 定义一个对象实例
  19. User user = User.builder().id(1).name("Alice").build();
  20. Task task = Task.builder().id(101).description("Complete report").assignedUser(user).build();
  21. //JAVA对象转化为JSON字符串
  22. String message = objectMapper.writeValueAsString(task);
  23. kafkaTemplate.send(topic, message);
  24. System.out.println("Sent: " + message);
  25. Thread.sleep(500); // 模拟消息发送间隔
  26. }
  27. };
  28. }
  29. }

序列化:使用 Jackson 的

  1. ObjectMapper

  1. Task

对象转化为 JSON 字符串,方法

  1. writeValueAsString()

将 Java 对象转为 JSON 字符串。

3.5、SingleConsumer.java

  1. import com.fasterxml.jackson.core.JsonProcessingException;
  2. import com.fasterxml.jackson.databind.ObjectMapper;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.springframework.kafka.annotation.KafkaListener;
  5. import org.springframework.kafka.support.Acknowledgment;
  6. import org.springframework.stereotype.Service;
  7. @Service
  8. public class SingleConsumer {
  9. @KafkaListener(topics = "task-topic", groupId = "test-group", containerFactory = "singleFactory", autoStartup = "true")
  10. public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws JsonProcessingException {
  11. String message = record.value();
  12. ObjectMapper objectMapper = new ObjectMapper();
  13. Task task = objectMapper.readValue(message,Task.class);
  14. // 取出
  15. System.out.println("User - Received: " + task.getAssignedUser());
  16. // 手动提交offset
  17. acknowledgment.acknowledge();
  18. }
  19. }

反序列化: 使用

  1. ObjectMapper

将 JSON 字符串

  1. message

转换回

  1. Task

对象,方法

  1. readValue()

可以将 JSON 字符串解析为指定的 Java 对象类型。

4、测试

启动KafkaProducer.java

可以解析出JAVA对象中User

成功!

标签: kafka linq 分布式

本文转载自: https://blog.csdn.net/weixin_48968553/article/details/143889081
版权归原作者 求积分不加C 所有, 如有侵权,请联系我们删除。

“Kafka怎么发送JAVA对象并在消费者端解析出JAVA对象--示例”的评论:

还没有评论