0


消息中间件:Kafka

(1)Zookeeper安装

https://downloads.apache.org/zookeeper/

(1)更新系统的包管理器

sudo yum update

(2)安装JDK

sudo yum install java-1.8.0-openjdk-devel

(3)下载ZooKeeper

cd /usr/local/

wget https://downloads.apache.org/zookeeper/zookeeper-3.7.1/apache-zookeeper-3.7.1-bin.tar.gz

(4)解压ZooKeeper

tar -xvf apache-zookeeper-3.7.1-bin.tar.gz

(5)重命名为”zookeeper”

mv apache-zookeeper-3.7.1-bin zookeeper

(6)创建ZooKeeper数据目录

mkdir /usr/local/zookeeper/data

mkdir /usr/local/zookeeper/logs

(7)创建ZooKeeper配置文件:

ZooKeeper的滴答时间(以毫秒为单位)、ZooKeeper存储数据的数据目录、ZooKeeper监听的客户端端口

vim /usr/local/zookeeper/conf/zoo.cfg

tickTime=2000

dataDir=/usr/local/zookeeper/data

dataLogDir=/usr/local/zookeeper/logs

clientPort=2181

(8)启动ZooKeeper

权限不足解决方案:su root、chmod a+xwr zkServer.sh

/usr/local/zookeeper/bin/zkServer.sh start

(9)使用如下命令检查ZooKeeper是否正在运行:

/usr/local/zookeeper/bin/zkServer.sh status

(2)Kafka安装

https://kafka.apache.org/

权限不足解决方案:

chmod a+xwr kafka-topics.sh

chmod a+xwr kafka-console-producer.sh

chmod a+xwr kafka-console-consumer.sh

chmod a+xwr kafka-consumer-groups.sh

(1)安装Kafka

cd /usr/local/

tar -zxvf /usr/local/kafka_2.11-2.4.0.tgz

mv kafka_2.11-2.4.0 kafka

(2)配置Kafka

vim /usr/local/kafka/config/server.properties

broker.id=0

listeners=PLAINTEXT://192.168.19.131:9092

log.dirs=/usr/local/kafka/data/kafka-logs

zookeeper.connect=192.168.19.131:2181

(3)启动Kafka

cd /usr/local/kafka/bin

./kafka-server-start.sh -daemon ../config/server.properties

ps -aux | grep server.properties

(4)使用Kafka

创建主题

./kafka-topics.sh --create --zookeeper 192.168.19.131:2181 --replication-factor 1 --partitions 1 --topic test

查看主题

./kafka-topics.sh --list --zookeeper 192.168.19.131:2181

发送消息

./kafka-console-producer.sh --broker-list 192.168.19.131:9092 --topic test

接收消息方式一:从最后一条消息的偏移量+1开始消费

./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092 --topic test

接收消息方式二:从头开始消费

./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092 --from-beginning --topic test

(5)单播消息:一个消费组只有一个消费者能消费

./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092 --consumer-property group.id=group1 --topic test

./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092 --consumer-property group.id=group1 --topic test

(6)多播消息:不同的消费者处于不同的消费组

./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092 --consumer-property group.id=group1 --topic test

./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092 --consumer-property group.id=group2 --topic test

(7)查看消费组

./kafka-consumer-groups.sh --bootstrap-server 192.168.19.131:9092 --list

./kafka-consumer-groups.sh --bootstrap-server 192.168.19.131:9092 --describe --group group1

./kafka-consumer-groups.sh --bootstrap-server 192.168.19.131:9092 --describe --group group2

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID

group1 test 0 22 22 0 consumer-group1

Current-offset:当前消费组已经消费的偏移量

Log-end-offset:主题对应分区消息的结束偏移量(HW)

Lag:当前消费组未消费的消息数

(8)主题分区

./kafka-topics.sh --create --zookeeper 192.168.19.131:2181 --replication-factor 1 --partitions 2 --topic topic1

./kafka-topics.sh --describe --zookeeper 192.168.19.131:2181 --topic topic1

cd /usr/local/kafka/data/kafka-logs/topic1-0

cd /usr/local/kafka/data/kafka-logs/topic1-1

说明:定期将自己消费分区的offset提交给kafka内部topic、key是consumerGroupId+topic+分区、value是当前offset值

说明:kafka会定期清理topic里的消息、默认保存7天、7天后消息会被删除

说明:通过此公式可以选出consumer消费的offset要提交到哪个分区:hash(consumerGroupId)%__consumer_offsets主题分区数

__consumer_offsets-0

__consumer_offsets-49

(3)Kafka集群

(1)Kafka集群、3个broker

3个server.properties

vim server0.properties

broker.id=0

listeners=PLAINTEXT://192.168.19.131:9092

log.dirs=/usr/local/kafka/data/kafka-logs-0

vim server1.properties

broker.id=1

listeners=PLAINTEXT://192.168.19.131:9093

log.dirs=/usr/local/kafka/data/kafka-logs-1

vim server2.properties

broker.id=2

listeners=PLAINTEXT://192.168.19.131:9094

log.dirs=/usr/local/kafka/data/kafka-logs-2

./kafka-server-start.sh -daemon ../config/server0.properties

./kafka-server-start.sh -daemon ../config/server1.properties

./kafka-server-start.sh -daemon ../config/server2.properties

(2)副本:1个主题、2个分区、3个副本

./kafka-topics.sh --create --zookeeper 192.168.19.131:2181 --replication-factor 3 --partitions 2 --topic topic2

./kafka-topics.sh --describe --zookeeper 192.168.19.131:2181 --topic topic2

