Spring Boot是一个用于构建独立的、生产级的Java应用程序的框架,而Kafka是一种高吞吐量的分布式发布订阅消息系统。在本文中,我们将详细解释如何在Spring Boot项目中集成Kafka。
1. 添加依赖
首先,我们需要在项目的pom.xml文件中添加Spring Boot和Kafka的依赖。
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
2. 配置Kafka
接下来,我们需要在application.properties文件中配置Kafka的相关参数。
# Kafka服务器地址
spring.kafka.bootstrap-servers=localhost:9092
# 消费者组ID
spring.kafka.consumer.group-id=myGroup
# 消费者自动提交偏移量
spring.kafka.consumer.enable-auto-commit=true
3. 创建Kafka生产者
现在我们可以创建一个Kafka生产者来发送消息。首先,我们需要注入KafkaTemplate。
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
然后,我们可以创建一个方法来发送消息。
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
4. 创建Kafka消费者
同样,我们也可以创建一个Kafka消费者来接收消息。首先,我们需要注入ConsumerFactory和ConsumerConfig。
@Autowired
private ConsumerFactory<String, String> consumerFactory;
@Autowired
private ConsumerConfig<String, String> consumerConfig;
然后,我们可以创建一个方法来接收消息。
public void consumeMessages() {
this.consumer = consumerFactory.createConsumer(consumerConfig);
this.consumer.subscribe(Arrays.asList("myTopic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
5. 测试Kafka集成
最后,我们可以创建一个主方法来测试我们的Kafka集成。首先,我们需要发送一条消息,然后消费这条消息。
@SpringBootApplication
public class KafkaDemoApplication implements CommandLineRunner {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private ConsumerFactory<String, String> consumerFactory;
@Autowired
private ConsumerConfig<String, String> consumerConfig;
private final AtomicInteger count = new AtomicInteger(); //记录每个分区的消息数,用于后续统计各分区的消息总数。
public static void main(String[] args) {
SpringApplication.run(KafkaDemoApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
// 向主题"test"发送消息
sendMessage("test", "hello");
// 启动消费者
consumeMessages();
}
/**
* 发送消息到Kafka
*/
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
count.incrementAndGet();
}
/**
* 从Kafka消费消息
*/
public void consumeMessages() {
this.consumer = consumerFactory.createConsumer(consumerConfig);
this.consumer.subscribe(Arrays.asList("test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
这是Spring Boot集成Kafka栗子,在开发中根据实际情况调整。
版权归原作者 hope笔记 所有, 如有侵权,请联系我们删除。