消息队列概念
- 什么是消息队列 - 消息(Message)是指在应用传送的数据- 消息队列(Message Queue)是一种应用间的通信方式解决方法,确保消息的可靠传递。
- 为什么需要消息队列 - 解耦 - 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束- 冗余 - 消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的“插入—获取—删除”范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕- 扩展性 - 因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可- 灵活性与峰值处理能力 - 在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件项住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃- 可恢复性 - 系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理- 顺序保证 - 在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。(Kafka 保证一个Partition 内的消息的有序性)- 异步通信 - 很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个 消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处 理它们
消息队列的特征
- 存储 - 将消息存储在某种类型的缓冲区中,直到目标进程读取这些消息或将其从消息队列中显式移除为止
- 异步 - 消息队列通过缓冲消息可以在应用程序中公开一定程度的异步性,允许源进程发送消息并在队列中累计消息,而目标进程则可以挑选消息进行处理
kafka基础概念
- 什么是kafka - Kafka是一种高吞吐量的分布式发布订阅消息系统- kafaka是Apache组织下的一个开源系统- 开源实时的处理大量数据以满足各种需求场景
- kafka角色术语
- Broker:Kafka集群包含一个或多个服务器,每个服务器被称为broker(经纪人)
- Topic:每条发布到Kafka集群的消息都有一个分类,这个类别被称为Topic(主题)
- Producer:指消息的生产者。负责发布消息到kafka broker
- Consumer:指消息的消费者,从kafka broker拉取数据,并消费这些已经发布的消息。
- Patition:Patition物理上的概念,每个Topic包含一个或多个Patition或多个Patition。每个Patition中的每条消息都会被分配一个有序的id(offset)
- Consumer Group:消费者组,可以给每个Consumer指定消费组,若不指定消费者组,则舒徐默认的group
- Message:通信的基本单位,每个producer可以项一个topic发布一些消息
zookeeper基础概念
- Zookeeper是一种分布式协调技术。所谓分布式协调技术主要是用来解决分布式环境大概在多个进程之间的同步控制,让他们有序的去访问某种共享资源,防止资源竞争(脑裂)的后果。
zookeeper工作原理
- master启动 - 各几点项ZooKeeper中注册节点信息,以编号最小算法选举出一个主节点,另外的节点技术备用节点,有zookeeper完成对两个Msater进行的调度,和乐主,备节点的分配和协议。
- master故障 - 如果主节点A发生了故障,这时候在Zookeeper所注册的节点信息会被自动删除,并非再次发送选举
- master修复 - 如果主机欸但修复了,他会再次向zookeeper注册自身的节点信息,但注册的节点编号会别笑,因此编号称为master,而是另一台继续担任master
单节点部署kafka
#安装zookeeper[root@bogon ~]# yum -y install java[root@bogon ~]# tar zxvf apache-zookeeper-3.6.0-bin.tar.gz[root@bogon ~]# mv apache-zookeeper-3.6.0-bin /etc/zookeeper[root@bogon ~]# cd /etc/zookeeper/conf[root@bogon conf]# mv zoo_sample.cfg zoo.cfg[root@bogon conf]# vim zoo.cfg #更改一下内容dataDir=/etc/zookeeper/zookeeper-data
[root@bogon conf]# cd /etc/zookeeper/[root@bogon kafka]# mkdir zookeeper-data[root@bogon zookeeper]# ./bin/zkServer.sh start[root@bogon zookeeper]# ./bin/zkServer.sh status#安装kafka[root@bogon ~]# tar zxvf kafka_2.13-2.4.1.tgz [root@bogon ~]# mv kafka_2.13-2.4.1 /etc/kafka[root@bogon ~]# cd /etc/kafka/#更改一下内容[root@kafka1 kafka]# vim config/server.properties log.dirs=/etc/kafka/kafka-logs #60行[root@bogon kafka]# mkdir /etc/kafka/kafka-logs[root@bogon kafka]# bin/kafka-server-start.sh config/server.properties &#检查两个端口的开启状态[root@bogon kafka]# netstat -anpt | grep 2181[root@bogon kafka]# netstat -anpt | grep 9092#测试
群集部署kafka
#修改主机hosts文件(所有主机都配置)[root@kafka1 ~]# vim /etc/hosts192.168.10.101 kafka1
192.168.10.102 kafka2
192.168.10.103 kafka3
#zookeeper的部署#安装zookeeper(三个节点的配置相同)[root@kafka1 ~]# systemctl stop firewalld[root@kafka1 ~]# setenforce 0[root@kafka1 ~]# yum -y install java[root@kafka1 ~]# tar zxvf apache-zookeeper-3.6.0-bin.tar.gz[root@kafka1 ~]# mv apache-zookeeper-3.6.0-bin /etc/zookeeper#创建数据保存目录(三个节点的配置相同)[root@kafka1 ~]# cd /etc/zookeeper/[root@kafka1 zookeeper]# mkdir zookeeper-data#修改配置文件(三个节点的配置相同)[root@kafka1 zookeeper]# cd /etc/zookeeper/conf[root@kafka1 ~]# mv zoo_sample.cfg zoo.cfg[root@kafka1 ~]# vim zoo.cfg dataDir=/etc/zookeeper/zookeeper-data
clientPort=2181server.1=192.168.10.101:2888:3888 #2181:对cline端提供服务 3888:选举leader使用 #2888:集群内机器通讯使用(Leader监听此端口)server.2=192.168.10.102:2888:3888
server.3=192.168.10.103:2888:3888
#创建节点id文件(按server编号设置这个id,三个机器不同)#节点1:[root@kafka1 conf]# echo '1' > /etc/zookeeper/zookeeper-data/myid#节点2:[root@kafka2 conf]# echo '2' > /etc/zookeeper/zookeeper-data/myid#节点3:[root@kafka3 conf]# echo '3' > /etc/zookeeper/zookeeper-data/myid#三个节点启动zookeeper进程[root@kafka1 conf]# cd /etc/zookeeper/[root@kafka1 zookeeper]# ./bin/zkServer.sh start[root@kafka1 zookeeper]# ./bin/zkServer.sh status#kafka的安装(三个节点的配置相同)[root@kafka1 ~]# tar zxvf kafka_2.13-2.4.1.tgz[root@kafka1 ~]# mv kafka_2.13-2.4.1 /etc/kafka#修改配置文件[root@kafka1 ~]# cd /etc/kafka/[root@kafka2 kafka]# vim config/server.properties broker.id=1#其他两个的id分别是2和3listeners=PLAINTEXT://192.168.10.101:9092 #其他节点改成各自的IP地址log.dirs=/etc/kafka/kafka-logs
num.partitions=1#分片数量,不能超过节点数zookeeper.connect=192.168.10.101:2181,192.168.10.102:2181,192.168.10.103:218
#创建日志目录(三个节点的配置相同)[root@kafka1 kafka]# mkdir /etc/kafka/kafka-logs#在所有kafka节点上执行开启命令,生成kafka群集(三个节点的配置相同)[root@kafka1 kafka]# ./bin/kafka-server-start.sh config/server.properties & #测试#创建topic[root@bogon kafka]# ./kafka-topics.sh --create --zookeeper kafka1:2181 --replication-factor 1 --partitions 1 --topic test#列出topic[root@bogon kafka]# ./kafka-topics.sh --list --zookeeper kafka1:2181#查看topic[root@bogon kafka]# ./kafka-topics.sh --describe --zookeeper kafka1:2181 --topic test#生产消息[root@bogon kafka]# ./kafka-console-producer.sh --broker-list kafka1:9092 -topic test#消费消息(打开另一个终端,一边生产消息,一边查看消费消息)[root@bogon kafka]# ./kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic test#删除topic
bin/kafka-topics.sh --delete--zookeeper kafka1:2181 --topictest
本文转载自: https://blog.csdn.net/weixin_67764171/article/details/140440515
版权归原作者 亚特兰蒂斯453 所有, 如有侵权,请联系我们删除。
版权归原作者 亚特兰蒂斯453 所有, 如有侵权,请联系我们删除。