kafka命令行操作主要分为三大类: 主题命令行操作、生产者命令行操作、消费者命令行操作。
注意: 命令行操作前提,启动kafka集群。
1. 主题命令行操作
1.1 查看主题命令行参数
a) 查询命令
[root@hadoop102 ~]# /opt/module/kafka/bin/kafka-topics.sh
b)参数列表
参数
描述
--bootstrap-server<String:server toconnect to>
连接的Kafka Broker主机名称和端口号
--topicString:topic
操作的topic名称
--create
创建主题
--delete
删除主题
--alter
修改主题
--list
查看所有主题
--describle
查看主题详细描述
--partitions<Integer:#of partitions>
设置分区数
--replication-factor<Integer:replication factor>
设置分区副本
--configString:name=value
更新系统默认的配置
1.2 查看当前主机的所有主题(topic)
# 防止kafka宕机可以使用两台
[root@hadoop102 ~]# /opt/module/kafka/bin/kafka-topics.sh --bootstrap-server hadoop102:9092,hadoop103:9092 --list
# 测试一般使用一台即可
[root@hadoop102 ~]# /opt/module/kafka/bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list
报错问题:kafka没有启动
产生原因:
之前设置的zk的clientPort值不同导致kafka无法注册到zk上,启动后无法注册导致宕机。
解决:
zk集群的所有主机的配置文件中的配置项clientPort都要设置为一样的,zk的默认端口是2181,都设置为clientPort=2181(必须是相同的)
还没有主题
1.3 创建主题
a)创建一个first主题并指定分区为1,分区副本3份。
[root@hadoop102 ~]# /opt/module/kafka/bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --create --partitions 1 --replication-factor 3
b)查看所有的主题
[root@hadoop102 ~]# /opt/module/kafka/bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list
c)查看指定主题的详细信息
[root@hadoop102 ~]# /opt/module/kafka/bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --topic first --describe
1.4 修改分区数
注意: 分区数只能增加,不能减少
[root@hadoop102 ~]# /opt/module/kafka/bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3
查看当前分区详情
发现分区数改变
1.5 删除主题
[root@hadoop102 ~]# /opt/module/kafka/bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --delete --topic first
2. 生产者与消费者命令操作
2.1 查看命令参数
1) 查看生产者命令参数
[root@hadoop102 kafka]# bin/kafka-console-producer.sh
参数
描述
--bootstrap-server <String: server toconnect to>
连接的 Kafka Broker 主机名称和端口号
--topic <String: topic>
操作的 topic 名称
** 2)查看消费者命令参数**
[root@hadoop102 kafka]# bin/kafka-console-consumer.sh
参数
描述
--bootstrap-server <String: server toconnect to>
连接的 Kafka Broker 主机名称和端口号
--topic <String: topic>
操作的 topic 名称
--from-beginning
从头开始消费
--group <String: consumer group id>
指定消费者组名称
2.2 发送消息与消费消息
使用hadoop102生产消息,hadoop103消费消息。
1)生产消息
一次只能发送一条,多发送会报错,等消费者消费完可以发送之后的消息。
[root@hadoop102 kafka]# bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
开启后需要等待,等待消费者开启
2)消费消息
消费 first 主题中的数据
注意:新创建消费者无法消费之前的历史记录,只能消费生产者新产生的记录
[root@hadoop103 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
3) 测试
生产者发送一条消息
这时消费者消费一条
4)查询消费者消费记录
把主题中所有的数据都读取出来(包括历史数据)
[root@hadoop102 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first
版权归原作者 +uuid+ 所有, 如有侵权,请联系我们删除。