目标
熟悉kafka各个组件的功能。
通过命令的方式验证kafka各个组件之间的关联关系。
相关概念
Broker
kafka节点,多个broker组成kafka集群。
Topic
即主题,kafka通过Topic对消息进行分类,发布到kafka的消息都需要指定Topic。
Producer
即消息生产者,向Broker发送消息的客户端。
Consumer
即消息消费者,从Broker消费消息的客户端。
ConsumerGroup
即消费者组,消费者隶属于消费者组,同一个分区的消息可以被多个消费者消费,但是同一个消费者组中只能有一个消费者可以消费。
Partition
即分区,每个Topic下都至少有一个分区,分区内部的消息是有序的。
环境
zookeeper节点组成的集群,安装目录:
/opt/apache-zookeeper-3.6.2-bin
kafka节点组成的集群,安装目录:
/opt/kafka_2.13-3.1.0
启动zookeeper和kafka节点
第一步:先启动zookeeper集群,即启动三个zookeeper节点,因为kafka的节点需要向zookeeper注册。
#第一步:进入bin目录。
cd /opt/apache-zookeeper-3.6.2-bin/bin
#第二步:启动zookeeper。
./zkServer.sh start
###########################相关命令如下:###########################
#重启
./zkServer.sh restart
#停止
./zkServer.sh stop
#查看日志
./zkServer.sh start-foreground
#进入客户端
./zkCli.sh
第二步:启动kafka集群,即启动三个kafka节点。
#第一步:进入bin目录下。
/opt/kafka_2.13-3.1.0/bin
#第二步:启动kafka。
./kafka-server-start.sh -daemon ../config/server.properties
第三步:查看kafka集群是否启动成功。
#第一步:进入其中一个ZooKeeper节点的bin目录下。
cd /opt/apache-zookeeper-3.6.2-bin/bin
#第二步:启动客户端。注意,这里的ip和端口要和我们的zookeeper配置一致。
./zkCli.sh -server 127.0.0.1:2181
#第三步:查看有几个kafka启动了,这里查看的是Kafka的broker.id。
ls /brokers/ids
#第四步:如果还想查看单个Kafka节点的状态,就指定好broker.id查看,我这里查看broker.id=0的那个Kafka节点的状态。
get /brokers/ids/0
附录:关闭kafka节点。
#在kafka的bin目录下执行关闭操作。
./kafka-server-stop.sh -daemon ../config/server.properties
命令集锦
查看kafka节点数量
#这个命令需要在zookeeper客户端查看。
ls /brokers/ids
查看所有消费者组
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
查看消费者组详情
#我这里查看appleGroup这个消费者组的详情。
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group appleGroup
创建主题
#iPhoneTopic是我自定义的主题名称。
#localhost表示kafka节点所在的机器ip。
#9092表示kafka节点提供服务的端口,这个端口需要与配置文件配置的端口一致。
#创建一个最简单的主题,该主题没有指定分区和副本因子,默认为1个分区,即0分区。默认1个副本因子。
./kafka-topics.sh --create --topic iPhoneTopic --bootstrap-server localhost:9092
#指定分区数量创建主题。我这里创建了主题名称为liNingShoesTopic,分区数量为3。
./kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 3 --topic liNingShoesTopic
#指定副本因子创建主题。我这里创建主题名称为liNingShortSleeveTopic,指定副本因子数量为3个。分区数量3个。
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic liNingShortSleeveTopic
扩容分区
#把liNingShortSleeveTopic主题扩容为6个分区。
#注意:目前不支持减少分区,扩容前必须存在这个主题。
./kafka-topics.sh -alter --partitions 6 --bootstrap-server localhost:9092 --topic liNingShortSleeveTopic
查看主题详情
#liNingShortSleeveTopic是我要查看的主题。
#localhost表示kafka节点所在的机器ip。
#9092表示kafka节点提供服务的端口,这个端口需要与配置文件配置的端口一致。
./kafka-topics.sh --describe --topic liNingShortSleeveTopic --bootstrap-server localhost:9092
查看所有主题
#localhost表示kafka节点所在的机器ip。
#9092表示kafka节点提供服务的端口,这个端口需要与配置文件配置的端口一致。
./kafka-topics.sh --list --bootstrap-server localhost:9092
删除主题
#iPhoneTopic是我要删除的主题。
#localhost表示kafka节点所在的机器ip。
#9092表示kafka节点提供服务的端口,这个端口需要与配置文件配置的端口一致。
./kafka-topics.sh --delete --topic iPhoneTopic --bootstrap-server localhost:9092
发送消息
#localhost表示我往本机的kafka节点发送消息。
#iPhoneTopic是我发送消息的主题。
./kafka-console-producer.sh --broker-list localhost:9092 --topic iPhoneTopic
监听&消费消息
#监听&消费一个主题。
#iPhoneTopic是我要消费消息所在的主题。
#注意:该命令只能实时消费,启动该消费命令后需要用生产者生产消息来验证。
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic iPhoneTopic
#监听&消费多个主题。
#消费多个消息用|隔开。比如我这里把iPhoneTopic和iPadTopic放到了监听的白名单中。
#注意:该命令只能实时消费,启动该消费命令后需要用生产者生产消息来验证。
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist "iPhoneTopic|iPadTopic"
#该命令可以消费以前的消息。
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic iPadTopic
#把消费者分配到appleGroup消费者组,并监听。
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=appleGroup --topic iPhoneTopic
版权归原作者 我的身前一尺是我的世界 所有, 如有侵权,请联系我们删除。