一、快速了解kafka
1、MQ的作用
MQ:MessageQueue,消息队列。队列,是一种FIFO先进先出的数据结构。消息则是跨进程传递的数据。一个典型的MQ系统,会将消息由生产者发送到MQ进行排队,然后根据一定的顺序交由消息的消费者进行处理。QQ和微信就是典型的MQ,最不过其对接的使用对象是人,而kafka需要对接的使用对象是应用程序。
MQ的主要作用主要有以下三个方面:
异步:
举例:快递员发快递,直接到客户家效率会很低,引入菜鸟驿站之后,快递员只需要把快递放到菜鸟驿站,就可以继续发其他快递去了,客户再按照自己的时间安排去菜鸟驿站取快递。
作用:异步能提高系统的响应速度、吞吐量。
解耦:
例子:《Thinking in JAVA》很经典,但都是些英文,不容易读懂,所以需要编辑社,将文章翻译成其他语言,这样就可以完成英语与其他语言的交流。
作用:
服务之间进行解耦,才可以减少服务之间的影响。提高系统整体的稳定性以及可扩展性。
解耦后可以实现数据分发,生产者发送一个消息后,可以由一个或者多个消费者进行消费,并且消费者的增加或者减少对生产者没有影响。
削峰:
例子:长江每年都会涨水,但是下游出水口的速度是稳基本定的,所以会涨水。引入三峡大坝后,可以把水储存起来,下游慢慢排水。
作用:以稳定的系统资源应对突发的流量冲击。
缺点:①系统可用性降低; ②提升系统的复杂度; ③数据一致性问题;
2、kafka产品介绍
官网地址:Apache Kafka
Apache Kafka最初由LinkedIn开发并于2011年开源,主要解决大规模数据的实时流式处理和数据管道问题。
kafka是一个分布式的发布-订阅消息系统,可以快速地处理高吞吐量的数据流,并将数据实时地分发到多个消费者中。kafka消息系统有多个broker(服务器)组成,这些broker可以在多个数据中心之间分布式部署,以提供高可用性和容错性。
kafka使用高效的数据存储和管理技术,能够轻松地处理TB级别的数据量。其优点包括高吞吐量、低延迟、可扩展性、持久性和容错性等。
kafka在企业级应用中被广泛应用,包括实时流处理、日志聚合、监控和数据分析等方面。同时,kafka还可以与其他大数据工具集成,如Hadoop、Spark、Storm等,构建一个完整的数据处理生态系统。
3、kafka的特点
kafka最初诞生于LinkedIn公司,其核心作用就是用来收集并处理庞大复杂的应用日志。一个典型的日志聚合应用场景如下:
业务场景决定了产品的特点。所以kafka最典型的产品特点有以下几点:
数据吞吐量很大:需要能够快速收集各个渠道的海量日志
集群容错性高:允许集群中少量节点崩溃
功能不需要太复杂:kafka是设计目标是高吞吐、低延迟和可扩展,主要关注消息传递而不是消息处理。所以,kafka并没有支持死信队列、顺序消息等高级功能。
允许少量数据丢失:在海量的应用日志中,少量的日志丢失是不会影响结果的。所以kafka的设计初衷是允许少量数据丢失的。当然kafka本身也在不断优化数据安全问题。
二、快速上手kafka
1、快速搭建单机服务
kafka的运行环境非常简单,只要有JVM虚拟机就可以进行。以下以安装了JDK1.8的CentOS为例。
前提:安装jdk1.8,安装参考:Linux(VMware + FinalShell)_finalshell清屏-CSDN博客
下载kafka:
下载地址:Apache Kafka
关于kafka的版本,前⾯的2.13是开发kafka的scala语⾔的版本,后⾯的3.8.0是kafka应⽤的版本。 Scala是⼀种运⾏于JVM虚拟机之上的语⾔。在运⾏时,只需要安装JDK就可以了,选哪个Scala版本没有区别。但是如果要调试源码,就必须选择对应的Scala版本。因为Scala语⾔的版本并不是向后兼容的。
下载zookeeper:
下载地址:Apache ZooKeeper
Zookeeper的版本并没有强制要 求,这里选择了3.8.4版本。
安装过程记录:
通过以下命令分别将三个工具包解压到 /usr/local 路径下
tar -zxvf jdk-8u171-linux-x64.tar.gz -C /usr/local
tar -zxvf kafka_2.13-3.8.0.tgz -C /usr/local
tar -zxvf apache-zookeeper-3.8.4-bin.tar.gz -C /usr/local
kafka的安装程序中⾃带了Zookeeper,可以在kafka的安装包的libs⽬录下查看到zookeeper的客户端jar 包。但是,通常情况下,为了让应⽤更好维护,我们会使⽤单独部署的Zookeeper,⽽不使⽤kafka⾃带 的Zookeeper。
启动Kafka之前需要先启动Zookeeper 这⾥就⽤Kafka⾃带的Zookeeper。启动脚本在bin⽬录下。
首先进入kafka的目录,然后启动zookeeper服务:
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
nohub:程序执行的时候不占用控制台,让其后台启动
注意下脚本是不是有执⾏权限。 从nohup.out中可以看到zookeeper默认会在2181端⼝启动。通过jps指令看到⼀个QuorumPeerMain进程,确定服务启动成功。
启动Kafka
nohup bin/kafka-server-start.sh config/server.properties &
启动完成后,使⽤jps指令,看到⼀个kafka进程,确定服务启动成功。服务会默认在9092端⼝启动。
至此,服务启动完成。
2、简单收发消息
Kafka的基础⼯作机制是消息发送者可以将消息发送到kafka上指定的topic,⽽消息消费者,可以从指定的 topic上消费消息。
创建topic:
kafka的bin目录做了大量的封装。
kafka目录下:
bin/kafka-topics.sh -help
该命令用来查看kafka关于topic的相关帮助。
topic目录下创建topic以及查看都有哪些topic:
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test
bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
创建基于控制台的生产者和消费者:
bin/kafka-console-producer.sh --help
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
模拟生产-消费消息的过程:
首先创建生产者:
并发送了前三行hello相关的消息,这时在finalshell中复制一个窗口创建消费者,可以看到消费者窗口并不显示三条hello的消息,再返回生产者窗口产生1-4的四条消息,再看消费者窗口,就可以看到这四条消息了。(接收即时消息)
接收历史消息:--from-beginning
指定从某一条(第2条)消息开始消费:--partition 0 --offset 2
消费者组:
对于每个消费者,可以指定⼀个消费者组。kafka中的同⼀条消息,只能被同⼀个消费者组下的某⼀个消费者 消费。⽽不属于同⼀个消费者组的其他消费者,也可以消费到这⼀条消息。在kafka-console-consumer.sh脚 本中,可以通过--consumer-property group.id=testGroup来指定所属的消费者组。
创建消费者组:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group testGroup
finalshell中再复制一个消费者窗口,打开上面创建的组,可以看到生产者新产生的消息只在一个消费者组被消费:
查看消费者组的消费进度:
bin/kafka-consumer-group.sh --bootstrap-server localhost:9092 --describe --group testGroup
红框中的四个属性代表:
current-offset:当前消费进度
log-end-offset:日志中的最大消费进度
lag:当前消费者组没有消费过的消息
在消费者组关闭了情况下,再去生产几个消息,然后查看消费组消费情况:
可以看到加了上面红框的三条消息后,由于另一个复制的消费者窗口进行了消费,所以下图红框中当前消费以及最后一条日志都是13,而未消费是0;在输入上图绿框中消息前关闭了所有消费者组命令,再输入绿框的5条消息后,消费情况如下图绿框所示,当前消费为13,最后一条为18,还有5条尚未消费。
从这⾥可以看到,Kafka是以消费者组为单位来分别记录每个Partition上的消息偏移量的。⽽增加新的消费者 组,并不会影响Kafka的消息数据,只是需要新增⼀条偏移量记录就可以了。所以,Kafka的消息复读效率是很高的。
3、理解kafka的消息传递机制
Kafka的消息发送者和消息消费者通过Topic这样⼀个逻辑概念来进⾏业务沟通。但是实际上,所有的消息是存在服务端的Partition这样⼀个数据结构当中的。
同一个消费者组记录同一个offset,不同的消费者组记录不同的offset。
客户端Client: 包括消息⽣产者 和 消息消费者。
消费者组:每个消费者可以指定⼀个所属的消费者组,相同消费者组的消费者共同构成⼀个逻辑消费者组。每⼀个消息会被多个感兴趣的消费者组消费,但是在每⼀个消费者组内部,⼀个消息只会被消费一次。
服务端Broker:⼀个Kafka服务器就是⼀个Broker。
话题Topic:这是⼀个逻辑概念,⼀个Topic被认为是业务含义相同的⼀组消息。客户端都通过绑定Topic 来⽣产或者消费⾃⼰感兴趣的话题。
分区Partition:Topic只是⼀个逻辑概念,⽽Partition就是实际存储消息的组件。每个Partiton就是⼀个 queue队列结构。所有消息以FIFO先进先出的顺序保存在这些Partition分区中。
三、理解kafka的集群工作原理
对于Kafka这样⼀个追求消息吞吐量的产品来说,集群基本上是必备的(rabbitMQ对集群的要求没有很迫切)。接下来,我们就动⼿搭建⼀个Kafka集 群,并来理解⼀下Kafka集群的⼯作机制。
Kafka的集群架构⼤体是这样的:
将各个节点不同的状态信息存到zookeeper上,每个节点就会知道其他节点的服务状况。
为什么要用集群?
单机服务下,Kafka已经具备了⾮常⾼的性能。TPS能够达到百万级别。但是,在实际⼯作中使⽤时,单机搭 建的Kafka会有很⼤的局限性。
⼀⽅⾯:消息太多,需要分开保存。Kafka是⾯向海量消息设计的,⼀个Topic下的消息会⾮常多,单机服务很 难存得下来。这些消息就需要分成不同的Partition,分布到多个不同的Broker上。这样每个Broker就只需要保 存⼀部分数据。这些分区的个数就称为分区数。
另⼀⽅⾯:服务不稳定,数据容易丢失。单机服务下,如果服务崩溃,数据就丢失了。为了保证数据安全,就 需要给每个Partition配置⼀个或多个备份,保证数据不丢失。Kafka的集群模式下,每个Partition都有⼀个或多 个备份。Kafka会通过⼀个统⼀的Zookeeper集群作为选举中⼼,给每个Partition选举出⼀个主节点Leader, 其他节点就是从节点Follower。主节点负责响应客户端的具体业务请求,并保存消息。⽽从节点则负责同步主 节点的数据。当主节点发⽣故障时,Kafka会选举出⼀个从节点成为新的主节点。
最后:Kafka集群中的这些Broker信息,包括Partition的选举信息,都会保存在额外部署的Zookeeper集群当 中,这样,kafka集群就不会因为某⼀些Broker服务崩溃⽽中断。
Kafka也提供了另外⼀种不需要Zookeeper的集群机制,Kraft集群。
搭建环境:
准备三台同样的CentOS服务器,预先安装好了JDK,并关闭防⽕墙
service firewalld stop
systemctl disable firewalld
systemctl stop firewalld.service
分别配置机器名worker1,worker2,worker3(ip地址换成自己的)
vi /etc/hosts
192.168.153.134 worker1
192.168.153.133 worker2
192.168.153.132 worker3
接下来我们就动⼿部署⼀个Kafka集群,来体验⼀下Kafka是如何⾯向海量数据进⾏横向扩展的。 我们先来部署⼀个基于Zookeeper的Kafka集群。其中,选举中⼼部分,Zookeeper是⼀种多数同意的选举机 制,允许集群中少数节点出现故障。因此,在搭建集群时,通常都是采⽤3,5,7这样的奇数节点,这样可以 最⼤化集群的⾼可⽤特性。 在后续的实验过程中,我们会在三台服务器上都部署Zookeeper和Kafka。
1、部署Zookeeper集群
这⾥采⽤之前单独下载的Zookeeper来部署集群。Zookeeper是⼀种多数同意的选举机制,允许集群中少半数 节点出现故障。因此,在搭建集群时,通常采⽤奇数节点,这样可以最⼤化集群的⾼可⽤特性。
先将下载下来的Zookeeper解压到 /usr/local目录。然后进入conf目录,修改配置文件。在conf⽬录中,提供了⼀个zoo_sample.cfg⽂件,这是⼀个示例⽂件。我 们只需要将这个⽂件复制⼀份zoo.cfg(cp zoo_sample.cfg zoo.cfg),修改下其中的关键配置就可以了。其中 ⽐较关键的修改参数如下:
# 进入到zookeeper的解压目录下
# 进入conf路径
[root@192 apache-zookeeper-3.8.4-bin]# cd conf/
[root@192 conf]# cp zoo_sample.cfg zoo.cfg
[root@192 conf]# vi zoo.cfg
修改配置文件:
#Zookeeper的本地数据⽬录,默认是/tmp/zookeeper。这是Linux的临时⽬录,随时会被删掉。
# dataDir=/tmp/zookeeper #将原本的这一行注释掉,因为tmp是临时文件夹
dataDir=/usr/local/myzkdata # 将dataDir设置为自己的路径
#Zookeeper的服务端⼝
clientPort=2181
其中,clientPort 2181是对客户端开放的服务端⼝。
在文档的最后配置集群:(以下三个ip是自己三个centOS服务器的ip)
server.1=192.168.153.134:2888:3888
server.2=192.168.153.133:2888:3888
server.3=192.168.153.132:2888:3888
集群配置部分, server.x这个x就是节点在集群中的myid。后⾯的2888是集群内部进行数据传输的端口,3888是集群内部进行选举时使用的端口。
接下来将整个Zookeeper的应⽤⽬录分发到另外两台机器上。
然后需要构建对应的myid⽂件,进入配置的data目录:(myid是生成的文件,是在zoo.cfg中配置的对应的server.id)
[root@192 conf]# cd ../..
[root@192 local]# cd myzkdata/
[root@192 myzkdata]# ls
[root@192 myzkdata]# echo 1 > myid
[root@192 myzkdata]# ll
在worker1中配置好之后,进入zookeeper文件夹所在的目录,再要分别传到worker2和worker3:
scp -r apache-zookeeper-3.8.4-bin/ root@worker2:/usr/local
scp -r apache-zookeeper-3.8.4-bin/ root@worker3:/usr/local
再到worker2和worker3中创建myid文件。
worker2:
[root@192 local]# cd myzkdata/
[root@192 myzkdata]# echo 2 > myid
worker3:
[root@192 myzkdata]# ls
[root@192 myzkdata]# echo 3 > myid
三台服务器分别进到zookeeper路径下,启动zookeeper服务:
[root@192 local]# cd apache-zookeeper-3.8.4-bin/
[root@192 apache-zookeeper-3.8.4-bin]# bin/zkServer.sh --config conf start
(重复三次)
启动完成后,使⽤jps指令可以看到⼀个QuorumPeerMain进程就表示服务启动成功。
三台机器都启动完成后,可以查看下集群状态。
进入zookeeper的目录用下面命令查看
[root@worker1 apache-zookeeper-3.8.4-bin]# bin/zkServer.sh status
可以看到这台服务器是从节点follower。
2、部署kafka集群
kafka服务并不需要进⾏选举,因此也没有奇数台服务的建议。 部署Kafka的⽅式跟部署Zookeeper差不多,就是解压、配置、启服务三板斧。 ⾸先将Kafka解压到/usr/local⽬录下。然后进⼊config⽬录,修改server.properties。
进入kafka的目录,修改配置:
[root@192 kafka_2.13-3.8.0]# vi config/server.properties
这个配置⽂件⾥⾯的配置项⾮常多,下⾯列出几个要重点关注 的配置。
#broker 的全局唯⼀编号,不能重复,只能是数字。worker1设置为1,worker2设置为2,worker3设置为3
broker.id=1
#服务监听地址,分别为worker1、worker2、worker3
listeners=PLAINTEXT://worker1:9092
#数据⽂件地址。同样默认是给的/tmp⽬录。
log.dirs=/usr/local/kafka-logs
#默认的每个Topic的分区数
num.partitions=1
#zookeeper的服务地址
zookeeper.connect=worker1:2181,worker2:2181,worker3:2181
#可以选择指定zookeeper上的基础节点。
#zookeeper.connect=worker1:2181,worker2:2181,worker3:2181/kafka
修改配置完成后启动:
bin/kafka-server-start.sh -daemon config/server.properties
# daemon为后台启动 不占用当前页面
# 如果启动不生效,可以使用
[root@192 kafka_2.13-3.8.0]# nohup bin/kafka-server-start.sh config/server.properties &
将配置修改传给worker2和worker3:
[root@192 local]# scp -r kafka_2.13-3.8.0/ root@worker2:/usr/local
[root@192 local]# scp -r kafka_2.13-3.8.0/ root@worker3:/usr/local
传完之后分别由kafka所在的目录 进入worker2和worker3 的配置进入修改:
vi config/server.properties
worker2的配置修改:broker.id=2 和 listeners=PLAINTEXT://worker2:9092
worker3的配置修改:broker.id=3 和 listeners=PLAINTEXT://worker3:9092
最后用jps查看是否启动成功:
3、理解服务端的topic、partition、broker
创建一个分布式的topic:
bin/kafka-topics.sh --bootstrap-server worker1:9092 --create --replication-factor 2 --partitions 4 --topic disTopic
列出所有topic 及 查看列表情况:
# 列出所有topic
[root@worker1 kafka_2.13-3.8.0]# bin/kafka-topics.sh --bootstrap-server worker1:9092 --list
# 查看列表情况
[root@worker1 kafka_2.13-3.8.0]# bin/kafka-topics.sh --bootstrap-server worker1:9092 --describe --topic disTopic
ReplicationFactor:备份因子
从这⾥可以看到,:
1、--create创建集群,可以指定⼀些补充的参数。⼤部分的参数都可以在配置⽂件中指定默认值。
partitons参数表示分区数,这个Topic下的消息会分别存⼊这些不同的分区中。示例中创建的disTopic,指 定了四个分区,也就是说这个Topic下的消息会划分为四个部分。
replication-factor表示每个分区有⼏个备份。示例中创建的disTopic,指定了每个partition有两个备份。
2、--describe查看Topic信息。
partiton参数列出了四个partition,后⾯带有分区编号,⽤来标识这些分区。
Leader表示这⼀组partiton中的Leader节点是哪⼀个。这个Leader节点就是负责响应客户端请求的主节 点。从这⾥可以看到,Kafka中的每⼀个Partition都会分配Leader,也就是说每个Partition都有不同的节 点来负责响应客户端的请求。这样就可以将客户端的请求做到尽量的分散。
Replicas参数表示这个partition的多个备份是分配在哪些Broker上的。也称为AR。这⾥的0,1,2就对应配置 集群时指定的broker.id。但是,Replicas列出的只是⼀个逻辑上的分配情况,并不关⼼数据实际是不是按 照这个分配。甚⾄有些节点服务挂了之后,Replicas中也依然会列出节点的ID。
ISR参数表示partition的实际分配情况。他是AR的⼀个⼦集,只列出那些当前还存活,能够正常同步数据 的那些Broker节点。
接下来,我们还可以查看Topic下的Partition分布情况。在Broker上,与消息,联系最为紧密的,其实就是 Partition了。之前在配置Kafka集群时,指定了⼀个log.dirs属性,指向了⼀个服务器上的⽇志⽬录。进⼊这个 ⽬录,就能看到每个Broker的实际数据承载情况。
从这⾥可以看到,Broker上的⼀个Partition对应了⽇志⽬录中的⼀个⽬录。⽽这个Partition上的所有消息,就 保存在这个对应的⽬录当中。
从整个过程可以看到,Kafka当中,Topic是⼀个数据集合的逻辑单元。同⼀个Topic下的数据,实际上是存储 在Partition分区中的,Partition就是数据存储的物理单元。⽽Broker是Partition的物理载体,这些Partition分 区会尽量均匀的分配到不同的Broker机器上。⽽之前接触到的offset,就是每个消息在partition上的偏移量。
这样设计解决了什么问题?
1、Kafka设计需要⽀持海量的数据,⽽这样庞⼤的数据量,⼀个Broker是存不下的。那就拆分成多个 Partition,每个Broker只存⼀部分数据。这样极⼤的扩展了集群的吞吐量。
2、每个Partition保留了⼀部分的消息副本,如果放到⼀个Broker上,就容易出现单点故障。所以就给每个 Partition设计Follower节点,进⾏数据备份,从⽽保证数据安全。另外,多备份的Partition设计也提⾼了读取 消息时的并发度。
3、在同⼀个Topic的多个Partition中,会产⽣⼀个Partition作为Leader。这个Leader Partition会负责响应客户 端的请求,并将数据往其他Partition分发。
3、理解消费者组
四、章节总结:kafka集群
1、Topic是⼀个逻辑概念,Producer和Consumer通过Topic进⾏业务沟通。
2、Topic并不存储数据,Topic下的数据分为多组Partition,尽量平均的分散到各个Broker上。每组Partition包 含Topic下⼀部分的消息。每组Partition包含⼀个Leader Partition以及若⼲个Follower Partition进⾏备份,每 组Partition的个数称为备份因⼦ replica factor。
3、Producer将消息发送到对应的Partition上,然后Consumer通过Partition上的Offset偏移量,记录⾃⼰所属 消费者组Group在当前Partition上消费消息的进度。
4、Producer发送给⼀个Topic的消息,会由Kafka推送给所有订阅了这个Topic的消费者组进⾏处理。但是在每 个消费者组内部,只会有⼀个消费者实例处理这⼀条消息。
5、最后,Kafka的Broker通过Zookeeper组成集群。然后在这些Broker中,需要选举产⽣⼀个担任Controller ⻆⾊的Broker。这个Controller的主要任务就是负责Topic的分配以及后续管理⼯作。在我们实验的集群中,这 个Controller实际上是通过ZooKeeper产⽣的。
版权归原作者 玖柒_lin 所有, 如有侵权,请联系我们删除。