zookeeper与kafka
Zookeeper
Zookeeper是一个高性能,分布式的,开源分布式应用协调服务。它提供了简单原始的功能,分布式应用可以基于它实现更高级的服务(如Dubbo基于Zookeeper),比如,配置管理,集群管理,名空间。它被设计为易于编程,使用文件系统目录树作为数据模型。
Zookeeper不适合用于大容量存储。对于大容量存储,完全可以考虑使用数据库或者分布式文件系统等。
Zookeeper集群数量最好为奇数台。
Zookeeper设计目的
- 最终一致性:client不论连接到哪个Server,展示给它都是同一个视图。
- 可靠性:具有简单,健壮,良好的性能,如果消费m被另一台服务器接受,那么它将所有的服务器接受。
- 实时性:Zookeeper保证客户端将在一个时间间隔范围内获得服务器的更新信息,或者服务器失效的信息。但由于网络延时等原因,Zookeeper不能保证两个客户端能同时得到刚更新的数据,如果需要最新数据,应该在读数据之前调用sync()接口。
- 等待无关:慢的或者失效的client不得干预快速的client的请求,使得每个client都能有效的等待。
- 原子性:更新只有成功或者失败,没有中间状态
- 顺序性:包括全局有序和偏序两种:全局有序是指如果在一台服务器上消息a在消息b前发布,则在所有Server上消息a都将消息b前被发布;偏序是指如果一个消息b在消息a后被同一个发送者发布,a必将排在b前面。
Zookeeper做了什么?
- 命名服务(唯一路径)
- 配置管理(集群公用一个配置文件或者信息) (offset)
- 集群管理(leader选举 机器的增加和退出) (Is/brokers/ids)
- 分布式锁(保持独占,控制时序)
- 队列管理(先进先出FIFO)
命名服务
在分布式系统中,通过使用命名服务,客户端应用能够根据指定名字来获取资源或服务的地址,提供者等信息。被命名的实体通常可以是集群中的机器,提供的服务地址,远程对象等等-----这些我们都可以统称他们为名字(Name)。其中较为常见的就是一些分布式服务框架中的服务地址列表。通过调用ZK提供的创建节点的API,能够很容易创建一个全局唯一的path,这个path就可以作为一个名称。
配置管理
程序总是需要配置的,如果程序分散部署在多台机器上,要逐个改变配置就变得困难。现在把这些配置全部放到zookeeper上去,保存在Zookeeper的某个目录节点中,然后所有相关应用程序对这个目录节点进行监听,一旦配置信息发生变化,每个应用程序就会收到Zookeeper的通知,然后从Zookeeper获取新的配置信息应用到系统中就可以了。(offset)
集群管理
所谓集群管理无在乎两点:是否有机器退出和加入,选举master.
对于第一点,所有机器约定在父目录GroupMembers下创建临时目录节点,然后监听父目录节点的子节点变化消息。一旦有机器挂掉,该机器与zookeeper的连接断开,其所创建的临时目录节点被删除,所有其他机器都收到通知。
分布式锁
有了zookeeper的一致性文件系统,锁的问题变得容易。锁服务可以分为两类,一个是保持独占,另一个是控制时序。
对于第一类,我们将zookeeper上的一个znode看作是一把锁,通过createznode的方式来实现。所有客户端都去创建/distribute_lock节点,最终成功创建的那个客户端也即拥有了这把锁。用完删除掉自己创建的distribute_lock节点就释放出锁。
对于第二类,/distribute_lock已经预先存在,所有客户端在它下面创建临时顺序编号目录节点,编号最小的获得锁,用完删除,一次方便。
队列管理
两种类型的队列:
- 同步队列:当一个队列的成员都聚齐时,这个队列才可用,否则一直等待所有成员到达
- 队列按照FIFO方式进行入队和出队操作 第一类,在约定目录下创建临时目录节点,监听节点数目是否是我们要求的数目 第二类,和分布式锁服务中的控制时序场景基本原理一致,入列有编号,出列按编号
Zookeeper中的基本概念
集群角色
leader:
事务请求的唯一调度者和处理者,保证集群中事务处理顺序
集群内部各服务器的调度者
Follower:
处理客户端非事务请求,转发事务请求给leader服务器
参与事务请求proposal的投票
参与leader选举投票
Observer:(可伸缩性)
充当一个观察者的角色,和Follower类似,对非事务请求,都可以进行独立处理。而对于事务请求,则交给leader来处理。和follower的区别是,Observer不参与任何形式的投票。通常用于不影响集群事务处理能力的情况下增加集群非事务处理能力
Leader
- 恢复数据;
- 维持与Learner的心跳,接收Learner请求并判断Learner的请求消息类型
- Learner的消息类型主要有PING消息,REQUEST消息,ACK消息,REVALIDATE消息,根据不同的消息类型,进行不同 的处理。 PING消息是指Learner的心跳信息; REQUEST消息是Follower发送的提议信息,包括写请求及同步请求; ACK消息是Follower的对提议的回复,超过半数的Follower通过,则commit该提议; REVALIDATE消息是用来延长SESSION有效时间
Follower
1.向Leader发送请求(PING消息,REQUEST消息,ACK消息,REVALIDATE消息);
2.接收Leader消息并进行处理;
3.接收Client的请求,如果为写请求,发送给Leader进行投票
4.返回Client结果
Observer
- Zab协议规定:来自Client的所有写请求,都要转发给ZK服务中唯一的Server-Leader,由Leader根据该请求发起一个提议。然后,其他的Server对该提议进行投票。之后,Leader对投票结果进行收集,当投票数量过半时Leader会向所有的Server发送一个通知消息。最后,当Client所连接的Server收到消息时,会把该操作更新到内存中并对Client的写请求做出回应。
- zookeeper扮演了两个职能。它们一方面从客户端接收连接与操作请求,另一方面对操作结果进行投票。 这两个职能在Zookeeper集群扩展的时候彼此制约。
- 当我们希望增加ZK服务中Client数量的时候,那么我们就需要增加Server的数量,来支持这么多的客户端。然而,从Zab协议对写请求的处理过程中我们可以发现,增加服务器的数量,则增加了对协议中投票过程的压力。因为Leader节点必须等待集群中过半Server响应投票,于是节点的增加使得部分计算机运行较慢,从而拖慢整个投票过程的可能性也随之提高,写操作也会随之下降。这正是我们在实际操作中看到的问题—随着Zookeeper集群变大,写操作的吞吐量会下降。
Zookeeper数据结构
Zookeeper采用树形层次结构,树中的每个节点被称为—Znode
Znode,兼具文件和目录两种特点。既像文件一样维护着数据,元信息,ACL,时间戳等数据结构,又像目录一样可以作为路径标识的一部分。
Znode结构
每个Znode由3部分组成:
- stat:此为状态信息,描述该Znode的版本,权限等信息
- data:与该Znode关联的数据
- children:该Znode下的子节点
- cZxid = 0x100000009-------------节点创建时的zxid
- ctime = Mon Sep 18 10:55:36 CST 2017--------节点创建时的时间
- mZxid = 0x100000009-------------节点最新一次更新发生时的zxid
- mtime = Mon Sep 18 10:55:36 CST 2017 ------ 节点最新一次更新发生时的时间
- pZxid = 0x100000009-------------- 子节点最新一次更新发生时的时间戳
- cversion = 0---------------------------其子节点的更新次数
- dataVersion = 0 ---------------------节点数据的更新次数
- aclVersion = 0 -----------------------节点ACL(授权信息)的更新次数
- ephemeralOwner = 0x0 -----------ephemeralOwner值表示与该节点绑定的session id
- dataLength = 7-----------------------节点数据的字节数
- numChildren = 0---------------------子节点个数
节点类型
1.PERSISTENT—持久化目录节点
客户端与zookeeper断开连接后,该节点依旧存在
2.PERSISTENT_SEQUENTIAL-持久化顺序编号目录节点
客户端与zookeeper断开连接后,该节点依旧存在,只是Zookeeper给该节点名称进行顺序编号
3.EPHEMERAL-临时目录节点
客户端与zookeeper断开连接后,该节点被删除
4.EPHEMERAL_SEQUENTIAL-临时顺序编号目录节点
客户端与zookeeper断开连接后,该节点被删除,只是Zookeeper给该节点名称进行顺序编号
ZAB协议
- ZAB协议包括两种基本的模式,分别是崩溃恢复和消息广播
- 当整个服务框架在启动过程中,或是当Leader服务器出现网络中断,崩溃退出与重启等异常情况时,ZAB协议就会进入恢复模式并选举产生新的Leader服务器。当选举产生了新的Leader服务器同时集群中已经有过半的机器与该Leader服务器完成了状态同步之后,ZAB协议就会退出恢复模式。
- 当集群中已经有过半的Follower服务器完成了和Leader服务器的状态同步,那么整个服务框架就可以进入消息广播模式了。当一台同样遵守ZAB协议的服务器启动后加入到集群中时,如果此时集群中已经存在一个Leader服务器在负责进行消息广播,那么新加入的服务器就会自觉地进入数据恢复模式:找到Leader所在的服务器,并与其进行数据同步,然后一起参与消息广播流程中去。
Leader选举(FastLeaderElection)
当leader崩溃或者leader失去大多数的follower,这时候zk进入恢复模式,恢复模式需要重新选举出一个新的leader,让所有的server都恢复到一个正确的状态
- 每个Server启动以后都询问其它的Server它要投票给谁
- 对于其他server的询问,server每次根据自己的状态都回复自己推荐的leader的id和上一次处理事务的zxid(系统启动时每个server都会推荐自己)
- 收到所有Server回复以后,就计算出zxid最大的哪个Server,并将这个Server相关信息设置成下一次要投票的Server,计算这过程中获得票数最多的server为获胜者,如果获胜者的票数超过半数,则改server被选为leader,否则,继续这个过程,直到leader被选举出来 。
Leader选举带来的一些问题
- 老Leader在COMMIT前Crash(已经提交到本地)
- 老Leader在COMMIT后Crash,但有部分Follower接收到了Commit请求
数据同步
Leader服务器会为每一个Follower服务器都准备一个队列,并将那些没有被各Follower服务器同步的事务以Proposal消息的形式逐个发送给Commit消息,以表示该事务已经被提交。等到Follower服务器将所有其尚未同步的事务Proposal都从Leader服务器上同步过来并成功应用到本地数据库中后,Leader服务器就会将Follower服务器加入到真正的可用Follower列表中,并开始之后的其他流程
Zookeeper配置文件
- tickTime=2000:时间单元,ZK中时间都是以此基础进行整数倍配置的
- initLimit=10:同步数据的时间
- syncLimit=5:心跳机制的时间
- dataDir=/tmp/zookeeper:日志和快照文件保存目录
- clientPort=2181:对外服务的端口号
- maxClientCnxns=60:客户端与服务器之间的连接限制(单台)
- autopurge.snapRetainCount=3:清理保留的日志文件数量
- autopurge.snapRetainCount=3:清理保留的日志文件数量
- autopurge.purgeInterval=1:清理频率
Zookeeper常用命令
**bin目录下的脚本**
- zkCleanup:情路Zookeeper历史数据,包括事务日志文件和快照数据文件
- zkCli:Zookeeper的一个简易客户端
- zkEnv:设置Zookeeper的环境变量
- zkServer:Zookeeper服务器的启动,停止和重启脚本
创建节点create[-s][-e]path data acl
- 创建顺序节点:使用create -s /zk-test 123
- 创建临时节点:使用create -e /zk-temp 123
- 创建永久节点:使用create /zk-permanent 123
- 创建永久顺序节点:使用create -s /node_1/node_1_2 12
修改操作
- 命令 set path data [version]
path:节点路径
data:新数据
- set /node_1 newdataOfNode_1
- set /node_1 newdataOfNode_1 2
删除操作
- delete path [version]
- path:要删除的节点的路径
- delete /zk-permanent 123
- 若删除节点存在子节点,那么无法删除该节点,必须先删除子节点,再删除父节点
快照文件
- 快照是ZK的data tree的一份拷贝。每一个server每隔一段时间会序列化data tree的所有数据并写入一个文件
- server金总快照时不需要进行协调,也不用暂停处理请求。因为server在进行快照时还会处理请求,所以当快照完成时,data tree可能会变化。我们称这样的快照是模糊的(fuzzy),因为它们不需要反映出(reflect)在任意给点的时间点data tree确切的状态
- 在任意的时间点上,data tree的数据都不会跟快照一样。这不是问题,因为server会重放(replay)事务。每一个快照文件被打上一个标记(tag),这个标记是快照开始的时候最后一个被commit的事务的时间戳,称之为TS。如果server最后加载快照,它会重放在ts之后的所有事务日志中的事务
Kafka
基本结构和基础概念
Kafka一开始是由LinkedIn公司采用Scala语言开发的一个多分区、多副本且基于ZooKeeper协调的分布式消息系统。Kafka是一个分布式流式处理平台。具有高吞吐、可持久化、可水平扩展、支持流数据处理的特点。
功能
消息系统:系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性、消息顺序型保障、回溯消费。
存储系统:消息持久化到磁盘,多副本机制。即可把Kafka作为长期的数据存储系统来使用。
流式处理平台:有一个完整的流式处理类库。(窗口、连接、变换和聚合)。
Kafka相关名词解释如下:
1.zookeeper:是kafka用来负责集群元数据的管理,控制器的选举等操作的
2.producer:将消息发送到Broker
消息生产者,发送消息的一方,负责创建消息,发布消息到kafka集群的终端或服务,
3.Broker:负责将收到的消息存储到磁盘中
服务代理节点:对于kafka而言,Broker可以简单地看作一个独立的kafka服务节点或kafka服务实例。
大多数情况下 也可以将Broker看作一台kafka服务器,前提是这台服务器上只部署了一个kafka实例。
一个或多个Broker组成一个kafka集群。
4.Consumer:负责从Broker订阅并消费消息
消费者:接收消息的一方,消费者连接到kafka上接收消息,进而进行相应的业务逻辑处理
5.主题和分区
- 主题(Topic):每条发布到kafka集群的消息属于的类别,即kafka是面向topic的 1)kafka中的消息以主题为单位进行归类 2)生产者负责将消息发送到特定的主题,而消费者负责订阅主题进行消费
- 分区(Partition): 1)主题是一个逻辑上的概念,主题可以细分为多个分区,一个分区只属于单个主题 2)同一个主题下的不同分区包含的消息是不同的,分区在存储层面可以看作一个可追加的日志(Log)文件, 消息在被追加到分区日志文件的时候都会分配一个特定的偏移量(offset) 3)offset是消息在分区中的唯一标识,kafka通过它来保证消息在分区内的顺序性, kafka保证的是分区有序而不是主题有序
- 主题和分区的关系: 如下图。主题中有4个分区,消息被顺序追加到每个分区日志文件的尾部。kafka中的分区可以分布在不同的服务器(broker)上,一个主题(topic)可以横跨每个broker,以此来提供比单个broker更强大的性能。 每一条消息被发送到broker之前,会根据分区规则选择存储到哪个具体的分区。如果分区规则设定的合理,所有的消息都可以均匀地分配到不同的分区中。如果一个主题只对应一个文件,那么这个文件所在的机器I/O将会成为这个主题的性能瓶颈,而分区解决了这个问题。在创建主题的时候可以通过指定的参数来设置分区的个数,当然也可以在主题创建完成之后去修改分区的数量,通过增加分区的数量可以实现水平扩展。
6.分区的多副本(Replica)机制
通过增加副本数量可以提升容灾能力
同一分区的不同副本中保存的是相同的消息(在同一时刻,副本之间并非完全一样),副本之间是"一主多从"的关系,其中leader副本负责处理读写请求,foller副本只负责与leader副本的消息同步。副本处于不同的broker中,当leader副本出现故障时,从follwer副本中重新选举新的leader副本对外提供服务。kafka通过多副本机制实现了故障的自动转移,当kafka集群中某个broker失效时仍然能保证服务可用。
如下,kafka集群中有4个broker,某个主题中有3个分区,且副本因子(即副本个数)也为3,如此每个分区便有1个leader副本和2个follower副本。生产者和消费者只有leader副本进行交互,而follower副本只负责消息的同步,很多时候followerr副本中的消息相对leader副本而言会有一定的滞后。
kafka消费端也具备一定的容灾能力。Consumer使用拉(Pull)模式从服务端拉取消息,并且保存消费的具体位置,当消费者宕机后恢复上线时可以根据之前保存的消费位置重新拉取需要的消息进行消费,这样就不会造成消息丢失。
Producer发布消息
写入方式:
producer采用push模式将消息发布到broker,每条消息都被append到partition中,
属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障kafka吞吐率)
分区策略(默认):
1.指定了partition,则直接使用;
2.未指定partition但指定key,通过对key的value进行hash选出一个partition
3.partition和key都未指定,使用轮询选出一个partition
写入流程:
1.Producer从zookeeper的/broker/…/state节点找到该partition的leader
2.Producer将消息发送给leader
3.Leader将消息写入本地log
4.followers从leader fetch消息,写入本地log后leader发送ACK
5.leader收到所有ISR中的replica的ACK后,增加HW(high watermark,最后commit的offset)并向producer发送ACK
producer向leader发送数据的可靠性
可以通过request.required.acks参数来设置数据可靠性的级别:
1(默认):这意味着producer在ISR中的leader已成功收到的数据并得到确认后发送下一条message.如果leader宕机了,则会丢失数据。
0:这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,
但是数据可靠性确是最低的。
-1:producer需要等待ISR中所有follower都确认接收到数据后才算一次发送完成,可靠性最高。
但是这样也不能保证数据不丢失,比如当ISR中只有leader时这样就变成了acks=1的情况。为了避免这种情况出现,
可以设置min.insync.replicas这个参数。
消息的发送模式
Kafka消息的发送模式由producer端的配置参数producer.type来设置,这个参数指定了在后台线程中消息的发送方式是同步的
还是异步的,默认是同步的方式,即producer.type=sync。如果设置成异步的模式,即priducer.type=async。
可以是producer以batch的形式push数据,这样会极大的提高broker的性能,但是这样会增加丢失数据的风险。如果需要确保消息的
可靠性,必须要将producer.type设置为sync。
Broker保存消息
存储方式:物理上把topic分成一个或多个partition
存储策略:无论消息是否被消费,kafka都会保留所有消息。有两种策略可以删除旧数据:
1.基于时间;2.基于大小
存储机制:
kafka中消息是以topic进行分类的,topic在物理层面又能以partition为分组,一个topic可以分成若干个partition。
每个partition为一个目录,partition的名称规则为:topic名称+有序序号
**partition是实际物理上的概念,而topic是逻辑上的概念**
概念:每个partition相当于一个巨型文件被平均分配到多个相等的segment数据文件中(每个segment文件中消息数量不一定相等)
作用:这种特性方便了old segment的删除,即方便已被消费的消息的清理,提高磁盘的利用率。每个partition只需要支持顺序读写就行
组成:segment文件由三部分组成,分别为“.index”文件、“.log”文件和”.timeindex”文件(引入版本,0.10.1.0 )
复制原理和同步方式:
1.多副本:
为了提高消息的可靠性,kafka每个topic的partition有n个副本(replicas),其中n为复制因子,可配置。选举一个副本作为leader,
其他都为follower,leader处理partition的所有读写请求,与此同时,follower会被定期地去复制leader上的数据
2.ISR(In-Sync Replicas):
包括:leader和follower
判断条件:延迟时间replica.lag.time.max.ms
延迟条数replica.lag.max.message
任意一个超过阈值都会把follower剔除出ISR
kafka的ISR的管理最终都会反馈到Zookeeper节点上。
Consumer消费消息
常用的kafka操作
Kafka环境的搭建
安装环境
linux:
Linux localhost.localdomain 3.10.0-862.el7.x86_64 #1 SMP Fri Apr 20 16:44:24 UTC 2018 x86_64 x86_64 x86_64 GNU/Linux
kafka:kafka_2.11-2.3.0.tgz
zookeeper:apache-zookeeper-3.5.5-bin.tar.gz
zookerper安装
- 创建安装目录:mkdir /usr/local/zookeeper
- 解压zookeeper:tar -zxvf apache-zookeeper-3.5.5-bin.tar.gz
- cd /usr/local/zookeeper/apache-zookeeper-3.5.5-bin/configcp zoo_sample.cfg zoo.cfg
- vim zoo.cfg
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/tmp/zookeeper/data
dataLogDir=/tmp/zookeeper/log
# the port at which the clients will connect
clientPort=2181
server.1=206.206.127.191:2888:3888
admin.serverPort=8099
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
You have mail in /var/spool/mail/root
- tickTime:CS通信心跳时间(tickTime=2000)
Zookeeper服务器之间或客户端与服务器之间维持心跳的时间间隔,
也就是每个tickTime时间就会发送一个心跳。tickTime以毫秒为单位
- initLimit:LF初始通信时限(initLimit=10)集群中的follower服务器(F)与leader服务器(L)之间初始化连接时能容忍的最多心跳数(tickTime的数量)
- syncLimit:LF同步通信时限(syncLimit=5)
集群中的follower服务器与leader服务器之间请求和应答之间能容忍的最多心跳数(tickTime的数量)
- dataDir:数据文件目录Zookeeper保存数据的目录,默认情况下,Zookeeper将写数据的日志文件也保存在这个目录里例:dataDir=/tmp/zookeeper/data dataLogDir=/tmp/zookeeper/log
- clientPort:客户端连接端口客户端连接Zookeeper服务器的端口,Zookeeper会监听这个端口,接受客户端的访问请求。例:clientPort=2181
- 服务器名称与地址:集群信息(服务器编号,服务器地址,LF通信端口,选举端口)例:server.N=YYY:A:B server.1=206.206.127.191:2888:38881. 创建tmpmkdir /tmp/zookeeper/datamkfir /tmp/zookeeper/log
- 创建myid文件
1)cd /tmp/zookeeper
2)touch myid
3)vim myid
例:集群环境zoo.cfg中:server.0=127.0.0.1:5555:6666
server.1=127.0.0.1:5556:6667
server.2=127.0.0.1:5557:6668
分别在zk、zk2、zk3、的datadir中新建myid文件, 写入一个数字, 该数字表示这是第几号server. 该数字必须和zoo.cfg文件中的server.x中的x一一对应.
/opt/zk/zk/data/myid文件中写入0
/opt/zk/zk2/data/myid文件中写入1
/opt/zk/zk3/data/myid文件中写入2
- 配置环境变量,1)vim /etc/profile 添加内容: export ZOOKEEPER_INSTALL=/usr/local/zookeeper/apache-zookeeper-3.5.5-bin export PATH= P A T H : PATH: PATH:ZOOKEEPER_INSTALL/bin2)source /etc/profile
- zookeeper操作命令:
启动:./usr/local/zookeeper/apache-zookeeper-3.5.5-bin/bin/zkServer.sh start
查看状态:./usr/local/zookeeper/apache-zookeeper-3.5.5-bin/bin/zkServer.sh status
状态为Starting zookeeper … STARTED表示启动成功
kafka安装
- 创建kafka安装目录mkdir /usr/local/kafka
- 将安装包:kafka_2.11-2.3.0.tgz,放置在/usr/local/kafka目录下解压安装包:tar -zxvf kafka_2.11-2.3.0.tgz
- 配置server.properties./usr/local/kafka/kafka_2.11-2.3.0/config/server.properties
# broker的全局唯一编号,不能重复,只能是数字broker.id=0# 集群中必须写ip地址,不然客户端访问kafka链接不上listeners=PLAINTEXT://206.206.127.191:9092# 处理网络请求的线程数量num.network.threads=3# 用来处理磁盘IO的线程数量num.io.threads=8# 发送套接字的缓冲区大小socket.send.buffer.bytes=102400# 接收套接字的缓冲区大小socket.receive.buffer.bytes=102400# 请求套接字的缓冲区大小socket.request.max.bytes=104857600# kafka运行日志(数据)存放的路径,路径不需要提前创建,kafka自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分割log.dirs=/tmp/kafka/log# topic在当前broker上的分区个数num.partitions=1# 用来恢复和清理data下数据的线程数量num.recovery.threads.per.data.dir=1# 每个topic创建时副本数,默认时1个副本offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1# segment文件保留的最长时间,超时将被删除log.retention.hours=168# 每个segment文件的大小。默认最大1Glog.segment.bytes=1073741824# 检查过期数据的时间,默认5分钟检查一次是否数据过期log.retention.check.interval.ms=300000# 配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理)zookeeper.connect=206.206.127.191:2181(单机)#zookeeper.connect=192.168.3.51:2181,192.168.3.52:2181,192.168.3.53:2181/kafka(集群)# Timeout in ms for connecting to zookeeperzookeeper.connection.timeout.ms=6000group.initial.rebalance.delay.ms=0
- kafka启动命令:sh kafka-server-start.sh -daemon /usr/local/kafka/kafka_2.11-2.3.0/config/server.properties
- 查看kafka状态:jps
- 创建topic:
sh kafka-topics.sh --create --zookeeper 206.206.129.201:2181 --replication-factor 1 --partitions 6 --topic frms_face_info
- 查看topic
sh kafka-topics.sh --list --zookeeper 206.206.129.201:2181
- 删除topic
sh /usr/local/kafka/kafka_2.11-2.3.0/bin/kafka-topics.sh --delete --zookeeper 206.206.129.201 --topic frms_face_info
- 生产者,消费者
./kafka-console-producer.sh --broker-list 192.168.20.91:9092 --topic test
./kafka-console-consumer.sh --bootstrap-server 192.168.20.91:9092 --topic test --from-beginning
JAVA消费kafka
<!--kafka依赖-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.2</version>
</dependency>
生产者:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;··
public class KafkaProducerTest implements Runnable {
private final KafkaProducer<String, String> producer;
private final String topic;
public KafkaProducerTest(String topicName) {
Properties props = new Properties();
//bootstrap.servers:kafka的地址
props.put("bootstrap.servers", "206.206.127.191:9092");
/*acks:消息的确认机制,默认值是0
* acks=0:如果设置为0,生产者不会等待kafka的响应
* acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应
* acks=all:这个配置意味着leader会等待所有的follower同步完成,这个确保消息不会丢失,除非kafka集群中
* 所有机器挂掉。这是最强的可用性保证
* */
props.put("acks", "all");
//retries:配置为大于0的值的话,客户端会在消息发送失败时重新发送
props.put("retries", 0);
//batch.size:当多条消息需要发送到同一个分区时,生产者会尝试合并网络请求。这会提高client和生产者的效率
props.put("batch.size", 16384);
//key.serializer:键序列化,默认org.apache.kafka.common.serialization.StringDeserializer
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//value.serializer:值序列化,默认org.apache.kafka.common.serialization.StringDeserializer。
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
this.producer = new KafkaProducer<String, String>(props);
this.topic = topicName;
}
@Override
public void run() {
int messageNo = 1;
try {
for(;;) {
String messageStr="aaaa";
producer.send(new ProducerRecord<String, String>(topic, "Message", messageStr));
//生产了100条就打印
if(messageNo%100==0){
System.out.println("发送的信息:" + messageStr);
}
//生产1000条就退出
if(messageNo%1000==0){
System.out.println("成功发送了"+messageNo+"条");
break;
}
messageNo++;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
public static void main(String args[]) {
KafkaProducerTest test = new KafkaProducerTest("test");
Thread thread = new Thread(test);
thread.start();
}
}
消费者:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumerTest implements Runnable {
private final KafkaConsumer<String, String> consumer;
private ConsumerRecords<String, String> msgList;
private final String topic;
private static final String GROUPID = "groupA";
public KafkaConsumerTest(String topicName) {
Properties props = new Properties();
//bootstrap.servers:kafka的地址
props.put("bootstrap.servers", "206.206.127.191:9092");
//group.id:组名不同可以重复消费。例如先使用了组名A消费了kafka的1000条数据,但是你还想再次
//进行消费这1000条数据,并且不想重新去产生,那么这里你只需要更改组名就可以重复消费了
props.put("group.id", GROUPID);
//enable.auto.commit:是否自动提交,默认为true
props.put("enable.auto.commit", "true");
//auto.commit.interval.ms:从poll(拉)的回话处理时长
props.put("auto.commit.interval.ms", "1000");
//session.timeout.ms:超时时间
props.put("session.timeout.ms", "30000");
/*auto.offset.reset:消费规则,默认earliest
* earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
* latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
* none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
* */
props.put("auto.offset.reset", "earliest");
//key.deserializer:键序列化,默认org.apache.kafka.common.serialization.StringDeserializer
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//value.deserializer:值序列化,默认org.apache.kafka.common.serialization.StringDeserializer
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer<String, String>(props);
this.topic = topicName;
this.consumer.subscribe(Arrays.asList(topic));
}
@Override
public void run() {
int messageNo = 1;
System.out.println("---------开始消费---------");
try {
for (; ; ) {
msgList = consumer.poll(1000);
if (null != msgList && msgList.count() > 0) {
for (ConsumerRecord<String, String> record : msgList) {
//消费100条就打印 ,但打印的数据不一定是这个规律的
if (messageNo % 100 == 0) {
System.out.println(messageNo + "=======receive: key = " + record.key() + ", value = " + record.value() + " offset===" + record.offset());
}
//当消费了1000条就退出
if (messageNo % 1000 == 0) {
break;
}
messageNo++;
}
} else {
Thread.sleep(1000);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
public static void main(String args[]) {
KafkaConsumerTest test1 = new KafkaConsumerTest("test");
Thread thread1 = new Thread(test1);
thread1.start();
}
}
Queues.drain()
Google Guava库提供的一个方法,用于将一个BlockingQueue中的所有元素一次性转移或者处理。该方法具有以下特点:
1.该方法是线程安全的,可以在多个并发线程之间安全的调用
2.该方法支持批量操作,可以一次性地将BlockingQueue中的所有元素转移到目标集合或进行某种操作
3.该方法可以阻塞等待BlockingQueue中有可用的元素
Queues.drain()方法的签名如下所示:
public static <E> int drain(BlockingQueue<E> q, Collection<? super E> buffer,
int numElements, long timeout, TimeUnit unit)
throws InterruptedException
其中,参数含义如下:
q:需要转移或处理的BlockingQueue对象
buffer:目标集合,用于存储从BlockingQueue中取出的元素
numElements:要取出的元素数量,如果 numElements为0,则取出所有元素
timeout:如果指定了timeout,表示在等待BlockingQueue中有可用元素时的最长等待时间
unit:timeout参数的时间单位
使用示例:
import com.google.common.collect.Queues;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
for (int i = 0; i < 10; i++) {
queue.put("item" + i);
}
List<String> list = new ArrayList<>();
int count = Queues.drain(queue, list, 5, 5000, TimeUnit.MILLISECONDS);
System.out.println("Drained " + count + " elements from queue");
System.out.println("List size: " + list.size());
for (String s : list) {
System.out.println(s);
}
}
}
上述代码中,首先创建了一个长度为10的ArrayBlockingQueue,并往其中添加了10个元素。然后,使用Queues.drain()方法一次性地从队列中取出5个元素,并存储到List集合中,如果在等待过程中超时,则会抛出异常InterruptedException,最后,打印出取出的元素数量和List集合中的元素
总之,Queues.drain()是Google Guava库提供的一个方法,用于将一个BlockingQueue中的所有元素一次性转移或者处理。该方法支持批量操作,可以一次性地将BlockingQueue中的所有元素转移到目标集合或进行某种操作。
注:
使用此方法时,如果队列中的元素数目不足要取的数量,那么这个方法会阻塞等待直到队列中有足够的元素
具体来说,Queues.drain()方法会调用队列的take()方法来取出元素,如果队列中没有足够的元素,take()方法会一直阻塞等待直到队列中有新的元素插入,而drain()方法将会不断地调用take()方法,直到取出了指定数量的元素或者队列已经被关闭。
因此,当队列中的元素数量不足要取的数量时,drain()方法会阻塞等待直到队列中有足够的元素可以取出。当然,在等待的过程中,如果队列被关闭了,drain()方法也会停止并返回当前已经取出的元素数量。
kafka与Zookeeper
kafka和Zookeeper是两个独立的开源项目,它们之间有着密切的联系,在kafka集群中,Zookeeper用于存储集群的元数据和消费者的偏移量信息等,同时还用于协调kafka的分区分配和Leader选举等操作。
Kafka和Zookeeper的版本之间需要保持兼容性,否则可能会导致不兼容或错误。
具体以下有一些常见的kafka和Zookeeper版本之间的配套关系:
- 在kafka0.8x版本中,Zookeeper依然是必需的,并且需要安装Zookeeper3.3x或3.4x版本
- 在kafka0.9.0.0及更高版本中,引入了内置的_consumer_offsets主题来存储消费者的偏移量信息,因此不再强制要求使用Zookeeper存储偏移量信息,但是,仍然需要与Kafka集群中运行的Zookeeper版本保持兼容性,建议使用zookeeper3.4.6或更高版本
- 在kafka2.0.0及更高版本中,Zookeeper的最低版本要求为3.4.9,因为该版本修复了某些安全漏洞
总之,kafka与zookeeper之间需要保持兼容性,这意味应该使用与kafka集群中运行的zookeeper版本相兼容的版本。建议查阅相关文档以了解具体版本之间的兼容性情况,并进行必要的测试和验证。
注:
在kafka集群中,Zookeeper用于存储集群的元数据和消费者的偏移量信息等重要数据
- 元数据是描述kafka集群中各broker,分区,副本以及消费者等组件的信息。包括但不限于:每个主题(topic)的分区数,每个分区(partition)的副本数,每个分区的ISR(in-syncreplicas),Leader副本所在的broker等等。kafka生产者和消费者需要访问这些元数据来进行读写操作,进行负载均衡和故障恢复等
- 消费者的偏移量是指已经被消费者处理的消息的位置信息。为了避免重复读取消息,kafka消费者需要记录下已经消费过的消息的位置信息,这个位置被称为偏移量(offset),消费者定期提交偏移量信息到Zookeeper中,以便在下次启动时从上一次停止的位置继续消费消息。
由于这些数据都是非常重要的,它们需要被可靠的存储和保护。Zookeeper提供了一个分布式协调服务,它可以帮助kafka存储这些数据,并确保它们在整个集群中是一致的和可靠的。因此,Zookeeper在kafka集群中扮演着至关重要的角色,它的稳定性和高可用性直接影响着kafka集群的稳定性和可靠性
Zookeeper协调kafka的分区分配和Leader选举等操作
- Broker注册:每个Kafka broker启动时会在Zookeeper上注册一个节点。这样,Kafka集群中的所有broker可以从同一个地方获取到其他broker的信息,例如其ID,主机名,端口号等。
- 分区分配:当新的主题(topic)被创建,增加或删除分区时,Zookeeper会协调进行分区分配。此时Zookeeper会维护一个/brokers/topics节点。其中包含了所有主题,分区和副本的信息。当有新的Broker加入集群后,Zookeeper会通知所有Brokers进行重新分配,确保所有的分区和副本得到了合理的分配。
- Leader选举:当某个分区的Leader副本失效后,Zookeeper会协调进行Leader选举。此时,Zookeeper会维护一个/controller节点,用于存储当前负责Leader选举的Controller节点的ID。每个Kafka Broker都可以成为Controller节点。但只有一个Controller节点是活跃的。当一个Broker检测到某个分区的Leader副本失效时,它会向Controller节点发送一个请求参与Leader选举。Controller节点收到请求后,负责指定新的Leader。
- 消费者偏移量管理:Zookeeper还用于协调消费者端的偏移量(offset)信息。Kafka消费者需要记录下已经消费过的消息的位置信息以避免重复读取消息。消费者在处理完一些消息后,将偏移量提交到Zookeeper上以持久化保存。Zookeeper可以帮助kafka维护消费者偏移量并确保在不同的消费者客户端之间共享
@KafkaListener
@KafkaListener(id = "11111", groupId = "demo-group",topics = Constants.TOPIC)
public void listen(String msgData) {
LOGGER.info("收到消息" + msgData);
}
@KafkaListener(id = "22222", groupId = "demo-group", clientIdPrefix = "prefix",
topics = Constants.TOPIC)
public void listen2(String msgData) {
LOGGER.info("收到消息" + msgData);
}
@KafkaListener(id = "3333", groupId = "demo-group2", topics = Constants.TOPIC)
public void listen3(String msgData) {
LOGGER.info("收到消息" + msgData);
}
@KafkaListener(id = "4444", groupId = "demo-group2", topics = Constants.TOPIC)
public void listen4(String msgData) {
LOGGER.info("收到消息" + msgData);
}
- id:默认是每个Listener实例的重要标识对于整个日志的排查起着至关重要的作用。如果不指定groupId,那么id将直接作为groupId。可以使用另外一个属性idIsGroup=false关闭,默认是true。
- groupId:每个消费者所属的组每个消费者都有自己所属的组。一个组中可以有多个消费者一个Topic的分区只能被同一个组下的某个消费者消费。从日志上来看,侧面也反映的消费模式是Subscribed订阅模式,不是手动的assign模式
- clientIdPrefix:消费者clientId前缀如下图,共有4个消费者。每个消费者配置了clientIdPrefix属性为"prefix",所以该消费者的clientId以指定的“prefix”开头。如果没有配置,该实例的clientId默认为“consumer”。同时,每个实例的clientId后面挂了一个数字,来标示它在整个kafka集群中的消费者编号,编号从0开始。这里配置了4个消费者,所以消费者实例编号有0,1,2,3
- autoStartup是否自动启动,如果是false,默认不生效,需要手动唤醒看源码上作者给的注释:该注解指定的值优先级比工厂里指定的高另外可以使用${}占位符的形式,支持配置。
application.yaml:listener: auto: startup: true java : @KafkaListener(... containerFactory = "batchContainerFactory", autoStartup = "${listener.auto.startup}") public void listen4(List<ConsumerRecord> list, Acknowledgment acknowledgment)...
注:每个消费者实例对象内部持有两个属性boolean runningboolean paused有几个改变状态的方法:调用start()方法后,running转为true调用stop()方法后,running转为false调用pause()方法后,paused转为true调用resume()方法后,paused转为false只有running=true 、 paused=false 的消费者实例才能正常消费数据。注解上的autoStartup改变的是running属性
kafka消费失败原因
这个错误表示 Kafka 消费者组的 session.timeout.ms 属性设置值不在 Broker 配置的允许范围内。
Kafka Broker 有两个相关的属性来限制消费者组的 session timeout 时间范围。这两个属性分别是
group.min.session.timeout.ms
和
group.max.session.timeout.ms
。其中,
group.min.session.timeout.ms
表示消费者组的 session timeout 最小值,
group.max.session.timeout.ms
表示消费者组的 session timeout 最大值。
当消费者组的 session.timeout.ms 属性设置值小于
group.min.session.timeout.ms
或大于
group.max.session.timeout.ms
时,就会出现上述错误。
要解决这个问题,可以在 Kafka 消费者端将 session.timeout.ms 属性的值改为在 Broker 端设置的允许范围内。或者,也可以在 Kafka Broker 配置文件中修改
group.min.session.timeout.ms
和
group.max.session.timeout.ms
的值。需要注意的是,修改这两个属性会影响所有消费者组。
以下是一个示例消费者配置,将 session.timeout.ms 属性设置为 30000 毫秒(30 秒):
spring.kafka.consumer.session.timeout.ms=30000
如果在使用 Kafka 命令行工具时遇到了这个问题,可以在启动消费者时添加
--group-property
参数,指定
session.timeout.ms
属性的值。例如:
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --group test-group --topic test-topic --from-beginning --group-property session.timeout.ms=30000
以上示例将 session.timeout.ms 属性设置为 30000 毫秒。
**修改kafka的
session.timeout.ms
**
在 Kafka Broker 配置文件中修改
group.min.session.timeout.ms
和
group.max.session.timeout.ms
的值,需要编辑
server.properties
文件。
server.properties
文件是 Kafka Broker 的主配置文件,包含了很多 Broker 相关的配置项。
打开
server.properties
文件,在其中找到
group.min.session.timeout.ms
和
group.max.session.timeout.ms
两个属性,将它们的值修改为期望的数值。例如:
# 设置最小的 session timeout 值为 5000(即 5 秒)
group.min.session.timeout.ms=5000
# 设置最大的 session timeout 值为 180000(即 3 分钟)
group.max.session.timeout.ms=180000
修改完
server.properties
文件后,需要重新启动 Kafka Broker 才能使配置生效。在 Kafka 安装目录下,使用以下命令来启动/停止/重启 Kafka Broker:
./bin/kafka-server-start.sh -daemon config/server.properties # 启动
./bin/kafka-server-stop.sh # 停止
./bin/kafka-server-stop.sh && ./bin/kafka-server-start.sh -daemon config/server.properties # 重启
以上示例中,
-daemon
参数是让 Kafka Broker 在后台运行的选项,可以根据自己的需要选择是否使用。
注意,修改
group.min.session.timeout.ms
和
group.max.session.timeout.ms
会影响所有消费者组。因此,在修改之前需要仔细考虑是否有必要修改这些值,以免不必要的影响。在实际的生产环境中,建议使用合适的 session timeout 值,以保证消费者组的稳定性和健壮性。
@KafkaListener的默认配置值
@KafkaListener
并没有默认的
session.timeout.ms
配置值。
@KafkaListener
是 Spring Kafka 提供的一个注解,它能够将一个方法标记为 Kafka 消费者,当监听的 Kafka 主题中有新消息到达时,这个方法就会被触发。
session.timeout.ms
是 Kafka 客户端配置的一个属性,用于设置消费者组的心跳超时时间。它与
max.poll.interval.ms
一起控制着消费者是否被认为已经死亡并离开消费者组。如果消费者没有在
session.timeout.ms
的时间内发送心跳给 Kafka 服务端,那么它将被认为已经死亡,并从消费者组中移除。
对于 Spring Kafka 中的
@KafkaListener
,你需要在配置文件中设置
session.timeout.ms
的值。例如:
spring.kafka.consumer.session.timeout.ms=30000
上述示例将
session.timeout.ms
的值设置为 30000 毫秒(30 秒)。这个值可以根据具体需求进行调整,但是需要注意设置合理的值,以提高消费者组的稳定性和健壮性。
如果不去显式配置
session.timeout.ms
值,Spring Kafka 就会使用默认的值
60000
毫秒(60 秒)。这意味着如果消费者在 60 秒内没有向 Kafka 服务端发送心跳,那么它将被认为已经死亡,并从消费者组中移除。这种情况可能导致如下两个问题。
首先,如果
session.timeout.ms
设置得太短,那么即使消费者实际上是存活的,也有可能被错误地从消费者组中移除,导致消息无法被正确消费。
其次,如果
session.timeout.ms
设置得太长,那么当一个消费者真正死亡时,其他消费者可能会被卡住。因为 Kafka 只有等待
session.timeout.ms
的时间,才能判断消费者是否真的死亡了。如果等待时间太长,那么也就意味着其他消费者需要等待更长的时间才能接替该消费者,开始消费它的分配消息。
因此,在使用
@KafkaListener
时,建议显式配置
session.timeout.ms
值,并根据实际情况进行适当调整,以确保消费者组的稳定性和健壮性。
这段日志显示了该消费者正在尝试离开消费者组,因此 Kafka 服务端撤回了之前分配给它的所有分区。可以看到消费者的
clientId
为
consumer-groupA-1
,
groupId
为
groupA
。
常见的离开消费者组的情况有:
- 消费者进程被结束或崩溃导致消费者失去与 Kafka 的连接,从而被认为已经死亡,因此其他消费者需要接管它消费的分区。
- 消费者主动调用
consumer.close()
或KafkaListenerEndpointRegistry.stop()
方法关闭消费者,从而离开消费者组。
在这段日志中,第一条语句显示了 Kafka 正在撤回之前分配给消费者的所有分区,另外几条语句则显示该消费者正在关闭,销毁一些资源。
默认情况下,Kafka 客户端的
session.timeout.ms
值是
10000
毫秒(10 秒)。这个值指定了心跳检测和分区再平衡之间的超时时间。也就是说,如果消费者在
session.timeout.ms
时间内没有发送心跳到 Kafka 服务端,或者没有参与分区再平衡,则被认为已经死亡。
需要注意的是,如果使用
@KafkaListener
启动消费者,则 Spring Kafka 会默认设置
session.timeout.ms
为
60000
毫秒(60 秒),这个时间值比 Kafka 客户端的默认值大得多。如果你在使用
@KafkaListener
的时候没有显式地设置
session.timeout.ms
,则需要注意配置文件中 Kafka 服务端的相应参数,以保持一致。
手动创建 Kafka 消费者和 @KafkaListener 注解都是用来消费 Kafka 消息的方式,但它们有以下几点区别:
- 编程模型:手动创建 Kafka 消费者需要开发者自己编写代码来实现消息的订阅和处理,而使用 @KafkaListener 注解则是借助 Spring Kafka 库来实现消息的消费和处理,简化了开发者的代码实现。
- 注册方式:手动创建 Kafka 消费者需要开发者自己将消费者注册到 Kafka 集群中,而使用 @KafkaListener 注解则是通过在 Spring 应用上下文中注册 Bean 的方式来自动创建消费者并注册到 Kafka 集群中。
- 配置管理:手动创建 Kafka 消费者需要开发者自己管理 Kafka 相关的配置参数,如消费者的 group id、topic、partition 等配置参数,而使用 @KafkaListener 注解则是通过在应用程序中配置 Kafka 相关的属性来管理消费者的配置。
总的来说,使用 @KafkaListener 注解可以让开发者更方便地实现 Kafka 消息的消费和处理,避免了手动编写 Kafka 消费者的繁琐操作,同时也简化了 Kafka 配置的管理。但如果您对 Kafka 消费者的细节操作有更高的要求,手动创建 Kafka 消费者也是一种可行的选择。
手动创建 Kafka 消费者可能会导致以下问题:
- 难以维护和扩展:手动创建 Kafka 消费者需要编写大量的初始化和配置逻辑,而且如果要增加、删除或修改消费者组等操作时,需要手动更新代码。
- 容易出错:手动创建 Kafka 消费者容易出现错误,例如,如果在消费者代码中没有正确处理异常或错误,可能会导致消息重复或丢失等问题。
- 性能问题:对 Kafka 消费者进行手动管理可能会导致性能问题,例如,如果处理消息的速度比消费者获取消息的速度慢,可能会导致队列阻塞,从而影响应用程序的性能。
- 安全问题:手动创建 Kafka 消费者可能会有安全风险,例如,在认证和授权方面存在漏洞,可能会导致未经授权的用户访问 Kafka 消息队列,并获取敏感信息。
版权归原作者 qq_47386653 所有, 如有侵权,请联系我们删除。