0


在Spring Boot 中使用Kafka

在Spring Boot 中使用Kafka :

1. 添加 Kafka 相关依赖

首先,在

pom.xml

文件中添加 Kafka 相关的依赖。通常使用

spring-kafka

提供的依赖来集成 Kafka。

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>${spring-kafka.version}</version></dependency>

请确保

${spring-kafka.version}

是你所需的 Spring Kafka 版本。

2. 配置 Kafka 连接信息

application.properties

application.yml

中配置 Kafka 的连接信息:

spring.kafka.bootstrap-servers=your-kafka-server:9092
spring.kafka.consumer.group-id=my-group-id

可以根据你的需要配置更多的 Kafka 相关属性,如序列化器、反序列化器等。

3. 创建 Kafka 生产者

使用 Spring Boot 创建 Kafka 生产者的步骤如下:

3.1. 创建 Kafka 生产者配置
importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.common.serialization.StringSerializer;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.kafka.core.DefaultKafkaProducerFactory;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.kafka.core.ProducerFactory;importorg.springframework.kafka.support.serializer.ErrorHandlingDeserializer;importorg.springframework.kafka.support.serializer.JsonSerializer;importjava.util.HashMap;importjava.util.Map;@ConfigurationpublicclassKafkaProducerConfig{@BeanpublicProducerFactory<String,Object>producerFactory(){Map<String,Object> configProps =newHashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"your-kafka-server:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,JsonSerializer.class);// 如果发送 JSON 对象returnnewDefaultKafkaProducerFactory<>(configProps);}@BeanpublicKafkaTemplate<String,Object>kafkaTemplate(){returnnewKafkaTemplate<>(producerFactory());}}

在上述配置中,

producerFactory

方法设置了 Kafka 生产者的配置,包括序列化器。

kafkaTemplate

方法创建了一个 KafkaTemplate 用于发送消息。

3.2. 发送消息到 Kafka
importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.stereotype.Service;@ServicepublicclassKafkaProducerService{privatestaticfinalStringTOPIC="your-topic-name";@AutowiredprivateKafkaTemplate<String,Object> kafkaTemplate;publicvoidsendMessage(Object message){
        kafkaTemplate.send(TOPIC, message);}}

KafkaProducerService

中,通过

KafkaTemplate

发送消息到指定的

TOPIC

4. 创建 Kafka 消费者

使用 Spring Boot 创建 Kafka 消费者的步骤如下:

4.1. 创建 Kafka 消费者配置
importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.common.serialization.StringDeserializer;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;importorg.springframework.kafka.core.ConsumerFactory;importorg.springframework.kafka.core.DefaultKafkaConsumerFactory;importorg.springframework.kafka.listener.ConcurrentMessageListenerContainer;importorg.springframework.kafka.listener.config.ContainerProperties;importorg.springframework.kafka.support.serializer.ErrorHandlingDeserializer;importorg.springframework.kafka.support.serializer.JsonDeserializer;importjava.util.HashMap;importjava.util.Map;@ConfigurationpublicclassKafkaConsumerConfig{@BeanpublicConsumerFactory<String,Object>consumerFactory(){Map<String,Object> props =newHashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"your-kafka-server:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"my-group-id");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);// 如果消费 JSON 对象returnnewDefaultKafkaConsumerFactory<>(props,newStringDeserializer(),newErrorHandlingDeserializer<>(newJsonDeserializer<>(Object.class)));}@BeanpublicConcurrentKafkaListenerContainerFactory<String,Object>kafkaListenerContainerFactory(){ConcurrentKafkaListenerContainerFactory<String,Object> factory =newConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());return factory;}}

在上述配置中,

consumerFactory

方法设置了 Kafka 消费者的配置,包括反序列化器。

kafkaListenerContainerFactory

方法创建了一个

ConcurrentKafkaListenerContainerFactory

用于监听 Kafka 消息。

4.2. 创建 Kafka 消费者监听器
importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Service;@ServicepublicclassKafkaConsumerService{@KafkaListener(topics ="your-topic-name", groupId ="my-group-id")publicvoidlisten(Object message){System.out.println("Received Message: "+ message.toString());// 在这里处理接收到的消息逻辑}}

KafkaConsumerService

中,通过

@KafkaListener

注解监听指定的

TOPIC

,并处理接收到的消息。

5. 使用 Kafka

现在,你可以在你的 Spring Boot 应用程序中通过

KafkaProducerService

发送消息,通过

KafkaConsumerService

接收和处理消息了。确保在启动应用程序时 Kafka 已经正常运行,并且配置了正确的连接信息和主题名称。

标签: kafka spring boot java

本文转载自: https://blog.csdn.net/qq_38096989/article/details/140489578
版权归原作者 苟且. 所有, 如有侵权,请联系我们删除。

“在Spring Boot 中使用Kafka”的评论:

还没有评论