0


使用Spring Boot集成中间件:Kafka的高级使用案例讲解

使用Spring Boot集成中间件:Kafka的具体使用案例讲解

导言

在实际应用中,Kafka作为一种强大的分布式消息系统,广泛应用于实时数据处理和消息传递。本文将通过一个全面的使用案例,详细介绍如何使用Spring Boot集成Kafka,并展示其在实际场景中的应用。

1. 准备工作

在开始之前,我们需要确保已经完成以下准备工作:

  • 安装并启动Kafka集群
  • 创建Kafka主题(Topic)用于消息的发布与订阅

2. 生产者示例

首先,我们来创建一个简单的生产者,将消息发送到Kafka主题。

@RestControllerpublicclassKafkaProducerController{@AutowiredprivateKafkaTemplate<String,String> kafkaTemplate;@PostMapping("/produce/{message}")publicResponseEntity<String>produceMessage(@PathVariableString message){
        kafkaTemplate.send("my-topic", message);returnResponseEntity.ok("Message sent to Kafka: "+ message);}}

在上述代码中,我们使用了Spring Boot提供的

KafkaTemplate

,通过调用

send

方法将消息发送到名为

my-topic

的Kafka主题。

3. 消费者示例

接下来,我们创建一个简单的消费者,订阅并处理来自Kafka主题的消息。

@ServicepublicclassKafkaConsumerService{@KafkaListener(topics ="my-topic", groupId ="my-group")publicvoidconsumeMessage(String message){System.out.println("Received message from Kafka: "+ message);// 进行消息处理逻辑}}

通过

@KafkaListener

注解,我们指定了要监听的主题为

my-topic

,同时指定了消费者组的ID为

my-group

。当有新消息到达时,

consumeMessage

方法将被触发,进行消息处理逻辑。

4. 配置文件

application.properties

application.yml

中配置Kafka相关属性。

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group

这里我们配置了Kafka的地址和消费者组的ID。

5. 运行和测试

启动Spring Boot应用程序,通过POST请求发送消息:

curl-X POST http://localhost:8080/produce/HelloKafka

在控制台或日志中,可以看到消费者输出了接收到的消息。
##############################################################################################################

一些其他的使用场景

使用Spring Boot集成中间件:Kafka高级使用案例

在这个高级使用案例中,我们将深入展示Spring Boot集成Kafka的一些高级功能,包括多分区、事务、自定义分区策略以及消息过滤。这将使我们更好地适应复杂的业务场景。

1. 配置多分区和自定义分区策略

首先,我们在Kafka配置中设置多分区以提高并发处理能力,并实现自定义分区策略。

@ConfigurationpublicclassKafkaConfig{@BeanpublicNewTopicmyTopic(){returnTopicBuilder.name("my-topic").partitions(5)// 设置为5个分区.replicas(1).build();}@BeanpublicKafkaTemplate<String,String>kafkaTemplate(ProducerFactory<String,String> producerFactory){KafkaTemplate<String,String> template =newKafkaTemplate<>(producerFactory);
        template.setDefaultTopic("my-topic");return template;}@BeanpublicProducerListener<String,String>producerListener(){returnnewMyProducerListener();}}

在上述配置中,我们将主题

my-topic

配置为5个分区,并设置了生产者的默认主题。同时,我们实现了一个自定义的生产者监听器

MyProducerListener

,可以在消息发送前后执行额外的逻辑。

2. 事务支持和幂等性配置

接下来,我们配置生产者启用事务,并设置消费者为幂等性消费。

@ConfigurationpublicclassKafkaConfig{// ... 上述配置 ...@BeanpublicKafkaTransactionManager<String,String>kafkaTransactionManager(ProducerFactory<String,String> producerFactory){returnnewKafkaTransactionManager<>(producerFactory);}@BeanpublicConsumerFactory<String,String>consumerFactory(){Map<String,Object> props =newHashMap<>();// ... 其他配置 ...
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);returnnewDefaultKafkaConsumerFactory<>(props);}@BeanpublicConcurrentKafkaListenerContainerFactory<String,String>kafkaListenerContainerFactory(ConsumerFactory<String,String> consumerFactory,ConcurrentKafkaListenerContainerFactoryConfigurer configurer){ConcurrentKafkaListenerContainerFactory<String,String> factory =newConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, consumerFactory);
        factory.setBatchListener(true);return factory;}}

在上述配置中,我们使用了

