在Spring Boot 中使用Kafka :
1. 添加 Kafka 相关依赖
首先,在
pom.xml
文件中添加 Kafka 相关的依赖。通常使用
spring-kafka
提供的依赖来集成 Kafka。
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>${spring-kafka.version}</version></dependency>
请确保
${spring-kafka.version}
是你所需的 Spring Kafka 版本。
2. 配置 Kafka 连接信息
在
application.properties
或
application.yml
中配置 Kafka 的连接信息:
spring.kafka.bootstrap-servers=your-kafka-server:9092
spring.kafka.consumer.group-id=my-group-id
可以根据你的需要配置更多的 Kafka 相关属性,如序列化器、反序列化器等。
3. 创建 Kafka 生产者
使用 Spring Boot 创建 Kafka 生产者的步骤如下:
3.1. 创建 Kafka 生产者配置
importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.common.serialization.StringSerializer;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.kafka.core.DefaultKafkaProducerFactory;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.kafka.core.ProducerFactory;importorg.springframework.kafka.support.serializer.ErrorHandlingDeserializer;importorg.springframework.kafka.support.serializer.JsonSerializer;importjava.util.HashMap;importjava.util.Map;@ConfigurationpublicclassKafkaProducerConfig{@BeanpublicProducerFactory<String,Object>producerFactory(){Map<String,Object> configProps =newHashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"your-kafka-server:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,JsonSerializer.class);// 如果发送 JSON 对象returnnewDefaultKafkaProducerFactory<>(configProps);}@BeanpublicKafkaTemplate<String,Object>kafkaTemplate(){returnnewKafkaTemplate<>(producerFactory());}}
在上述配置中,
producerFactory
方法设置了 Kafka 生产者的配置,包括序列化器。
kafkaTemplate
方法创建了一个 KafkaTemplate 用于发送消息。
3.2. 发送消息到 Kafka
importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.stereotype.Service;@ServicepublicclassKafkaProducerService{privatestaticfinalStringTOPIC="your-topic-name";@AutowiredprivateKafkaTemplate<String,Object> kafkaTemplate;publicvoidsendMessage(Object message){
kafkaTemplate.send(TOPIC, message);}}
在
KafkaProducerService
中,通过
KafkaTemplate
发送消息到指定的
TOPIC
。
4. 创建 Kafka 消费者
使用 Spring Boot 创建 Kafka 消费者的步骤如下:
4.1. 创建 Kafka 消费者配置
importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.common.serialization.StringDeserializer;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;importorg.springframework.kafka.core.ConsumerFactory;importorg.springframework.kafka.core.DefaultKafkaConsumerFactory;importorg.springframework.kafka.listener.ConcurrentMessageListenerContainer;importorg.springframework.kafka.listener.config.ContainerProperties;importorg.springframework.kafka.support.serializer.ErrorHandlingDeserializer;importorg.springframework.kafka.support.serializer.JsonDeserializer;importjava.util.HashMap;importjava.util.Map;@ConfigurationpublicclassKafkaConsumerConfig{@BeanpublicConsumerFactory<String,Object>consumerFactory(){Map<String,Object> props =newHashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"your-kafka-server:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG,"my-group-id");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);// 如果消费 JSON 对象returnnewDefaultKafkaConsumerFactory<>(props,newStringDeserializer(),newErrorHandlingDeserializer<>(newJsonDeserializer<>(Object.class)));}@BeanpublicConcurrentKafkaListenerContainerFactory<String,Object>kafkaListenerContainerFactory(){ConcurrentKafkaListenerContainerFactory<String,Object> factory =newConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());return factory;}}
在上述配置中,
consumerFactory
方法设置了 Kafka 消费者的配置,包括反序列化器。
kafkaListenerContainerFactory
方法创建了一个
ConcurrentKafkaListenerContainerFactory
用于监听 Kafka 消息。
4.2. 创建 Kafka 消费者监听器
importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Service;@ServicepublicclassKafkaConsumerService{@KafkaListener(topics ="your-topic-name", groupId ="my-group-id")publicvoidlisten(Object message){System.out.println("Received Message: "+ message.toString());// 在这里处理接收到的消息逻辑}}
在
KafkaConsumerService
中,通过
@KafkaListener
注解监听指定的
TOPIC
,并处理接收到的消息。
5. 使用 Kafka
现在,你可以在你的 Spring Boot 应用程序中通过
KafkaProducerService
发送消息,通过
KafkaConsumerService
接收和处理消息了。确保在启动应用程序时 Kafka 已经正常运行,并且配置了正确的连接信息和主题名称。
版权归原作者 苟且. 所有, 如有侵权,请联系我们删除。