0


掌握Kafka事务,看这篇就够了

先赞后看,南哥助你Java进阶一大半

Kafka事务实际上引入了

原子多分区写入

的概念,

Federico Valeri

播客画了以下流程图,展示了事务在分区级别如何工作。

在这里插入图片描述

我是南哥,一个Java学习与进阶的领路人,相信对你通关面试、拿下Offer进入心心念念的公司有所帮助。

⭐⭐⭐本文收录在《Java学习/进阶/面试指南》:https://github…JavaSouth

1. Kafka事务

1.1 Kafka事务是什么

面试官:Kafka事务你说说看?

Kafka的事务主要应用在以流式处理的应用程序中,流式处理?听起来都觉得很迷糊不知道是什么东西。

Kafka事务支持的流式处理过程一般是这样,A程序从一个A主题消费A消息,对A消息进行处理后,再把结果写入到B主题,后续B程序会对B主题的消息进行消费。也就是

消费 - 处理 - 生产

的过程。

这样的一个过程涉及了两个消息的消费、一个消息的生产,如何保证这整个过程的事务性,让这整个过程要么成功、要么都不成功,这就是Kafka事务要做的事情。

南哥画下流程图,帮助大家理解理解。

在这里插入图片描述

1.2 重复消费问题

面试官:你说的这个过程,不使用事务有什么问题?

流式处理程序的

消费 - 处理 - 生产

过程,如果没有事务的保证,可能会出现多种消息重复消费的问题,这就会产生各种奇奇怪怪的问题了。

特别是在金融、支付行业,整个支付过程涉及了多个流程,例如用户下单 -> 库存校验 -> 订单处理-> 实际扣费 -> 清算结算,这些业务场景采用的便是流式处理程序。涉及资金的业务场景,事务的保障就更重要了!!

我说说两个消息重复消费的场景。

还是举例上文的场景:A程序从一个A主题消费A消息,对A消息进行处理后,再把结果写入到B主题,后续B程序会对B主题的消息进行消费。

(1)程序崩溃造成的重复消费

如果A程序对A消息进行处理后,把结果写入到B主题。但在偏移量提交的时候崩溃了,此时Kafka会认为A消息还没有被消费,而A程序崩溃了Kafka会把该分区分配给新的消费者。

问题就来了,新的消费者会重新消费A消息,等于B主题被写入了两条相同的消息,A消息被消费了两次。

(2)僵尸程序造成的重复消费

如果一个消费者程序认为自己没有死亡,但因为停止向Kafka发送心跳一段时间后,Kafka认为它已经死亡了,这种程序叫做僵尸程序。

A程序从Kafka读取A消息后,它暂时挂起了,失去和Kafka的连接也不能提交偏移量。此时Kafka认为其死亡了,会把A消费分配给新的消费者消费。

但后续A程序恢复后,会继续把A消息写入B主题,仍然造成了A消费被消费了两次。

可能很多人会说,这个流程有重复消费的问题,那处理重复消费的问题不就可以了,不必引入Kafka事务这么复杂。但在金融、支付这么严谨、重要的业务场景,我们要的是整个流程哪怕有一丁点出错,整个处理流程全都要进行回滚。

1.3 Kafka事务不能处理的问题

面试官:Kafka事务有不能处理的问题吗?

当然在整个Kafka事务的过程中,会有某些操作是不能回滚的,Kafka事务并不支持处理,我们来看看。

(1)Kafka事务过程加入外部逻辑

例如A程序消费消息A的过程中,发送了一个通知邮件,那整个外部操作是不可逆的,不在事务的处理范围内。

(2)读取Kafka消息后写入数据库

这其实也可以当成一个外部处理逻辑,数据库的事务并不在Kafka事务的处理范围内。

1.4 SpringBoot使用Kafka事务

面试官:接触过SpringBoot发送Kafka事务消息吗?

在SpringBoot项目我们可以轻松使用Kafka事务,通过以下Kafka事务的支持,我们就可以保证消息的发送和偏移量的提交具有事务性,从而避免上述的重复消费问题。

(1)先引入

spring-kafka

依赖

<dependencies><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>xxx</version></dependency></dependencies>

(2)配置Kafka事务管理器和生产者工厂

importorg.springframework.context.annotation.Configuration;importorg.springframework.context.annotation.Bean;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.kafka.core.ProducerFactory;importorg.springframework.kafka.transaction.KafkaTransactionManager;importorg.springframework.kafka.support.serializer.JsonSerializer;importorg.apache.kafka.common.serialization.StringSerializer;importorg.springframework.kafka.core.DefaultKafkaProducerFactory;importjava.util.HashMap;importjava.util.Map;@ConfigurationpublicclassKafkaProducerConfig{@BeanpublicProducerFactory<String,Object>producerFactory(){Map<String,Object> configProps =newHashMap<>();
        configProps.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        configProps.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        configProps.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,JsonSerializer.class);
        configProps.put(org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
        configProps.put(org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_CONFIG,"tx-");DefaultKafkaProducerFactory<String,Object> factory =newDefaultKafkaProducerFactory<>(configProps);
        factory.setTransactionIdPrefix("tran-");return factory;}@BeanpublicKafkaTransactionManager<String,Object>transactionManager(ProducerFactory<String,Object> producerFactory){returnnewKafkaTransactionManager<>(producerFactory);}@BeanpublicKafkaTemplate<String,Object>kafkaTemplate(ProducerFactory<String,Object> producerFactory){returnnewKafkaTemplate<>(producerFactory);}}

(3)使用

KafkaTemplate

发送事务性消息

importorg.springframework.kafka.annotation.EnableKafka;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.stereotype.Service;importorg.springframework.transaction.annotation.Transactional;@EnableKafka@ServicepublicclassKafkaConsumerService{@AutowiredprivateKafkaTemplate<String,String> kafkaTemplate;@Transactional@KafkaListener(topics ="A")publicvoidprocessMessage(String message){// 处理从主题A接收到的消息String processedMessage ="Processed "+ message;// 将处理后的消息发送到主题B
        kafkaTemplate.send("B", processedMessage);// 提交事务,确保消息发送和偏移量提交一起完成}}

戳这,《JavaSouth》作为一份涵盖Java程序员所需掌握核心知识、面试重点的《Java学习进阶指南》。

在这里插入图片描述

我是南哥,南就南在Get到你的有趣评论➕点赞➕关注。

创作不易,不妨点赞、收藏、关注支持一下,各位的支持就是我创作的最大动力❤️

标签: kafka 分布式 后端

本文转载自: https://blog.csdn.net/hdgaadd/article/details/142145974
版权归原作者 JavaSouth南哥 所有, 如有侵权,请联系我们删除。

“掌握Kafka事务,看这篇就够了”的评论:

还没有评论