一、Kafka定义
Kafka传统定义:
Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。
Kafka最新定义
Kafka是 一个开源的分布式事件流平台 (Event Streaming Platform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。
但是目前来看,将它作为消息队列的使用还是更多一些。未来可能会向着它官网最新定义发展,我们也会持续关注其变化。
说明一下:Kafka的分布式是说针对正常的线上环境一般都是分布式的,当然单机也是可以的。
二、消息队列
分类
目前,企业中比较常见的消息队列产品主要有: Kafka、RabbitMQ 、 RocketMQ、ActiveMQ 等。 在大数据场景主要采用 Kafka 作为消息队列。在 JavaEE 开发中主要采用RabbitMQ、RocketMQ、 ActiveMQ。
传统消息队列的应用场景
传统的消息队列的主要应用场景包括:缓存/消峰、解耦和异步通信。
也可以说是消息队列的三大作用。具体的在RabbitMQ的笔记中有介绍过。
消息队列的两种模式
前面在RabbitMQ中也提到了,那就是点对点模式和发布订阅模式。虽然RabbitMQ内部又细分为很多种模式,但是大的方向,就是这两种的。
- 点对点式:- 消息发送者发送消息,消息代理将其放入一个队列中,消息接收者监听该队列,从队列中获取消息内容,消息读取后被移出队列。- 消息只有唯一的发送者和接受者,但并不是说只能有一个接收者。【意思是说可以有很多接收者监听该消息队列,但是某一个消息最终只能被一个接收者接收】
- 发布订阅式:发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个 主题,那么就会在消息到达时同时收到消息。
也即点对点就是消息最终只能有一个消费者,发布订阅模式可以有多个消费者消费。
这里需要说明的是:
Kafka中发布订阅模式和RabbitMQ的发布订阅模式实现上稍微有些区别:
RabbitMQ中发送消息给扇出交换机,与该扇出交换机绑定的所有队列都能接受到该消息,不管发送的路由键是什么。消费者绑定队列来消费消息。但是,如果多个消费者绑定同一个队列,一旦某一个消费者消费了该队列的消息,那么该队列就会将该消息删除掉,其它消费者就无法获取了。当然,绑定其他队列的消费者不受影响。因为消费后删除的只是自己所在队列中的消息,与该交换机绑定的其他队列不受影响。
Kafka中队列的消息,一旦消费后不会立即删除。它会主动控制什么时候删除,这个后面再说。
实际开发中,我们都是更多的使用发布订阅模式。这样处理效率会更高些。
以上只是对消息队列的一个基本认识,各个消息中间件之间有所区别。各位也不必执着,谁谁谁不一样之类的。我们会在后面的讲解中,具体介绍各自的用法和细节。
三、Kafka基础架构图
四、安装Kafka
因为Kafka的生产者和消费者都是用Java写的,而且依赖于zookeeper,所以我们要提前安装好这两个。
4.1 为每台服务器下载Kafka并解压
官网:Apache Kafka
这里演示下载kafka_2.12-3.0.0.tgz,将其放入linux系统,进行解压
解压命令为:
tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/module/
解压后,进入该目录然后重命名一下,好看一点
mv kafka_2.12-3.0.0 kafka
4.2 查看目录结构
然后我们查看解压后的目录结构
4.3 为每台服务器修改配置文件server.properties
下面,我们要想快速启动Kafka集群,还需要修改配置server.properties
需要修改的信息
#Kafka的身份唯一标识,配置的集群一定要保证每一台都不一样,这和zookeeper的myid一个道理
broker.id=0
#存储Kafka中数据的位置,默认是临时目录下:/tmp/kafka-logs,我们放在一个自己指定的地方
log.dirs=/opt/module/kafka/datas
#listener=PLAINTEXT://:9092默认被注释掉了,我们需要取消掉 listeners 的注释,然后修改值为如下,其中 192.168.17.100 是当前服务器的IP地址。
#注意:Kafka集群中每台服务器上的 server.properties 配置文件都需要修改 listeners 配置项,都修改为自己对应服务器的IP地址。
listeners=PLAINTEXT://192.168.17.100:9092
#注册中心zookeeper默认连接的是本地的2181端口,localhost:2181,我们需要连接到之前创建的zookeeper集群中
#多个服务器之间用,号隔开,但最后我们需要写/kafka,
#如果不采用目录树下面放一个指定的目录的话,那么Kafka中的信息就会打散到zookeeper中
#如果后续要注销某个Kafka的话,就需要手动去zookeeper中一个一个找到删掉,不利于管理
zookeeper.connect=192.168.17.100:2181,192.168.17.101:2181,192.168.17.102:2181/kafka
注意每台服务器都需要修改,它们不一样的地方一个是broker.id分别为0、1、2。
然后监听的ip地址是各自的服务器ip。
4.4 为每台服务器配置Kafka环境变量
vim /etc/profile.d/my_env.sh
#JAVA_HOME
export JAVA_HOME=/opt/module/jdk1.8.0_202
export PATH=$PATH:$JAVA_HOME/bin
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
让配置生效:
source /etc/profile
4.5 启动zookeeper集群
ok,到这里基本的安装配置就算完成了,下面准备启动。
启动Kafka集群之前,必须先启动zookeeper集群。
之前在zookeeper学习笔记中,有讲解创建一个启动脚本,直接一次启动即可。我们查看一下是否都启动起来了。
或者使用jps命令,只不过每台都要使用jps命令查看(当然你如果配置了其他命令也不用)
4.6 启动Kafka集群
在每台服务器中都启动一下
kafka-server-start.sh -daemon ../config/server.properties
如果通过jps命令发现没有启动成功,需要检查一下是否配置文件设置的有问题。什么都没有问题,还要确认一下创建的data目录是否是干净的,里面不能有任何东西。
是否启动成功我们通过jps命令可以检查
我们也可以为它创建shell脚本,前面zookeeper中已经讲解过。
#!/bin/bash
case $1 in
"start"){
for i in 192.168.17.100 192.168.17.101 192.168.17.102
do
echo "--------启动 $i kafka"
ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
done
}
;;
"stop"){
for i in 192.168.17.100 192.168.17.101 192.168.17.102
do
echo "--------停止 $i kafka"
ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh"
done
}
;;
esac
在家目录下的bin目录中创建一个kf.sh文件,然后复制上面的shell脚本,保存
vim kf.sh
接着,将它变成可执行命令
chmod 777 kf.sh
测试一下,ok
查看是否启动成功,通过jps命令即可,注意每台都要看一下
都检查ok以后,我们的Kafka集群就算启动成功了。
个人建议后续通过这种脚本的方式启动集群,更为方便。这样就不用每台都要输这么多命令了,比较麻烦。
4.7 关闭Kafka集群的注意事项
一定要关闭Kafka之后再关闭zookeeper集群,否则会导致Kafka出问题!!!
关闭也不用多说了,前面脚本中也配置了的,只需要执行kf.sh stop即可关闭。
五、Topic命令
针对消费者、生产者、Kafka服务器集群都有不同的命令。
针对主题的命令为:kafka-topics.sh
具体有哪些命令,这里面都提示了的。核心命令如下:
参数描述--bootstrap -server <String: server toconnect to>连接的 Kafka Broker 主机名称和端口号--topic <String:topic >操作的 topic 名称--create创建主题--delete删除主题--alter修改主题--list查看所有主题--describe查看主题详细描述--partitions <Integer:# of partitions>设置分区数--replication-factor <Integer:replication factor>设置分区副本--config <String:name=value >更新系统默认的配置
5.1 查看服务器中的所有 topic
[root@centos100 kafka]# bin/kafka-topics.sh --bootstrap-server http://centos100:9092 --list
注意--bootstrap-server 后面接Kafka集群任意一台都可以,集群中所有服务器中的数据都能看到。但是如果该台挂了,我们就连接不上了。所以生产环境下,可以多连接几台。
上图这里说明还没有topic。
如果出现连接不上很可能是之前server.properties没有取消掉对listeners这段的注释。可以回过去看一看,每台服务器都要改成各自的IP地址,然后记得关闭Kafka集群再重启。
5.2 创建topic主题
[root@centos100 kafka]# bin/kafka-topics.sh --bootstrap-server centos100:9092 --topic test1-topic --create --partitions 1 --replication-factor 3
解释一下这段命令:
我们通过任意一台服务器的9092端口,连接到Kafka集群。
准备操作的是topic主题,名称为:test1-topic
要对这个这个主题做什么?是创建出来
创建的这个主题存到哪里,需要设置分区数和具体的分区副本。
创建成功,我们查看一下有没有。
ok!
5.3 查看某主题的详情信息
[root@centos100 kafka]# bin/kafka-topics.sh --bootstrap-server centos100:9092 --topic test1-topic --describe
解释一下信息说明:
主题叫做:test1-topic
主题id为:egmGOmNSQF6zaa0JOAU9sg
分区数:1
分区副本:3
底层分区安装1G为单位进行划分。
第二行:当前主题test1-topic,因为指定的是放在1个分区中的,所以分区Partition实际为:0(因为是0,1,2...这样排的),我们设置的是存放三份分区副本,实际Replicas也是的:0,1,2。但是它们会推举一个leader:0,后续生成者消费者就针对leader进行相关操作。具体怎么推举的这个后面说。
5.4 修改topic主题分区数量
[root@centos100 kafka]# bin/kafka-topics.sh --bootstrap-server centos100:9092 --topic test1-topic --alter --partitions 3
查看一下详情
注意修改的分区数一定不能小于原来的大小。 分区只能增加,不能减少。否则报错!
5.5 不能通过命令行的方式修改分区副本
六、生产者消费者命令
经过刚才的演示,大致已经知道了具体的方法了,生产者则是通过kafka-console-producer.sh这个命令来完成消息发送
具体有哪些指令也可以执行它出来看看。大致都很相似。
下面我们演示一下发送消息
6.1 生产者发送消息
[root@centos100 bin]# kafka-console-producer.sh --bootstrap-server centos100:9092 --topic test1-topic
现在已经发送了
我们再开一台服务器来作为消费者接收试试
6.2 消费者接收消息
发现没有接受到数据。我们再回去发送一条试试?
我们发现,历史发送的数据收不到,连接后发送的数据是可以收到的。
那要收到历史发送的消息怎么办呢?
其实只需要消费者在后面再拼接命令,就可以看到所有数据
[root@centos101 kafka]# bin/kafka-console-consumer.sh --bootstrap-server centos101:9092 --topic test1-topic --from-beginning
实际开发中,要不要加上 --from-beginning呢?这个要根据情况而定。
ok,基本命令就演示到这里。
七、生产者原理
实际开发工作中,我们一般都是通过代码来完成消息的生产和消费一系列动作的。但是在进行代码演示之前,我们先需要搞懂生产者发送的基本原理,这样有助于对Kafka的整体认识。
生产者发送消息的流程
准备好消息后,Producer首先调用send方法进行发送
首先会经过拦截器,可以对数据进行一些加工处理,当然也可以不配置,或者不用Kafka的拦截器配置。
随后会经过序列化,Kafka并没有采用Java提供的序列化器,而是自己实现的序列化器。因为Java提供的序列化器,会在原有数据的基础上,增加很多的用于安全校验的数据,在大数据的场景下,每次传输的数据量很大,如果在此基础上还要加入大量用于安全校验的数据,严重的影响了效率,所以kafka等中间件,自己实现了序列化器,仅仅进行简单的校验,增加了效率。
随后经过分区器(分区器实际上是将数据发送到了缓冲队列中,缓冲队列是一个双端队列,其内部包含内存池,避免频繁的申请和释放内存)
因为Kafka可以对topic进行分区,所以发送时就需要确定向哪个分区发送信息,就由分区器定义的规则来发送。一个topic的分区对应一个队列,这些队列都是在内存中创建的,总大小默认32M,每一批次默认大小16K。
听不明白这里说明一下:
具体发送到哪个分区,是通过send()方法的形参指定好分区或者使用默认的分区策略或者指定使用自定义的分区策略,它就会自动将此次send()方法发送的内容发送到具体topic某一分区上。
但是在发送之前,它会由分区器先将数据发送到内存中的缓冲队列中,缓冲队列里面可能有很多个队列,这个根据topic主题设置的分区而定,一个队列就对应着当前topic上具体的一个分区。
什么时候发送?
当消息通过分区器以后,到达了指定的缓冲队列中。
一旦满足如下两个条件任何一个,即可调用sender线程帮助我们将缓冲队列中的数据,发送到kafka集群中。
(一)batch.size:即批次大小。
只有数据累积到batch.size之后,sender才会发送数据。默认16K
(二)linger.ms:等待时间。
如果数据迟迟未达到batch.size,sender等待linger.ms设置的时间到了之后就会发送信息。单位ms,默认值是0ms(即默认到了就发送,不等待到达batch.size阈值)
八、生产者发送方式之同步、异步发送
8.1 基本概念
异步发送:指的是生产者将消息发送到缓冲队列中就可以再次调用方法发送其余的数据。
同步发送:要等消息发送到队列并且再发送到Kafka集群并收到服务器返回的ack通知之后,才能再次调用方法发送其余的数据。否则一直处于阻塞状态。
8.2 生产者异步发送
send()方法默认即是异步发送。
下面开始代码演示
创建一个springboot工程。
导入依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
package com.hssy.kafkademo.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomProducer {
public static void main(String[] args) {
// 0 配置
Properties properties = new Properties();
// 1 连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"centos100:9092,centos101:9092");
// 2 指定对应的key value的序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 3 创建Kafka生产者对象
// "" hello
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// 4 发送数据
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("test1-topic","你好,我是高启强!" + i));
}
// 5 关闭连接
kafkaProducer.close();
}
}
发送之前,我们先通过服务器命令开启一个消费者,这样好观察我们发送的消息。
执行刚才的生产者代码,发送消息,我们可以看到能够成功收到消息,说明发送成功!
这个send方法还有很多重载方法,我们可以指定一个回调函数,这样,发送完毕会返回一个结果。方便我们观察和判断。
// 4 发送数据
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("test1-topic", "你好,我是高启强!" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null){
System.out.println(
"主题:" + recordMetadata.topic() +
"分区:" + recordMetadata.partition()
);
}
}
});
}
思考:为什么我们发送的消息分区都是0号分区?
前面讲解生产者原理的时候就说过了,如果我们调用send方法指定了分区的话,就会发送到指定分区,或者使用默认的分区策略,Kafka会按照某种规则帮我们指定了分区。
关于指定分区和分区策略,在下面的章节我会详细说明!
8.3 生产者同步发送
代码中唯一的区别就是send()方法后面再.get();然后抛出异常即可。这里就不演示了。
九、生产者消息发送分区
9.1 指定分区
调用send()方法发送消息的时候,需要传递一个record参数。
如果record对象中指定了分区,则使用指定的分区。
9.2 默认分区策略
- 如果没有指定分区,但有键,则根据键的散列选择分区
- 如果没有分区或键,则选择当批处理满时更改的粘性分区
关于粘性分区的详细信息请参见KIP-480
我们可以通过下图加深认识
9.3 自定义分区策略
当然了,如果不想使用默认的分区策略,我们还可以使用自定义的分区策略。
当使用自定义的分区策略后,默认的分区策略包括指定分区(其实严格意义上也属于默认分区策略)就不生效了。
9.4 代码演示
9.4.1 指定分区
// 2 发送数据
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("test1-topic", 1,"","你好,我是高启强!" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null){
System.out.println(
"主题:" + recordMetadata.topic() +
"分区:" + recordMetadata.partition()
);
}
}
});
}
全部发送到分区1了。
我们把分区改成2号,也是ok的。
三号分区呢,肯定不行,因为我们给当前主题只设置了3个分区,即0,1,2号分区。不存在的分区肯定不行,它就会一直试图发送。但永远也到达不了。
9.4.2 没有指定分区,但指定key
假如指定了key为a,则按照a的hashcode值保存到分区。
// 2 发送数据
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("test1-topic", "a","你好,我是高启强!" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null){
System.out.println(
"主题:" + recordMetadata.topic() +
"分区:" + recordMetadata.partition()
);
}
}
});
}
举例:
希望把数据库中订单表的所有数据发送到Kafka的某一个分区,怎么实现?
很简单,不设置分区,设置key就好了,但是key最好设置为订单表的表名字,这样见名之意。
9.4.3 自定义分区策略
package com.hssy.kafkademo.producer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* 自定义分区
*/
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
String msgValue = value.toString();
int partition;
if (msgValue.contains("hssy")){
partition = 0;
}else {
partition = 1;
}
return partition;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
十、提高生产者吞吐量
默认情况下,linger.ms 等待时间,默认是 0ms
也就是说只要一有数据,sender线程就往Kafka集群里面存。
batch.size:批次大小,默认16k。
但是在linger.ms为0ms的情况下是不起作用的。
那么怎么提高吞吐量呢?即如何增大每次发送的数据量。
那么最直接的就是修改linger.ms的时间
linger.ms设置的高一些,这样每次发送的数据量就大些。但是不能设置的太高了,否则延迟就很高,一般修改为5-100ms。
其他方面:
比如压缩数据进行存储,
再比如修改缓冲区大小,默认32m,比如64m。
代码演示
package com.hssy.kafkademo.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomerProducerParameters {
public static void main(String[] args) {
// 0 配置
Properties properties = new Properties();
// 连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"centos100:9092,centos102:9092");
// 序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 缓冲区大小 默认33554432,即32M。这里*2让它变成64M
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432*2);
// 批次大小 默认 16K
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
// linger.ms 等待时间,默认 0ms
properties.put(ProducerConfig.LINGER_MS_CONFIG,1);
// 压缩 默认 none,可配置值 gzip、snappy、lz4 和 zstd
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
// 1.创建生产者
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// 2 发送数据
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("test1-topic","刘亦菲"+i));
}
// 3.关闭连接
kafkaProducer.close();
}
}
十一、生产者发送数据可靠性(是否漏发)
11.1 分析
前面我们提到了,生产者发送数据分为同步发送和异步发送。异步发送只发到缓冲队列就接着发送下一条消息了。而同步发送则是需要发送到Kafka服务端以后收到服务端的ack应答通知才能发下一条消息。
那么很明显,异步发送是不可靠的发送。
ps:虽说是不可靠的发送,但是通常情况下不可靠的概率也非常非常的低。而且生产者可以通过回调函数来查看broker最终ack结果,应答发送成功还是失败,然后做出相应的补救处理。
同步发送虽说需要阻塞到收到服务端的应答才能发送下一条。但也依然有可能存在数据发送丢失的情况。这与ACK应答机制密切相关。
即无论是同步发送还是异步发送,都与Kafka集群的应答ack方式有关。
- 0:生产者发送过来的数据,不需要等数据落盘应答。 这样速度是最快的,但是一般生产环境没人使用。
- 1:生产者发送过来的数据,Leader收到数据后应答。
- -1(all):生产者发送过来的数据,Leader 和 isr 队列里面的所有节点收齐数据后应答。(-1和all等价)
所以,要想更好的保证数据不丢失,可以让ack模式设置为-1。
思考
Ack模式设置为-1,Leader收到数据,所有Follower都开始同步数据, 但有一个Follower,因为某种故障,迟迟不能与Leader进行同步,那这个问题怎么解决呢?
迟迟不能同步leader中的数据,也就意味着不能应答生产者。那么这条数据就永远卡在这里?
其实不用担心,超过一定时间不与Leader同步就会被踢出的。我们可以看这张说明图:
所以要想数据完全可靠不丢失,最好做到这几点:
- ACK级别设置为-1
- 分区副本大于等于2
- ISR里面应答的最小副本数量大于等于2
11.2 可靠性总结
acks=0,生产者发送过来数据就不管了,可靠性差,效率高;
acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;
acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低;
在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据, 对可靠性要求比较高的场景。
11.3 ack=-1带来的问题
但是,ack=-1的情况下,还可能带来数据的重复问题。
就是所有节点收集完毕准备最后应答的一瞬间,leader挂了,那么ack没有成功,集群就会挑选一个新的follower作为leader。新的leader就会去接受数据。由于上一次没有应答成功,sender线程就会再发送一次该数据,由于上一次已经同步了一份,所以现在就有两份数据了。
接下来,我们就解决消息重复的问题。
十二、生产者发送数据的重复问题
前面说了,ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2。保障了数据的可靠性(不丢失消息),但也可能出现重复的问题。
那么怎么解决呢?
ACK级别设置为0,这样肯定只会发送一次,不管有没有接收到,都只会发送一次。即使是应答瞬间挂了,再次发送过来也不会重复,因为上一次压根就没落盘。但是它带来的问题就是:它可能导致数据丢失的问题。
对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。
Kafka 0.11版本以后,引入了一项重大特性:幂等性和事务。
12.1 幂等性
幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。
其实,生产者发送的每一条消息,都有一个主键。具有<PID, Partition, SeqNumber>主键相同的消息提交时,Broker只会持久化一条。
- PID是生产者的唯一编号,但Kafka每次重启都会为生产者分配一个新的;
- Partition 表示分区号;
- Sequence Number是针对消息的一个递增序列。
所以幂等性只能保证的是在单分区单会话内不重复。
通过下面这个图我们再说明一下:
当发送world时,可能由于网络延迟,broker0迟迟没有应答,结果生产者又发送了一遍。因为幂等性,所以这次发生的数据被broker认定为重复数据,因此不会被保存。
而幂等性默认就是开启幂等的。我们也可以通过参数 enable.idempotence 来控制开启关闭。默认为 true,false表示关闭。
但是仅仅有幂等性还是不能保证服务器响应前突然宕机后保存重复消息的问题。因为重启服务器会产生新的PID,幂等性只能保证的是在单分区单会话内不重复。
所以,就要引入事务了。
12.2 生产者事务
上面幂等性只能保证的是在单分区单会话内不重复。
一旦Kafka服务器挂掉后再重启,就会产生重复数据。
那么我们还需要开启事务,这样即使服务器挂掉了,但是重启后还能继续处理未完成的事务。因为事务id这些都是存储在硬盘中的。
12.3 代码演示
package com.hssy.kafkademo.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomProducerTransaction {
public static void main(String[] args) {
// 0 配置
Properties properties = new Properties();
// 连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"centos100:9092,centos101:9092");
// 指定对应的key value的序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 指定事务id,一定要保证唯一,这里随便写一下。
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transactional_id_01");
// 1 创建Kafka生产者对象
// "" hello
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// 初始化事务
kafkaProducer.initTransactions();
// 开启事务
kafkaProducer.beginTransaction();
try {
// 2 发送数据
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("test1-topic","这是事务+幂等性测试!" + i));
}
// 提交事务
kafkaProducer.commitTransaction();
}catch (Exception e){
// 终止事务
kafkaProducer.abortTransaction();
}finally {
// 3 关闭连接
kafkaProducer.close();
}
}
}
因为幂等性默认是开启的,ack=-1默认也是开启的所以这里就没再添加相关代码了。
12.4 总结
也就是说,默认情况下我们的Kafka都是能够保证单会话单分区数据不重复的。
如果要想保证服务器重启也保证数据不重复,就可以开启事务。
十三、数据有序
生产者发送的数据,消费者在消费时要求是有序的。
什么意思呢?
我们生产者一条一条的发送的数据(多次send()方法),可能其中某几次发送的数据落到了不同的分区(原因可能是与使用的分区策略有关,或者写的代码有关,代码中你修改了要发送的分区)。
但是不管如何,相对于具体的某个分区内部,它的顺序是固定的。
- 也就是说单分区内,它是有序的。
- 但是分区与分区之间是无序的。
因此,我们存的一些重要信息,与顺序关系很大的。最好是设置统一的一个分区。不要让他放到不同的分区中去。
假如希望多个分区,分区与分区之间也有序,可以吗?
可以做到,但是效率要低很多。这里不演示了。
当然,如果对数据的顺序没有要求,那怎么都可以。
十四、数据乱序
生产者发送数据还未收到ack应答能否再次发送余下的数据呢?
其实是可以!但是最多只能5拨数据。
举例说明
队列中准备顺序发送1,2,3,4,5。
前两次发送1,2都成功应答了。按顺序接受到1,2。
但是发送3的时候没有应答,4就先发过去了。
这个时候分区接收到了4,现在顺序就是1,2,4。
由于没有应答,发送者又会重新发送3,这个时候接收到,顺序就变成了1,2,4,3。
导致了乱序。
其实真实情况下,只要保证开启幂等性并且设置max.in.flight.requests.per.connection小于等于5,****就不会乱序。
因为在Kafka1.x以后,按照以上要求设置后,Kafka服务端会缓冲生产者发过来的最近5个消息的元素据,所以无论如何,都可以保证最近的5个数据都是有序的。当然如果设置的是3个就缓冲3个。但是最大只能设置为5个。
十五、消费方式
pull(拉)模式
consumer采用从broker中主动拉取数据。 Kafka采用这种方式。
十六、消费的总体流程
生产者生产的大量消息,根据设置,会存放在不同的分区上。
而每个分区又会产生分区副本去同步数据,目的是防止数据的丢失。但是我们消费数据都是在leader分区上进行的。
我们的消费者可以指定消费哪一个分区的消息。当然一个消费者也可以消费多个分区的消息。
消费者与消费者之间是独立的,不冲突的。也就是说消费者之间可以独立消费。消费者1可以消费a分区,消费者2也可以消费a分区。消费不冲突。
这样带来了一个问题就是消息被重复消费。
但是如果设置了消费者组,那么每个分区的数据只能由消费者组中一个消费者来消费。保证消息不重复消费。
但是还有一个问题,假如我们消费者组某一消费者挂掉了,我们希望可以让其他消费者组中的消费者来消费,这样怎么办呢?
其实,我们分区被谁消费了,也就是说消费到哪里了,会有一个offset记录。
这个offset存放在Kafka的topic主题中。老版本是存在zookeeper中的。
下面我们详细谈谈消费者组的原理,来解答这个问题。
十七、消费者组的工作原理
17.1 基本原理
Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。
- 消费者组内每个消费者负责消费不同分区的数据,即一个分区只能由组内某一消费者消费。具体如何分配后面细说。当然了,如果消费者组就一个消费者,那它消费所有的分区。这不矛盾。
- 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
17.2 代码演示
package com.hssy.kafkademo.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class CustomConsumer {
public static void main(String[] args) {
boolean flag = false;
// 0 配置
Properties properties = new Properties();
// 连接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"centos100:9092,centos100:9092");
// 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
// 设置消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"testGroup");
// 1 创建消费者 "",hello
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 2 订阅主题
List<String> topics = new ArrayList<>();
topics.add("test1-topic");
kafkaConsumer.subscribe(topics);
// 3 消费数据
while (true){
if (flag){
break;
}else {
// 1s 拉取一次数据
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
// 处理拉取的数据
// 这里循环打印消费到的数据
for (ConsumerRecord<String,String> consumerRecord : consumerRecords) {
System.out.println("消费数据:" + consumerRecord);
}
}
}
}
}
复制上述代码,让它运行三份,因为代码一样,所以它们属于同一个消费者组。
然后调用一个之前写好的生产者,发送500条消息,要求不设置分区和key,让它根据默认分区策略的(粘性分区器)去发送。这样大概率会将这500条消息落到不同的分区。
测试的结果发现:设置了消费者组,组内消费者就消费各自分区的消息。不会重复消费。
当然我们也可以为每个消费者指定分区。核心代码如下:
// 1 创建消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 2 订阅主题对应的分区
List<TopicPartition> topicPartitions = new ArrayList<>();
topicPartitions.add(new TopicPartition("test1-topic",0));
kafkaConsumer.assign(topicPartitions);
那么,不指定分区消费,Kafka是如何分配分区给到消费者组内的消费者呢?
17.3 分区分配以及再平衡原理
Kafka有四种主流的消费者分区分配策略。默认的是Range+CooperativeSticky。
17.3.1 分配过程分析
1)每个消费者都发送JoinGroup请求给到Kafka集群的coordinator模块。
2)coordinator会选出一个消费者作为leader。(注意这不是broker选举而是消费者选举)
3)coordinator马上把要消费的topic情况发送给leader消费者。
4)leader消费者会负责制定消费方案。(比如何种分区分配策略等)
5)制定完成后,将消费方案发送给coordinator。
6)coordinator负责将消费方案发送给组内每个消费者。
7)组内每个消费者都会和coordinator保持心跳(默认3s),一旦超时(session.timeout.ms=45s),该消费者就会被移除,并触发再平衡,也即重新从选举leader消费者开始。或者消费者处理时间过长(max.poll.interval.ms=5分钟),也会触发再平衡。另外,如果某一时刻组内新增了消费者,也会触发再平衡。
下面我们就针对这四种分区分配策略展开讲解。
17.3.2 分区分配策略之Range
Range 是对每个 topic 而言的。
首先对同一个 topic 里面的分区按照序号进行排序,并对消费者进行排序。 排序的规则由Kafka内部决定。
假如现在有 7 个分区,3 个消费者,排序后的分区将会是0,1,2,3,4,5,6;消费者排序完之后将会是C0,C1,C2。
通过 partitions数 / consumer数 来决定每个消费者应该消费几个分区。如果除不尽,那么前面几个消费者各将会多消费 1 个分区。
例如:
7 / 3 = 2 余 1 ,除不尽,那么 消费者 C0 便会多 消费 1 个分区。
8 / 3 = 2 余 2 ,除不尽,那么C0和C1分别多 消费一个。
注意:
如果只是针对 1 个 topic 而言,C0消费者多消费1 个分区影响不是很大。但是如果有 N 多个 topic,那么针对每 个 topic,消费者 C0 都将多消费 1 个分区,topic越多,C0消费的分区会比其他消费者明显多消费 N 个分区。 容易产生数据倾斜!
所以,如果topic个数多的情况下,那么Range策略就不太好了。
特殊情况1
如果某次发送数据之前,组内其中一台消费者突然挂掉。
那么本应该发送到该消费者的某分区的消息,就无法发送成功。
如果在规定的时间内没有恢复连接,那么Kafka就会触发再平衡。
在再平衡过程中,Kafka会重新分配所有分区给处于活跃状态的消费者,也就是说再平衡可能会修改其他消费者原本消费的分区。目的是为了保证消费者组内的消费者在发生变化时可以重新分配以实现负载均衡和高可用性。
特殊情况2
如果某次发送数据之前,组内又突然新增了一个消费者,那么也会触发再平衡。又会重新进行消费者leader的选举。
17.3.3 分区分配策略之RoundRobin
RoundRobin 针对集群中所有Topic而言。
RoundRobin 轮询分区策略,是把所有的 partition 和所有的 consumer 都列出来(不仅仅是某一个topic),然后按照 hashcode 进行排序,最后 通过轮询算法来分配 partition 给到各个消费者。
代码设置方法:
(1)依次在 CustomConsumer、CustomConsumer1、CustomConsumer2 三个消费者代 码中修改分区分配策略为 RoundRobin。
// 修改分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");
(2)重启 3 个消费者运行消费即可。
企业中这种方式用的还是比较多的。它能够防止数据倾斜!
17.3.4 分区分配策略之Sticky
粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前, 考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。
粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区 到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。
具体的
这种分区方式也是按照 分区数 / 消费者数来分配的,但是又有些不同。
比如7个分区,3个消费者,那就分成了3,2,2。但是它不是从分区0,1,2,3,4,5,6,7依次填入这三个消费者的。而是随机的。
也就是说只要保证是3,2,2就行。具体它们是怎么分配的不重要,随机就行。
17.4 offset的保存位置
上一章节提到,如果消费者挂掉,触发再平衡的时候,它怎么知道上一个消费者消费到哪里了?难道不会重复消费或者漏消费吗?
其实消费位点offset就是帮我们解决这个问题的。
生产者生成的数据发送到Kafka集群对应主题的不同分区中,消费者组去消费对应的分区。每个消费者消费的过程中会有一个offset,记录当前消费到哪里了。这个offset维护在系统的名为_consumer_offsets的topic中(0.9版本后)。
__consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+ 分区号,value 就是当前 offset 的值。每隔一段时间,kafka 内部会对这个 topic 进行 compact,也就是每个 group.id+topic+分区号就保留最新数据。
17.5 提交offset的方式
Kafka提交offset的方式分为两种:
- 自动提交
- 手动提交
17.5.1 自动提交offset
为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。不需要我们配置任何东西。
自动提交offset的相关参数:
- enable.auto.commit:是否开启自动提交offset功能,默认是true
- auto.commit.interval.ms:自动提交offset的时间间隔,默认是5s
如果我们希望默认的自动提交时间间隔为1s,则可以修改该值即可
//自动提交 ,其实不用我们写,这里只是演示一下配置的参数是哪个
//properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
//自动提交的间隔时间,默认就是5s 如果要改自己设置
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
注意:
很多参数到底默认值是多少,这些可能版本不同会有变化。
你也不知道后面的新版本是多少,所以最好的方式是看官网。
17.5.2 手动提交offset
虽然自动提交offset十分简单便利,但由于其是基于时间段提交的,开发人员难以把握offset提交的时机。因此Kafka还提供了手动提交offset的API。 我消费一条数据提交一次offset。更加精准。
手动提交offset的方法有两种:
- commitSync(同步提交)
- commitAsync(异步提交)
两者的相同点是,都会将本次提交的一批数据最高的偏移量提交;
不同点是,同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败。
- commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。
- commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。
代码演示
package com.hssy.kafkademo.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class CustomConsumerByHandSync {
public static void main(String[] args) {
boolean flag = false;
// 0 配置
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"centos100:9092,centos100:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"testGroup");
//自动提交 ,其实不用我们写
//如果手动提交,这里要改为false
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
//自动提交的间隔时间,默认就是5s 如果要改自己设置
//如果手动提交,这块就不用改了,改了也没意义,它不会生效
//properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
// 1 创建消费者 "",hello
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 2 订阅主题
List<String> topics = new ArrayList<>();
topics.add("test1-topic");
kafkaConsumer.subscribe(topics);
// 3 消费数据
while (true){
if (flag){
break;
}else {
// 1s 拉取一次数据
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
// 处理拉去的数据
// 这里循环打印消费到的数据
for (ConsumerRecord<String,String> consumerRecord : consumerRecords) {
System.out.println("消费数据:" + consumerRecord);
}
// 手动提交offset
//kafkaConsumer.commitAsync();//同步提交
kafkaConsumer.commitAsync();//异步提交
}
}
}
}
17.6 提交方式导致重复消费问题
自动提交由于是每隔一段时间自动提交,即使间隔时间设置的再短,也有可能导致在某次间隔时间内一旦出现服务器宕机,那么没有记录到该段时间的消费offset。服务重启后,又会从上一次记录的offset开始重新消费。导致重复消费。
同理,手动提交如果是异步手动提交,也依然有可能出现重复消费。因为异步提交是处理消息,只要一发offset,就去消费下一条消息。倘若这个offset没有发送成功,也会出现重复消费。只是这种情况要比自动提交好得多。
同步提交则没有这个问题,因为同步提交是需要等到提交offset完成以后才可以消费下一条消息。
一般情况下,我们选择异步提交就够了。最好的方式还是使用同步提交。
十八、从指定位置开始消费
自动消费的消费方式有三种
auto.offset.reset = earliest | latest | none 默认是 latest。
当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量 时(例如该数据已被删除),该怎么办?
(1)earliest:自动将偏移量重置为最早的偏移量,--from-beginning。
(2)latest(默认值):自动将偏移量重置为最新偏移量。
(3)none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。
如果是latest默认值,那么当前消费者会从最新的偏移量开始消费,也就是消费者加入进来以后,再发送过来的数据开始消费。消费一批往前走一批offset。
如果是earliest,那么当前消费者会从最早的偏移量开始消费。然后也是一样,消费一批往前走一批offset。
不管是哪种,都不会出现重复消费的问题。因为这个设置的消费方式不会随便改。即使消费者挂了,其他消费者也会消费它的分区时,也是根据它上次的offset的值接着完成消费动作。
还有一种情况就是,我既不想从开始消费,也不想从最新的开始消费。就想从中间某位置开始消费,也是可以的。这里不演示了,感兴趣的自行了解。
十九、从指定时间开始消费
需求:在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。 例如要求按照时间消费前一天的数据,怎么处理?
其实需要我们找到偏移量,然后从指定偏移量消费即可。这个不同版本设置的方法可能不太一样,具体的可以自行查阅。
二十、Broker分析
20.1 Zookeeper中存储Kafka哪些信息?
进入任意Zookeeper服务器,通过zkCli.sh命令进入zookeeper客户端。
通过**
ls /
**命令查看哪些节点。
查看Kafka的节点信息**
ls /kafka
**
这里面有很多信息,比如我们看一下brokers,**
ls /kafka/brokers
**
**
ls /kafka/brokers/ids
**
即broker.id的值0,1,2,三台服务器。
以上这种命令行的方式查看zookeeper的节点信息,个人觉的还是比较麻烦的,推荐使用Zookeeper的客户端软件工具,方便查阅,而且展示的更加直观!
prettyZoo的使用
安装的步骤很简单不演示了。
使用时先连接zookeeper服务器
然后双击创建的连接即可。如果连接不上,将主机名换成ip地址试试。
那回到标题,zookeeper中存储了Kafka哪些信息?我们记住一些重要的就行。
/kafka/brokers/ids
即正常工作的Kafka服务器编号,某一台下线了这里的记录去除掉。
/kafka/brokers/topics
存放了所有的主题,每个主题可以查看分区信息
/kafka/controller
辅助选举Leader节点用的
未来每一个broker节点都会有一个controller模块(并非zookeeper中的/kafka/controller),而所有的broker都会去抢注册这里的 /kafka/controller,谁先抢到谁就是老大,这个老大broker中的controller模块中规定的谁是leader就是谁。所以说/kafka/controller是服务选举leader节点的。
20.2 Kafka某主题各分区leader的选举机制
1)每台broker节点启动之后都会去Zookeeper注册中心注册。
三台都启动后,Zookeeper中就会增加三个brokerids节点。[0,1,2]
2)然后每个broker都争抢着去zookeeper中的/kafka/controller注册,谁先抢到注册谁说了算
3)这个抢到注册的broker通过zookeeper中的/kafka/controller就开始监听brokers节点的变化
4)然后这个broker开始真正的选举(当然是在新建或者修改主题的时候开始)
每个broker中都有一个Controller模块。这个模块Controller记录了它们各自的选举结果。
但是最终的选举结果由争抢到注册的broker决定。
5)然后这个broker把它存在controller模块中的节点信息上传到zookeeper中。
6)其他broker的contorller模块再从zookeeper同步相关信息,目的是一旦监控到它挂了,就根据里面的信息重新开始选举。
7)最后就是生产者开始发送信息了。发送信息后,leader主动和follower同步信息。
选举的规则细节
介绍概念
AR:分区中某主题下所有副本的统称。
ISR:所有通讯正常的副本节点,包括了leader和follower,只要是通讯正常的,都在isr中。
规则
在isr中存活为前提,按照AR中排在前面的优先。
但是最终的选举结果由争抢到注册的broker决定。
例如
假设现准备新增一个主题test-topic,设置为2个分区。
那么每个broker都会创建两个分区来放这个主题。
broker0:
Topic:test-topic Partition:0 ar[0,1,2], isr [1,0,2] Topic:test-topic Partition:1 ar[0,1,2], isr [1,0,2]
broker1(假定它抢到注册权的):
Topic:test-topic Partition:0 ar[1,0,2], isr [1,0,2] Topic:test-topic Partition:1 ar[1,0,2], isr [1,0,2]
broker2:
Topic:test-topic Partition:0 ar[2,0,1], isr [1,0,2] Topic:test-topic Partition:1 ar[2,0,1], isr [1,0,2]
由于最终的选举结果由争抢到注册的broker决定。即由broker1决定。
Topic:test-topic Partition:0 ar[1,0,2], isr [1,0,2]
Topic:test-topic Partition:1 ar[1,0,2], isr [1,0,2]
那么leader 就会按照AR中的顺序轮询。
根据它的节点信息,分区0和分区1中当前主题的leader肯定都是broker1当选。
那么其他两个broker的分区0和分区1自动成为当前主题的分区副本。
此后由于各个broker的controller会从zookeeper同步相关信息,一旦broker1挂掉了。
其他各个broker的controller监听到isr的存活情况发现broker1挂掉了,就会开始新的选举。
此时各个broker同步到的信息是
Topic:test-topic Partition:0 ar[1,0,2], isr [0,2]
Topic:test-topic Partition:1 ar[1,0,2], isr [0,2]
于是会推举新的broker0作为作为分区0和分区1在该主题下的leader。
假如当前broker1和broker0是同时挂了的,那么轮询到0时发现broker0也挂了,不再isr中,所以又往后轮询2号,2号在isr中就选择2号作为新的leader。
以上步骤大致的流程图如下:
20.3 文件存储机制
物理上真是的情况是:Kafka集群的每个broker上,都是按照分区进行管理的。而Topic是逻辑上的。
我们每次新建或修改的topic,是在这些broker各自的分区中的。
比如新建一个test-topic,为它设置3个分区。那么实际上只要在线的每个broker,各自都会准备三块分区空间给这个topic。当然这三块空间是该topic独享的。
也就是说
一个主题可以有多个分区。但是一个分区只能被一个主题独有。
并且有多少broker在线,新建修改主题的时候,就会有多少个broker各自为该主题创建出指定数量的分区。
每个分区,对应着一个.log文件。
文件是以segment的形式存储的。
20.4 文件清除策略
Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间。
- log.retention.hours,最低优先级小时,默认 7 天。
- log.retention.minutes,分钟。
- log.retention.ms,最高优先级毫秒。
- log.retention.check.interval.ms,负责设置检查周期,默认 5 分钟。
那么日志一旦超过了设置的时间,怎么处理呢?
Kafka 中提供的日志清理策略有 delete(默认) 和 compact 两种。
二十一、Kafka如何做到高效读写数据的?
1)Kafka 本身是分布式集群,可以采用分区技术,并行度高。
生产者可以同时并行向不同的分区发送,消费者也可以并行消费不同的分区。
2)读数据采用稀疏索引,可以快速定位要消费的数据 。
它存储的文件是.log文件,底层是按照segment格式进行存储的,默认是1G1G的segment文件。里面包含了.index的索引文件。通过索引快速定位。而这个索引采用的是稀疏索引,也就是说每往log文件中存储4kb大小的文件,就记录一次索引。然后根据索引的范围快速定位到要查找的文件位置。
3)顺序写磁盘。
采用追加的方式,叫消息存储到之前数据的后面。这样省去了磁头寻址的时间。因为磁盘的顺序写入是很快的,举个例子:某某固态硬盘顺序写入600M/s,但是它随机写入却只有12M/s。
4)页缓存+零拷贝技术
比如我们生产者往Kafka存消息后。消费者要消费消息,Kafka就会去linux系统的页缓存中找(页缓存是将存到磁盘的数据放到了内存的缓存区域的,有就直接那缓存,没有就去磁盘找然后加入到缓存)。然后将缓存中的数据直接通过网卡发送给消费者。省去了还要通过Kafka应用层缓存的步骤(也就是说省去了这次拷贝)。
二十二、与SpringBoot整合
22.1 pom.xml
<!--kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
22.2 application.properties
server.port=9000
# 连接Kafka集群
spring.kafka.bootstrap-servers=centos100:9092,centos101:9092
# key value的序列化
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
22.3 编写测试controller发送消息
@RestController
public class TestController {
@Autowired
KafkaTemplate<String,String> kafkaTemplate;
@GetMapping("/test")
public Result data(String msg){
//
kafkaTemplate.send("test1-topic",msg);
return Result.success("你的任务成功了");
}
}
22.4 消费消息
需要在application.properties中增加配置。当然,如果你选择在其他项目中消费消息,那么需要重新配置一遍。
这里就在同一个项目中配置消费者,只需要增加配置即可。
# key value的反序列化
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 消费者组id
spring.kafka.consumer.group-id=hssy
编写一个配置类,专门处理Kafka集群中的相关任务
package com.hssy.kafkaspringboot.task;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
@Configuration
public class KafkaTaskHandle {
@KafkaListener(topics = "test1-topic")
public void testTaskHandle(String msg){
// 处理之前test接口发送到Kafka集群的任务
System.out.println("收到消息:"+msg);
}
}
二十三、Kafka监控工具:Kafka-Eagle
Kafka-Eagle 框架可以监控 Kafka 集群的整体运行情况,在生产环境中经常使用。
Kafka-Eagle 的安装依赖于 MySQL,MySQL 主要用来存储可视化展示的数据。如果集 群中之前安装过 MySQL 可以跨过该步。
23.1 安装mysql
上传安装包和驱动jar包
卸载自带的mysql然后安装
[root@centos100 mysql]# rpm -qa | grep -i -E mysql\|mariadb | xargs -n1 sudo rpm -e --nodeps
[root@centos100 mysql]#
[root@centos100 mysql]# rpm -ivh 01_mysql-community-common-5.7.16-1.el7.x86_64.rpm
[root@centos100 mysql]# rpm -ivh 02_mysql-community-libs-5.7.16-1.el7.x86_64.rpm
[root@centos100 mysql]# rpm -ivh 03_mysql-community-libs-compat-5.7.16-1.el7.x86_64.rpm
[root@centos100 mysql]# rpm -ivh 04_mysql-community-client-5.7.16-1.el7.x86_64.rpm
[root@centos100 mysql]# rpm -ivh 05_mysql-community-server-5.7.16-1.el7.x86_64.rpm
启动mysql
systemctl start mysqld
查看mysql密码
cat /var/log/mysqld.log | grep password
进入mysql
23.2 Kafka 环境准备
1)关闭 Kafka 集群
kf.sh stop
2)修改/opt/module/kafka/bin/kafka-server-start.sh 命令中的参数,增加内存配置
vim bin/kafka-server-start.sh
将原来的
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
修改为:
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
export JMX_PORT="9999"
#export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
注意每台Kafka都要配置。
启动Kafka集群
kf.sh start
23.3 安装Kafka-Eagle
只需要在一台上安装就可以了,当然你甚至可以安装到非Kafka的服务器上。http://www.kafka-eagle.org/
解压
[root@centos100 software]# tar -zxvf kafka-eagle-bin-2.0.8.tar.gz
然后进入当前解压目录下继续解压,并将其放到/opt/module目录下
[root@centos100 kafka-eagle-bin-2.0.8]# tar -zxvf efak-web-2.0.8-bin.tar.gz -C /opt/module/
修改系统配置文件
原本监控了两个Kafka集群,我们删掉一个。然后把我们的集群ip和端口填写上去。
######################################
# multi zookeeper & kafka cluster list
# Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead
######################################
efak.zk.cluster.alias=cluster1
cluster1.zk.list=centos100:2181,centos101:2181,centos102:2181/kafka
#cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181
offset的存储位置,将zookeeper注释掉
######################################
# kafka offset storage
######################################
cluster1.efak.offset.storage=kafka
#cluster2.efak.offset.storage=zk
修改Kafka的数据库连接,就是刚刚安装好的mysql。
######################################
# kafka sqlite jdbc driver address
######################################
efak.driver=com.mysql.jdbc.Driver
efak.url=jdbc:mysql://centos100:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=000000
完毕后保存退出。
然后配置环境变量
vim /etc/profile.d/my_env.sh
#kafkaEFAK
export KE_HOME=/opt/module/efak
export PATH=$PATH:$KE_HOME/bin
source /etc/profile
对了,前面忘记修改kafka解压后的名称了,再去修改一下
[root@centos100 module]# mv efak-web-2.0.8/ efak
ok,搞定
下面启动Kafka集群,再启动efak
启动efak,进入该文件bin目录,执行 ke.sh start
成功,访问http://192.168.17.100:8048
23.4 启动异常解决办法
看日志是最好的解决办法!
问题:访问地址报错404
遇到问题不要慌!
停止监控ke.sh stop
我们打开efak的日志看看,应该有记录当前问题。
[2023-02-22 12:53:58] ContextLoader.main - ERROR - Context initialization failed org.springframework.beans.factory.BeanDefinitionStoreException: Invalid bean definition with name 'dataSource' defined in class path resource [spring-mybatis.xml]: Could not resolve placeholder 'efak.password' in value "${efak.password}"; nested exception is java.lang.IllegalArgumentException: Could not resolve placeholder 'efak.password' in value "${efak.password}" Caused by: java.lang.IllegalArgumentException: Could not resolve placeholder 'efak.password' in value "${efak.password}"
说我们的数据源的密码错误?
那我们就去看看配置文件
结果发现efak.passwor=000000,password没写全,尴尬,修改后重新启动
ok
写在后面的话
目前就总结到这里,后续有时间的话,我再继续往后补充。
真心希望能够帮助到正在学习Kafka的各位。由于个人能力有限,如果存在笔误或者理解上的偏差,也请不吝指正,我们共同进度!
版权归原作者 何苏三月 所有, 如有侵权,请联系我们删除。