0


在Spring Boot 中使用Kafka

在Spring Boot 中使用Kafka :

1. 添加 Kafka 相关依赖

首先,在

  1. pom.xml

文件中添加 Kafka 相关的依赖。通常使用

  1. spring-kafka

提供的依赖来集成 Kafka。

  1. <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>${spring-kafka.version}</version></dependency>

请确保

  1. ${spring-kafka.version}

是你所需的 Spring Kafka 版本。

2. 配置 Kafka 连接信息

  1. application.properties

  1. application.yml

中配置 Kafka 的连接信息:

  1. spring.kafka.bootstrap-servers=your-kafka-server:9092
  2. spring.kafka.consumer.group-id=my-group-id

可以根据你的需要配置更多的 Kafka 相关属性,如序列化器、反序列化器等。

3. 创建 Kafka 生产者

使用 Spring Boot 创建 Kafka 生产者的步骤如下:

3.1. 创建 Kafka 生产者配置
  1. importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.common.serialization.StringSerializer;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.kafka.core.DefaultKafkaProducerFactory;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.kafka.core.ProducerFactory;importorg.springframework.kafka.support.serializer.ErrorHandlingDeserializer;importorg.springframework.kafka.support.serializer.JsonSerializer;importjava.util.HashMap;importjava.util.Map;@ConfigurationpublicclassKafkaProducerConfig{@BeanpublicProducerFactory<String,Object>producerFactory(){Map<String,Object> configProps =newHashMap<>();
  2. configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"your-kafka-server:9092");
  3. configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
  4. configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,JsonSerializer.class);// 如果发送 JSON 对象returnnewDefaultKafkaProducerFactory<>(configProps);}@BeanpublicKafkaTemplate<String,Object>kafkaTemplate(){returnnewKafkaTemplate<>(producerFactory());}}

在上述配置中,

  1. producerFactory

方法设置了 Kafka 生产者的配置,包括序列化器。

  1. kafkaTemplate

方法创建了一个 KafkaTemplate 用于发送消息。

3.2. 发送消息到 Kafka
  1. importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.stereotype.Service;@ServicepublicclassKafkaProducerService{privatestaticfinalStringTOPIC="your-topic-name";@AutowiredprivateKafkaTemplate<String,Object> kafkaTemplate;publicvoidsendMessage(Object message){
  2. kafkaTemplate.send(TOPIC, message);}}

  1. KafkaProducerService

中,通过

  1. KafkaTemplate

发送消息到指定的

  1. TOPIC

4. 创建 Kafka 消费者

使用 Spring Boot 创建 Kafka 消费者的步骤如下:

4.1. 创建 Kafka 消费者配置
  1. importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.common.serialization.StringDeserializer;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;importorg.springframework.kafka.core.ConsumerFactory;importorg.springframework.kafka.core.DefaultKafkaConsumerFactory;importorg.springframework.kafka.listener.ConcurrentMessageListenerContainer;importorg.springframework.kafka.listener.config.ContainerProperties;importorg.springframework.kafka.support.serializer.ErrorHandlingDeserializer;importorg.springframework.kafka.support.serializer.JsonDeserializer;importjava.util.HashMap;importjava.util.Map;@ConfigurationpublicclassKafkaConsumerConfig{@BeanpublicConsumerFactory<String,Object>consumerFactory(){Map<String,Object> props =newHashMap<>();
  2. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"your-kafka-server:9092");
  3. props.put(ConsumerConfig.GROUP_ID_CONFIG,"my-group-id");
  4. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
  5. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);// 如果消费 JSON 对象returnnewDefaultKafkaConsumerFactory<>(props,newStringDeserializer(),newErrorHandlingDeserializer<>(newJsonDeserializer<>(Object.class)));}@BeanpublicConcurrentKafkaListenerContainerFactory<String,Object>kafkaListenerContainerFactory(){ConcurrentKafkaListenerContainerFactory<String,Object> factory =newConcurrentKafkaListenerContainerFactory<>();
  6. factory.setConsumerFactory(consumerFactory());return factory;}}

在上述配置中,

  1. consumerFactory

方法设置了 Kafka 消费者的配置,包括反序列化器。

  1. kafkaListenerContainerFactory

方法创建了一个

  1. ConcurrentKafkaListenerContainerFactory

用于监听 Kafka 消息。

4.2. 创建 Kafka 消费者监听器
  1. importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Service;@ServicepublicclassKafkaConsumerService{@KafkaListener(topics ="your-topic-name", groupId ="my-group-id")publicvoidlisten(Object message){System.out.println("Received Message: "+ message.toString());// 在这里处理接收到的消息逻辑}}

  1. KafkaConsumerService

中,通过

  1. @KafkaListener

注解监听指定的

  1. TOPIC

,并处理接收到的消息。

5. 使用 Kafka

现在,你可以在你的 Spring Boot 应用程序中通过

  1. KafkaProducerService

发送消息,通过

  1. KafkaConsumerService

接收和处理消息了。确保在启动应用程序时 Kafka 已经正常运行,并且配置了正确的连接信息和主题名称。

标签: kafka spring boot java

本文转载自: https://blog.csdn.net/qq_38096989/article/details/140489578
版权归原作者 苟且. 所有, 如有侵权,请联系我们删除。

“在Spring Boot 中使用Kafka”的评论:

还没有评论