0


【Kafka】概述与集群部署

文章目录

Kafka概述

定义

kafka是一种分布式的,基于发布/订阅的消息队列 (MessageQueue)。它可以处理消费者在网站中的所有动作流数据。

Kafka是一个开源的分布式事件流平台(Event StreamingPlatform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。(既想处理消息队列,又想处理数据)

应用场景

缓冲/削峰

有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。

解耦

允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

异步通信

允许用户把一个消息放入队列,但并不立即处理它,然后在需要的时候再去处理它们。

应用模式

点对点模式

消费者主动拉去数据,消息收到后清除消息

发布/订阅模式

• 可以有多个topic主题(浏览、点赞、收藏、评论等)
• 消费者消费数据之后,不删除数据
• 每个消费者相互独立,都可以消费到数据

基础架构

(1)Producer:消息生产者,就是向Kafka broker发消息的客户端。
(2)Consumer:消息消费者,向Kafka broker取消息的客户端。
(3)Consumer Group(CG):消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
(4)Broker:一台Kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
(5)Topic:可以理解为一个队列,生产者和消费者面向的都是一个topic。
(6)Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。
(7)Replica:副本。一个topic的每个分区都有若干个副本,一个Leader和若干个Follower。
(8)Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是Leader。
(9)Follower:每个分区多个副本中的“从”,实时从Leader中同步数据,保持和Leader数据的同步。Leader发生故障时,某个Follower会成为新的Leader。

Kafka集群部署

集群规划

hadoop102hadoop103hadoop104zkzkzkkafkakafkakafka

下载解压

官网地址;https://kafka.apache.org/downloads

下载

cd /opt/module

wget https://downloads.apache.org/kafka/3.4.0/kafka_2.12-3.4.0.tgz

解压

tar -zxvf kafka_2.12-3.4.0.tgz -C /opt/module/

改名

mv kafka_2.12-3.4.0/ kafka

修改配置文件

路径:config/server.properties

  • broker.id=0
  • log.dirs=/opt/module/kafka/datas
  • zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
  1. #broker的全局唯一编号,不能重复,只能是数字。
  2. broker.id=0
  3. #处理网络请求的线程数量
  4. num.network.threads=3
  5. #用来处理磁盘IO的线程数量
  6. num.io.threads=8
  7. #发送套接字的缓冲区大小
  8. socket.send.buffer.bytes=102400
  9. #接收套接字的缓冲区大小
  10. socket.receive.buffer.bytes=102400
  11. #请求套接字的缓冲区大小
  12. socket.request.max.bytes=104857600
  13. #kafka运行日志(数据)存放的路径,路径不需要提前创建,kafka自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
  14. log.dirs=/opt/module/kafka/datas
  15. #topic在当前broker上的分区个数
  16. num.partitions=1
  17. #用来恢复和清理data下数据的线程数量
  18. num.recovery.threads.per.data.dir=1
  19. # 每个topic创建时的副本数,默认时1个副本
  20. offsets.topic.replication.factor=1
  21. #segment文件保留的最长时间,超时将被删除
  22. log.retention.hours=168
  23. #每个segment文件的大小,默认最大1G
  24. log.segment.bytes=1073741824
  25. # 检查过期数据的时间,默认5分钟检查一次是否数据过期
  26. log.retention.check.interval.ms=300000
  27. #配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理)
  28. zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka

分发安装包

这一步的作用是让另外几台服务器也都有 kafka/ 这套文件

  • 命令:xsync kafka/
  • 下面是xsync.sh脚本的内容
  1. #!/bin/bash
  2. #1. 判断参数个数
  3. if [ $# -lt 1 ]
  4. then
  5. echo Not Enough Arguement!
  6. exit;
  7. fi
  8. #2. 遍历集群所有机器
  9. for host in hadoop102 hadoop103 hadoop104
  10. do
  11. echo ==================== $host ====================
  12. #3. 遍历所有目录,挨个发送
  13. for file in $@
  14. do
  15. #4. 判断文件是否存在
  16. if [ -e $file ]
  17. then
  18. #5. 获取父目录
  19. pdir=$(cd -P $(dirname $file); pwd)
  20. #6. 获取当前文件的名称
  21. fname=$(basename $file)
  22. ssh $host "mkdir -p $pdir"
  23. rsync -av $pdir/$fname $host:$pdir
  24. else
  25. echo $file does not exists!
  26. fi
  27. done
  28. done

hadoop103、hadoop104修改配置文件

  1. # broker.id不得重复,整个集群中唯一
  2. # hadoop103对应的
  3. broker.id=1
  4. # hadoop104对应的
  5. broker.id=2

配置环境变量

路径:vim /etc/profile.d/my_env.sh

注意:集群内的机子都需要配一遍

  1. # KAFKA_HOME
  2. export KAFKA_HOME=/opt/module/kafka
  3. export PATH=$PATH:$KAFKA_HOME/bin

刷新一下环境变量

  1. source /etc/profile

启动集群

先启动Zookeeper集群

  1. kafka/zk.sh start
  2. vim kafka/zk.sh
  3. #!/bin/bash
  4. case $1 in
  5. "start"){
  6. for i in hadoop102 hadoop103 hadoop104
  7. do
  8. echo ------------- zookeeper $i 启动 ------------
  9. ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start"
  10. done
  11. }
  12. ;;
  13. "stop"){
  14. for i in hadoop102 hadoop103 hadoop104
  15. do
  16. echo ------------- zookeeper $i 停止 ------------
  17. ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop"
  18. done
  19. }
  20. ;;
  21. "status"){
  22. for i in hadoop102 hadoop103 hadoop104
  23. do
  24. echo ------------- zookeeper $i 状态 ------------
  25. ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status"
  26. done
  27. }
  28. ;;
  29. esac