KafkaTransactionManager

配置事务管理器,同时设置了消费者的隔离级别为

read_committed

,启用了批量监听。

3. 自定义分区策略

为了更灵活地控制消息的分布,我们可以实现自定义的分区策略。

publicclassCustomPartitionerimplementsPartitioner{@Overridepublicintpartition(String topic,Object key,byte[] keyBytes,Object value,byte[] valueBytes,Cluster cluster){List<PartitionInfo> partitions = cluster.availablePartitionsForTopic(topic);int numPartitions = partitions.size();if(key ==null||!(key instanceofString)){thrownewInvalidRecordException("Invalid key");}String keyValue =(String) key;int hashCode = keyValue.hashCode();returnMath.abs(hashCode % numPartitions);}@Overridepublicvoidclose(){// 关闭资源逻辑}@Overridepublicvoidconfigure(Map<String,?> configs){// 配置初始化逻辑}}

在上述分区器中,我们使用了消息的字符串形式的

key

进行哈希计算,然后取绝对值得到分区数。这使得具有相同

key

的消息始终被分发到同一个分区。

4. 运行和测试

通过上述配置,我们可以启动Spring Boot应用程序,观察多分区、事务支持和自定义分区策略在消息生产和消费中的效果。

curl-X POST http://localhost:8080/produce/HelloKafka

在Kafka消费者日志中,可以看到消息被正确地分配到了指定的分区,并且事务操作生效,确保消息的一致性。

Kafka获取文件流的具体案例讲解


在许多实际应用场景中,我们需要处理文件数据,并将文件流传输到Kafka中进行进一步的处理。下面将通过一个具体的案例来演示如何使用Spring Boot和Kafka实现文件流的生产和消费。

1. 文件流生产者

首先,我们创建一个文件流生产者,读取本地文件并将文件内容发送到Kafka主题。

importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.stereotype.Service;importjava.io.IOException;importjava.nio.file.Files;importjava.nio.file.Path;@ServicepublicclassFileProducerService{privatefinalKafkaTemplate<String,byte[]> kafkaTemplate;@AutowiredpublicFileProducerService(KafkaTemplate<String,byte[]> kafkaTemplate){this.kafkaTemplate = kafkaTemplate;}publicvoidproduceFile(String topic,String filePath){try{byte[] fileBytes =Files.readAllBytes(Path.of(filePath));
            kafkaTemplate.send(topic, fileBytes);}catch(IOException e){// 处理文件读取异常
            e.printStackTrace();}}}

在上述代码中,我们注入了

KafkaTemplate

,通过

Files.readAllBytes

读取文件内容并通过

kafkaTemplate.send

发送到指定的Kafka主题。

2. 文件流消费者

接下来,我们创建一个文件流消费者,监听Kafka主题并将接收到的文件流保存到本地。

importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Service;importjava.io.FileOutputStream;importjava.io.IOException;@ServicepublicclassFileConsumerService{@KafkaListener(topics ="file-topic")publicvoidconsumeFile(byte[] fileBytes){try{// 保存文件到本地String fileName ="received-file.txt";FileOutputStream outputStream =newFileOutputStream(fileName);
            outputStream.write(fileBytes);
            outputStream.close();}catch(IOException e){// 处理文件保存异常
            e.printStackTrace();}}}

通过

@KafkaListener

注解,我们监听名为

file-topic

的Kafka主题,接收文件流并保存到本地文件。

3. 配置文件

application.properties

application.yml

中配置Kafka相关属性。

spring.kafka.bootstrap-servers=localhost:9092

4. 运行和测试

在Spring Boot应用程序中运行文件流生产者和消费者,通过调用生产者的方法,将文件内容发送到Kafka主题:

fileProducerService.produceFile("file-topic","path/to/your/file.txt");

消费者将接收到文件流,并将其保存到本地文件。你可以通过查看消费者的日志或检查保存的文件来验证流程是否正常运行。

结语

通过对kafka的一些常用使用案例代码分析,希望这个能够帮助大家更深入地理解和使用Spring Boot集成Kafka的高级功能。感谢阅读!

标签: java spring boot 后端

本文转载自: https://blog.csdn.net/m0_68856746/article/details/135580936
版权归原作者 KingDol_MIni 所有, 如有侵权,请联系我们删除。

“使用Spring Boot集成中间件:Kafka的高级使用案例讲解”的评论:

还没有评论