消息队列
“消息队列”是在消息的传输过程中保存消息的容器。
使用消息队列的好处:
1)解耦:允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
2) 可恢复性: 系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
3)缓冲:有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
4)灵活性&峰值处理能力
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
5)异步通信:很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
消息队列的两种模式
(1)点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)
消息生产者生产消息发送到Queue中,然后消息消费者从Queue中取出并且消费消息。消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
(2)发布/订阅模式(一对多,消费者消费数据之后不会清除消息)
消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。
发布订阅模式又分为pull模式【消费者主动拉取消息】和push模式【mq推送消息给消费者】,pull模式则可以根据consumer的消费能力以适当的速率消费消息。
kafka介绍
Kafka是一个高吞吐量、分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。最初由LinkedIn公司开发,使用Scala语言编写,目前是Apache的开源项目。
kafka架构
Producers(生产者):消息生产者,就是向kafka broker发消息的客户端;
Consumers(消费者):消息消费者,向kafka broker取消息的客户端;
Consumer Group: 多个消费者组成一个消费者组。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者必须属于某个消费者组,即消费者组是逻辑上的订阅者。
Broker(代理):一台kafka服务器就是一个broker, 负责消息存储和转发。一个集群由多个broker组成。Broker是无状态的,所以他们使用ZooKeeper来维护它们的集群状态。 一个Kafka代理实例可以每秒处理数十万次读取和写入,每个Broker可以处理TB的消息,而没有性能影响。 Kafka的leader选举可以由ZooKeeper完成。
ZooKeeper: 保存着集群 broker、 topic、 partition等meta 数据;另外,还负责broker故障发现,partition leader选举,负载均衡等功能,同时记录着每个消费者消费消息的偏移量,方便下次读取下一条数据。
因为Kafka代理是无状态的,这意味着消费者必须通过使用分区偏移来维护已经消耗了多少消息。 如果消费者确认特定的消息偏移,则意味着消费者已经消费了所有先前的消息。 消费者向代理发出异步拉取请求,以具有准备好消耗的字节缓冲区。 消费者可以简单地通过提供偏移值来快退或跳到分区中的任何点。 消费者偏移值由ZooKeeper通知。
topic: 消息类别,Kafka按照topic来分类消息,一个broker可以容纳多个topic。
partition: topic的分区。一个非常大的topic可以分布到多个broker(即服务器》上,一个topic可以包含多个partition, topic 消息保存在各个partition上
Replica: 副本,为保证集群中的某个节点发生故障时,该节点上的partition '数据不丢失,且kafka仍然能够继续工作,kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader和若干个follower。
leader: 每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是leader。follower: 每个分区多个副本中的“从”,实时从leader中同步数据,保持和 leader 数据的同步。leader发生故障时,某个follower会成为新的follower。
offset: 消息在日志中的位置,可以理解是消息在partition上的偏移量,也是代表该消息的唯一序号
kafka使用场景
日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、HBase、Solr等。
消息系统:解耦和生产者和消费者、缓存消息等。
用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
流式处理:比如spark streaming和 Flink
kafka数据存储
Kafka 安装
略【使用docker安装很方便,推荐】
[root@us4ci6jaxom1jjz2 opt]# docker exec -it kafka bash #进入kafka[root@us4ci6jaxom1jjz2 ~]# docker exec -it kafka bash
bash-5.1# cd /opt/kafka_2.13-2.8.1/bin/
kafka配置文件
server.propeties
Kafka配置文件详解
(1) producer.properties:生产端的配置文件
#指定kafka节点列表,用于获取metadata,不必全部指定#需要kafka的服务器地址,来获取每一个topic的分片数等元数据信息。
metadata.broker.list=kafka01:9092,kafka02:9092,kafka03:9092
#生产者生产的消息被发送到哪个block,需要一个分组策略。#指定分区处理类。默认kafka.producer.DefaultPartitioner,表通过key哈希到对应分区#partitioner.class=kafka.producer.DefaultPartitioner#生产者生产的消息可以通过一定的压缩策略(或者说压缩算法)来压缩。消息被压缩后发送到broker集群,#而broker集群是不会进行解压缩的,broker集群只会把消息发送到消费者集群,然后由消费者来解压缩。#是否压缩,默认0表示不压缩,1表示用gzip压缩,2表示用snappy压缩。#压缩后消息中会有头来指明消息压缩类型,故在消费者端消息解压是透明的无需指定。#文本数据会以1比10或者更高的压缩比进行压缩。
compression.codec=none
#指定序列化处理类,消息在网络上传输就需要序列化,它有String、数组等许多种实现。
serializer.class=kafka.serializer.DefaultEncoder
#如果要压缩消息,这里指定哪些topic要压缩消息,默认empty,表示不压缩。#如果上面启用了压缩,那么这里就需要设置#compressed.topics=#这是消息的确认机制,默认值是0。在面试中常被问到。#producer有个ack参数,有三个值,分别代表:#(1)不在乎是否写入成功;#(2)写入leader成功;#(3)写入leader和所有副本都成功;#要求非常可靠的话可以牺牲性能设置成最后一种。#为了保证消息不丢失,至少要设置为1,也就#是说至少保证leader将消息保存成功。#设置发送数据是否需要服务端的反馈,有三个值0,1,-1,分别代表3种状态:#0: producer不会等待broker发送ack。生产者只要把消息发送给broker之后,就认为发送成功了,这是第1种情况;#1: 当leader接收到消息之后发送ack。生产者把消息发送到broker之后,并且消息被写入到本地文件,才认为发送成功,这是第二种情况;#-1: 当所有的follower都同步消息成功后发送ack。不仅是主的分区将消息保存成功了,#而且其所有的分区的副本数也都同步好了,才会被认为发动成功,这是第3种情况。
request.required.acks=0#broker必须在该时间范围之内给出反馈,否则失败。#在向producer发送ack之前,broker允许等待的最大时间 ,如果超时,#broker将会向producer发送一个error ACK.意味着上一次消息因为某种原因#未能成功(比如follower未能同步成功)
request.timeout.ms=10000#生产者将消息发送到broker,有两种方式,一种是同步,表示生产者发送一条,broker就接收一条;#还有一种是异步,表示生产者积累到一批的消息,装到一个池子里面缓存起来,再发送给broker,#这个池子不会无限缓存消息,在下面,它分别有一个时间限制(时间阈值)和一个数量限制(数量阈值)的参数供我们来设置。#一般我们会选择异步。#同步还是异步发送消息,默认“sync”表同步,"async"表异步。异步可以提高发送吞吐量,#也意味着消息将会在本地buffer中,并适时批量发送,但是也可能导致丢失未发送过去的消息
producer.type=sync
#在async模式下,当message被缓存的时间超过此值后,将会批量发送给broker,#默认为5000ms#此值和batch.num.messages协同工作.
queue.buffering.max.ms =5000#异步情况下,缓存中允许存放消息数量的大小。#在async模式下,producer端允许buffer的最大消息量#无论如何,producer都无法尽快的将消息发送给broker,从而导致消息在producer端大量沉积#此时,如果消息的条数达到阀值,将会导致producer端阻塞或者消息被抛弃,默认为10000条消息。
queue.buffering.max.messages=20000#如果是异步,指定每次批量发送数据量,默认为200
batch.num.messages=500#在生产端的缓冲池中,消息发送出去之后,在没有收到确认之前,该缓冲池中的消息是不能被删除的,#但是生产者一直在生产消息,这个时候缓冲池可能会被撑爆,所以这就需要有一个处理的策略。#有两种处理方式,一种是让生产者先别生产那么快,阻塞一下,等会再生产;另一种是将缓冲池中的消息清空。#当消息在producer端沉积的条数达到"queue.buffering.max.meesages"后阻塞一定时间后,#队列仍然没有enqueue(producer仍然没有发送出任何消息)#此时producer可以继续阻塞或者将消息抛弃,此timeout值用于控制"阻塞"的时间#-1: 不限制阻塞超时时间,让produce一直阻塞,这个时候消息就不会被抛弃#0: 立即清空队列,消息被抛弃
queue.enqueue.timeout.ms=-1
#当producer接收到error ACK,或者没有接收到ACK时,允许消息重发的次数#因为broker并没有完整的机制来避免消息重复,所以当网络异常时(比如ACK丢失)#有可能导致broker接收到重复的消息,默认值为3.
message.send.max.retries=3#producer刷新topic metada的时间间隔,producer需要知道partition leader#的位置,以及当前topic的情况#因此producer需要一个机制来获取最新的metadata,当producer遇到特定错误时,#将会立即刷新#(比如topic失效,partition丢失,leader失效等),此外也可以通过此参数来配置#额外的刷新机制,默认值600000
topic.metadata.refresh.interval.ms=60000(2)consumer.properties:消费端的配置文件
#消费者集群通过连接Zookeeper来找到broker。#zookeeper连接服务器地址
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181
#zookeeper的session过期时间,默认5000ms,用于检测消费者是否挂掉
zookeeper.session.timeout.ms=5000#当消费者挂掉,其他消费者要等该指定时间才能检查到并且触发重新负载均衡
zookeeper.connection.timeout.ms=10000#这是一个时间阈值。#指定多久消费者更新offset到zookeeper中。#注意offset更新时基于time而不是每次获得的消息。#一旦在更新zookeeper发生异常并重启,将可能拿到已拿到过的消息
zookeeper.sync.time.ms=2000#指定消费
group.id=xxxxx
#这是一个数量阈值,经测试是500条。#当consumer消费一定量的消息之后,将会自动向zookeeper提交offset信息#注意offset信息并不是每消费一次消息就向zk提交#一次,而是现在本地保存(内存),并定期提交,默认为true
auto.commit.enable=true
# 自动更新时间。默认60 * 1000
auto.commit.interval.ms=1000# 当前consumer的标识,可以设定,也可以有系统生成,#主要用来跟踪消息消费情况,便于观察
conusmer.id=xxx
# 消费者客户端编号,用于区分不同客户端,默认客户端程序自动产生
client.id=xxxx
# 最大取多少块缓存到消费者(默认10)
queued.max.message.chunks=50# 当有新的consumer加入到group时,将会reblance,此后将会#有partitions的消费端迁移到新 的consumer上,如果一个#consumer获得了某个partition的消费权限,那么它将会向zk#注册 "Partition Owner registry"节点信息,但是有可能#此时旧的consumer尚没有释放此节点, 此值用于控制,#注册节点的重试次数.
rebalance.max.retries=5#每拉取一批消息的最大字节数#获取消息的最大尺寸,broker不会像consumer输出大于#此值的消息chunk 每次feth将得到多条消息,此值为总大小,#提升此值,将会消耗更多的consumer端内存
fetch.min.bytes=6553600#当消息的尺寸不足时,server阻塞的时间,如果超时,#消息将立即发送给consumer#数据一批一批到达,如果每一批是10条消息,如果某一批还#不到10条,但是超时了,也会立即发送给consumer。
fetch.wait.max.ms=5000
socket.receive.buffer.bytes=655360# 如果zookeeper没有offset值或offset值超出范围。#那么就给个初始的offset。有smallest、largest、#anything可选,分别表示给当前最小的offset、#当前最大的offset、抛异常。默认largest
auto.offset.reset=smallest
# 指定序列化处理类
derializer.class=kafka.serializer.DefaultDecoder
(3)server.properties:服务端的配置文件
#broker的全局唯一编号,不能重复
broker.id=0#用来监听链接的端口,producer或consumer将在此端口建立连接port=9092#处理网络请求的线程数量,也就是接收消息的线程数。#接收线程会将接收到的消息放到内存中,然后再从内存中写入磁盘。
num.network.threads=3#消息从内存中写入磁盘是时候使用的线程数量。#用来处理磁盘IO的线程数量
num.io.threads=8#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400#接受套接字的缓冲区大小
socket.receive.buffer.bytes=102400#请求套接字的缓冲区大小
socket.request.max.bytes=104857600#kafka运行日志存放的路径
log.dirs=/export/servers/logs/kafka
#topic在当前broker上的分片个数
num.partitions=2#我们知道segment文件默认会被保留7天的时间,超时的话就#会被清理,那么清理这件事情就需要有一些线程来做。这里就是#用来设置恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1#segment文件保留的最长时间,默认保留7天(168小时),#超时将被删除,也就是说7天之前的数据将被清理掉。
log.retention.hours=168#滚动生成新的segment文件的最大时间
log.roll.hours=168#日志文件中每个segment的大小,默认为1G
log.segment.bytes=1073741824#上面的参数设置了每一个segment文件的大小是1G,那么#就需要有一个东西去定期检查segment文件有没有达到1G,#多长时间去检查一次,就需要设置一个周期性检查文件大小#的时间(单位是毫秒)。
log.retention.check.interval.ms=300000#日志清理是否打开
log.cleaner.enable=true
#broker需要使用zookeeper保存meta数据
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181
#zookeeper链接超时时间
zookeeper.connection.timeout.ms=6000#上面我们说过接收线程会将接收到的消息放到内存中,然后再从内存#写到磁盘上,那么什么时候将消息从内存中写入磁盘,就有一个#时间限制(时间阈值)和一个数量限制(数量阈值),这里设置的是#数量阈值,下一个参数设置的则是时间阈值。#partion buffer中,消息的条数达到阈值,将触发flush到磁盘。
log.flush.interval.messages=10000#消息buffer的时间,达到阈值,将触发将消息从内存flush到磁盘,#单位是毫秒。
log.flush.interval.ms=3000#删除topic需要server.properties中设置delete.topic.enable=true否则只是标记删除
delete.topic.enable=true
#此处的host.name为本机IP(重要),如果不改,则客户端会抛出:#Producer connection to localhost:9092 unsuccessful 错误!
host.name=kafka01
advertised.host.name=192.168.239.128
kafka命令行操作
#创建主题
bash-5.1# kafka-topics.sh --create --zookeeper 117.88.63.81:2181/kafka --replication-factor 1 --partitions 2 --topic mytest
Created topic mytest.
#【选项说明:】
--topic 定义topic名
--replication-factor 定义副本数
--partitions 定义分区数
#查看当前服务的所有主题
bash-5.1# kafka-topics.sh --list --zookeeper 117.88.63.81:2181/kafka
__consumer_offsets
mytest
可以看到日志中生成了两个分区文件
#删除分区
bash-5.1# kafka-topics.sh --delete --zookeeper 117.88.63.81:2181/kafka --topic mytest
Topic mytest is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
bash-5.1# #查看主题信息
bash-5.1# kafka-topics.sh --describe --zookeeper 117.88.63.81:2181/kafka --topic mytest
Topic: mytest TopicId: gbkbv4-6SduH5WkUC-XTSw PartitionCount: 2 ReplicationFactor: 1 Configs:
Topic: mytest Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: mytest Partition: 1 Leader: 0 Replicas: 0 Isr: 0#因为只有一个节点,所以副本数为0
bash-5.1# #开启生产者端并发送消息
bash-5.1# ./kafka-console-producer.sh --broker-list localhost:9092 --topic mytest>hello
>world
#开启消费者端并发送消息
bash-5.1# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_topic
hello
world
消息存储过程
不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的),每个partition存储一部分Message。
Kafka中消息是以topic进行分类的,生产者生产消息,消费者消费消息,都是面向 topic的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的),每个partition存储一部分Message。
topic是逻辑上的概念,而 partition是物理上的概念,每个partition对应于一个log 文件。
比如,创建了一个名为page_visits的topic,其有5个partition,那么在Kafka的数据目录中(由配置文件中的log.dirs指定的,如果指定了多个,:Kafka 会在含有分区目录最少的文件夹中创建新的分区目录)中就有这样5个目录: page_visits-0, page_ visits-1, page_visits-2,page_visits-3,page_visits-4,其命名规则为<topic_name>-<partition_id>,里面存储的分别就是这5个partition的数据。Producer生产的数据会被不断追加到该log 文件末端,且每条数据都有自己的offset。消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费。
Kafka创建Topic时如何将分区放置到不同的Broker中?
1)副本因子不能大于 Broker 的个数;
2)第一个分区(编号为0)的第一个副本放置位置是随机从 brokerList 选择的;
3)其他分区的第一个副本放置位置相对于第0个分区依次往后移。也就是如果我们有5个 Broker,5个分区,假设第一个分区放在第四个 Broker 上,那么第二个分区将会放在第五个 Broker 上;第三个分区将会放在第一个 Broker 上;第四个分区将会放在第二个 Broker 上,依次类推;
4)剩余的副本相对于第一个副本放置位置其实是由 nextReplicaShift 决定的,而这个数也是随机产生的;
Partition的数据文件
Partition中的每条Message由offset来表示它在这个partition中的偏移量,这个offset不是该Message在partition数据文件中的实际存储位置,而是逻辑上一个值,它唯一确定了partition中的一条Message。因此、可以认为offset是partition中Message的id。partition中的每条Message包含了以下三个属性:
- offset
- MessageSize
- data
其中offset为long型,MessageSize为int32,表示data有多大, data为message的具体内容。它的格式和中介绍的MessageSet格式是一致。
Partition的数据文件则包含了若干条上述格式的Message,按offset由小到大排列在一起。它的实现类为FileMessageSet,类图如下:
它的主要方法如下:
- append: 把给定的ByteBufferMessageSet中的Message写入到这个数据文件中。
- searchFor: 从指定的startingPosition开始搜索找到第一个Message,其offset是大 于或者等于指定的offset,并返回其在文件中的位置Position。它的实现方式是从startingPosition开始读取12个字节,分别是当前MessageSet的offset和size。如果当前offset小于指定的offset,那么将position向后移 动L ogOverHead+MessageSize (其中L ogOverHead为offset+messagesize,为12个字 节)。
- read: 准确名字应该是slice, 它截取其中- 部分返回一个新的FileMessageSet。 它不保证截取的位置数据的完整性。
- sizelnBytes: 表示这个FileMessageSet占有了多少字节的空间。
- truncateTo: 把这个文件截断,这个方法不保证截断位置的Message的完整性。
- readInto: 从指定的相对位置开始把文件的内容读取到对应的ByteBuffer中。
由于生产者生产的消息会不断追加到 log文件末尾,为防止 log文件过大导致数据定位效率低下,Kafka采取了分段和索引机制,将每个partition分为多个segment。每个segment对应两个文件——“.index”文件和”.log”文件。这些文件位于所属的分区文件中。
数据文件的分段
Kafka解决查询效率的手段之一是将数据文件分段,比如有100条Message,它们的offset是从0到99。假设将数据文件分成5段,第一段为0-19,第二段为20-39,以此类推,每段放在一个单独的数据文件里面,数据文件以该段中最小的offset命名。这样在查找指定offset的Message的时候,用二分查找就可以定位到该Message在哪个段中。
为数据文件建索引
数据文件分段使得可以在一个较小的数据文件中查找对应offset的Message了,但是这依然需要顺序扫描才能找到对应offset的Message。为了进一步提高查找的效率, Kafka为每个分段后的数据文件建立了索引文件,文件名与数据文件的名字是一样的,只是文件扩展名为.index。
索引文件中包含若干个索引条目,每个条目表示数据文件中一条Message的索引。索引包含两个部分(均为4个字节的数字),分别为相对offset和position。
- 相对offset:因为数据文件分段以后,每个数据文件的起始offset不为O,相对offset表示这条Message相对于其所属数据文件中最小的offset的大小。举例,分段后的一个数据文件的offset是从20开始,那么offset为25的Message在index文件中的相对offset就是25-20= 5。存储相对offset可以减小索引文件占用的空间。
- position,表示该条Message在数据文件中的绝对位置。只要打开文件并移动文件指针到这个position就可以读取对应的Message了。
index文件中并没有为数据文件中的每条Message建立索引,而是采用了稀疏存储的方式,每隔一定字节的数据建立一条索引。这样避免了索引文件占用过多的空间,从而可以将索引文件保留在内存中。但缺点是没有建立索引的Message也不能一次定位到其在数据文件的位置,从而需要做一次顺序扫描,但是这次顺序扫描的范围就很小了。
在Kafka中,索引文件的实现类为OffsetIndex,它的类图如下:
主要的方法有:
- append方法,添加一对offset和position到index文件中,这里的offset将会被转成相对的offset
- lookup,用二分查找的方式去查找小于或等于给定offset的最大的那个offset
小结:
我们以几张图来总结一下Message是如何在Kafka中存储的,以及如何查找指定offset的Message的。
Msssage是按照topic来组织的,每个topic可以分成多个的partition,每个有5个partition的名为page_visits的topic的目录结构
partition是分段的,每个分段是一个LogSegment,包括了一个数据文件和一个索引文件,index和 log文件以当前segment的第一条消息的offset命名,下图是某个partition目录下的文件:
可以看到,这个partition有4个LogSegment。
Message的查找过程:
比如:要查找绝对offset为7的Message:
1.首先是用二分查找确定它是在哪个LogSegment中,自然是在第一个Segment中。
- 打开这个Segment的index文件,也是用二分查找找到offset小于或者等于指定offset的索引条目中最大的那个offset。自然offset为 6的那个索引是我们要找的,通过索引文件我们知道offset为6的Message在数据文件中的位置为9807。 3.打开数据文件,从位置为9807的那个地方开始顺序扫描直到找到offset为7的那条Message。 这套机制是建立在offset是有序的。索引文件被映射到内存中,所以查找的速度还是很快的。 一句话,Kafka的Message存储采用了分区(partition),分段(LogSegment)和稀疏索引这几个手段来达到了高效性。
分区策略
分区的原因
- 方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了;
- 可以提高并发,因为可以以Partition为单位读写了。
生产者与分区
我们需要将producer 发送的数据封装成一个 ProducerRecord对象。
默认分区策略:
- 如果在发消息的时候指定了分区,则消息投递到指定的分区;
- 如果没有指定分区,但是消息的key不为空,则基于key的哈希值topic的 partition数进行取余得到 partition值;来选择一个分区;
- 如果既没有指定分区,且消息的key也是空,则用轮询round-robin的方式选择一个分区:第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与topic可用的 partition总数取余得到 partition值;
消费者与分区
每个消费者一定隶属于一个组,消费者以组的名义订阅主题,主题有多个分区。凡是订阅了某个Topic的所有分组都会收到该Topic中的全部数据,消费者组中有多个消费者实例,组内的所有消费者相互协调消费topic中的所有分区,同一个消费者可以去消费多个分区中的数据,但是每个分区只能由同一个消费者组内的一个消费者来消费【为了保证消息的顺序和不重复消费消息】,当消费者的数量大于了分区的数量时,多余的消费者将会处于空闲。也就是说如果只有一个分区的话,在同一个消费者组中启动多少个消费者都没有用。
所以说,同组中的消费者不能大于Topic中的分区数量。假如partition为4,订阅改Topic的同组中的消费者最多只能为4个。
在 Kafka 内部存在两种默认的分区分配策略:Range 和 RoundRobin。当以下事件发生时,Kafka 将会进行一次分区分配:
- 同一个 consumer 消费者组 group.id 中,新增了消费者进来,会执行 Rebalance 操作
- 消费者离开当期所属的 consumer group组。比如宕机
- 分区数量发生变化时(即 topic 的分区数量发生变化时)
- 消费者主动取消订阅
假设我们有个名为 T1 的主题,其包含了10个分区,然后我们有两个消费者(C1,C2)来消费这10个分区里面的数据,而且 C1 的 num.streams = 1,C2 的 num.streams = 2
Range策略
org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor
如果是自定义分配策略的话可以继承AbstractPartitionAssignor这个类,它默认有3个实现
range:该策略会把主题的若干个连续的分区分配给消费者(默认的策略),对应的实现类是org.apache.kafka.clients.consumer.RangeAssignor
1、range分配策略针对的是主题
2、首先,将分区按数字顺序排行序,消费者按消费者名称的字典序排好序
3、然后,用分区总数除以消费者总数。如果能够除尽,则平均分配;若除不尽,则位于排序前面的消费者将多负责一个分区
在我们的例子里面,排完序的分区将会是0, 1, 2, 3, 4, 5, 6, 7, 8, 9;消费者线程排完序将会是C1-0, C2-0, C2-1。然后将partitions的个数除于消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。
在我们的例子里面,我们有10个分区,3个消费者线程,10 / 3 = 3,而且除不尽,那么消费者线程 C1-0 将会多消费一个分区,所以最后分区分配的结果看起来是这样的:
C1-0 将消费 0, 1, 2, 3 分区
C2-0 将消费 4, 5, 6 分区
C2-1 将消费 7, 8, 9 分区
假如我们有11个分区,那么最后分区分配的结果看起来是这样的:
C1-0 将消费 0, 1, 2, 3 分区
C2-0 将消费 4, 5, 6, 7 分区
C2-1 将消费 8, 9, 10 分区
假如我们有2个主题(T1和T2),分别有10个分区,那么最后分区分配的结果看起来是这样的:
C1-0 将消费 T1主题的 0, 1, 2, 3 分区以及 T2主题的 0, 1, 2, 3分区
C2-0 将消费 T1主题的 4, 5, 6 分区以及 T2主题的 4, 5, 6分区
C2-1 将消费 T1主题的 7, 8, 9 分区以及 T2主题的 7, 8, 9分区
可以看出,C1-0 消费者线程比其他消费者线程多消费了2个分区,这就是Range strategy的一个很明显的弊端。
RoundRobin:该策略把主题的所有分区逐个分配给消费者,具体实现是org.apache.kafka.clients.consumer.RoundRobinAssignor
使用RoundRobin策略有两个前提条件必须满足:
同一个Consumer Group里面的所有消费者的num.streams必须相等;
每个消费者订阅的主题必须相同。
所以这里假设前面提到的2个消费者的num.streams = 2。RoundRobin策略的工作原理:将所有主题的分区组成 TopicAndPartition 列表,然后对 TopicAndPartition 列表按照 hashCode进行排序,最后按照round-robin风格将分区分别分配给不同的消费者线程。
在我们的例子里面,假如按照 hashCode 排序完的topic-partitions组依次为T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9,我们的消费者线程排序为C1-0, C1-1, C2-0, C2-1,最后分区分配的结果为:
C1-0 将消费 T1-5, T1-2, T1-6 分区;
C1-1 将消费 T1-3, T1-1, T1-9 分区;
C2-0 将消费 T1-0, T1-4 分区;
C2-1 将消费 T1-8, T1-7 分区。
多个主题的分区分配和单个主题类似。
消费者组分区在均衡
- 同一个 consumer 消费者组 group.id 中,新增了消费者进来,会执行 Rebalance 操作
- 消费者离开当期所属的 consumer group组。比如宕机
- 分区数量发生变化时(即 topic 的分区数量发生变化时)
- 消费者主动取消订阅
Rebalance的过程如下:
第一步:所有成员都向coordinator发送请求,请求入组。一旦所有成员都发送了请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader。
第二步:leader开始分配消费方案,指明具体哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案发给coordinator。coordinator接收到分配方案之后会把方案发给各个consumer,这样组内的所有成员就都知道自己应该消费哪些分区了。
所以对于Rebalance来说,Coordinator起着至关重要的作用
Kafka消费者
消费消息方式
consumer采用pull(拉)模式从broker 中读取数据。
push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull模式则可以根据consumer的消费能力以适当的速率消费消息。
pull模式不足之处是,如果 kafka没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka的消费者在消费数据时会传入一个时长参数 timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回,这段时长即为 timeout。
offset的维护
由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。
Kafka早期版本使用ZooKeeper为每个消费者存储offset。在消费者对指定消息分区进行消息消费的过程中,需要定时地将分区消息的消费进度Offset记录到Zookeeper上,以便在该消费者进行重启或者其他消费者重新接管该消息分区的消息消费后,能够从之前的进度开始继续进行消息消费。
Offset在Zookeeper中由一个专门节点进行记录,其节点路径为:
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
节点内容就是Offset的值。
Kafka 0.9版本之前,consumer 默认将offset保存在Zookeeper中,由于ZooKeeper写入性能较差,从0.9版本开始,consumer 默认将offset保存在Kafka一个内置的topic中,该topic为_consumer_offsets。
数据可靠性保证
1)Producer 往 Broker 发送消息:为保证producer 发送的数据,能可靠的发送到指定的topic,在 Producer 里面提供了消息确认机制。通过配置来决定消息发送到对应分区的几个副本才算消息发送成功。可以在定义 Producer 时通过 acks 参数指定(在 0.8.2.X 版本之前是通过 request.required.acks 参数设置的)。
这个参数支持以下三种值:
acks = 0:生产者不会等待 broker 的 ack,这个延迟最低但是存储的保证最弱当 server 挂掉的时候就会丢数据。
acks = 1:服务端会等待 ack , leader 副本确认接收到消息后发送 ack 但是如果 leader挂掉后他不确保是否复制完成新 leader 也会导致数据丢失。
acks = -1(all)(这个和 request.required.acks = -1 含义一样):服务端会等所有的 follower 的副本受到数据后才会受到 leader 发出的ack,确保leder挂掉后,能在follower中选举出新的leader,这样数据不会丢失。
另外,Producer 发送消息还可以选择同步(默认,通过 producer.type=sync 配置) 或者异步(producer.type=async)模式。如果设置成异步,虽然会极大的提高消息发送的性能,但是这样会增加丢失数据的风险。如果需要确保消息的可靠性,必须将 producer.type 设置为 sync。
2)Leader 选举
设想以下情景: leader 收到数据,所有follower都开始同步数据,但有一个follower,因为某种故障,迟迟不能与leader进行同步,那leader就要一直等下去,直到它完成同步,才能发送ack。这个问题怎么解决呢?
Leader维护了一个动态的in-sync replia set(ISR),意为和 leader保持同步的follower集合。当ISR中的follower完成数据的同步之后, leader就会给follower发送ack。如果follower长时间未向leader 同步数据,则该follower将被踢出ISR,该时间阈值由replica.lag.time.max.ms参数设定。Leader "发生故障之后,就会从ISR中选举新的leader。
ISR、OSR、AR 是什么?
ISR:In-Sync Replicas 副本同步队列:kafka中与leader副本保持一定同步程度的副本(包括leader)组成ISR
OSR:Out-of-Sync Replicas 与leader滞后太多的副本组成OSR
AR:Assigned Replicas 所有副本统称AR
对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR中的follower全部接收成功。
所以用户可根据对可靠性和延迟的要求进行权衡,配置合适的消息应答机制。
3)故障处理
每个副本都与leader节点保持着一定的同步程度,那假如主节点挂了选取谁才能使Consumer 都能读到一样的数据呢?
LEO:指的是每个副本最大的offset;
HW:指的是消费者能见到的最大的offset,ISR队列中最小的LEO,consumer 最多只能消费到 HW 所在的位置上一条信息。
(1) follower故障。
follower发生故障后会被临时踢出ISR,待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步。等该follower的LEO大于等于该Partition 的HW,即 follower追上 leader之后,就可以重新加入ISR了。
(2)leader 故障。
leader 发生故障之后,会从ISR中选出一个新的leader之后,为保证多个副本之间的
数据一致性,其余的follower会先将各自的log文件高于HW的部分截掉,然后从新的leader同步数据。
通过HW使得无论是从主节点中读取数据还是ISR中某个副本读,都能保证可见的数据是一样的,从而保证消费者消费数据的一致性。
Exactly Once 语义
将服务器的ACK级别设置为-1,可以保证Producer到 Server之间不会丢失数据,即AtLeast Once语义。相对的,将服务器ACK级别设置为0,可以保证生产者每条消息只会被发送一次,即At Most Once语义。
At Least Once可以保证数据不丢失,但是不能保证数据不重复;相对的,At Least Once可以保证数据不重复,但是不能保证数据不丢失。但是,对于一些非常重要的信息,比如说交易数据,下游数据消费者要求数据既不重复也不丢失,即 Exactly Once语义。在0.11版本以前的Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。对于多个下游应用的情况,每个都需要单独做全局去重,这就对性能造成了很大影响。
0.11版本的Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指Producer不论向Server发送多少次重复数据,Server端都只会持久化一条。幂等性结合At Least Once语义,就构成了Kafka的Exactly Once语义。即:
At Least Once = 幂等性 = Exactly Once
要启用幂等性,只需要将Producer的参数中 enable.idompotence 设置为 true即可。Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的Producer在初始化的时候会被分配一个PID,发往同一 Partition 的消息会附带Sequence Number。而Broker端会对<PID,Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker 只会持久化一条。
但是PID重启就会变化,同时不同的Partition也具有不同主键,所以幂等性无法保证跨
分区跨会话的Exactly Once。力
Kafka高效读写数据
1)顺序写磁盘
Kafka 的producer生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到600M/s,而随机写只有100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。
2)零复制技术
减少用户空间(用户可以操作的内存缓存区域)与CPU内核空间(CPU可以操作的内存缓存区域及寄存器)的拷贝过程,减少用户上下文(用户状态环境)与CPU内核上下文(CPU内核状态环境)间的切换,提高系统效率”
3)文件分区分段存储,每个分段后的数据文件对应着一个索引文件,可通过索引文件快速检索到数据, 同时index文件中并没有为数据文件中的每条Message建立索引,而是采用了稀疏存储的方式,每隔一定字节的数据建立一条索引。这样避免了索引文件占用过多的空间,从而可以将索引文件保留在内存中。
4)批量发送数据
5)数据压缩
Zookeeper在 Kafka中的作用
1)Broker的注册
Broker是分布式部署并且相互之间相互独立,但是需要有一个注册中心对整个集群的Broker进行管理,此时就使用了Zookeeper。在Zookeeper上会有一个专门用来记录Broker服务器列表的节点:/brokers/ids
每个Broker在启动时,都会在Zookeeper上进行注册,即到/brokers/ids下创建属于自己的节点,如/brokers/ids/[0…N]。
Kafka使用了全局唯一的数字来指代每个Broker服务器,不同的Broker必须使用不同的Broker ID进行注册,创建完节点后,每个Broker就会将自己的IP地址和端口信息记录到该节点中去。其中,Broker创建的节点类型是临时节点,一旦Broker宕机,则对应的临时节点也会被自动删除。这样,我们就可以很方便的监控到Broker节点的变化,及时调整负载均衡等。
2)Topic的注册
在kafka中,用户可以自定义多个topic,每个topic又被划分为多个分区,每个分区存储在一个独立的broker上。这些分区信息及与Broker的对应关系都是由Zookeeper进行维护。
在zookeeper中,建立专门的节点来记录这些信息,其节点路径为/brokers/topics/{topic_name}。并且topic创建的节点类型也是临时节点
3)生产者负载均衡
同一个Topic消息会被分区并将其分布在多个Broker上。由于每个Broker启动时,都会在Zookeeper上进行注册,生产者会通过该节点的变化来动态地感知到Broker服务器列表的变更,这样就可以实现动态的负载均衡。
4)消费者负载均衡
与生产者类似,Kafka中的消费者同样需要进行负载均衡来实现多个消费者合理地从对应的Broker服务器上接收消息,每个消费者分组包含若干消费者,每条消息都只会发送给分组中的一个消费者,不同的消费者分组消费自己特定的Topic下面的消息,互不干扰。
每个消费者都需要关注所属消费者分组中其他消费者服务器的变化情况,即对/consumers/[group_id]/ids节点注册子节点变化的Watcher监听,一旦发现消费者新增或减少,就触发消费者的负载均衡。还对Broker服务器变化注册监听。消费者需要对/broker/ids/[0-N]中的节点进行监听,如果发现Broker服务器列表发生变化,那么就根据具体情况来决定是否需要进行消费者负载均衡。
5)分区与消费者的关系
消费者组 Consumer group 下有多个 Consumer(消费者)。
对于每个消费者组 (Consumer Group),Kafka都会为其分配一个全局唯一的Group ID,Group 内部的所有消费者共享该 ID。订阅的topic下的每个分区只能分配给某个 group 下的一个consumer(当然该分区还可以被分配给其他group)。同时,Kafka为每个消费者分配一个Consumer ID。
在Kafka中,规定了每个消息分区 只能被同组的一个消费者进行消费,因此,需要在 Zookeeper 上记录 消息分区 与 Consumer 之间的关系,每个消费者一旦确定了对一个消息分区的消费权力,需要将其Consumer ID 写入到 Zookeeper 对应消息分区的临时节点上,例如:
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]
其中,[broker_id-partition_id]就是一个 消息分区 的标识,节点内容就是该消息分区上消费者的Consumer ID。
6)记录消息消费的进度Offset
Kafka早期版本使用ZooKeeper为每个消费者存储offset。在消费者对指定消息分区进行消息消费的过程中,需要定时地将分区消息的消费进度Offset记录到Zookeeper上,以便在该消费者进行重启或者其他消费者重新接管该消息分区的消息消费后,能够从之前的进度开始继续进行消息消费。
Offset在Zookeeper中由一个专门节点进行记录,其节点路径为:
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
节点内容就是Offset的值。
Kafka 0.9版本之前,consumer 默认将offset保存在Zookeeper中,由于ZooKeeper写入性能较差,从0.9版本开始,consumer 默认将offset保存在Kafka一个内置的topic中,该topic为_consumer_offsets。
7) 消费者注册
- 注册新的消费者分组 当新的消费者组注册到zookeeper中时,zookeeper会创建专用的节点来保存相关信息,其节点路径为 /consumers/{group_id},其节点下有三个子节点,分别为[ids, owners, offsets]。
ids节点:记录该消费组中当前正在消费的消费者;
owners节点:记录该消费组消费的topic信息;
offsets节点:记录每个topic的每个分区的offset;
- 注册新的消费者 当新的消费者注册到zookeeper中时,会在/consumers/{group_id}/ids节点下创建临时子节点,并记录相关信息
总结:保存着集群 broker、 topic、 partition等meta 数据;另外,还负责broker故障发现,partition leader选举,负载均衡等功能,同时记录着每个分区消息的偏移量,方便下次读取下一条数据。
Kafka事务
Kafka从0.11版本开始引入了事务支持。事务可以保证Kafka在 Exactly Once语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败。
Producer事务
为了实现跨分区跨会话的事务,需要引入一个全局唯一的Transaction ID,并将Producer获得的PID和Transaction ID绑定。这样当Producer重启后就可以通过正在进行的TransactionID获得原来的PID。
为了管理Transaction,Kafka引入了一个新的组件Transaction Coordinator。Producer就是通过和‘Transaction Coordinator交互获得Transaction ID对应的任务状态。TransactionCoordinator还负责将事务所有写入Kafka的一个内部Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。
Consumer事务
上述事务机制主要是从Producer方面考虑,对于Consumer而言,事务的保证就会相对较弱,尤其时无法保证Commit 的信息被精确消费。这是由于Cosumer可以通过 offset访问任意信息,而且不同的SegmentFile生命周期不同,同一事务的消息可能会出现重启后被删除的情况。
版权归原作者 三月不灭 所有, 如有侵权,请联系我们删除。