一、创建topic
装有kafka的系统命令行界面执行以下命令创建名为test的topic
kafka-topics --create --zookeeper localhost:2180 --replication-factor 2 --partitions 3 --topic test
说明:
--create 表示要对kafka执行创建的操作
--zookeeper 指定自己的zookeeper连接地址,这里是localhost:2180
--replication-factor 指定保存数据的副本数,这里是2,可以根据自己需求指定
--partitions 指定保存数据的分区数,这里是3,可以根据自己需求指定
--topic 指定要创建的topic(kafka里面的表)名字
二、kafka命令行消费者
在kafka节点执行kafka-console-consumer命令行工具
kafka-console-consumer --bootstrap-server test1:9192,test2:9192,test3:9192 --topic test
说明:
--bootstrap-server 指定kafka的块地址,这里是 test1:9192,test2:9192,test3:9192 需要替换自己的kafka块连接地址。(ps:只要网络可以通信,使用该命令行消费工具也可以指定其他kafka集群的topic进行消费)
--topic 指定需要进行消费的kafka topic
其他参数:
--from-beginning 添加该参数表示从topic的最开始位置进行消费,不指定则默认从topic的最新位置进行消费
三、kafka命令行生产者
kafka-console-producer --broker-list test1:9092,test2:9092,test3:9092 --topic test
说明:
--broker-list 指定kafka的块地址,这里是 test1:9192,test2:9192,test3:9192 需要替换自己的kafka块连接地址。(ps:只要网络可以通信,使用该命令行生产者工具也可以指定其他kafka集群的topic进行生产数据)
--topic 指定生产数据的kafka topic
四、删除topic
如果要删除topic和数据块,需要设置kafka的配置文件server.properites
添加delete.topic.enable=true
然后重启kafka。如果只是逻辑删除topic,并不删除数据块,则可以不配置。
kafka-topics --zookeeper localhost:2181 --topic test --delete
说明:
--delete 表示要对kafka执行删除的操作
--zookeeper 指定自己的zookeeper连接地址,这里是localhost:2180
--topic 指定要删除的topic名字
五、kafka单个topic增加配置
这里以单独把topic test的数据保存天数设置为永久为例。
kafka-configs --zookeeper localhost:2181 --alter --add-config 'retention.ms=-1' --entity-name test --entity-type topics
说明:
--zookeeper 指定自己的zookeeper连接地址,这里是localhost:2180
--alter 表示要进行配置修改命令
--add-config 要添加的键值对配置。方括号可用于对包含逗号的值进行分组:'k1 = v1, k2 = [v1, v2, v3], k3 = v3'
--entity-name 要操作实体的名称,topic名、客户端id、userID、brokerID
--entity-type 实体配置的类型可以是topics、clients、users、brokers
版权归原作者 大数据动物园 所有, 如有侵权,请联系我们删除。