一、Kafka作为Source 【数据进入到kafka中,抽取出来】
1、在我的flume的conf文件夹下,有个myconf文件夹:
2、 创建一个flume脚本文件: kafka-memory-logger.conf
flume学习网站:Flume 1.9用户手册中文版 — 可能是目前翻译最完整的版本了 (liyifeng.org)
# 来到这个目录下
cd /opt/installs/flume/conf/myconf
# 创建一个conf文件
vi kafka-memory-logger.conf
在kafka-memory-logger.conf文件中写入:
a1.sources = r1
a1.channels = c1
a1.sinks=k1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092
a1.sources.r1.kafka.topics = bigdata
a1.sources.r1.kafka.consumer.group.id = text7
a1.sources.r1.batchSize = 100
a1.sources.r1.batchDurationMillis = 2000
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.type = logger
a1.sinks.k1.maxBytesToLog = 128
3、测试
启动一个消息生产者,向topic中发送消息,启动flume,接收消息
- 启动一个消息生产者,向topic中发送消息:
kafka-console-producer.sh --bootstrap-server bigdata01:9092 --topic bigdata
- 启动flume,接收消息
flume-ng agent -n a1 -c ../ -f kafka-memory-logger.conf -Dflume.root.logger=INFO,console
二、kafka作为Sink 【数据从别的地方抽取到kafka里面】
1、创建一个flume脚本文件:flume-kafka-sink.conf
在flume-kafka-sink.conf文件中写入:
a1.sources = r1
a1.channels = c1
a1.sinks=k1
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = bigdata01
a1.sources.r1.port = 44444
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = bigdata
a1.sinks.k1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
2、测试
启动:
flume-ng agent -n a1 -c ../ -f flume-kafka-sink.conf -Dflume.root.logger=INFO,console
使用telnet命令,向端口发送消息:
yum -y install telnet
telnet bigdata01 44444
在窗口不断的发送文本数据,数据被抽取到了kafka中,如何获取kafka数据呢?使用消费者:
kafka-console-consumer.sh --topic bigdata --bootstrap-server bigdata01:9092
版权归原作者 songqq27 所有, 如有侵权,请联系我们删除。