然后启动Kafka

  1. bin/kafka-server-start.sh -daemon config/server.properties
  2. bin/kafka-server-start.sh -daemon config/server.properties
  3. bin/kafka-server-start.sh -daemon config/server.properties

关闭集群

  1. bin/kafka-server-stop.sh

集群启停脚本

脚本编写

vim kf.sh

  1. #! /bin/bash
  2. case $1 in
  3. "start"){
  4. for i in hadoop102 hadoop103 hadoop104
  5. do
  6. echo " --------启动 $i Kafka-------"
  7. ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
  8. done
  9. };;
  10. "stop"){
  11. for i in hadoop102 hadoop103 hadoop104
  12. do
  13. echo " --------停止 $i Kafka-------"
  14. ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "
  15. done
  16. };;
  17. esac

添加执行权限

  1. chmod +x kf.sh

启动集群脚本命令

  1. kf.sh start

停止集群脚本命令

  1. kf.sh stop

注意:****停止Kafka集群时,一定要等Kafka所有节点进程全部停止后再停止Zookeeper集群。因为Zookeeper集群当中记录着Kafka集群相关信息,Zookeeper集群一旦先停止,Kafka集群就没有办法再获取停止进程的信息,只能手动杀死Kafka进程了。

Docker启动Kafka集群

docker-compose.yml编写

  1. version: '3'
  2. services:
  3. li-zookeeper:
  4. image: wurstmeister/zookeeper:latest
  5. ports:
  6. - "7181:2181"
  7. networks:
  8. - li-kafka-net
  9. li-kafka-1:
  10. image: wurstmeister/kafka
  11. environment:
  12. KAFKA_LISTENERS: PLAINTEXT://:9092
  13. KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://139.196.169.148:7091
  14. KAFKA_BROKER_ID: 1
  15. KAFKA_ZOOKEEPER_CONNECT: li-zookeeper:2181
  16. ports:
  17. - "7091:9092"
  18. networks:
  19. - li-kafka-net
  20. li-kafka-2:
  21. image: wurstmeister/kafka
  22. environment:
  23. KAFKA_LISTENERS: PLAINTEXT://:9092
  24. KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://139.196.169.148:7092
  25. KAFKA_BROKER_ID: 2
  26. KAFKA_ZOOKEEPER_CONNECT: li-zookeeper:2181
  27. ports:
  28. - "7092:9092"
  29. networks:
  30. - li-kafka-net
  31. li-kafka-3:
  32. image: wurstmeister/kafka
  33. environment:
  34. KAFKA_LISTENERS: PLAINTEXT://:9092
  35. KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://139.196.169.148:7093
  36. KAFKA_BROKER_ID: 3
  37. KAFKA_ZOOKEEPER_CONNECT: li-zookeeper:2181
  38. ports:
  39. - "7093:9092"
  40. networks:
  41. - li-kafka-net
  42. li-kafka-map:
  43. image: dushixiang/kafka-map:latest
  44. environment:
  45. KAFKA_MAP_KAFKA_SERVERS: li-kafka-1:9092,li-kafka-2:9092,li-kafka-3:9092
  46. KAFKA_MAP_USERNAME: admin
  47. KAFKA_MAP_PASSWORD: admin
  48. ports:
  49. - "8080:8080"
  50. networks:
  51. - li-kafka-net
  52. networks:
  53. li-kafka-net:
  54. driver: bridge

启动compose

  1. docker-compose up -d

命令行验证

  1. docker exec -it kafka-server bash
  2. cd /opt/bitnami/kafka
  3. 列出所有topics
  4. kafka-topics.sh --bootstrap-server 139.196.169.148:7091 --list
  5. 创建一个topic
  6. kafka-topics.sh --bootstrap-server 139.196.169.148:7092 --create --partitions 1 --replication-factor 3 --topic first
  7. 生产者
  8. kafka-console-producer.sh --bootstrap-server 139.196.169.148:7091 --topic first
  9. 消费者
  10. kafka-console-consumer.sh --bootstrap-server 139.196.169.148:7092 --from-beginning --topic first

Python验证

  • 安装包pip install kafka-python
  • 生产者from kafka import KafkaProducerfrom kafka.errors import KafkaError# Kafka 集群地址bootstrap_servers = ['139.196.169.148:7091', '139.196.169.148:7092', '139.196.169.148:7093']# Kafka 主题名称topic = 'first'# 创建 Kafka 生产者producer = KafkaProducer(bootstrap_servers=bootstrap_servers)# 发送消息到 Kafka 主题def send_message(message): try: producer.send(topic, message.encode('utf-8')) producer.flush() print('Message sent successfully:', message) except KafkaError as e: print('Failed to send message:', e)# 测试发送消息send_message('Hello, Kafka!')
  • 消费者from kafka import KafkaConsumer# Kafka 集群地址bootstrap_servers = ['139.196.169.148:7091', '139.196.169.148:7092', '139.196.169.148:7093']# Kafka 主题名称topic = 'first'# 创建 Kafka 消费者consumer = KafkaConsumer( topic, bootstrap_servers=bootstrap_servers, auto_offset_reset='earliest', # 从最早的消息开始消费 enable_auto_commit=True, # 自动提交消费位移 group_id='my-group') # 消费者组名称# 从 Kafka 主题消费消息def consume_message(): for message in consumer: print('Message received:', message.value.decode('utf-8'))# 测试消费消息consume_message()
标签: kafka 分布式 java

本文转载自: https://blog.csdn.net/al6nlee/article/details/130512935
版权归原作者 Al6n Lee 所有, 如有侵权,请联系我们删除。

“【Kafka】概述与集群部署”的评论:

还没有评论