0


基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源

文章目录

04:数据源

  • 目标了解数据源的格式及实现模拟数据的生成
  • 路径- step1:数据格式- step2:数据生成
  • 实施- 数据格式image-20210905200540304消息时间发件人昵称发件人账号发件人性别发件人IP发件人系统发件人手机型号发件人网络制式发件人GPS收件人昵称收件人IP收件人账号收件人系统收件人手机型号收件人网络制式收件人GPS收件人性别消息类型双方距离消息msg_timesender_nickynamesender_accountsender_sexsender_ipsender_ossender_phone_typesender_networksender_gpsreceiver_nickynamereceiver_ipreceiver_accountreceiver_osreceiver_phone_typereceiver_networkreceiver_gpsreceiver_sexmsg_typedistancemessage2020/05/08 15:11:33古博易14747877194男48.147.134.255Android 8.0小米 Redmi K304G94.704577,36.247553莱优97.61.25.5217832829395IOS 10.0Apple iPhone 104G84.034145,41.423804女TEXT77.82KM天涯海角惆怅渡,牛郎织女隔天河。佛祖座前长顿首,只求共度一百年。- 数据生成- 创建原始文件目录mkdir /export/data/momo_init- 上传模拟数据程序cd /export/data/momo_initrzimage-20210905142015948- 创建模拟数据目录mkdir /export/data/momo_data- 运行程序生成数据- 语法java -jar /export/data/momo_init/MoMo_DataGen.jar 原始数据路径 模拟数据路径 随机产生数据间隔ms时间- 测试:每500ms生成一条数据java-jar /export/data/momo_init/MoMo_DataGen.jar \/export/data/momo_init/MoMo_Data.xlsx \/export/data/momo_data/ \500- 结果:生成模拟数据文件MOMO_DATA.dat,并且每条数据中字段分隔符为\001image-20210929100901349
  • 小结- 了解数据源的格式及实现模拟数据的生成

05:技术架构及技术选型

  • 目标掌握实时案例的技术架构及技术选型
  • 路径- step1:需求分析- step2:技术选型- step3:技术架构
  • 实施- 需求分析- 离线存储计算 - 提供离线T + 1的统计分析- 提供离线数据的即时查询- 实时存储计算 - 提供实时统计分析- 技术选型- 离线 - 数据采集:Flume- 离线存储:Hbase- 离线分析:Hive:复杂计算- 即时查询:Phoenix:高效查询- 实时 - 数据采集:Flume- 实时存储:Kafka- 实时计算:Flink- 实时应用:MySQL + FineBI 或者 Redis + JavaWeb可视化- 技术架构image-20210905162218286- 为什么不直接将Flume的数据给Hbase,而统一的给了Kafka,再由Kafka到Hbase? - 避免高并发写导致机器负载过高、实现架构解耦、实现异步高效- 保证数据一致性
  • 小结- 掌握实时案例的技术架构及技术选型

06:Flume的回顾及安装

  • 目标回顾Flume基本使用及实现Flume的安装测试
  • 路径- step1:Flume回顾- step2:Flume的安装- step3:Flume的测试
  • 实施- Flume的回顾- 功能:实时对文件或者网络端口进行数据流监听采集- 场景:文件实时采集- 开发 - step1:先开发一个配置文件:properties【K=V】- step2:运行这个文件即可- 组成 - Agent:一个Agent就是一个Flume程序- Source:负责监听数据源,将数据源的动态数据变成每一条Event数据,将Event数据流放入Channel- Channel:负责临时存储Source发送过来的数据,供Sink来取数据- Sink:负责从Channel拉取数据写入目标地- Event:代表一条数据对象 - head:Map集合[KV]- body:byte[]- Flume的安装- 上传安装包cd /export/software/rzimage-20210905162948401- 解压安装tar-zxvf apache-flume-1.9.0-bin.tar.gz -C /export/server/cd /export/servermv apache-flume-1.9.0-bin flume-1.9.0-bin- 修改配置#集成HDFS,拷贝HDFS配置文件cd /export/server/flume-1.9.0-bincp /export/server/hadoop/etc/hadoop/core-site.xml ./conf/#修改Flume环境变量cd /export/server/flume-1.9.0-bin/conf/mv flume-env.sh.template flume-env.shvim flume-env.sh ``````#修改22行exportJAVA_HOME=/export/server/jdk1.8.0_65#修改34行exportHADOOP_HOME=/export/server/hadoop-3.3.0- 删除Flume自带的guava包,替换成Hadoop的cd /export/server/flume-1.9.0-bin rm -rf lib/guava-11.0.2.jarcp /export/server/hadoop/share/hadoop/common/lib/guava-27.0-jre.jar lib/- 创建目录cd /export/server/flume-1.9.0-bin#程序配置文件存储目录mkdir usercase#Taildir元数据存储目录mkdir position- Flume的测试- 需求:采集聊天数据,写入HDFS- 分析- Source:taildir:动态监听多个文件实现实时数据采集- Channel:mem:将数据缓存在内存- Sink:hdfs- 开发vim /export/server/flume-1.9.0-bin/usercase/momo_mem_hdfs.properties``````# define a1a1.sources = s1 a1.channels = c1a1.sinks = k1#define s1a1.sources.s1.type = TAILDIR#指定一个元数据记录文件a1.sources.s1.positionFile = /export/server/flume-1.9.0-bin/position/taildir_momo_hdfs.json#将所有需要监控的数据源变成一个组a1.sources.s1.filegroups = f1#指定了f1是谁:监控目录下所有文件a1.sources.s1.filegroups.f1 = /export/data/momo_data/.*#指定f1采集到的数据的header中包含一个KV对a1.sources.s1.headers.f1.type = momoa1.sources.s1.fileHeader = true#define c1a1.channels.c1.type = memorya1.channels.c1.capacity = 10000a1.channels.c1.transactionCapacity = 1000#define k1a1.sinks.k1.type = hdfsa1.sinks.k1.hdfs.path = /flume/momo/test/daystr=%Y-%m-%da1.sinks.k1.hdfs.fileType = DataStream#指定按照时间生成文件,一般关闭a1.sinks.k1.hdfs.rollInterval = 0#指定文件大小生成文件,一般120 ~ 125M对应的字节数a1.sinks.k1.hdfs.rollSize = 102400#指定event个数生成文件,一般关闭a1.sinks.k1.hdfs.rollCount = 0a1.sinks.k1.hdfs.filePrefix = momoa1.sinks.k1.hdfs.fileSuffix = .loga1.sinks.k1.hdfs.useLocalTimeStamp = true#bounda1.sources.s1.channels = c1a1.sinks.k1.channel = c1- 启动HDFSstart-dfs.sh- 运行Flumecd /export/server/flume-1.9.0-binbin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_hdfs.properties -Dflume.root.logger=INFO,console- 运行模拟数据java -jar /export/data/momo_init/MoMo_DataGen.jar \/export/data/momo_init/MoMo_Data.xlsx \/export/data/momo_data/ \100- 查看结果image-20210905171157230
  • 小结- 回顾Flume基本使用及实现Flume的安装测试

