Apache Kafka ---kafka官方文档
一、Kafka概述
1、简介
引例:某东购物网站,它会将所有用户什么时间,使用什么设备访问了什么内容都进行记录并保存在后台的日志文件log中。程序员可以使用Flume工具实时的从日志文件中采集信息,然后将这些信息存储在Hadoop集群中进行后续的分析统计工作。在平时网站的访问量不大的情况下,Hadoop的上传速度足够支撑Flume的采集(一般Hadoop的上传速度不大)。但是当出现双11等活动时,网站的访问量急剧增加,Flume的采集速度也会随之增加,此时Hadoop的上传速度不足以支撑上传海量的由Flume采集过来的数据。此时Kafka就诞生了,Kafka可以帮助我们先进行数据缓存,Kafka有极强的数据处理能力,可以先将Flume采集的数据存放到Kafka集群中,然后让Hadoop慢慢的从Kafka集群中上传数据。(Flume采集过来的数据如果不及时取走很快就没了)
Kafka传统定义:Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。发布/订阅是指:消息的发布者Flume和Log不会将消息直接发送给特定的订阅者Hadoop(这只有发布过程),而是先将消息发布给Kafka集群,Kafka将消息分成不同的类别(进行分类整理),然后订阅者Hadoop只接收感兴趣的消息即可。有时发布者也称生产者,订阅者也称消费者。Kafka的本质就是解决发布者与订阅者处理数据不一致的问题。
Kafka最新定义:Kafka是一个开源的分布式事件流平台(Event Streaming Platform),被数千家公司用于高性能数据管道,流分析,数据集成和关键任务应用。
2、消息队列
(1)消息队列应用场景
传统消息队列的应用场景:缓存/消峰、解耦和异步通信。缓存/消峰就是上面的例子。
解耦:允许你独立的扩展或修改数据源和目的地的处理过程,只要保证它们遵循同样的接口约束,就和java中多态的作用一样。实际开发环境中数据的数据源分布很广mysql,sqlserver,Oracle,Flume...,如果没有MQ,那么我们的程序想要获取不同来源的数据就需要与不同数据源建立连接,通信,然后获取数据,关闭连接,这很麻烦。有了MQ可以将各个不同来源的数据全部存储到MQ中,我们的程序直接从MQ里获取即可,相当于完成了解耦,可以随意切换数据源。
异步通信:允许用户把一个消息放入队列,但并不立刻处理它,而是在需要的时候在去处理它。
上述例子告诉我们,登录网站注册信息,在写入到数据库以后,核心的任务已经完成了,至于发不发短信不重要,同步方式的原则是必须把所有的事情一步一步做完,不能少了某个环节。异步方式的原则是把核心的任务做然后立刻响应用户,让用户放心,后续无关紧要的事情放到消息队列中,让其它人员慢慢完成即可。相当于步骤124就是主线程,3以及发短信让子线程慢慢去做。
(2)消息队列的两种模式
点对点模式:消费者主动拉取数据,消息收到后删除数据(单主题)
发布/订阅模式:可以有多个topic主题(浏览,点赞,收藏,评论等),消费者消费数据之后,不删除数据。每个消费者相互独立,都可以消费到数据。
3、Kafka的基础架构
详细解释:Producer:消息的生产者,就是向Kafka cluster发消息的客户端比如Flume,Producer可以对接各种数据源。
Consumer:就是消费者,向Kafka cluster取消息的客户端,比如Hadoop(分布式的)。
Consumer Group(CG):消息者组,由多个consumer组成,Kafka会将来源数据分区存储在其集群中,因为数据量太大一台服务器无法处理,显然消费者想要处理这么庞大的数据也不是单单一台服务器可以处理的,比如Hadoop集群也是分布式的,spark,flink都是分布式的。
注意:A、消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费,组内消费者并行消费;B、消费者组间互不影响;C、所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
Broker:表示一台Kafka服务器,一个Kafka cluster由多个broker组成,一个broker可以容纳多个topic。
Topic:就是一个队列用于存储具体的数据,不同的topic表示不同类型,不同的类型表示不同的数据源,mysql、oracle、sqlserver等等。
Partition:一个很大的数据源一台服务器存储不下,可以存储在多台服务器上,一个topic可以分为多个partition,每个partition是一个有序的队列。
Replica:副本,类似于Hadoop的副本机制,但还不完全相同,Hadoop的副本之间没有主备关系,使用的时候随便用即可。而在Kafka中一个topic的每个分区都有若干副本,其中有一个是leader和若干follower。
Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是leader。
Follower:每个分区多个副本的“从”,实时从leader中同步数据,保存与leader数据一致。leader发生故障时,利用zookeeper的leader选举机制,某个follower会成为新的leader。zk的工作是记录Kafka集群有那些服务器在线,记录各个分区谁是leader,以及完成leader选举机制。
注意:在Kafka2.8.0以前Kafka的启动必须依赖zk,在2.8.0以后的版本中可以不依赖zk,它自己就能完成原来zk的工作,这就是Kafka的kraft模式。
二、Kafka的安装与常见命令
1、Kafka的安装
集群规划:192.168.11.141(node11) : zk:kafka
192.168.11.142(node22) : zk:kafka
192.168.11.143(node33) : zk:kafka
注意kafka与zookeeper一样,不需要将每台服务器都配置,比如当前有100台服务器,只需要在11台服务器上配置zk和kafka即可。(zk服务器的台数与kafka服务器的台数不用必须相同,可以随意灵活的配置)
下载Kafka():Apache Kafka
在node11上执行:
解压:tar -zxvf kafka_2.12-3.0.0.tgz -C /export/server
ln -s /export/server/kafka_2.12-3.0.0 /export/server/kafka
cd /export/server/kafka
ll
cd config
vim server.properties
** broker.id=0**
** log.dirs=/export/server/kafka/datas ** : 设置log日志存储位置
# 设置连接的zk集群是什么,/kafka表示设置一个命名空间,后续所有的需要记录在zk中的数据全部放在了/kafka/*
** zookeeper.connect=node11:2181,node22:2181,node33:2181/kafka**
scp -r kafka_2.12-3.0.0 node22:pwd
/
scp -r kafka_2.12-3.0.0 node33:pwd
/
在node22、33上执行
ln -s /export/server/kafka_2.12-3.0.0 /export/server/kafka
vim /export/server/kafka/config/server.properties
修改**broker.id=1/2**
在node11,22,33上配置path环境变量:
vim /etc/profile
** export KAFKA_HOME=/export/server/kafka**
** export PATH=$PATH:$KAFKA_HOME/bin**
source /etc/profile
chown -R hadoop:hadoop /export
su hadoop
zkServer.sh start : 确保zookeeper集群已经启动
启动kafka每台服务器都需要执行:kafka-server-start.sh -daemon /export/server/kafka/config/server.properties(kafka默认对外提供的客户端端口为9092)
停止kafka每台服务器都需要执行(kafka的停止需要等一会儿):kafka-server.stop.sh
注意:服务器不用时必须先关kafka,然后等kafka的jps进程没了以后在关zk。如果先关了zk会导致kafka无法关闭,只能通过kill杀死。 (zk会存储kafka的数据,kafka在关闭前会向zk询问一些事情,如果这时zk关了,将导致kafka无法关闭)
至此kafka就配置好并成功启动了。但是上述配置是我们手动一个一个服务器的配置,如果服务器多了,那就太麻烦了,因此我们可以自行编写shell脚本完成:scp分发,jps命令查看,zk启动,kafka启动。最好把这些脚本放到同一个文件夹下面,并将该文件夹放入到PATH中。
创建一个/export/server/bin目录用于存储所有的shell脚本,然后配置PATH环境变量:
vim /etc/profile 填入export PATH=$PATH:/export/server/bin
source /etc/profile
chmod 755 shell脚本 编写完shell脚本后需要设置权限,使所有用户都可以执行。
scp分发shell:scpall 分发文件/目录(可以使用该命令将所有脚本也分发到其他服务器上,使所有服务器都可以使用)
#!/bin/bash
#1. 判断参数个数
# $#统计传入参数个数,-lt表示小于
if [ $# -lt 1 ]
then
echo Not Enough Arguement!
exit;
fi
# 至少输入了一个参数
#2. 遍历集群所有机器
for host in node11 node22 node33
do
echo ==================== $host ====================
#3. 遍历所有目录,挨个发送 $@表示传入的参数列表kafka_2.12-3.0.0
for file in $@ # file = xxx目录,xxx文件,......
do
#4. 判断文件是否存在
if [ -e $file ]
then
#5. 获取父目录 cd -P:表示不走软连接,有没有-P都一样
pdir=$(cd -P $(dirname $file); pwd) # kafka_2.12-3.0.0
#6. 获取当前文件的名称
fname=$(basename $file) # bin
ssh $host "mkdir -p $pdir"
# rsync -av远程同步命令 -v, --verbose 详细模式输出
# -a, --archive 归档模式,表示以递归方式传输文件,并保持所有文件属性
rsync -av $pdir/$fname $host:$pdir
else
echo $file does not exists!
fi
done
done
zk启动shell:zk.sh
#!/bin/bash
case $1 in
"start"){
for i in node11 node22 node33
do
echo ---------------zookeeper $i start ---------------
ssh $i "/export/server/zookeeper/bin/zkServer.sh start"
done
}
;;
"stop"){
for i in node11 node22 node33
do
echo ---------------zookeeper $i stop ---------------
ssh $i "/export/server/zookeeper/bin/zkServer.sh stop"
done
}
;;
"status"){
for i in node11 node22 node33
do
echo ---------------zookeeper $i status ---------------
ssh $i "/export/server/zookeeper/bin/zkServer.sh status"
done
}
;;
"restart"){
for i in node11 node22 node33
do
echo ---------------zookeeper $i restart ---------------
ssh $i "/export/server/zookeeper/bin/zkServer.sh restart"
done
}
;;
esac
jps查看shell:jpsall
#!/bin/bash
for i in node11 node22 node33
do
echo ---------------$i jps information ---------------
ssh $i "/export/server/jdk/bin/jps"
done
建议对于执行的程序写绝对路径,这样可以避免麻烦,如果我们这里写ssh $i jps,可能会出现:bash: jps: command not found的错误jps找不到,明明各个服务器jdk安装了,PATH也设置了,直接运行也可以运行,使用ssh就不行了:
这里需要注意:环境变量的配置有两个地方,一个是/etc/profile文件,一个是/.bashrc文件。如果直接执行jps会自动从/etc/profile中读取PATH,能正常执行。而以ssh方式直接执行jps实际读取的是/.bashrc这个文件里面的环境变量,/.bashrc文件中并没有进行任何配置因此jps找不到。/.bashrc文件设置的环境变量是针对某一个特定的用户,使用ssh远程连接执行jps隐含了有一个特定的用户使用ssh远程连接执行jps,此时读取的是~/.bashrc文件中的环境变量。
解决办法就是在/.bashrc文件中重新配置环境变量。把/etc/profile里面的环境变量追加到/.bashrc文件中:
[root@node11 bin]# cat /etc/profile >> ~/.bashrc
[root@node22 bin]# cat /etc/profile >> ~/.bashrc
[root@node33 bin]# cat /etc/profile >> ~/.bashrc
注意: /.bashrc文件是在目录下的,即每个用户在自己的home目录下都有这个文件。上述配置只配置了root home目录下的.bashrc文件,切换到hadoop用户下依旧无法使用。需要再次配置/home/hadoop/.bashrc文件。
建议执行的程序直接写绝对路径!!!!!
kafka启动shell:kf.sh
#!/bin/bash
case $1 in
"start"){
for i in node11 node22 node33
do
echo -------------------- $i start kafka --------------------
ssh $i "/export/server/kafka/bin/kafka-server-start.sh -daemon /export/server/kafka/config/server.properties"
done
}
;;
"stop"){
for i in node11 node22 node33
do
echo -------------------- $i stop kafka --------------------
ssh $i "/export/server/kafka/bin/kafka-server-stop.sh"
done
}
;;
esac
2、Kafka的命令行操作
Kafka有三种命令行操作,分别是针对Producer的kafka-console-producer.sh,针对Consumer的kafka-console-consumer.sh,针对Topic的kafka-topics.sh
(1)kafka-topics.sh
**kafka-topics.sh : **直接无法执行,它会显示该脚本的所有参数(帮助界面)
kafka-topics.sh用于对Topic进行操作,首先需要连接kafka集群(与zk一样只需要连接一个kafka broker服务器即可,所做的操作都是共享的,底层就是一个Kafka消息队列。实际生产环境中为了防止连接的Kafka broker挂掉通常连接两个,使用逗号隔开。目前我们学习只连一个即可),然后选择对那个topic进行操作,操作一般是增删改查。
(后续所有操作hadoop102就是node11,hadoop103是node22,hadoop104是node33等等)
注意分区数只能增加,不能减少。
注意命令行的方式无法修改副本数。后续可以通过其他方式进行修改。
(2)kafka-console-producer.sh和kafka-console-consumer.sh
kafka-console-producer.sh
**kafka-console-consumer.sh **
默认情况下消费者只能获取连接以后生产者发来的数据,历史的数据无法获取(增量获取)。此时需要添加**--from-beginning**从头开始消费(全量获取)。
三、Kafka的生产者
1、发送原理
在生产者向kafka集群发送数据时,涉及到两个线程--main线程和sender线程,main线程主要用于不断的从生产者哪里获取消息,并将消息发送给RecordAccumulator缓存。sender线程负责不断的从RecordAccumulator中拉去满足条件的消息(达到batch.size或linger.ms)发送到kafka集群。batch.size表示只有数据累计到batche.size之后sender才会去拉取并发送,默认16K。linger.ms表示如果数据迟迟未达到batch.size,sender等待linger.ms设置的时间到了以后也会发送数据,单位ms,默认值为0ms,表示不等待来一个数据发一个数据,此时batch.size设多少也没用。在实际开发过程中这两个参数需要灵活调整。
当Kafka集群收到sender发来的消息时,它有3中应答方式分别是acks = 0,acks = 1, acks = -1/all。0表示不响应RecordAccumulator将数据发送以后就可以不管了,直接将数据清除,不用管数据是否发送成功。1表示生产者发来数据,leader收到数据后应答。-1表示生产者发来数据,leader和follower都收到数据后应答。
涉及参数:
send(ProducerRecord):ProducerRecord里面记录了发送的配置信息和发送的具体消息,比如发给kafka集群的那个topic,发给那个partition等等。
key和value:指定发送消息的类型。例如上述发的hello,只有value。key为空“”,都是String类型
key.serializer和value.serializer:指定发送消息的key和value的序列化类型,一般不使用jvm提供的序列化类型而是使用kafka自己提供的,如果消息的类型是String,那序列化类型就是StringSerializer。一定要写全类名(见后面代码)
buffer.memory:RecordAccumulator缓存队列总大小,默认32M
batch.size:默认16K。适当增加该值,可以提高吞吐量,但不能设置太大,会导致数据传输延迟增加。
linger.ms:默认0ms,一般建议设置为5-100ms之间
acks:默认值-1
max.in.flight.requests.per.connection:允许最多没有返回ack的次数,默认为5
enable.idempotence:是否开启幂等性(保证数据传输不重复),默认为true,开启幂等后max.in.flight.requests.per.connection必须设置为1-5之间的数字
retries:重试次数,默认int最大值为2147483647。如果设置了重试,还要保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1(保证最大重试次数为1,即当前消息重试时,其他消息的传输先等待),否则在重试当前失败消息时,其他消息可能发送成功了。
retry.backoff.ms:两次重试之间的时间间隔,默认100ms
compression.type:是否压缩数据,默认为none,支持gzip、snappy、lz4、zstd。
2、异步发送
显然上述发送过程是异步发送的,main线程和sender线程是并行执行的。main线程只负责不停的向RecordAccumulator缓存队列中存储消息,先把核心任务做了,至于后续将RecordAccumulator缓存队列中的消息再发送到kafka集群中,由sender线程以它自己的速度和逻辑慢慢执行即可。
接下来我们使用代码演示一下异步发送过程。首先下载kafka-client-3.0.0.jar依赖包:
下载地址:Central Repository: org/apache/kafka/kafka-clients/3.0.0
然后将其整合到项目中。创建包:com.zxy.kafka.producer,创建CustomProducer类:
**普通异步发送: **
注意:连接kafka集群只能通过:主机名:端口的方式,即使将主机名换成ip地址也不行。请确保自己的计算机配置了各linux主机的主机名映射。(kafka的配置文件必须要求只能通过主机名:端口方式连接)
如果出现hosts映射文件无法修改的情况(或修改后点击保持显示另存为界面),这是因为权限不足导致了,我们可以修改其权限:
我们在linux上开启一个消费者,然后运行上述代码,发现消费者确实收到数据了。
带回调函数的异步发送:
回调函数会在prodecer收到ack时自动调用,为异步调用。该方法有两个参数:RecordMetadata元数据信息(主题、分区等信息)和Exception异常信息,如果Exception为null表示发送成功,如果Exception不为null表示发送失败,消息发送失败后会自动重试,不需要我们在回调函数中收到重试。
目前first设置了3个分区(0,1,2),为什么这里会将数据发给分区1呢?后续讲解分区时会解释。
3、同步发送
默认情况下kafka就是异步发送方式,因为kafka的基本架构就是异步方式的。当然它也支持同步发送,即Kafka Producer生产者发来一批数据后,必须等到kafka集群收到数据且发来ack成功的消息后,Kafka Producer生产者才会再次发送下一批数据。
同步方式只需要在异步方式基础上调用一下get方法即可:
**为什么同步方式各个数据的分区不一样,而异步方式却都一样呢? **后续讲解分区时会解释。
4、生产者分区(Partitioner分区器)
分区可以合理的使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据(每块1G)存储在多台Broker上。合理控制分区任务,可以实现负载均衡。此外分区可以提高并行度,无论是生产者还是消费者都可以以分区为单位对数据进行发送/消费。
(1)默认分区器DefaultPartitioner
Partitioner分区器有默认分区器和自定义分区器两种,上述我们写代码时并没有设置分区器,系统使用了默认分区器DefaultPartitioner,在IDEA中 Ctrl + n,全局查找DefaultPartitioner
注意这里设置的分区器是控制RecordAccumulator中的分区,不同分区器的设置是在properties配置文件对象中配置,不写就是使用默认分区器。
在ProducerRecord中我们可以设置存储的分区:
上述同步异步发送演示代码中,使用的就是最后一种方式,没有指定分区,也没有key值,此时会使用黏性分区规则。但是同步和异步方式是有细微的差别的,默认情况下linger.ms为0ms,对于异步方式,由于send发送数据的过程是在内存中执行的,执行速度是极快的,上述循环发5次数据实质上0.000xx纳秒的时间就发过去了(系统认为没有超过0ms),因此执行过程中没有超时,是batch.size参数在生效,又由于传输的数据很小没有使当前batch存满,因此所有数据是作为一个整体传输的,都存储在一个分区上,随机取的分区为1(sender线程只执行了1次)。对于同步方式,必须按顺序一个一个发送直到ack确认后再发送下一个数据,每send一个数据就会执行一次sender线程,因此每发一个数据就重新选择一个分区,我们可以观察到每次选择的分区与上一次的分区不同。
总之:每执行一次sender线程相当于达到batch.size或linger.ms超时,就会重新随机选择一个分区。
测试:
生产环境中使用key的情况还是很多的,比如当前mysql中有很多表,要求将mysql中的student表放到kafka集群的某个分区上,这里只需要将student表的表名作为key传输即可。
在这里每次发送一条数据停止2ms,此时就是linger.ms = 0ms在生效。每发一个数据都要重新随机选择新分区,达到了数据打散的效果。但是发现重新选择的分区存在与上一次分区相同的情况,这是为什么呢?经过测试发现当线程等待的时间比较小时,重新选择的分区存在与上一次分区相同的情况;当线程等待时间比较大时,重新选择的分区与上一次分区不同(线程等待时间越长,数据打散程度越高)。原因不明
(2)自定义分区器
在实际生产环境中默认分区器是无法满足所有的业务需求的,因此需要进行自定义分区器。
特殊需求:自定义一个分区器实现:如果发送来的数据包含”atguigu“就发往0号分区,如果不包含”atguigui“就发往1号分区。
首先自己定义一个分区器MyPartitioner类,实现Partitioner接口:
5、生产经验
(1)常见参数的经验配置
linger.ms:一般修改为5-100ms
batch.size:一般使用默认值,或者设置为32k
compression.type :一般使用snappy压缩方式
RecordAccumulator:一般使用默认,或设置为64M
(2)如何保证数据传输的完全可靠
acks应答的级别有三种分别是0,1,-1。
0就是生产者发来数据之后不需要应答,不用管数据发成功没,直接不停的发就行了,这种方式效率肯定是最高的,但其可靠性是最差的,生产环境中一般不使用。
1表示生产者发来的数据,leader收到后就给生产者应答。比0的可靠性高。
-1表示生产者发来的数据,leader收到并且所有follower也同步完后才给生产者应答。可靠性最高,效率最低。
思考:leader收到数据,所有follower都开始同步数据,-1级别要求:必须所有follower同步完成才应答。但有一个follower因为某种故障,迟迟不能与leader进行同步,此时会导致无法向生产者应答,某个请求被卡死,影响系统性能。显然在实际的生产环境中不可能因为某个follower故障而影响到整个集群的运行。在kafka集群中每个Leader都维护了一个动态的ISR来解决上述问题。
ISR是一个存储了leader + 与leader保持正常同步的所有follower的集合(leader :0, isr:0,1,2)。如果某个follower长时间未向leader发送通信请求或同步数据,则系统会认为该follower死掉,并将其从isr中踢出。超时阈值由replica.lag.time.max.ms设置,默认为30s。
有了这个机制,ack应答的时候就不会死等长期联系不上或者已经故障的节点,它只会等待isr集合中记录的没问题的follower完成同步。
思考:添加完isr机制后就能保证完整的数据可靠传输呢?不能,因为当分区副本数设为1,或ISR里应答的最小副本数量(min.insync.replicas默认为1)设置为1,此时就和ack=1的情况一样了,仍然有丢失数据的风险(leader :0,isr:0)
完整的数据可靠传输 = ACK级别设为-1 + 分区副本数>=2 + ISR里应答的最小副本数量>=2
** 总结:**
思考: ACK级别设为-1,能得到可靠性,那它有没有别的问题呢(除了效率低之外)?可能会出现数据重复:
思考:数据重复问题如何解决呢?
(3)数据去重
进行数据去重之前先了解以下几个概念(kafka三种消息传输语义):
A:至少一次(At Least Once,生产者将数据发送给kafka集群后,kafka集群至少保证能正常收到1份数据)= 完整的数据可靠传输 = ACK级别设为-1 + 分区副本数>=2 + ISR里应答的最小副本数量>=2
B、最多一次(At Most Once)= ACK级别设为0(至少一次可能会发生数据重复问题,最多一次一份数据最多发一次,可以保证数据不重复)
思考:与钱相关的业务能使用At Least Once或At Most Once吗?显然不能,如果使用At Least Once可能会发生银行多给我转了一次钱。如果使用At Most Once可能会发生自己卡里的钱莫名奇妙没了。
C、精确一次(Exactly Once,对于一些非常重要的信息,比如与钱相关的数据,要求数据既不能重复也不能丢失)= 幂等性 + 至少一次
什么是幂等性:指Producer无论向broker发送多少次重复数据,broker端都只会持久化一条,保证了数据不重复。
思考:kafka集群如何知道当前传过来的数据是不是重复数据呢?
重复数据的判断标准:具有<PID,Partition,SeqNumber>相同主键的消息提交时,broker只会持久化一条。其中Kafka每次重新启动都会分配一个新的PID(生产者id号);Partition表示分区编号;SeqNumber是单调递增的。我们可以看出幂等性只能保证在单分区单会话内不重复。
思考:幂等性并不能跨越多个分区运作,而事务可以弥补这个缺陷。事务可以保证对多个分区写入操作的原子性。对多个分区的写入操作要么全部成功要么全部失败。****Producer在使用事务功能前,必须先手动设置一个唯一的tansactional.id,有了事务id即使客户端挂掉了,重启后也能继续处理未完成的事务。
注意:一旦使用了事务功能,那么每个broker上的Transaction Coordinator事务协调器就启动了,事务协调器主要负责处理当前事务,那每个broker都有自己的Transaction Coordinator,此时需要选举一个老大出来负责当前的事务处理。选举规则如上图所示:假设transactional.id我们写了“a”,它的hashcode值为97,97%50=47,即当前要处理的事务属于47号分区:__transaction_state-Partition47,__transaction_state-Partition47-Leader所在的broker节点上的Transaction Coordinator就被选举成为处理当前事务的老大。接下来就可以处理事务了(了解)。
由于事务底层依赖幂等性,因此开启事务,必须开启幂等性(enable.idempotence = true默认开启)。
(4)数据有序
数据有序是指:生产者发来数据A,B,C,D,E消费者在读取数据时还是A,B,C,D,E,保证与生产者发送数据顺序一致。在实际生产环境中数据都是多分区存储的。假设当前有3个分区(分区0,1,2)。数据在发送过程中可能A,B存储在分区0内,C,D存储在分区1内,E存储在分区2内。消费者在消费数据时是以异步方式执行的,很可能第二个消费者先执行完读到C,D,然后第三个消费者执行完读到E,最后第一个消费者执行完读到A,B,最终读到C,D,E,A,B。在某些应用场景下(发送一篇文章)必须要求读的数据与发送时的数据顺序保持一致,这该这么解决呢?
方案一:将Kafka cluster直接设置为单分区(有条件限制)
方案二:多分区情况下,先将所有的数据拉取到消费者那里,然后统一进行一次排序操作,最后由消费者读取排好序的数据。
(5)数据乱序问题
数据乱序问题是指:生产者向Kafka集群发送消息时,kafka收到的消息顺序与生产者发送数据顺序不一致。例如当前kafka cluster有两个broker。在发送数据时会有两个请求队列,每个请求队列最多缓存5个请求。假设在broker0请求队列中request1请求发送数据A,kafka cluster收到了并返回了ack信息,request1完成。然后request2请求发送数据B,也发送成功。request3请求发送数据C,发送失败,系统自动进行重试。此时broker0请求队列中只有1个请求,因此还可以再发请求,request4请求发送数据D,在request3重试期间request4请求发送成功,然后request3才重试成功,此时broker0得到的数据是A,B,D,C,导致数据乱序(单分区乱序)。
上述数据有序问题中解决方案一:将Kafka cluster直接设置为单分区是有条件限制的,如果出现了数据乱序问题,单分区内的数据本身也乱序了,即使设置为单分区,也无法保证消费者获得的数据有序。
解决方法:
1)在kafka1.x版本之前保证数据单分区内有序,需要进行如下配置
max.in.flight.requests.per.connection=1:最多每个broker缓存1个请求(不需要考虑是否开启幂等性,在kafka1.x版本之前没有幂等性功能)
2)在kafka1.x版本之后保证数据单分区内有序,需要考虑是否开启幂等性,配置如下:
A、未开启幂等性:**max.in.flight.requests.per.connection=1**
B、开启幂等性:**max.in.flight.requests.per.connection只需要设置<=5**,就能保证单分区内有序。
主要原因是:启动幂等后,kafka会自动缓存producer发来的最近(最多5个)5个request的元数据(PID,分区,**自增序列号**),并根据自增序列号的顺序依次在kafka cluster中落盘。假设request1发送数据A成功,它的序列号为1,成功落盘。request2发送数据B成功,序列号为2,成功罗盘。request3发送数据C失败,序列号为3系统自动对request3进行重试,此时request4发来数据D成功,但是它的序列号为4,系统识别到序列号为3的请求还没落盘呢,request4只能等待。当request3重试成功,数据落盘以后,request4发送的数据才落盘。相当于有自动排序的功能。
补充:
(1).var快捷输入
(2)Ctrl + n快捷键
查看各种类的源码
版权归原作者 吆喝的翅膀 所有, 如有侵权,请联系我们删除。