0


flume对kafka中数据的导入导出、datax对mysql数据库数据的抽取

要求:

1、flume自定义拦截器

抽取trans_info.json的数据到kafka上,对其中的tr_flag=0的数据进行过滤抛弃,只保留正常的状态数据

在pom.xml中放入依赖包:

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>3.0.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.slf4j</groupId>
  8. <artifactId>slf4j-log4j12</artifactId>
  9. <version>1.7.25</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.apache.hadoop</groupId>
  13. <artifactId>hadoop-common</artifactId>
  14. <version>2.6.5</version>
  15. </dependency>
  16. <dependency>
  17. <groupId>org.apache.hadoop</groupId>
  18. <artifactId>hadoop-hdfs</artifactId>
  19. <version>2.6.5</version>
  20. </dependency>
  21. <dependency>
  22. <groupId>org.apache.hadoop</groupId>
  23. <artifactId>hadoop-client</artifactId>
  24. <version>2.6.5</version>
  25. </dependency>
  26. <dependency>
  27. <groupId>org.apache.flume</groupId>
  28. <artifactId>flume-ng-core</artifactId>
  29. <version>1.9.0</version>
  30. </dependency>
  31. <dependency>
  32. <groupId>org.apache.flume</groupId>
  33. <artifactId>flume-ng-sdk</artifactId>
  34. <version>1.9.0</version>
  35. </dependency>
  36. <dependency>
  37. <groupId>com.alibaba</groupId>
  38. <artifactId>fastjson</artifactId>
  39. <version>1.2.48</version>
  40. </dependency>

使用java代码,自定义拦截器:

  1. package com.bigdata.yuekao04;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONArray;
  4. import com.alibaba.fastjson.JSONObject;
  5. import org.apache.flume.Context;
  6. import org.apache.flume.Event;
  7. import org.apache.flume.interceptor.Interceptor;
  8. import org.codehaus.jackson.JsonNode;
  9. import org.codehaus.jackson.map.ObjectMapper;
  10. import java.io.IOException;
  11. import java.util.ArrayList;
  12. import java.util.HashMap;
  13. import java.util.List;
  14. public class DemoInterceptor implements Interceptor {
  15. @Override
  16. public void initialize() {
  17. }
  18. @Override
  19. public Event intercept(Event event) {
  20. try {
  21. // 获取事件体中的数据(假设数据是JSON格式存储在事件体中)
  22. String data = new String(event.getBody());
  23. // 使用Jackson将JSON字符串解析为JsonNode对象
  24. ObjectMapper objectMapper = new ObjectMapper();
  25. JsonNode jsonNode = objectMapper.readTree(data);
  26. // 获取tr_flag的值
  27. int trFlag = jsonNode.get("tr_flag").asInt();
  28. // 如果tr_flag不等于0,保留该事件
  29. if (trFlag!= 0) {
  30. return event;
  31. }
  32. } catch (IOException e) {
  33. e.printStackTrace();
  34. }
  35. // 如果tr_flag等于0,返回null,表示过滤掉该事件
  36. return null;
  37. }
  38. @Override
  39. public List<Event> intercept(List<Event> list) {
  40. return null;
  41. }
  42. @Override
  43. public void close() {
  44. }
  45. public static class BuilderEvent implements Builder{
  46. @Override
  47. public Interceptor build() {
  48. return new DemoInterceptor();
  49. }
  50. @Override
  51. public void configure(Context context) {
  52. }
  53. }
  54. }

打包java代码,放入/flume/lib下面

2、创建topic为yuekao的主题,并使用flume将数据抽取到该主题的kafka中

编写conf文件(yuekao04.conf),将数据抽取到kafka新创建的主题中:

  1. # 定义Flume agent名称
  2. a1.sources = r1
  3. a1.sinks = k1
  4. a1.channels = c1
  5. # 配置source
  6. a1.sources.r1.type = TAILDIR
  7. #以空格分隔的文件组列表。每个文件组表示要跟踪的一组文件
  8. a1.sources.s1.filegroups = f1
  9. #文件组的绝对路径
  10. a1.sources.s1.filegroups.f1=/home/trans_info1.json
  11. a1.sources.r1.interceptors = i1
  12. a1.sources.r1.interceptors.i1.type = com.bigdata.DemoInterceptor$Builder
  13. # 配置channel
  14. a1.channels.c1.type = file
  15. # 配置sink
  16. a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
  17. a1.sinks.k1.kafka.topic = yuekao04
  18. a1.sinks.k1.kafka.bootstrap.servers = bigdata01:9092
  19. a1.sinks.k1.channel = c1
