文章目录
1.消息收发的基本概念
消息在Kafka消息队列中发送和接收过程如下图所示:
消息生产者Producer产生消息数据,发送到Kafka消息队列中,一台Kafka节点只有一个Broker,消息会存储在Kafka的Topic(主题中),不同类型的消息数据会存储在不同的Topic中,可以利用Topic实现消息的分类,消息消费者Consumer会订阅消息存储的Topic,从Topic中读取/接收消息数据,不同的消费者可以订阅不同的Topic。
消息收发的大致流程:生产者产生消息数据—>写入到Kafka的Topic中—>消费者从Topic中读取消息数据。
Kafka消息队列中自带了Producer生产者和Consumer消费者两种命令客户端工具,我们可以通过这两种工具模拟消息的收发。
Producer和Consumer两种命令行工具都可以从本地文件中读取内容,也可以直接在命令行工具中输入内容,然后将这些内容以消息的形式发送到Kafka消息队列中,在默认情况下,文件中的每一行或者命令行中的每一行数据都会被当做一条独立的消息数据,在发送和接收消息数据时都需要指定Kafka的地址和消息存储的Topic。
2.使用Kafka模拟消息的发送和接收
2.1.创建消息数据存储的Topic主题
1.创建topic
[root@kafka bin]# ./kafka-topics.sh --create --zookeeper 192.168.81.210:2181 --replication-factor 1 --partitions 1 --topic test_topic1
Created topic test_topic1.
`--replication-factor`:指定副本数量
`--partitions`:指定分区数量
2.查看创建的Topic
[root@kafka bin]# ./kafka-topics.sh --list --zookeeper 192.168.81.210:2181
test_topic1
2.2.发送消息数据
使用
kafka-console-producer.sh
命令可以模拟发送者Producer发送消息数据,可以从本地文件中读取内容也可以从命令行中直接输入内容,这些内容会以消息的形式发送到Kafka集群。
执行该命令时需要指定Kafka消息队列的地址,还需要指定要将消息存储在哪一个Topic中,这个Topic需要事先存在。
命令成功执行后会弹出">"符号,此时我们就可以发送消息内容了,一行表示一条消息。
[root@kafka bin]# ./kafka-console-producer.sh --broker-list 192.168.81.210:9092 --topic test_topic1
>hello
>hi jiangxl
>jiangxl very good
2.3.消费消息数据
同理,使用
kafka-console-consumer.sh
命令可以模拟消费者Consumer接收消息数据,将接收到的消息数据打印在终端,默认情况下消费的是Topic中最新的消息数据。
消费消息数据的方式有两种:
- 从Topic中最后一条消息的偏移量(offset)+1处开始消费。 - 所谓的偏移量其实就是一个位置的概念,偏移量+1指的是从Topic中最后一条消息然后再加一条消息处开始消费- 例如当Topic中有3条消息数据,默认情况下会从第四条消息开始消费,第四条消息还没有产生,因此消费者会处于等待的状态,等待发送者产生新的消息数据。- 可以理解为默认情况下,消费者从最新的消息数据处开始消费。
- 从Topic最初开始消费。 - 直接从第一条消息数据处开始消费。
1)从Topic主题中最后一条消息偏移量+1处开始消费
2.生产者产生新的消息数据
[root@kafka bin]# ./kafka-console-producer.sh --broker-list 192.168.81.210:9092 --topic test_topic1
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
>hello
>hi jiangxl
>jiangxl very good
>
>new message1 #产生新的消息数据
>new message2
>new message3
1.先开启消费者等待消息消费
[root@kafka bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.81.210:9092 --topic test_topic1
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
new message1
new message2
new message3
2)从Topic中第一条消息数据处开始消费
加上
--from-beginning
表示从顶部开始消费消息数据。
[root@kafka bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.81.210:9092 --from-beginning --topic test_topic1
hello
hi jiangxl
jiangxl very good
new message1
new message2
new message3
版权归原作者 Jiangxl~ 所有, 如有侵权,请联系我们删除。