文章目录
JMS消息模型
JMS即
Java Message Service
是Java平台的消息传递标准,用于实现消息中间件的通信。JMS提供了两种消息模型:点对点模型和发布订阅模型。
- 点对点模型:消息发送者发送消息,消息代理将其放入消息队列中,消息接受者从队列中获取消息,消息读取后被移除消息队列。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到它们被消费或超时。虽然可能有多个客户端在队列中侦听消息,但只有一个可以读取到消息,之后消息将不存在,其他消费者将无法读取。也就是说消息队列只有唯一一个发送者和接受者,但是并不能说只有一个接收者。它的特点是每个消息只有一个消费者,即消息一旦被消费,消息就不在消息队列中;发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列;接收者在成功接收消息之后需向队列应答成功;
- 发布订阅模型:发布者将消息发送到主题
Topic
中,多个订阅者订阅这个主题,订阅者不断的去轮询监听消息队列中的消息,那么就会在消息到达的同时接收消息。它的特点是每个消息可以有多个消费者,消费完消息之后消息不会清除;发布者和订阅者之间有时间上的依赖性:针对某个主题的订阅者,它必须创建一个订阅之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态。当然了缓和这种严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样即使订阅者没有运行,它也能接收到发布者的消息;
MQ概览
JMS定义了消息传递的标准接口。它并不是一个消息中间件,而是一个规范,用于与各种实现消息中间件,如
ActiveMQ
、
RabbitMQ
、
RocketMQ
、
Kafka
进行交互。
MQ即
messagequeue
消息队列,是分布式系统的重要组件,主要解决异步消息,应用解耦,流量消峰等问题。从而实现高可用,高性能,可伸缩和最终一致性的架构。使用较多的MQ有,
activeMQ
,
rabbitMQ
,
Kafka
。
- 异步消息处理:可以将一些非核心流程,如日志,短信,邮件等,通过MQ的方式异步去处理。这样做的好处是缩短主流程的响应时间,提升用户体验;
- 应用解耦:商品服务和订单服务之间。用户下单后,订单服务会通知商品服务。不使用MQ的情况是订单服务调用商品服务的接口,这样订单服务和商品服务之间是耦合的;使用MQ,订单服务完成持久化处理,将消息写入MQ消息队列中,返回用户订单下单成功,商品服务来订阅这个下单的消息,采用拉或推的方式获得下单信息,商品服务根据商品下单信息进行商品库存信息修改,这样当下单时商品服务不可用时,也不影响正常下单,这就完成了订单服务和商品服务之间的解耦;
- 流量消峰:秒杀活动流量过大,导致流量暴增,最终可能导致应用挂掉。一般会在应用前端加入消息队列来控制活动人数,假如消息队列超过最大数量,应该直接抛弃用户请求或者跳转到错误页面。秒杀业务根据消息队列中的请求信息在做后续的业务处理。比如在抢购时,可能一下子过来了10万个请求,但MQ只接受前100个用户的请求,超过100个不接收了。这样就成功限制了用户请求;
特性ActiveMQRabbitMQRocketMQKafka单机吞吐量万级,比 RocketMQ、Kafka 低一个数量级同 ActiveMQ10 万级,支撑高吞吐10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景Topic 数量对吞吐量的影响吞吐量基本稳定吞吐量基本稳定Topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 TopicTopic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 Topic 数量不要过多,如果要支撑大规模的 Topic,需要增加更多的机器资源时效性ms 级微秒级,这是 RabbitMQ 的一大特点,延迟最低ms 级延迟在 ms 级以内可用性高,基于主从架构实现高可用同 ActiveMQ非常高,分布式架构非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用消息可靠性有较低的概率丢失数据基本不丢经过参数优化配置,可以做到 0 丢失同 RocketMQ功能支持MQ 领域的功能极其完备基于 Erlang 开发,并发能力很强,性能极好,延时很低MQ 功能较为完善,还是分布式的,扩展性好功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用消息模型点对点和发布订阅点对点和发布订阅点对点和发布/订阅发布/订阅
一般的业务系统要引入MQ,最早大家都用
,但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃,所以大家还是算了吧,我个人不推荐用这个了。ActiveMQ
后来大家开始用
RabbitMQ
,但是确实Erlang语言阻止了大量的Java工程师去深入研究和掌控它,对公司而言,几乎处于不可控的状态,但是确实人家是开源的,比较稳定的支持,活跃度也高。
不过现在确实越来越多的公司会去用
RocketMQ
,确实很不错,毕竟是阿里出品,但社区可能有突然黄掉的风险,目前
RocketMQ
已捐给
Apache
,但
GitHub
上的活跃度其实不算高,对自己公司技术实力有绝对自信的,推荐用
RocketMQ
,否则回去老老实实用
RabbitMQ
吧,人家有活跃的开源社区,绝对不会黄。
所以中小型公司,技术实力较为一般,技术挑战不是特别高,用
RabbitMQ
是不错的选择。大型公司,基础架构研发实力较强,用
RocketMQ
是很好的选择。如果是大数据领域的实时计算、日志采集等场景,用
Kafka
是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。
Kafka
Kafka
是一个分布式的基于发布订阅模式的消息队列,由
Apache
软件基金会开发。它主要用于处理大规模的实时数据流,适用于各种数据管道、流处理和日志收集场景。
基础架构
Broker
:Broker
是Kafka
集群中的服务器节点,一台Kafka
服务器就是一个Broker
。每个Broker
负责存储和管理消息,处理生产者的写入请求和消费者的读取请求。Kafka
集群可以通过增加更多的Brokers
来扩展处理能力和存储容量,一个Broker
可以容纳多个Topic
。Topic
:Topic
是Kafka
中的消息分类标准。消息被发送到特定的主题,消费者从主题中读取消息。每个主题可以分为多个Partition
,用来提高处理能力和扩展性。Partition
:Partition
是Topic
的一个子集,是Kafka
存储消息的实际单位。每个Partition
是一个有序的、不可变的消息序列,存储在磁盘上的日志文件中。每个Partition
有一个领导者(Leader
)和多个副本(Replicas
)。Leader
: 每个Partition
有一个Leader
,负责处理所有读写请求,写入消息到分区。Follower
: 每个Partition
的副本从Leader
复制消息,用于提供高可用性和数据备份。副本可以在Leader
失败时接管。Producer
:Producer
是负责将消息发送到Kafka``Topic
的客户端应用程序。生产者将消息发布到一个或多个Topic
中,可以选择Partition
策略决定消息写入到哪个分区。Consumer
:Consumer
是从Kafka``Topic
中读取消息的客户端应用程序。消费者可以独立工作,也可以作为Consumer Group
的一部分,订阅一个或多个Topic
,从主题的一个或多个Partition
中读取消息。Consumer Group
:Consumer Group
是一组消费者组成的集合,用于共同处理一个或多个主题中的消息。消费者组中的每个消费者负责处理主题的不同Partition
,确保消息的负载均衡和避免重复消费。Zookeeper
:Zookeeper
是一个分布式协调服务,用于Kafka
集群的管理。它维护Kafka
集群的元数据,如Broker
信息、Topic
和Partition
信息,进行分区的领导者选举,监控Kafka
集群的健康状态。
当消费者宕机后,再次启动的时候会继续消费消息,而不是从头消费消息。因为这个特性所以消费者会保存一些消费的进度信息,被称为
offset
,在
Kafka 0.9
之前保存在
Zookeeper
当中,在此之后保存在
Kafka
本地。即最终
Kafka
会将消息保存在本地磁盘中,默认保留 168 个小时,即 7 天。
Kafka
中消息是以
Topic
进行分类的,
Producer
生产消息,
Consumer
消费消息,都是面向
Topic
的。
Topic
是逻辑上的概念,而
Partition
是物理上的概念,每个
Partition
对应于一个
log
文件,该
log
文件中存储的就是
Producer
生产的数据。
Producer
生产的数据会被不断追加到该
log
文件末端,且每条数据都有自己的
offset
。
Consumer
组中的每个
Consumer
,都会实时记录自己消费到了哪个
offset
,以便出错恢复时,从上次的位置继续消费。
发布订阅工作流程
- 生产者发布消息 - 消息创建:生产者创建消息,并指定消息内容及相关元数据。- 选择主题和分区:生产者决定将消息发送到哪个
Topic
。如果未指定分区策略,Kafka
会根据配置选择一个分区。分区策略可能包括按键分配或轮询。- 消息发送:消息被发送到指定的Topic
的分区中。每个分区在本地磁盘上有一个日志文件,生产者将消息追加到日志文件末端。- 消息确认:生产者等待Kafka
确认消息已成功写入,确认可以是来自单个副本或所有副本,取决于acks
配置。 - 消息存储 - 消息存储在分区的日志文件中,每条消息都有一个唯一的
offset
,表示在该分区中的位置。消息存储在磁盘上,Kafka
默认保留消息 168 小时(即 7 天),可以通过配置调整保留时间。 - 消费者订阅消息 - 消费者组:消费者可以独立工作,也可以作为消费者组的一部分。每个消费者组由一个或多个消费者组成,共同处理来自同一
Topic
的消息。- 主题订阅:消费者订阅一个或多个Topic
,Kafka
会将相应Topic
的分区分配给消费者组中的消费者。 - 消息消费 - 拉取消息:消费者从分区中拉取消息,消费者可以定期或按需拉取消息。- 处理消息:消费者处理拉取到的消息。处理完成后,消费者可以选择提交当前的
offset
,记录处理进度。- 提交offset
:消费者将当前的offset
提交到Kafka
,以便在恢复或重新启动时从正确的位置继续消费。offset
提交到Kafka
(在Kafka 0.9
及之后版本)或Zookeeper
(在Kafka 0.8
及之前版本)。 - 消息确认和备份 - 消息确认:消费者可以选择自动或手动提交
offset
。自动提交是Kafka
定期提交消费者的offset
,手动提交是消费者显式提交。- 容错和恢复:如果消费者宕机或失败,Kafka
会根据提交的offset
进行消息恢复,确保消息不会丢失。Kafka
使用Leader
和Follower
模型来提供高可用性和数据备份。
生产者
生产者是
Kafka
中负责将消息发送到
Topic
的客户端应用程序。它将消息发布到指定的
Topic
,
Kafka
中的消息是按
Topic
进行分类的。
生产者文件存储
Kafka
中的消息存储在
Broker
服务器上的磁盘中,每个
Topic
被分为一个或多个
Partition
。每个
Partition
在磁盘上对应一个日志文件,生产者发布的消息被追加到该日志文件的末尾。由于生产者生产的消息会不断追加到
log
文件末尾,为防止
log
文件过大,导致数据定位效率低下,
Kafka
采取了分片和索引机制,将每个
Partition
分为多个
Segment
。每个
Segment
对应两个文件,“.index”文件和“.log”文件。这些文件位于一个文件夹下,该文件夹的命名规则为,
Topic
名称+分区序号。例如,
first
这个Topic有三个分区,则其对应的文件夹为
first-0
,
first-1
,
first-2
。其中,每个
Segment
中的日志数据文件大小均相等。“.index”文件存储大量的索引信息,“.log”文件存储大量的数据,索引文件中的元数据指向对应数据文件中
message
的物理偏移地址。其中文件的命名是以第一个数据的偏移量来命名的。
该日志数据文件的大小可以通过在Kafka Broker的config/server.properties配置文件的中的“log.segment.bytes”进行设置,默认为1G大小(1073741824字节),在顺序写入消息时如果超出该设定的阈值,将会创建一组新的日志数据和索引文件。
Kafka
如何通过
index
文件快速找到
log
文件中的数据?
根据指定的偏移量,使用二分法查询定位出该偏移量对应的消息所在的分段索引文件和日志数据文件。然后通过二分查找法,继续查找出小于等于指定偏移量的最大偏移量,同时也得出了对应的
position
即实际物理位置。
根据该物理位置在分段的日志数据文件中顺序扫描查找偏移量与指定偏移量相等的消息。由于
index
文件中的每条对应
log
文件中存储内容大小都相同,所以想要找到指定的消息,只需要用
index
文件中的该条的大小加上该条的偏移量即可得出
log
文件中指定消息的位置。
生产者分区策略
分区是
Kafka
中的物理存储单位,每个
Topic
可以有多个
Partition
。消息在
Partition
中按顺序存储,每个消息有一个唯一的
offset
。
分区策略决定了消息如何分配到不同的
Partition
中,直接影响到
Kafka
的负载均衡和性能。合理的分区策略可以提高并发处理能力,优化资源利用。
分区策略类型:
- 轮询:轮询分区策略是
Kafka
的默认策略。生产者将消息按顺序轮询地发送到各个分区,确保负载均衡。优点是实现简单,能够均匀分配负载,适用于消息内容无关的场景。但无法控制消息在分区中的顺序。 - 按键分配:在这种策略中,生产者使用消息的
Key
来决定消息发送到哪个分区。消息的Key
被哈希,确保具有相同Key
的消息总是发送到同一个分区。优点是保证具有相同Key
的消息顺序一致,适用于需要顺序处理的场景。但可能导致某些分区负载过重,从而造成负载不均衡。 - 自定义分区器:允许开发者实现自定义的分区策略。通过实现
Partitioner
接口,生产者可以定义复杂的分区逻辑以满足特定的业务需求。优点是灵活性高,可以根据具体业务需求实现自定义分区规则。但需要开发者实现和维护自定义分区器类。
每种分区策略有其特定的应用场景和优缺点。根据具体的业务需求和系统性能要求,选择合适的分区策略可以优化
Kafka
的负载均衡和消息处理效率。
生产者数据可靠性保证
为保证生产者发送的数据,能可靠的发送到指定的
Topic
,
Topic
的每个分区收到生产者发送的数据后,都需要向生产者发送
ACK
(
acknowledgement
确认收到)。如果生产者收到
ACK
,就会进行下一轮的发送,否则重新发送数据。
Kafka
确保消息在
Leader
副本和
Follower
副本中都得到确认。消息在
Leader
副本写入成功后,
Leader
会将消息复制到其他
Follower
副本。副本确认机制保证了即使某个副本出现故障,数据也不会丢失。
ACK
配置项决定了生产者需要等待多少个副本确认消息已写入才能认为请求成功。可以设置为以下值:
0
:生产者不会等待任何确认,可能导致消息丢失。1
:生产者等待领导者副本的确认,提供基本的可靠性。all
或-1
:生产者等待所有副本的确认,提供最高级别的数据可靠性。
同步方案优点缺点半数以上完成同步,就发送ACK延迟低选举新的Leader时,容忍n台 节点的故障,需要2n+1个副本全部完成同步,才发送ACK选举新的Leader时,容忍n台节点的故障,需要n+1个副本延迟高理解 2n+1:
半数以上完成同步才可以发ACK,如果挂了n台有副本的服务器,那么就需要有另外n台正常发送(这样正常发送的刚好是总数(挂的和没挂的)的一半(n(挂的)+n(正常的)=2n)),因为是半数以上所以2n+1。所以总数2n+1的时候最多只能容忍n台有故障。即如果挂了n台有副本的服务器,那么存在副本的服务器的总和为 2n+1
Kafka
选择了第二种方案,原因是,同样为了容忍n台节点的故障,第一种方案需要
2n+1
个副本,而第二种方案只需要
n+1
个副本,而
Kafka
的每个分区都有大量的数据,第一种方案会造成大量数据的冗余;虽然第二种方案的网络延迟会比较高,但网络延迟对
Kafka
的影响较小。
采用第二种方案之后,设想以下情景,
Leader
收到数据,所有
Follower
都开始同步数据,但有一个
Follower
,因为某种故障,迟迟不能与
Leader
进行同步,那
Leader
就要一直等下去,直到它完成同步,才能发送
ACK
。这个问题怎么解决呢?
Leader
维护了一个动态的
in-sync replica set
即
ISR
。当和
Leader
保持同步的
Follower
集合。当
ISR
中的
Follower
完成数据的同步之后,就会给
Leader
发送
ACK
。如果
Follower
长时间未向
Leader
同步数据,则该
Follower
将被踢出
ISR
,该时间阈值由
replica.lag.time.max.ms
参数设定。
Leader
发生故障之后,就会从
ISR
中选举新的
Leader
。
生产者数据一致性保证
LEO
:Log End offset
每个副本的最后一个offset
;HW
:High Watermark
高水位,指的是消费者能见到的最大的offset
,ISR
队列中最小的LEO
;
Follower
发生故障后会被临时踢出
ISR
,待该
Follower
恢复后,
Follower
会读取本地磁盘记录的上次的
HW
,并将
log
文件高于
HW
的部分截取掉,从
HW
开始向
Leader
进行同步。等该
Follower
的
LEO
大于等于该
Partition
的
HW
,即 Follower 追上
Leader
之后,就可以重新加入
ISR
了。当
Leader
发生故障之后,会从
ISR
中选出一个新的
Leader
,之后,为保证多个副本之间的数据一致性, 其余的
Follower
会先将各自的
log
文件高于
HW
的部分截掉,然后从新的
Leader
同步数据;如果少于
Leader
中的数据则会从
Leader
中进行同步。
生产者发送消息流程
Kafka
的生产者发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程
main
线程和
Sender
线程,以及一个线程共享变量即
RecordAccumulator
。
在生产者发送消息时,
main
线程将消息发送给
RecordAccumulator
,当数据积累到
batch.size
之后,
sender
线程才会不断从
RecordAccumulator
中拉取消息发送到
Kafka
的
Broker
;如果数据迟迟未达到
batch.size
,
sender
线程等待
linger.time
之后就会发送数据。
消费者
Kafka
消费者是用于从
Kafka
主题中读取消息的客户端应用程序。
消费者分区分配策略
一个消费者组中有多个消费者,一个主题有多个分区,所以必然会涉及到分区的分配问题,即确定哪个分区由哪个消费者来消费。消费分配策略:
round-robin
:采用轮询的方式将当前所有的分区依次分配给所有的消费者,这种方法确保每个消费者都能获得一个相对均匀的分区负载;range
:首先会计算每个消费者可以消费的分区个数,然后按照顺序将指定个数范围的分区分配给各个消费者;sticky
:这种分区策略是最新版本中新增的一种策略,将现有的分区尽可能均衡的分配给各个消费者,存在此目的的原因在于round-robin
和range
分配策略实际上都会导致某几个消费者承载过多的分区,从而导致消费压力不均衡;
range
按范围分配,先将所有的分区放到一起然后排序,按照平均分配的方式计算每个消费者会得到多少个分区,如果没有除尽,则会将多出来的分区依次计算到前面几个消费者。比如这里是三个分区和两个消费者,那么每个消费者至少会得到1个分区,而3除以2后还余1,那么就会将多余的部分依次算到前面几个消费者,也就是这里的1会分配给第一个消费者。如果按照
range
分区方式进行分配,其本质上是依次遍历每个
Topic
,然后将这些
Topic
的分区按照其所订阅的消费者数量进行平均的范围分配。这种方式从计算原理上就会导致排序在前面的消费者分配到更多的分区,从而导致各个消费者的压力不均衡。
消费者重复消费
消费者在消费的时候,需要维护一个
offset
,用于记录消费的位置。当提交的
offset
小于当前程序处理的最后一条消息的
offset
,会造成重复消费。就是先消费,后提交
offset
,如果消费成功、提交失败,消费者下次获取的
offset
还是以前的,所以会造成重复消费。
解决重复消费:
- 将接口设计具有幂等性。处理消息时使用唯一标识符来检测是否已经处理过该消息,从而避免重复处理。
Kafka
支持事务机制,允许将消息处理和偏移量提交放在同一个事务中,确保操作的原子性。使用事务可以减少因消费者失败导致的消息重复处理。- 将
offset
保存在数据库中,使当前业务与offset
提交绑定起来,这样可以一定程度避免重复消费问题。
消费者漏消费
消费者在消费时,当提交的
offset
大于当前程序处理的最后一条消息的
offset
,会造成漏消费。就是先提交
offset
,后消费,如果提交成功、消费失败,消费者下次获取的
offset
已经是新的,所以会造成漏消费。
解决漏消费:
- 在处理完消息并确保处理成功后,再提交偏移量。可以使用
commitSync
方法来保证偏移量的同步提交,从而确保只有在消息处理成功后才提交偏移量。try { processMessage(message); consumer.commitSync(Collections.singletonMap(new TopicPartition(topic, partition), new OffsetAndMetadata(lastOffset + 1))); } catch (Exception e) { // 处理异常情况 }
- 使用事务来保证消息处理和偏移量提交的原子性。
Kafka
提供了事务机制来确保消息处理和偏移量提交的可靠性。consumer.beginTransaction(); try { processMessage(message); consumer.commitTransaction(); } catch (Exception e) { consumer.abortTransaction(); }
消费者消息积压
消费者消息积压是指消费者处理消息的速度跟不上生产者发送消息的速度,导致消息在
Kafka
的分区中堆积,从而造成系统的延迟增加和资源消耗增加。如果线上遇到大量消息积压,那就是线上故障了,最可能的原因是消费者出现故障。
一般这个时候,只能临时紧急扩容了。先修复消费者的问题,确保其恢复消费速度,然后将现有消费者都停掉。新建一个
Topic
,
Partition
是原来的 10 倍,临时建立好原先10倍的
queue
数量。然后写一个临时的分发数据的
Consumer
程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的
queue
。接着临时征用10倍的机器来部署
Consumer
,每一批
Consumer
消费一个临时
queue
的数据。这种做法相当于是临时将
queue
资源和
Consumer
资源扩大 10 倍,以正常的 10 倍速度来消费数据。等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的
Consumer
机器来消费消息。
如果没有消费者没有出现问题,而出现了消费积压的情况可以参考以下思路:
- 提高消费并行度;
- 批量方式消费;
- 跳过非重要方式消费;
- 优化消息消费业务处理过程,简化过程;
Kafka事务
Kafka
从
0.11
版本开始引入了事务支持。
Kafka
的事务机制允许生产者以原子性方式写入消息到多个分区和主题中。这意味着生产者可以将一组消息作为一个事务提交,要么全部成功,要么全部失败。这种机制对于保证消息的一致性和防止数据丢失至关重要。
为了管理 Transaction,Kafka引入了一个新的组件 Transaction Coordinator。 Producer 就是通过和 Transaction Coordinator 交互获得 Transaction ID 对应的任务状态。 Transaction Coordinator 还负责将事务所有写入 Kafka 的一个内部 Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。
生产者事务工作流程:
- 初始化事务:生产者需要通过
initTransactions()
方法初始化事务资源,为事务操作做准备。 - 开始事务:通过
beginTransaction()
方法启动一个事务。这个事务会有一个全局唯一的事务ID
,和生产者的PID
绑定。如果生产者重启,能够通过这个事务ID
找到原来的PID
,从而继续事务。 - 发送消息:在事务进行中,可以将消息发送到
Kafka
主题。消息会被标记为事务的一部分,直到事务完成。 - 提交事务:使用
commitTransaction()
方法来提交事务。如果所有消息都成功发送,事务被提交,消息对消费者可见。 - 中止事务:如果在事务期间出现问题,可以调用
abortTransaction()
方法来中止事务。这样未提交的消息不会对消费者可见。
对于消费者而言,事务的保证就会相对较弱,尤其时无法保证提交的信息被精确消费。这是由于消费者可以通过
offset
访问任意信息,而且不同的
Segment
File
生命周期不同,同一事务的消息可能会出现重启后被删除的情况。
消费者事务工作流程:
- 配置隔离级别:设置
isolation.level=read_committed
,让消费者只读取已经提交的消息。 - 消费消息:消费者从
Kafka
读取消息。由于配置了事务隔离级别,只会处理已经提交的消息,未提交的消息不会被读取。
版权归原作者 _whitepure 所有, 如有侵权,请联系我们删除。