🌻🌻 目录
学习本文技术需要已经有如下的基础要求:
- 熟悉
Javase
基础- 熟悉
Linux
常用命令- 熟悉
ldea
开发工具
一、Kafka 概述
1.1 定义
Kafka是
由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
1.2 消息队列
- 目前企业中比较常见的消息队列产品主要有
Kafka
、ActiveMQ
、RabbitMQ
、RocketMQ
等。- 在大数据场景主要采用Kafka作为消息队列。
在JavaEE开发中主要采用ActiveMQ、RabbitMQ、RocketMQ 作为消息队列。
1.2.1 消息队列内部实现原理
消息队列(非kafka)内部实现原理
1.2.2 传统消息队列的应用场景
传统的消息队列的主要应用场景包括:
缓存/消峰
(消去峰值)、
解耦和异步通信
。
- 消息队列的应用场景——
缓存/消峰
- 消息队列的应用场景——
解耦
- 消息队列的应用场景——
异步通信
1.2.3 消息队列的两种模式
1.3 Kafka 基础架构
- (1)
Producer
:消息生产者,就是向Kafka broker发消息的客户端。- (2)
Consumer
:消息消费者,向Kafka broker取消息的客户端。- (3)
Consumer Group(CG)
:消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。- (4)
Broker
:一台Kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。- (5)
Topic
:可以理解为一个队列,生产者和消费者面向的都是一个topic。- (6)
Partition
:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。- (7)
Replica
:副本。一个topic的每个分区都有若干个副本,一个Leader和若干个Follower。- (8)
Leader
:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是Leader。- (9)
Follower
:每个分区多个副本中的“从”,实时从Leader中同步数据,保持和Leader数据的同步。Leader发生故障时,某个Follower会成为新的Leader。
抽象理解 (生产者消费者):
- 生产者消费者,生产者生产鸡蛋,消费者消费鸡蛋,生产者生产一个鸡蛋,消费者就消费一个鸡蛋,假设消费者消费鸡蛋的时候噎住了(系统宕机了),生产者还在生产鸡蛋,那新生产的鸡蛋就丢失了。
- 再比如生产者很强劲(大交易量的情况),生产者1秒钟生产100个鸡蛋,消费者1秒钟只能吃50个鸡蛋,那要不了一会,消费者就吃不消了(消息堵塞,最终导致系统超时),消费者拒绝再吃了,”鸡蛋“又丢失了。
- 这个时候我们放个篮子在它们中间,生产出来的鸡蛋都放到篮子里,消费者去篮子里拿鸡蛋,这样鸡蛋就不会丢失了,都在篮子里。
上面的例子里面:
1.篮子就是kafka,鸡蛋其实就是数据流,系统之间的交互都是通过数据流来传输的(就是tcp、https什么的),也称为报文,或者消息。
2. 消息队列满了,其实就是篮子满了,鸡蛋放不下了,那赶紧多放几个篮子,其实就是kafka的扩容。
producer
:生产者,就是它来生产“鸡蛋”的。consumer
:消费者,生出的“鸡蛋”它来消费。broker
:就是篮子了,鸡蛋生产出来后放在篮子里。topic
:你把它理解为标签,生产者每生产出来一个鸡蛋就贴上一个标签(topic),消费者可不是谁生产的“鸡蛋”都吃的,有的只吃草鸡蛋,有的吃洋鸡蛋,篮子中分为一个个小盒子,草鸡蛋放一个盒子里,洋鸡蛋放另一个盒子里。这样不同的生产者生产出来的“鸡蛋”,消费者就可以选择性的“吃”了。
大家一定要学会抽象的去思考,上面只是属于业务的角度,如果从技术角度,
topic
标签实际就是队列,生产者把所有“鸡蛋(消息)”都放到对应的队列里了,消费者到指定的队列里取。
二、 Kafka 快速入门
2.1 安装前的准备
- 先搭建一台虚拟机,再克隆三台出来
- 步骤在这里(三台上面仅需安装jdk,hadoop(可不装),zookeeper(必须装))
2.2 安装部署
安装前的准备:
资源获取:
通过百度网盘分享的文件获取:
kafka软件大全 提取码:yyds
- 1.VMware的安装:VMware-workstation-full-15.5.0
- 2.镜像的安装:CentOS-7.5-x86_64-DVD-1804.iso
- 3.JDK的安装: jdk-8u212-linux-x64.tar.gz
- 4.Hadoop的安装: hadoop-3.1.3.tar.gz
2.2.1 集群规划
linux-102linux-103linux-104zkzkzkkafkakafkakafka
2.2.2 单节点或集群部署
- 0)官方下载 获取
- 1)本地资源下载获取(上面网盘获取)
- 2)上传压缩包到服务器 /usr/local下面并且进行解压
tar -zxvf kafka_2.12-3.0.0.tgz
3)修改解压后的文件名称到当前目录下
mv kafka_2.12-3.0.0 kafka
4)进入到
/usr/local/kafka/config
目录,修改配置文件
server.properties
如下③ 处地方,并保存。
vi server.properties
①
②
③
创建文件夹
datas
5)分发安装包(针对上述集群配置,注:分发后配置里面的这个需要依次改动
broker.id=0
,broker.id不得重复,整个集群中唯一。)
scp -r /usr/local/kafka root@linux-103:/usr/local/
6)配置环境变量
(1)在
/etc/profile
文件中增加kafka环境变量配置
增加如下内容:
#kafka
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
(2)刷新一下环境变量。
(3)分发环境(
只针对集群
)变量文件到其他节点,并source。
7)启动集群
(1)先启动Zookeeper集群,然后启动Kafka。
cd /usr/local/zookeeper/bin/
./zkServer.sh start
./zkServer.sh status
cd /usr/local/kafka/bin/
./kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
jps
(2)依次在linux-102、linux-103、linux-104节点上启动Kafka。(
针对集群
)(如同上述启动)
注意:配置文件的路径要能够到server.properties。
8)关闭(集群)
./kafka-server-stop.sh
2.2.3 集群启停脚本
1)在
/home/bin
目录下创建文件
kf.sh
脚本文件(没有bin文件夹则创建文件夹)
脚本如下:
#! /bin/bash
case $1 in
"start"){
#集群
#for i in linux-102 linux-103 linux-104
#单节点
for i in linux-102
do
echo " --------启动 $i Kafka-------"
ssh $i "/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties"
done
};;
"stop"){
#集群
#for i in linux-102 linux-103 linux-104
#单节点
for i in linux-102
do
echo " --------停止 $i Kafka-------"
ssh $i "/usr/local/kafka/bin/kafka-server-stop.sh "
done
};;
esac
2)添加执行权限
chmod +x kf.sh
3)启动集群命令
kf.sh start
4)停止集群命令
./kf.sh stop
注意:停止Kafka集群时,一定要等Kafka所有节点进程全部停止后再停止Zookeeper集群。因为Zookeeper集群当中记录着Kafka集群相关信息,Zookeeper集群一旦先停止,Kafka集群就没有办法再获取停止进程的信息,只能手动杀死Kafka进程了。
2.3 Kafka命令行操作
2.3.1 主题命令行操作
1)查看操作主题命令参数
./kafka-topics.sh
参数描述- -bootstrap-server <String: server toconnect to>连接的Kafka Broker主机名称和端口号。- -topic <String: topic>操作的topic名称。- - create创建主题。- - delete删除主题。- - alter修改主题。- - list查看所有主题。- - describe查看主题详细描述。- - partitions <Integer: # of partitions>设置分区数。- - replication-factor<Integer: replication factor>设置分区副本。- - config <String: name=value>更新系统默认的配置。
2)查看当前服务器中的所有 topic(其中
9092
是kafka的默认端口)
./kafka-topics.sh --bootstrap-server linux-102:9092 --list
#一般是生产环境
./kafka-topics.sh --bootstrap-server linux-102:9092,linux-103:9092 --list
3)创建 first topic
./kafka-topics.sh --bootstrap-server linux-102:9092 --topic first --create --partitions 1 --replication-factor 1
./kafka-topics.sh --bootstrap-server linux-102:9092 --list
选项说明:
--topic 定义topic名
--partitions 定义分区数
--replication-factor 定义副本数
4)查看first主题的详情
./kafka-topics.sh --bootstrap-server linux-102:9092 --describe --topic first
5)修改分区数(注意:分区数只能增加,不能减少)
6)再次查看first主题的详情
./kafka-topics.sh --bootstrap-server linux-102:9092 --alter --topic first --partitions 2
./kafka-topics.sh --bootstrap-server linux-102:9092 --describe --topic first
7)删除 topic,删除后再次查看还有没有 topic
./kafka-topics.sh --bootstrap-server linux-102:9092 --delete --topic first
./kafka-topics.sh --bootstrap-server linux-102:9092 --list
2.3.2 生产者命令行操作
操作:开两个窗口,一个作为生产者,一个作为消费者,当在生产者窗口命令输入的时候,消费者窗口就会自动输出(附截图最后)。
1)查看操作生产者命令参数
kafka-console-producer.sh
参数描述- -bootstrap-server <String: server toconnect to>连接的Kafka Broker主机名称和端口号。- -topic <String: topic>操作的topic名称。
2)发送消息
2.3.3 消费者命令行操作
1)查看操作消费者命令参数
./kafka-console-consumer.sh
参数描述- -bootstrap-server <String: server toconnect to>连接的Kafka Broker主机名称和端口号。- -topic <String: topic>操作的topic名称。- -from-beginning从头开始消费。- -group <String: consumer group id>指定消费者组名称。
2)消费消息
(1)消费first主题中的数据。
./kafka-console-consumer.sh --bootstrap-server linux-102:9092 --topic first
(2)把主题中所有的数据都读取出来(包括历史数据)。
./kafka-console-consumer.sh --bootstrap-server linux-102:9092 --from-beginning --topic first
2.3.4 生产者生产消费者消费
版权归原作者 Daniel521-Spark 所有, 如有侵权,请联系我们删除。