0


大数据-228 离线数仓 - Flume 自定义拦截器(续接上节) 编写代码 日志采集小结

点一下关注吧!!!非常感谢!!持续更新!!!

Java篇开始了!

目前开始更新 MyBatis,一起深入浅出!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(已更完)
  • ClickHouse(已更完)
  • Kudu(已更完)
  • Druid(已更完)
  • Kylin(已更完)
  • Elasticsearch(已更完)
  • DataX(已更完)
  • Tez(已更完)
  • 数据挖掘(已更完)
  • Prometheus(已更完)
  • Grafana(已更完)
  • 离线数仓(正在更新…)

章节内容

上节我们完成了如下的内容:

  • 自定义拦截器(续接上节)
  • 采集启动日志和事件日志

在这里插入图片描述

日志数据采集小结

(总结前置,下面是正文)

在 Apache Flume 中,拦截器(Interceptor)是数据流管道的一个关键组件,它允许在事件(Event)进入 Flume Channel 之前对其进行修改或过滤。通过自定义拦截器,你可以实现特定的业务逻辑,如数据过滤、字段添加或修改、格式转换等。
自定义拦截器 是指用户根据需求自行编写 Java 代码来扩展 Flume 的功能,而不是使用默认的拦截器。

  • 使用 taildir source监控指定多个目录,可以给不同目录的日志加上不同Header
  • 在每个目录上可以使用正则匹配多个文件
  • 使用自定义拦截器,主要功能是从JSON串种获取时间戳,加到event的header中
  • hdfs sink使用event header中的信息写数据(控制写文件的位置)
  • hdfs文件的滚动方式(基于文件大小、基于event数量、基于时间)
  • 调节Flume JVM内存的分配

工作原理

  • 事件生成(Source): 数据从外部系统通过 Source 进入 Flume。
  • 拦截器(Interceptor): 在 Source 和 Channel 之间拦截数据,执行预处理、过滤或增强。
  • 传输(Channel): 处理后的事件被传递到 Channel,等待被 Sink 消费。
  • 消费(Sink): 最终将数据写入目标系统(如 HDFS、Kafka 等)。

开发和部署注意事项

  • 依赖管理: 开发自定义拦截器需要依赖 Flume 的核心库,如 flume-ng-core 和 flume-ng-sdk。
  • 测试: 在本地测试拦截器逻辑,确保其功能正确,性能符合预期。
  • 部署: 将 JAR 文件上传至 Flume Agent 的 lib 目录并重启 Flume 服务。
  • 性能监控: 自定义拦截器可能会影响 Flume 的性能,尤其是在拦截逻辑复杂的情况下。建议在生产环境中监控资源使用情况。

采集启动日志和事件日志

上节我们完成了Agent 的配置,接来我们继续。

自定义拦截器

编码完成后打包上传到服务器,放置在 $FLUME_HOME/lib

编写代码

packageicu.wzk;importcom.alibaba.fastjson.JSON;importcom.alibaba.fastjson.JSONArray;importcom.alibaba.fastjson.JSONObject;importorg.apache.flume.Context;importorg.apache.flume.Event;importorg.apache.flume.interceptor.Interceptor;importjava.nio.charset.StandardCharsets;importjava.time.Instant;importjava.time.LocalDateTime;importjava.time.ZoneId;importjava.time.format.DateTimeFormatter;importjava.util.ArrayList;importjava.util.List;importjava.util.Map;publicclassLogTypeInterceptorimplementsInterceptor{@Overridepublicvoidinitialize(){}@OverridepublicEventintercept(Event event){// 获取Event的bodyString eventBody =newString(event.getBody(),StandardCharsets.UTF_8);// 获取Event的HeaderMap<String,String> headerMap = event.getHeaders();// 解析body获取JSON串String[] bodyArr = eventBody.split("\\s+");try{String jsonStr = bodyArr[6];String timestampStr ="";JSONObject jsonObject = JSON.parseObject(jsonStr);if(headerMap.getOrDefault("logtype","").equals("start")){// 取启动时间戳
                jsonObject.getJSONObject("app_active").getString("time");}elseif(headerMap.getOrDefault("logtype","").equals("event")){// 取事件日志第一条记录的时间戳JSONArray jsonArray = jsonObject.getJSONArray("wzk_event");if(jsonArray.size()>0){
                    timestampStr = jsonArray.getJSONObject(0).getString("time");}}// 将时间戳转换为 yyyy-MM-ddlong timestamp =Long.parseLong(timestampStr);DateTimeFormatter formatter =DateTimeFormatter.ofPattern("yyyy-MM-dd");Instant instant =Instant.ofEpochMilli(timestamp);LocalDateTime localDateTime =LocalDateTime.ofInstant(instant,ZoneId.systemDefault());String date = formatter.format(localDateTime);// 转换后将字符串放置到Header中
            headerMap.put("logtime", date);
            event.setHeaders(headerMap);}catch(Exception e){
            headerMap.put("logtime","Unknown");
            event.setHeaders(headerMap);}return event;}@OverridepublicList<Event>intercept(List<Event> events){List<Event> lstEvent =newArrayList<>();for(Event event: events){Event outEvent =intercept(event);if(outEvent !=null){
                lstEvent.add(outEvent);}}return lstEvent;}@Overridepublicvoidclose(){}publicstaticclassBuilderimplementsInterceptor.Builder{@OverridepublicInterceptorbuild(){returnnewLogTypeInterceptor();}@Overridepublicvoidconfigure(Context context){}}}

