一、# 基础知识
1、安装
- 部署一台ZooKeeper服务器;
- 安装jdk;
- 下载kafka安装包;
- 上传安装包到kafka服务器上:/usr/local/kafka;
- 解压缩压缩包;
- 进入到config目录,修改server.properties配置信息:
#broker.id属性在kafka集群中必须要是唯⼀
broker.id=0
#kafka部署的机器ip和提供服务的端⼝号
listeners=PLAINTEXT://192.168.65.60:9092
#kafka的消息存储⽂件
log.dir=/usr/local/data/kafka-logs
#kafka连接zookeeper的地址
zookeeper.connect=192.168.65.60:2181
- 进入到bin目录,使用命令启动kafka服务器(带配置文件)
./kafka-server-start.sh -daemon ../config/server.properties
- 检查kafka是否启动成功:
进入到zk内查看是否有kafka节点:
/brokers/ids/0
2、基本概念
名称
说明
Broker
消息中间件处理节点,一个kafka节点为一个broker,一个或者多个broker组成一个kafka集群
Topic
消息主题。kafka根据topic对消息进行分类,发布到kafka集群的每条消息都需要指定一个topic
Partition
Topic在物理上的分区,一个Topic可以分为多个Partition,每个Partition是一个有序的记录序列。
Replica
Partition的副本
Producer
消息生产者。向broker发送消息的客户端。
Consumer
消息消费者。从broker读取消息的客户端。
Consumer Group
消费组。一个消费组可以包含一个或者多个消费者,每条消息只能被消费组的某个消费者消费
3、主题创建
- 通过kafka命令向zk中创建一个主题
./kafka-topics.sh --create --zookeeper 172.16.253.35:2181 --replicationfactor 1 --partitions 1 --topic test
- 查看当前zk中所有的主题
./kafka-topics.sh --list --zookeeper 172.16.253.35:2181 test
4、发送消息
把消息发送给broker的某个topic,打开一个kafka发送消息的客户端,然后开始用客户端向kafka服务器发送消息。
./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --topic test
5、消费消息
打开一个消费消息的客户端,向kafka服务器的某个主题消费消息。
生产者将消息发送给broker,broker会将消息保存到本地的日志文中。/usr/local/kafka/data/kafka-logs/主题-分区/00000000.log;消息的保存是有序的,通过offset偏移量来描述消息的有序性;消费者消费消息时也是通过offset来描述所要消费消息的位置。
- 方式一:从当前主题中的最后一条消息的offset + 1 开始消费:
./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --topic test
- 方式二:从当前主题的第一条消息开始消费
./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --from-beginning --topic test
6、单播&&多播消息
如果多个消费者在同一个消费组,那么只有一个消费者可以订阅到topic中的消息。即,同一个消费组中只能有一个消费者收到一个topic中的消息。
不同的消费组订阅同一个topic,那么不同消费组中各只有一个消费者能收到消息。
./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --consumer-property group.id=testGroup1 --topic test
./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --consumer-property group.id=testGroup2 --topic test
7、查看消费组信息
/kafka-consumer-groups.sh --bootstrap-server 172.16.253.38:9092 --describe --group testGroup
- current-offset: 最后被消费的消息的偏移量;
- Log-end-offset: 消息总量(最后⼀条消息的偏移量);
- Lag:积压了多少条消息。
二、主题与分区
1、主题 topic
kafka通过topic对消息进行分类,不同的topic会被订阅该topic的消费者消费。
如果一个topic的消息非常多,消息保存在log日志文件中,会占用大量的磁盘空间。为了解决文件过大的问题,kafka提出了Partition分区的概念。
2、分区 Partition
一个主题可以分为多个分区,一个分区只属于一个主题。同一个主题下不同分区包含的消息不同。消息在分区上的存储可以看作是日志文件的追加写入,消息被写入的时候会分配一个特定的偏移量(offset)。offset是消息在分区位置的标识,kafka通过offset保证分区内消息的顺序性。
通过partition将一个topic中的消息分区来存储。好处:
- 分区存储,解决了统一存储文件过大的问题,方便集群扩展;
- 提升了读写的吞吐量:读和写可以同时在多个分区中进行。
创建多个分区的主题:
./kafka-topics.sh --create --zookeeper 172.16.253.35:2181 --replicationfactor 1 --partitions 2 --topic test1
3、消息日志
(1)存储文件类型
- .index:索引文件;
- .log:日志文件,保存的为消息;
- .timeindex:时间索引文件。
topic下的Partition存储的文件过大的话会以分段(segment)的形式存储,分段的优点:
- 删除无用文件更为方便,提高磁盘利用率;
- 查找数据更为便捷(文件以偏移量命名,查找速度更快)。
(2)topic:__consumer_offsets
__consumer_offset-49:
- kafka内部创建了主题 “__consumer_offsets” 包含50个分区。这个主题用来存放消费者消费某个主题的偏移量。每个消费者会自己维护消费的主题的偏移量,即每个消费者会把消费的主题的偏移量自主上报给kafka中的默认主题consumer_offsets。kafka为了提升这个主题的并发性,默认设置了50个分区。 - 提交到哪个分区,通过hash函数确定: - hash(cunsumerGroupId)% __consumer_offsets 主题的分区数;- 提交到该主题的内容:key为 consumerGroupId+topic+分区号,value为当前的offset值。
注:在0.10.0(不包括0.10.0)之前,Kafka使用ZooKeeper来存储每个Topic分区的偏移量信息。随着Kafka集群规模的扩大,ZooKeeper的性能瓶颈逐渐显现。为了解决这个问题,Kafka引入了一个新的机制,即
__consumer_offsets
主题,用于存储消费组的偏移量信息
(3)清理策略
- 策略1:根据消息的保留时间,超过了指定的保留时间,触发清理(默认168小时,即7天);
log.retention.hours=168
- 策略2:根据topic存储数据量的大小,当topic的日志文件占用空间大于指定阈值,则会删除最久的消息(需要手动开启)。
Q:kafka数据清理机制?
A:
(1)介绍Kafka存储结构
- Kafka 中 topic 的数据存储在分区上,分区如果文件过大会分段存储segment;
- 每个分段都在磁盘上以 索引(xxxx.index) 和 日志文件(xxxx.log) 的形式存储;
- 分段的好处:①能够减少单个文件内容的大小,查找数据方便;②方便kafka进行日志清理。
(2)日志的清理策略有两个:
- 根据设置的消息的保留时间:当消息保存的时间超过了指定的时间,就会触发清理(默认是168小时,7天);
- 根据 topic 存储的数据大小:当topic所占的日志文件大小大于一定的阈值,则开始删除最久的消息。(默认关闭)
三、集群
kafka的服务端由被称为Broker的服务进程构成,即一个kafka集群由多个Broker组成。如果集群中的某一台机器宕机,其他机器上的Broker仍然能够对外提供服务,保证kafka的高可用性。
1、集群搭建
- 创建多个server.properties文件
# 0 1 2
broker.id=2
// 9092 9093 9094
listeners=PLAINTEXT://192.168.65.60:9094
// kafka-logs kafka-logs-1 kafka-logs-2
log.dir=/usr/local/data/kafka-logs-2
- 通过命令分别启动各个broker
./kafka-server-start.sh -daemon ../config/server.properties
./kafka-server-start.sh -daemon ../config/server1.properties
./kafka-server-start.sh -daemon ../config/server2.properties
- 检查是否启动成功
进入到zk中查看 /brokers/ids 中是否有对应的znode(0,1,2)。
2、副本
# 副本
./kafka-topics.sh --create --zookeeper 172.16.253.35:2181 --replicationfactor 3 --partitions 2 --topic my-replicated-topic
# 查看topic情况
./kafka-topics.sh --describe --zookeeper 172.16.253.35:2181 --topic myreplicated-topic
副本是为了给主题中的分区Partition创建多个备份,多个副本在kafka的集群的多个broker中,会有一个副本作为Leader,其他为Follower。
(1)Leader:
- kafka 的写和读操作,都发生在Leader上。Leader负责把数据同步给Follower,如果Leader挂了,通过主从选举,从多个Follower中选举产生一个新的Leader。
(2)Follower:
- 接收Leader的数据同步。
(3)ISR(in-sync replica):
Kafka的ISR(In-Sync Replicas)存储的是与主题相关的所有已提交且成功写入的分区数据副本。这些副本用于确保数据的可靠性和一致性,并且在发生故障时可以恢复数据。
可以同步和已经同步的副本会被存入到 isr集合 中。如果isr中的节点性能较差,会被从isr集合中剔除。
总结:集群中有多个broker,创建主题是可以指明主题有多个分区,可以为分区创建多个副本,不同的副本存放在不同的broker里。
在Kafka中,【副本leader的选举】是通过ZooKeeper来实现的。当一个分区没有活跃的leader副本时,会触发leader副本的选举过程。这个过程通常由以下步骤组成:
- Partition Leader Ephemeral Nodes:每个分区都有一个ZooKeeper节点,该节点包含了一个副本的ID和该副本的状态信息。
- ZooKeeper Watchers:每个副本都会注册一个watcher到ZooKeeper上,以便在节点状态发生变化时接收通知。
- Partition Leader Election:当一个副本失去连接或者被手动关闭时,ZooKeeper会通知其他副本进行选举。此时,所有副本都会尝试更新它们在ZooKeeper上的节点状态,以表明自己是新的leader副本。
- Highest-Sequence-Number Wins:在更新节点状态的过程中,副本会将自己的当前偏移量(即最高序列号)发送给ZooKeeper。ZooKeeper会比较所有副本的偏移量,并选择偏移量最大的那个副本作为新的leader副本。
- New Leader Announces Itself:一旦新的leader副本被选出来,它会向所有副本发送一条消息,告知它们自己成为了新的leader副本。
- Replica Syncs with New Leader:所有副本都会从新leader副本拉取最新的数据,以确保它们都拥有相同的数据副本。
3、集群消费
- 一个partition(分区)只能被一个消费组中的一个消费者消费,目的是为了保证消费的顺序性。多个partition的多个消费者的消费顺序的顺序性无法得到保证。
- partition的数量决定了消费组中消费者的数量,同一个消费组中的消费者的数量最好不要超过partition的数量,否则超出的消费者消费不到消息。
- 如果消费者挂了,会触发rebalance机制,会让其他消费者来消费该分区。
4、Controller、rebalance、Hw
(1)Controller
启动时,每个broker会向ZooKeeper创建一个临时序号节点,获得序号最小的那个broker将会作为集群中的Controller,负责:
- Leader选举:当集群中一个副本的Leader挂掉,需要在集群中选举出一个新的Leader,ZooKeeper会选择序列号最大的节点作为新的Leader;
- broker信息同步:当集群中有broker新增或者减少,Controller会同步信息给其他broker;
- 分区信息同步:当集群中有分区新增或者减少,Controller会同步信息给其他broker。
(2)reblance机制
Partition或者消费者数量发生了变化,需要重新建立消费者与Partition的消费关系。
前提:消费组中的消费者没有指明分区来消费;
触发的条件:消费组中的消费者和分区的关系发生了变化;
分区分配的策略:reblance之前,分区有三种分配策略:
- range:根据公式计算每个消费者消费哪几个分区,分区总数/消费者数量 + 1 (根据余数情况确定,前面几个消费者需要“+1”,后面几个不需要)。
- 轮询:即依次轮着来。
- sticky(粘合策略):如果需要reblance,会在之前已经分配的基础上进行调整,不会改变之前分配的情况。如果该策略没有开,那么就需要进行整体的重新分配。
(3)HW和LEO
HW是已经完成同步的位置。
消息在写入broker,且每个broker已经完成该消息的同步后,hw才会发生变化。在此之前消费者是消费不到这条消息的。在完成同步后,HW更新后,消费者才能消费到这条消息,这样的目的是为了防止消息丢失。
LEO(log-end-offset)是某个副本最后的消息位置。
四、消息的同步异步发送
注:该过程对应发送者和kafka集群(broker)的交互过程,与消费者无直接关联。
1、同步发送消息
# 确认机制,1 、 all
acks=all
sync.send=true
如果生产者发送消息没有收到ack,生产者会阻塞,阻塞到3s的时间,如果还没有收到消息,会进行重试。重试的次数为3次。
确认机制:指当生产者发送消息后,Kafka集群中至少有多少个副本接收到该消息后,才向生产者返回确认信息。
【生产者的三种ack配置】
- acks=0,kafka-cluster不需要任何的broker收到消息,就立即返回ack给生产者,最容易丢消息,效率最高(同步发送的场景勿配置0)。
- acks=1(默认),多副本之间的Leader已经收到消息,并把消息写入到本地的log中,才返回ack给生产者,性能和安全性较为均衡。
- acks=-1/all,配置min.insync.replicas=2(默认为1,推荐配置大于等于2),此时就需要Leader和一个Follower同步完成后,才返回给ack给生产者(此时集群中有2个broker已经完成数据的接收)。这种方式最安全,但性能最差。
2、异步发送消息
# 确认机制
acks=0
sync.send=false
异步发送,生产者发送完消息后就可以执行之后的业务流程,broker在收到消息后异步调用生产者提供的 callback() 回调方法。
3、消息发送的缓冲区
- kafka生产者默认会创建一个消息缓冲区,用来存放要发送的消息,默认为32mb; - props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
- kafka本地线程会去缓冲区中一次拉取16k的数据,发送到broker; - props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
- 如果拉取不到16k的数据,间隔10ms也会将已有的数据发送到broker。 - props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
五、消费者实现
1、消费者自动&&手动提交Offset
(1)提交的内容
“所属的消费组 + 主题 + 分区 + 消费的偏移量”,提交到集群的__consumer_offsets主题里面。
(2)自动提交
消费者poll消息下来后就自动提交offset。
// 是否自动提交offset,默认就是true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 自动提交offset的间隔时间
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
注:自动提交可能会丢消息。因为消费者在消费前提交offset,可能提交完后还没有完成消费,消费者就挂了。
(3)手动提交
需要把自动提交的配置改成false。
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
手动提交分为两种:
- 手动同步提交:在消费完消息后调用同步提交的方法,当集群返回ack前一直阻塞,返回ack后表示提交成功,执行之后的逻辑。
- 手动异步提交:在消息消费完后提交,不需要等待集群ack,直接执行之后的逻辑,可以设置一个回调方法,供集群调用。
2、长轮询poll消息
(1)默认情况下,消费者一次会拉取500条消息。
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
(2)可以设置长轮询的时间周期,例如1000ms。
- 如果⼀次poll到500条,就直接执行for循环。
- 如果这⼀次没有poll到500条。且时间在1秒内,那么⻓轮询继续poll,要么到500条,要么到1s。
- 如果多次poll都没达到500条,且1秒时间到了,那么直接执行for循环。
- 如果两次poll的间隔超过30s,集群会认为该消费者的消费能⼒过弱,该消费者被踢出消费组,触发rebalance机制,rebalance机制会造成性能开销。可以通过设置这个参数,让一次poll的消息条数少⼀点。
//⼀次poll最⼤拉取消息的条数,可以根据消费速度的快慢来设置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
//如果两次poll的时间如果超出了30s的时间间隔,kafka会认为其消费能⼒过弱,将其踢出消费组。将分区分配给其他消费者。-rebalance
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
3、消费者健康状态检查
消费者每隔1s向kafka集群发送一次心跳,如果集群发现超过10s没有续约的消费者,会将其踢出消费组,触发消费组的reblance机制,将该分区的交给消费组里的其他消费者进行消费。
//consumer给broker发送⼼跳的间隔时间
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
//kafka如果超过10秒没有收到消费者的⼼跳,则会把消费者踢出消费组,进⾏rebalance,把分区分配给其他消费者。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
六、常见问题
1、防止消息丢失
(1)生产者发送消息到Broker的过程丢失:
方式一:异步发送
- 设置异步发送,发送失败的情况使用回调记录或者重发;
- 失败重试,配置重试次数。
方式二:同步发送
- 使用同步发送消息的方式。
(2)消息在Broker中存储丢失:
- 把ack设置为1或者all(-1),设置同步的分区数 >= 2,让Follower节点参与保存数据的确认。
(3)消费者从Broker接收消息丢失:
- 关闭自动提交偏移量,开启手动提交偏移量;
- 提交方式,把自动提交改成手动提交(最好使用 同步 + 异步 提交)。
2、防止重复消费
如果生产者发送消息后,由于网络抖动等问题,没有收到ack,但是实际上broker已经收到了消息。此时,生产者会进行重试,于是broker就会收到多条相同的消息,从而造成重复消费。
解决:
- 生产者关闭重试。这种方式会造成消息丢失(不推荐);
- 消费者关闭自动提交偏移量,开启手动提交偏移量;
- 由消费者解决非幂等性消费问题: - 在数据库中创建联合主键,防止相同的主键创建出多条记录。- 使用分布式锁,以业务id为锁。保证只有一条记录能够创建成功。
3、保证顺序性消费
问题原因:一个topic的数据可能存储在不同的分区中,每个分区都有一个按照顺序存储的偏移量。如果消费者关联了多个分区,则不能保证顺序性。
解决该问题,只需要保证需要顺序消费的消息出现在同一个分区。
解决方法:
- 方式一: - 发送消息时,指定分区号;- 发送消息时,按照相同的业务设置相同的key(默认情况下,分区是通过key的hashcode值来确定分区的。因此,key一样的话,分区也是一样的);
- 方式二(不推荐): - 生产者:使用同步发送,ack设置成非0的值(1或者-1(all))。- 消费者:主题只设置一个分区,消费组只设置一个消费者。
主:实际kafka顺序消费的场景不多,因为会牺牲掉性能。
4、消息积压
(1)出现的原因
消费者的消费速度赶不上生产者的生产速度,导致kafka中大量的数据没有被消费。
随着积压消息的增多,消费者的寻址性能会下降,最终导致整个kafka对外提供服务的性能很差,从而造成其他服务访问速度变慢,造成服务雪崩。
(2)解决方案
- 在消费者中,使用多线程,充分利用机器的性能进行消费消息。
- 通过业务的架构设计,提升业务层面消费的性能。
- 创建多个消费组,多个消费者,部署到其他机器上,一起消费,提高消费者的消费速度。
- 创建一个消费者,该消费者在kafka另建一个主题,配上多个分区,多个分区再配上多个消费者。该消费者将消息poll下来,不进行消费,直接转发到新建的主题上。此时,新的主题的多个分区的多个消费者就开始一起消费了。(不常用)
5、应用场景
- 消息系统:用作消息中间件;
- 系统解耦
- 流量削峰
- 异步处理
- 日志聚合
6、高性能设计
- 消息分区:不受单台服务器的限制,可以不受限的处理更多的数据;
- 顺序读写:磁盘顺序读写,提升读写效率;
- 页缓存:把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问;
- 零拷贝:减少上下文切换及数据拷贝;
- 消息压缩:减少磁盘IO和网络IO;
- 分批发送:将消息打包批量发送,减少网络开销。
(1)零拷贝
非零拷贝流程:
消息保存的过程:生产者通过用户空间kafka将消息传递给内核空间的页缓存进行保存,然后页缓存再将消息传递到磁盘空间。
消息消费的过程:消费者通过用户空间的kafka到内核空间的页缓存中去查找数据,如果没有目标消息,则继续查找磁盘文件。从磁盘文件中将消息拷贝到页缓存,再由页缓存将消息拷贝到用户空间kafka。
发送给消费者需要用到socket连接和网卡,此过程首先将用户空间kafka中消息拷贝到内核空间的socket缓冲区,然后由socket缓冲区将消息拷贝到网卡,继而发给Consumer。
零拷贝流程:
消息的保存过程同“非零拷贝流程”。
消息的消费过程减少了从页缓存到kafka的拷贝,及kafka到socket缓冲区的拷贝。而是由页缓存直接将消息拷贝到网卡,继而发送给consumer(发送的过程拷贝次数从4次减少到2次)。
Q:kafka实现高性能的设计有了解吗?
A:kafka 高性能是多方面协同的结果,包括宏观架构、分布式存储、ISR数据同步,以及高效的利用磁盘、操作系统等。主要包括:
消息分区:不受单台服务器的影响,可以同时处理更多的数据;
顺序读写:磁盘顺序读写,提升读写的效率;
页缓存:把磁盘的数据缓存到内存中,把对磁盘的访问变为对内存的访问;
零拷贝:减少上下文切换及数据拷贝的开销;
消息压缩:减少磁盘IO和网络IO;
分配发送:消息批量发送,减少网络开销。
参考:
https://www.bilibili.com/video/BV1Xy4y1G7zA?p=1;
https://www.bilibili.com/video/BV1yT411H7YK;
https://www.jianshu.com/p/d3e963ff8b70;
https://juejin.cn/post/7179046224379510844;
https://juejin.cn/post/6844903565731823623;
版权归原作者 Janebook 所有, 如有侵权,请联系我们删除。