3、将kafka中的数据放入到hdfs上,目录为:/yuekao/ods/zhuanzhang

编写conf文件,然后执行该文件,将kafka中的数据放入hdfs中:

  1. a1.sources = r1
  2. a1.channels = c1
  3. a1.sinks=k1
  4. a1.sources.r1.channels = c1
  5. a1.sinks.k1.channel = c1
  6. a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
  7. a1.sources.r1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092
  8. a1.sources.r1.kafka.topics = yuekao04
  9. a1.sources.r1.kafka.consumer.group.id =yuekao
  10. a1.sources.r1.batchSize = 100
  11. a1.sources.r1.batchDurationMillis = 2000
  12. a1.channels.c1.type = memory
  13. a1.channels.c1.capacity = 1000
  14. a1.channels.c1.transactionCapacity = 100
  15. a1.sinks.k1.type = hdfs
  16. a1.sinks.k1.channel = c1
  17. a1.sinks.k1.hdfs.path = /yuekao/ods/zhuanzhang/%y-%m-%d
  18. a1.sinks.k1.hdfs.filePrefix = events-
  19. a1.sinks.k1.hdfs.round = true
  20. a1.sinks.k1.hdfs.roundValue = 10
  21. a1.sinks.k1.hdfs.roundUnit = minute
  22. a1.sinks.k1.hdfs.fileType = DataStream

结果展示:

4、 通过datax,对MySQL数据库中的表进行抽取,落入hdfs指定的目录中: /yuekao/ods/user_info

先在mysql中建表,然后将user_info.sql表中数据插入:

  1. CREATE TABLE `user_info` (
  2. `name` VARCHAR (255) ,
  3. phone_num VARCHAR (255) ,
  4. email VARCHAR (255) ,
  5. addr_info VARCHAR (255) ,
  6. gender VARCHAR (255) ,
  7. idno VARCHAR (255) ,
  8. create_time VARCHAR (255) ,
  9. user_id int
  10. );

编写json文件(demo.json),然后执行,将数据库中的数据放入hdfs中:

  1. {
  2. "job": {
  3. "setting": {
  4. "speed": {
  5. "channel": 3
  6. },
  7. "errorLimit": {
  8. "record": 0,
  9. "percentage": 0.02
  10. }
  11. },
  12. "content": [
  13. {
  14. "reader": {
  15. "name": "mysqlreader",
  16. "parameter": {
  17. "writeMode": "insert",
  18. "username": "root",
  19. "password": "123456",
  20. "column": [
  21. "name",
  22. "phone_num",
  23. "email",
  24. "addr_info",
  25. "gender",
  26. "idno",
  27. "create_time",
  28. "user_id"
  29. ],
  30. "splitPk": "user_id",
  31. "connection": [
  32. {
  33. "table": [
  34. "user_info"
  35. ],
  36. "jdbcUrl": [
  37. "jdbc:mysql://bigdata01:3306/yuekao"
  38. ]
  39. }
  40. ]
  41. }
  42. },
  43. "writer": {
  44. "name": "hdfswriter",
  45. "parameter": {
  46. "defaultFS": "hdfs://bigdata01:9820",
  47. "fileType": "text",
  48. "path": "/yuekao/ods/user_info",
  49. "fileName": "user_info.txt",
  50. "column": [
  51. {
  52. "name": "name",
  53. "type": "string"
  54. },
  55. {
  56. "name": "phone_num",
  57. "type": "string"
  58. },
  59. {
  60. "name": "email",
  61. "type": "string"
  62. },
  63. {
  64. "name": "addr_info",
  65. "type": "string"
  66. },
  67. {
  68. "name": "gender",
  69. "type": "string"
  70. },
  71. {
  72. "name": "idno",
  73. "type": "string"
  74. },
  75. {
  76. "name": "create_time",
  77. "type": "string"
  78. },
  79. {
  80. "name": "user_id",
  81. "type": "int"
  82. }
  83. ],
  84. "writeMode": "append",
  85. "fieldDelimiter": ","
  86. }
  87. }
  88. }
  89. ]
  90. }
  91. }

执行json文件:

  1. datax.py demo.json

结果展示:

数据放不进来,有需要的小伙伴可以私我!!!

标签: 数据库 flume kafka

本文转载自: https://blog.csdn.net/weixin_64860388/article/details/143835597
版权归原作者 jlting195 所有, 如有侵权,请联系我们删除。

“flume对kafka中数据的导入导出、datax对mysql数据库数据的抽取”的评论:

还没有评论