打包项目

mvn clean package

打包结果如下,我们需要将“”上传到服务器中:
在这里插入图片描述

启动测试

flume-ng agent --conf-file /opt/wzk/flume-conf/flume-log2hdfs3.conf -name a1 -Dflume.roog.logger=INFO,console

启动的结果如下图所示,如果你缺什么文件夹之类的,自己创建出来:
在这里插入图片描述

测试结果

写入log文件:

vim /opt/wzk/logs/start/test.log

写入的内容如下所示:

2020-08-02 18:19:32.959 [main] INFO icu.wzk.ecommerce.AppStart - {"app_active":{"name":"app_active","json":{"entry":"1","action":"0","error_code":"0"},"time":1596342840284},"attr":{"area":"大庆","uid":"2F10092A2","app_v":"1.1.15","event_type":"common","device_id":"1FB872-9A1002","os_type":"2.8","channel":"TB","language":"chinese","brand":"iphone-8"}}

写入的结果如下图所示:
在这里插入图片描述
写入log文件:

vim /opt/wzk/logs/event/test.log

写入的内容如下所示:

2020-08-02 18:20:11.877 [main] INFO icu.wzk.ecommerce.AppEvent - {"wzk_event":[{"name":"goods_detail_loading","json":{"entry":"1","goodsid":"0","loading_time":"93","action":"3","staytime":"56","showtype":"2"},"time":1596343881690},{"name":"loading","json":{"loading_time":"15","action":"3","loading_type":"3","type":"1"},"time":1596356988428},{"name":"notification","json":{"action":"1","type":"2"},"time":1596374167278},{"name":"favorites","json":{"course_id":1,"id":0,"userid":0},"time":1596350933962}],"attr":{"area":"长治","uid":"2F10092A4","app_v":"1.1.14","event_type":"common","device_id":"1FB872-9A1004","os_type":"0.5.0","channel":"QL","language":"chinese","brand":"xiaomi-0"}}

写入的结果如下图所示:
在这里插入图片描述

查看结果

控制台已经输出了结果:
在这里插入图片描述

我们查看HDFS,也输出了对应的内容出来:
在这里插入图片描述

生产环境

生产环节中,推荐使用:

nohup flume-ng agent --conf-file /opt/wzk/flume-conf/flume-log2hdfs3.conf -name a1 -Dflume.roog.logger=INFO,console > dev/null 2>&1&
  • nohup 该命令允许用户退出账户、关闭终端之后还继续运行相应的进程
  • /dev/null 代表Linux的空设备文件,所有往这个文件里面写入的内容都会丢失,也称黑洞
  • 标准输入0,从键盘获得输入 /proc/self/fd/0
  • 标准输出1,输出到屏幕(控制台)/proc/self/fd/1
  • 错误输出2,输出到屏幕(控制台)/proc/self/fd/2
  • /dev/null 标准输出1重定向到 /dev/null 中,此时标准输出不存在,没有任何地方能够找到输出的内容
  • 2>&1 错误输出将会和标准输出输出到同一个地方
  • /dev/null 2>&1 不会输出任何信息到控制台,也不会有任何信息输出到文件中
标签: 大数据 flume java

本文转载自: https://blog.csdn.net/w776341482/article/details/143871841
版权归原作者 武子康 所有, 如有侵权,请联系我们删除。

“大数据-228 离线数仓 - Flume 自定义拦截器(续接上节) 编写代码 日志采集小结”的评论:

还没有评论