Topic: topic2 PartitionCount: 2 ReplicationFactor: 3 Configs:

Topic: topic2 Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1

Topic: topic2 Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2

Leader:

写和读的操作都在Leader上、Leader负责把数据同步到follower、当leader挂了、经过主从选举、从多个follower中选举产生一个新Leader

Follower:

接收leader的同步的数据

Isr:

可以同步的broker节点和已同步的broker节点、存放在isr集合中、如果isr节点中的性能较差、会被踢出isr集合

总结:broker、主题、分区、副本

[root@web-server data]# ls

kafka-logs kafka-logs-0 kafka-logs-1 kafka-logs-2

[root@web-server kafka-logs-1]# ls

cleaner-offset-checkpoint log-start-offset-checkpoint meta.properties recovery-point-offset-checkpoint replication-offset-checkpoint topic2-0 topic2-1

[root@web-server kafka-logs-2]# ls

cleaner-offset-checkpoint log-start-offset-checkpoint meta.properties recovery-point-offset-checkpoint replication-offset-checkpoint topic2-0 topic2-1

(3)Kafka集群消息的发送

./kafka-console-producer.sh --broker-list 192.168.19.131:9092,192.168.19.131:9093,192.168.19.131:9094 --topic topic2

(4)Kafka集群消息的发送

./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092,192.168.19.131:9093,192.168.19.131:9094 --consumer-property group.id=group1 --from-beginning --topic topic2

./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092,192.168.19.131:9093,192.168.19.131:9094 --consumer-property group.id=group2 --from-beginning --topic topic2

难点:一个Partition只能被一个组中的一个Consumer消费、一个Consumer可以消费多个Partition。

注意:Kafka只在Partition分区的范围内保证消息消费的局部顺序性、不能在同一个topic主题中的多个Partition中保证总的消费顺序性。

(4)Kafka-eagle监控

(1)安装JDK

yum install java-1.8.0-openjdk-devel

/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.372.b07-1.el7_9.x86_64

java -version

(2)解压

tar -zxvf kafka-eagle-bin-3.0.1.tar.gz

tar -zxvf efak-web-3.0.1-bin.tar.gz

mv efak-web-3.0.1 efak-web

mv efak-web ../

cd /usr/local/efak-web

(3)配置环境变量

vim /etc/profile

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.372.b07-1.el7_9.x86_64

export KE_HOME=/usr/local/efak-web

export PATH=$PATH:$JAVA_HOME/bin:$KE_HOME/bin

source /etc/profile

(4)kafka-eagle内部配置问

vim /usr/local/efak-web/conf/system-config.properties

efak.zk.cluster.alias=cluster1

cluster1.zk.list=192.168.19.131:2181

efak.driver=com.mysql.cj.jdbc.Driver

efak.url=jdbc:mysql://192.168.3.53:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull

efak.username=root

efak.password=root

(5)启动

./ke.sh start

http://192.168.19.131:8048

admin/123456

windows版本启动问题解决方案:

例如:如果原来希望输入的命令为:

C:\Program Files\Java\jdk-11.0.12\bin\java.exe -jar xxx.jar

1

现在应改为:

"C:\Program Files\Java\jdk-11.0.12\bin\java.exe" -jar xxx.jar

(5)案例:zookeeper+kafka

一、安装docker

1、Docker 要求 CentOS 系统的内核版本高于 3.10 ,查看本页面的前提条件来验证你的CentOS 版本是否支持 Docker 。

通过 uname -r 命令查看你当前的内核版本

$ uname -r

2、使用 root 权限登录 Centos。确保 yum 包更新到最新。

$ sudo yum update

3、卸载旧版本(如果安装过旧版本的话)

$ sudo yum remove docker docker-common docker-selinux docker-engine

4、安装需要的软件包, yum-util 提供yum-config-manager功能,另外两个是devicemapper驱动依赖的

$ sudo yum install -y yum-utils device-mapper-persistent-data lvm2

5、设置yum源

$ sudo yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo

6、可以查看所有仓库中所有docker版本,并选择特定版本安装

$ yum list docker-ce --showduplicates | sort -r

7、安装docker

#$ sudo yum install docker-ce #没有版本默认安装最新版本、由于repo中默认只开启stable仓库,故这里安装的是最新稳定版17.12.0

$ sudo yum install <FQPN> # 例如:sudo yum install docker-ce-17.12.0.ce

8、启动并加入开机启动

$ sudo systemctl start docker

$ sudo systemctl enable docker

9、验证安装是否成功(有client和service两部分表示docker安装启动都成功了)

$ docker version

##############################################################################################

拉取zookeeker

docker pull wurstmeister/zookeeper

拉取kafka版本为2.12-2.2.0,不填写版本好则安装最新,但是个别系统会报错

docker pull wurstmeister/kafka:2.12-2.2.0

启动zookeeper

docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper

启动kafka

docker run --name kafka01 \

-p 9092:9092 \

-e KAFKA_BROKER_ID=0 \

-e KAFKA_ZOOKEEPER_CONNECT=127.0.0.1:2181 \

-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \

-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \

-d wurstmeister/kafka

进入Kafka容器类kafka01是容器名称,也可以填写成容器ID

docker exec -it kafka01 /bin/bash

创建my_log topic

/opt/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.3.191:2181 --replication-factor 1 --partitions 1 --topic my_log

查询创建的主题

/opt/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.3.191:2181

标签: kafka linq 分布式

本文转载自: https://blog.csdn.net/u010605984/article/details/135725440
版权归原作者 JAVA灯塔 所有, 如有侵权,请联系我们删除。

“消息中间件:Kafka”的评论:

还没有评论