07:Flume采集程序开发

  • 目标实现案例Flume采集程序的开发
  • 路径- step1:需求分析- step2:程序开发- step3:测试实现
  • 实施- 需求分析- 需求:采集聊天数据,实时写入Kafka- Source:taildir- Channel:mem- Sink:Kafka sinka1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092a1.sinks.k1.kafka.producer.acks = 1a1.sinks.k1.kafka.topic = mytopica1.sinks.k1.kafka.flumeBatchSize = 20a1.sinks.k1.kafka.producer.linger.ms = 1a1.sinks.k1.kafka.producer.compression.type = snappy- 程序开发vim /export/server/flume-1.9.0-bin/usercase/momo_mem_kafka.properties``````# define a1a1.sources = s1 a1.channels = c1a1.sinks = k1#define s1a1.sources.s1.type = TAILDIR#指定一个元数据记录文件a1.sources.s1.positionFile = /export/server/flume-1.9.0-bin/position/taildir_momo_kafka.json#将所有需要监控的数据源变成一个组a1.sources.s1.filegroups = f1#指定了f1是谁:监控目录下所有文件a1.sources.s1.filegroups.f1 = /export/data/momo_data/.*#指定f1采集到的数据的header中包含一个KV对a1.sources.s1.headers.f1.type = momoa1.sources.s1.fileHeader = true#define c1a1.channels.c1.type = memorya1.channels.c1.capacity = 10000a1.channels.c1.transactionCapacity = 1000#define k1a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.topic = MOMO_MSGa1.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092a1.sinks.k1.kafka.flumeBatchSize = 10a1.sinks.k1.kafka.producer.acks = 1a1.sinks.k1.kafka.producer.linger.ms = 100#bounda1.sources.s1.channels = c1a1.sinks.k1.channel = c1- 测试实现- 启动Kafkastart-zk-all.shstart-kafka.sh - 创建Topickafka-topics.sh --create--topic MOMO_MSG --partitions3 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092> 注意:Kafka2.11版本用–zookeeper 替代> kafka-topics.sh --create --topic MOMO_MSG --partitions 3 --replication-factor 2 --zookeeper node01:9092- 列举kafka-topics.sh --list --bootstrap-server node1:9092,node2:9092,node3:9092- 启动消费者kafka-console-consumer.sh --topic MOMO_MSG --bootstrap-server node1:9092,node2:9092,node3:9092- 启动Flume程序cd /export/server/flume-1.9.0-binbin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console- 启动模拟数据java-jar /export/data/momo_init/MoMo_DataGen.jar \/export/data/momo_init/MoMo_Data.xlsx \/export/data/momo_data/ \50- 观察结果在这里插入图片描述
  • 小结- 实现案例Flume采集程序的开发
标签: flume kafka hbase

本文转载自: https://blog.csdn.net/xianyu120/article/details/133811713
版权归原作者 大模型Maynor 所有, 如有侵权,请联系我们删除。

“基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源”的评论:

还没有评论