0


kafka常用命令

kafka常用命令

1、启动kafka服务
  1. bin/kafka-server-start.sh config/server.properties &
2、停止kafka服务
  1. ./kafka-server-stop.sh
3、创建一个叫demo-topic的主题(topic),有两个分区,每个分区3个副本,同时指定该主题的消息保留时长(72小时)
  1. ./kafka-topics.sh --zookeeper(host:port)--create--topic demo-topic --replication-factor 3--partitions2--configretention.ms=259200000
4、列出指定主题(topic)的详细信息
  1. ./kafka-topics.sh --zookeeper(host:port)--describe--topic demo-topic
5、查看所有的主题
  1. ./kafka-topics.sh --list --zookeeper(host:port) kafka-host(host:port)
6、查看所有主题的详细信息
  1. ./kafka-topics.sh --zookeeper(host:port)--describe
7、删除一个主题
  1. ./kafka-topics.sh --zookeeper(host:port)--topic demo-topic --delete
8.向kafka指定topi写入数据
  1. ./kafka-console-producer.sh --broker-list kafka-host(host:port)--topic demo-topic
9、命令行消费某个topic消息
  1. #加了--from-beginning 从头消费所有消息
  2. ./kafka-console-consumer.sh --bootstrap-server kafka-host(host:port)--topic demo-topic --from-beginning
  3. #不加--from-beginning 从最新的一条消息开始消费
  4. ./kafka-console-consumer.sh --bootstrap-server kafka-host(host:port)--topic demo-topic
10、查看某个topic对应的消息数量
  1. time为-1时表示最大值,time为-2时表示最小值 --partitions num 指定分区
  2. ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka-host(host:port)--topic demo-topic --time-1
11、kafka重置分组已经消费的偏移量offest
  1. ./kafka-consumer-groups.sh --bootstrap-server=kafka-host(host:port)--execute --reset-offsets --topic=demo-topic --group=testPlatform --to-earliest
12、topic增加分区
  1. ./kafka-topics.sh --alter --zookeeper(host:port)--topic demo-topic --partitions12
13、指定topic创建消费者分组
  1. ./kafka-console-consumer.sh --bootstrap-server=kafka-host(host:port) --topic demo-topic --consumer-property group.id=testPlatform
14、查看消费组组所属topic的消费情况
  1. ./kafka-consumer-groups.sh --bootstrap-server=kafka-host(host:port)--group=demo-group --describ
15、显示所有消费者
  1. ./kafka-consumer-groups.sh --bootstrap-serverkafka-host(host:port)--list
16、获取正在消费的topic的group的offset
  1. ./kafka-consumer-groups.sh --describe--group demo-group --bootstrap-serverkafka-host(host:port
17、重设 consumer group的offset
  1. #确定topic作用域:
  2. --all-topics consumer group下所有topic的所有分区调整位移
  3. --topic t1 --topic t2 为指定的若干个topic的所有分区调整位移
  4. --topic t1:0,1,2 为指定的topic分区调整位移
  5. #确定位移重设策略
  6. --to-current 把位移调整到分区当前位移.
  7. --to-datetime <String: datetime> 把位移调整到大于给定时间的最早位移处.datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'2020-07-01T12:00:00.000
  8. --to-earliest 把位移调整到分区当前最小位移
  9. --to-latest 把位移调整到分区当前最新位移
  10. --to-offset <Long: offset> 把位移调整到指定位移处
  11. --shift-by N 把位移调整到当前位移 + N处,注意N可以是负数,表示向前移动
  12. --by-duration <duration>:把位移调整到距离当前时间指定间隔的位移处,duration格式是PnDTnHnMnS,比如PT0H5M0S
  13. --from-file <file>:从CSV文件中读取调整策略
  14. 例:(--dry-run 不运行只查看结果,类似k8s里面;--execute执行)
  15. #按时间点重置(--to-datetime)
  16. ./kafka-consumer-groups.sh --bootstrap-server=localhost:9092 --topic=test --group=testPlatform --execute --reset-offsets --to-datetime 2022-01-18T12:00:00.000
  17. offset重置 (--to-offset )#重置消费组下指定topic
  18. ./kafka-consumer-groups.sh --bootstrap-server=localhost:9092 --topic=test --group=testPlatform --execute --reset-offsets --to-offset 359905139#重置消费组下面所以topic
  19. ./kafka-consumer-groups.sh --bootstrap-server=localhost:9092 --all-topics --group testPlatform --reset-offsets --to-latest --execute
18、指定offset与partition导出消息
  1. ./kafka-console-consumer.sh --bootstrap-server=kafka-host(host:port)--topic--topic=demo-topic --offset825000--partition0>> messages.log
19.修改topic的参数
  1. kafka-configs.sh --zookeeper(host:port) --entity-type topics --entity-name demo-topic --alter --add-config max.message.bytes=1048576
20.测试生产者性能脚本
  1. ./kafka-producer-perf-test.sh --topic demo-topic --num-records 10000000--throughput-1 --record-size 1024 --producer-props bootstrap.servers=kafka-host(host:port)acks=-1 linger.ms=2000compression.type=lz4
21.测试消费者性能脚本
  1. ./kafka-consumer-perf-test.sh --broker-list kafka-host(host:port)--messages10000000--topic demo-topic
标签: kafka java 大数据

本文转载自: https://blog.csdn.net/chenlei0520/article/details/128934717
版权归原作者    所有, 如有侵权,请联系我们删除。

“kafka常用命令”的评论:

还没有评论