一:消息队列
1: 什么是消息队列
消息(Message) 是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符 串,也可以更复杂,可能包含嵌入对象。
消息队列(Message Queue) 是一种应用间的通信方式,消息发送后可以立即返回, 由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取, 消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方 的存在。
2: 为什么需要消息队列
(1)解耦
允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
(2)冗余
消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。 许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要 你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使 用完毕。
(3)扩展性
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外
增加处理过程即可。
(4)灵活性&峰值处理能力
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果 为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够 使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
(5)可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所 以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
(6)顺序保证:
在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保 证数据会按照特定的顺序来处理。 ****(**Kafka 保证一个 Partition **内的消息的有序性)
(7)缓冲
有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情 况。
(8)异步通信
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户 把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要 的时候再去处理它们。
二:****Kafka 基础与入门
1:kafka** **基本概念
Kafka 是一种高吞吐量的分布式发布/订阅消息系统,这是官方对kafka 的定义,这样 说起来,可能不太好理解,这里简单举个例子:现在是个大数据时代,各种商业、社交、搜 索、浏览都会产生大量的数据。那么如何快速收集这些数据,如何实时的分析这些数据,是 一个必须要解决的问题,同时,这也形成了一个业务需求模型,即生产者生产 (produce)各种数据,消费者 (consume) 消费(分析、处理)这些数据。那么面对这些需求,如何高
效、稳定的完成数据的生产和消费呢?这就需要在生产者与消费者之间,建立一个通信的桥 梁,这个桥梁就是消息系统。从微观层面来说,这种业务需求也可理解为不同的系统之间如 何传递消息。
kafka 是Apache 组织下的一个开源系统,它的最大的特性就是可以实时的处理大量数 据以满足各种需求场景:比如基于 hadoop 平台的数据分析、低时延的实时系统、 storm/spark 流式处理引擎等。kafka 现在它已被多家大型公司作为多种类型的数据管道 和消息系统使用。
2:kafka 角色术语
kafka 的一些核心概念和角色
(1)Broker:Kafka 集群包含一个或多个服务器,每个服务器被称为broker****。
(2)Topic: 每条发布到Kafka 集群的消息都有一个分类,这个类别被称为Topic (主题)。
(3)Producer: 指消息的生产者,负责发布消息到kafka broker。
(4)Consumer: 指消息的消费者,从kafka broker 拉取数据,并消费这些已发布的消息。
(5)Partition:Partition 是物理上的概念,每个*Topic 包含一个或多个Partition,*
每个partition 都是一个有序的队列。partition 中的每条消息都会被分配一个有序的id(offset)。
(6)Consumer Group: 消费者组,可以给每个Consumer 指定消费组,若不指定消费者组, 则属于默认的 group。
(7)Message: 消息,通信的基本单位,每个producer 可以向一个topic 发布一些消息。
三:****zookeeper 概念介绍
ZooKeeper 是一种分布式协调技术,所谓分布式协调技术主要是用来解决分布式环境 当中多个进程之间的同步控制,让他们有序的去访问某种共享资源,防止造成资源竞争(脑 裂)的后果。脑裂是指在主备切换时,由于切换不彻底或其他原因,导致客户端和 ****Slave ****误以为出现两个activemaster, 最终使得整个集群处于混乱状态
ZooKeeper 是一种为分布式应用所设计的高可用、高性能的开源协调服务,它提供了 一项基本服务:分布式锁服务,同时,也提供了数据的维护和管理机制,如:统一命名服务、 状态同步服务、集群管理、分布式消息队列、分布式应用配置项的管理等等。
2:zookeeper 的工作原理
下面通过三种情形来讲解:
(1) master启动
在分布式系统中引入*Zookeeper 以后,就可以配置多个主节点,这里以配置两个主节 点为例,假定它们是主节点A 和主节点B,当两个主节点都启动后,它们都会向ZooKeeper* 中注册节点信息。我们假设主节点A** 锁注册的节点信息是 master00001 ,主节点B *注 册的节点信息是 master00002, 注册完以后会进行选举,选举有多种算法,这里以编号最 小作为选举算法,那么编号最小的节点将在选举中获胜并获得锁成为主节点,也就是主节 点A 将会获得锁成为主节点,然后主节点B 将被阻塞成为一个备用节点。这样,通过这 种方式Zookeeper就完成了对两个***Master ****进程的调度。完成了主、备节点的分配和协作。
(2)master故障
如果 主节点A 发生了故障,这时候它在ZooKeeper 所注册的节点信息会被自动删除,而 ZooKeeper 会自动感知节点的变化,发现主节点A 故障后,会再次发出选举,这时 候 主节点B 将在选举中获胜,替代 主节点A 成为新的主节点,这样就完成了主、被 节点的重新选举。
(3)master** **恢复
如果主节点恢复了,它会再次向ZooKeeper 注册自身的节点信息,只不过这时候它注 册的节点信息将会变成 master00003** , 而不是原来的信息。ZooKeeper 会感知节点的变 化再次发动选举,这时候主节点B **在选举中会再次获胜继续担任 主节点,主节点A 会 担任备用节点。
****zookeeper ****就是通过这样的协调、调度机制如此反复的对集群进行管理和状态同步。
五:群集部署kafka
主机
****kafka1:****192.168.10.101
kafka2:192.168.10.102
kafka3:192.168.10.103
1:zookeeper** **的部署
(1)安装zookeeper (三个节点的配置相同)
[root@kafka1 ~]#yum-y install java
[root@kafka1~]#tar zxvf apache-zookeeper-3.6.0-bin.tar.gz
[root@kafka1~]#mv apache-zookeeper-3.6.0-bin /etc/zookeeper
(2)创建数据保存目录(三个节点的配置相同)
[root@kafka1~]#cd /etc/zookeeper/
[root@kafka1 zookeeper]#mkdir zookeeper-data
(3)修改配置文件(三个节点的配置相同)
[root@kafka1 zookeeper]#cd /etc/zookeeper/conf
[root@kafka1 ~]#mv zo0_sample.cfg zo0.cfg
[root@kafka1 ~]#vim zo0.cfg
dataDir=/etc/zookeeper/zookeeper-data clientPort=2181
server.1=192.168.10.114:2888:3888
server.2=192.168.10.115:2888:3888
server.3=192.168.10.116:2888:3888
zookeeper 只用的端口 2181:对 cline端提供服务
3888: 选举leader 使用
2888: 集群内机器通讯使用 ( Leader 监听此端口)
(4)创建节点id 文件(按server 编号设置这个id, 三个机器不同)
节点1:
[root@kafka1
conf]#echo
'1'>/etc/zookeeper/zookeeper-data/myid
节点2:
[root@kafka2
conf]#echo
'2'>/etc/zookeeper/zookeeper-data/myid
节点3:
[root@kafka3
conf]#echo
'3'>/etc/zookeeper/zookeeper-data/myid
(5)三个节点启动zookeeper 进程
[root@kafka1 conf]#cd /etc/zookeeper/
[root@kafka1 zookeeper]#./bin/zkServer.sh start
[root@kafka1 zookeeper]#./bin/zkServer.sh status
****2:kafka ****的部署
(1)kafka 的安装(三个节点的配置相同)
[root@kafka1 ~]#tar zxvf kafka_2.13-2.4.1.tgz
[root@kafka1 ~]#mv kafka_2.13-2.4.1 /etc/kafka
(2)修改配置文件
[root@kafka1 ~]#cd /etc/kafka/
[root@kafka2 kafka]#vim config/server.properties
broker.id=1 ##21行修改,注意其他两个的id 分别是2和3
listeners=PLAINTEXT://192.168.10.114:9092 #31行修改,其他节点改成各自的IP 地址
log.dirs=/etc/kafka/kafka-logs # # 6 0 行 修 改
num.partitions=1 ##65行分片数量,不能超过节点数
****zookeeper.connect=192.168.10.114:2181,192.168.10.115:2181,****192.168.10.11
注释:9092是kafka 的监听端口
(3)创建日志目录(三个节点的配置相同)
[root@kafka1 kafka]#mkdir /etc/kafka/kafka-logs
(3)在所有kafka 节点上执行开启命令,生成kafka 群集(三个节点的配置相同)
****[root@kafka1 kafka]#./bin/k********afka-server-start.sh config/server.properties ****&
如果启动不了,可以将/etc/kafka/kafka-logs 中的数据清除再试试
3: 测试
创建 topic (任意一个节点)
bin/kafka-topics.sh --create --zookeeper kafka1:2181 --replication-factor 1 --partitions 1 --topic test
列出topic (任意一个节点)
bin/kafka-topics.sh --list --zookeeper
kafka1:2181
bin/kafka-topics.sh --list --zookeeper
kafka2:2181
bin/kafka-topics.sh --list --zookeeper
kafka3:2181
生产消息
bin/kafka-console-pro****ducer.sh --broker-list kafka1:9092 -topic test
消费消息
bin/kafka-console-consumer.sh --bootstr****ap-server kafka1:9092 --topic test
删除topic
bin/kafka-topics.sh --delete --zookeeper kafka1:2181 --topic test
版权归原作者 临桥 所有, 如有侵权,请联系我们删除。