文章目录
前言
在现今的大数据时代,数据流的处理与分析成为了许多企业和组织的核心需求。Apache Kafka作为一个分布式流处理平台,以其高性能、高吞吐量和可靠性在大数据领域崭露头角。它允许在分布式系统中处理和传输实时的数据流,为数据处理提供了全新的解决方案。
对于Kafka的初学者和开发者来说,熟练掌握其基础操作至关重要。为此,本文将带领读者走进Kafka的世界,深入探讨主题的创建、管理,以及消息的发送与接收等核心操作。希望通过本文的学习,读者能够对Kafka有更深入的理解,并能在实际开发中运用自如,充分发掘Kafka的潜力,为大数据处理助力。
一、创建主题
在Kafka中,需要先创建主题(topics)才能进行数据的发送和接收。
创建主题位置: kafka/bin
./kafka-topics.sh --bootstrap-server hadoop01:9092 --create --topic test1 --partitions 3 --replication-factor 2
解释:这里我们创建了一个 名为test1的主题,并为其设置了 3 个分区 2 个副本
–replication-factor 2 #复制两份
–partitions 3 #创建3个分区
–topic #主题名为test1
二、查看主题
./kafka-topics.sh --list --bootstrap-server hadoop01:9092
三、删除主题
kafka-topics.sh --bootstrap-server hadoop01:9092 --delete --topic test1 # 删除test1这个topic
四、 压力测试
./kafka-producer-perf-test.sh --topic test1 --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=hadoop01:9092,hadoop02:9092,hadoop03:9092 batch.size=4096 linger.ms=0
测试结果
五、发送和接收消息
Kafka提供了命令行工具来发送和接收消息。
消息收发的大致流程:生产者产生消息数据一>写入到Kafkal的Topic中一>消费者从Topic中读取消息数据。
发送消息
生产者(Producer)创建消息数据。
生产者将消息数据发送到Kafka的消息队列中。Kafka的节点包含一个或多个Broker,消息会存储在这些Broker中的Topic(主题)里面。不同类型的消息数据可以存储在不同的Topic中,这样可以利用Topic实现消息的分类。
在发送消息时,生产者还可以选择一些参数进行配置,例如批处理大小(batch.size)和延迟(linger.ms)等,来优化性能和吞吐量。
创建消费文件 :vi commodity_click-kafka-men-hdfs.conf
#sources
a1.sources = s1
a1.sources.s1.type =org.apache.flume.source.kafka.KafkaSource
a1.sources.s1.batchSize =100
a1.sources.s1.batchDurationMillis =2000
a1.sources.s1.kafka.topics = commodity
a1.sources.s1.kafka.bootstrap.servers =192.168.6.45:9092,192.168.6.46:9092,192.168.6.47:9092
a1.sources.s1.kafka.consumer.group.id = first_test
a1.sources.s1.kafka.consumer.auto.offset.reset = earliest
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity =10000
a1.channels.c1.transactionCapacity =10000
a1.channels.c1.byteCapacityBufferPercentage =20
a1.channels.c1.byteCapacity =800000
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path =/flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = commodity-
a1.sinks.k1.hdfs.round =false
a1.sinks.k1.hdfs.roundValue =0
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval =0
a1.sinks.k1.hdfs.rollSize =0
a1.sinks.k1.hdfs.rollCount =10000
a1.sinks.k1.hdfs.useLocalTimeStamp =false
a1.sinks.k1.hdfs.fileType =DataStream
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
创建生产者:vi commodity_click-exec-men-kafka.conf
##别名
a1.sources = r1
a1.channels = c1
a1.sinks =k1
#source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F/usr/local/data/111.txt
#channels
a1.channels.c1.type = memory
a1.channels.c1.capacity =10000
a1.channels.c1.transactionCapacity =10000
a1.channels.c1.byteCapacityBufferPercentage =20
a1.channels.c1.byteCapacity =800000
#sink
a1.sinks.k1.type =org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = commodity
a1.sinks.k1.kafka.bootstrap.servers =192.168.6.45:9092,192.168.6.46:9092,192.168.6.47:9092
a1.sinks.k1.kafka.flumeBatchSize =20
a1.sinks.k1.kafka.producer.acks =1
a1.sinks.k1.kafka.producer.linger.ms =50
#连接
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
注意原理:先消费在生产
生产
flume-ng agent -n a1 -c ../conf/-f commodity_click-exec-men-kafka.conf -Dflume.root.logger=INFO,console
消费
flume-ng agent -n a1 -c ../conf/-f commodity_click-kafka-men-hdfs.conf -Dflume.root.logger=INFO,console
结果展示
接收消息
消费者(Consumer)订阅存储在Kafka的Topic中的消息。
消费者从订阅的Topic中读取/接收消息数据。不同的消费者可以订阅不同的Topic。
可以使用以下命令来发送一条消息到“test1”主题:
Kafka发送和接收消息的过程涉及生产者(Producer)和消费者(Consumer)。
以下是Kafka发送和接收消息的一般流程:
$ echo "hello, kafka" | ./bin/kafka-console-producer.sh --topic test1 --bootstrap-server localhost:9092
可以使用以下命令来从“test”主题中读取消息:
$ bin/kafka-console-consumer.sh --topic test1 --from-beginning --bootstrap-server localhost:9092
其中,–from-beginning参数表示从最早的消息开始读取。如果不加该参数,则只会读取从当前时间开始的新消息。
总结
本文介绍了Kafka中的主题创建、查看、删除以及消息发送和接收等基本操作。通过命令行工具进行主题管理和消息收发,可以方便地进行Kafka的使用和管理。在进行消息收发时,生产者将消息数据发送到Kafka的消息队列中,消费者从订阅的Topic中读取/接收消息数据。通过参数配置,可以优化Kafka的性能和吞吐量。Kafka的可靠性、高性能和分布式特性使其成为了大数据领域中的重要组件之一。以上就是本篇文章的全部内容,希望能够帮助到大家!
版权归原作者 提醒一下哟 所有, 如有侵权,请联系我们删除。