0


二百五十九、Java——采集Kafka数据,解析成一条条数据,写入另一Kafka中(一般JSON)

一、目的

由于部分数据类型频率为1s,从而数据规模特别大,因此完整的JSON放在Hive中解析起来,尤其是在单机环境下,效率特别慢,无法满足业务需求。

而Flume的拦截器并不能很好的转换数据,因为只能采用Java方式,从Kafka的主题A中采集数据,并解析字段,然后写入到放在Kafka主题B中

二 、原始数据格式

JSON格式比较正常,对象中包含数组

{
"deviceNo": "39",
"sourceDeviceType": null,
"sn": null,
"model": null,
"createTime": "2024-09-03 14:10:00",
"data": {
"cycle": 300,
"evaluationList": [{
"laneNo": 1,
"laneType": null,
"volume": 3,
"queueLenMax": 11.43,
"sampleNum": 0,
"stopAvg": 0.54,
"delayAvg": 0.0,
"passRate": 0.0,
"travelDist": 140.0,
"travelTimeAvg": 0.0
},
{
"laneNo": 2,
"laneType": null,
"volume": 7,
"queueLenMax": 23.18,
"sampleNum": 0,
"stopAvg": 0.47,
"delayAvg": 10.57,
"passRate": 0.0,
"travelDist": 140.0,
"travelTimeAvg": 0.0
},
{
"laneNo": 3,
"laneType": null,
"volume": 9,
"queueLenMax": 11.54,
"sampleNum": 0,
"stopAvg": 0.18,
"delayAvg": 9.67,
"passRate": 0.0,
"travelDist": 140.0,
"travelTimeAvg": 0.0
},
{
"laneNo": 4,
"laneType": null,
"volume": 6,
"queueLenMax": 11.36,
"sampleNum": 0,
"stopAvg": 0.27,
"delayAvg": 6.83,
"passRate": 0.0,
"travelDist": 140.0,
"travelTimeAvg": 0.0
}]
}
}

三、Java代码

  1. package com.kgc;
  2. import com.fasterxml.jackson.databind.JsonNode;
  3. import com.fasterxml.jackson.databind.ObjectMapper;
  4. import org.apache.kafka.clients.consumer.ConsumerConfig;
  5. import org.apache.kafka.clients.consumer.ConsumerRecord;
  6. import org.apache.kafka.clients.consumer.ConsumerRecords;
  7. import org.apache.kafka.clients.consumer.KafkaConsumer;
  8. import org.apache.kafka.clients.producer.KafkaProducer;
  9. import org.apache.kafka.clients.producer.ProducerConfig;
  10. import org.apache.kafka.clients.producer.ProducerRecord;
  11. import org.apache.kafka.clients.producer.RecordMetadata;
  12. import org.apache.kafka.common.serialization.StringDeserializer;
  13. import org.apache.kafka.common.serialization.StringSerializer;
  14. import java.time.Duration;
  15. import java.util.Collections;
  16. import java.util.Properties;
  17. public class KafkaKafkaEvaluation {
  18. // 添加 Kafka Producer 配置
  19. private static Properties producerProps() {
  20. Properties props = new Properties();
  21. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.70:9092");
  22. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  23. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  24. props.put(ProducerConfig.ACKS_CONFIG, "-1");
  25. props.put(ProducerConfig.RETRIES_CONFIG, "3");
  26. props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
  27. props.put(ProducerConfig.LINGER_MS_CONFIG, "1");
  28. props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
  29. return props;
  30. }
  31. public static void main(String[] args) {
  32. Properties prop = new Properties();
  33. prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.70:9092");
  34. prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  35. prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  36. prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
  37. prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
  38. prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  39. // 每一个消费,都要定义不同的Group_ID
  40. prop.put(ConsumerConfig.GROUP_ID_CONFIG, "evaluation_group");
  41. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
  42. consumer.subscribe(Collections.singleton("topic_internal_data_evaluation"));
  43. ObjectMapper mapper = new ObjectMapper();
  44. // 初始化 Kafka Producer
  45. KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps());
  46. while (true) {
  47. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  48. for (ConsumerRecord<String, String> record : records) {
  49. try {
  50. JsonNode rootNode = mapper.readTree(record.value());
  51. System.out.println("原始数据"+rootNode);
  52. String device_no = rootNode.get("deviceNo").asText();
  53. String source_device_type = rootNode.get("sourceDeviceType").asText();
  54. String sn = rootNode.get("sn").asText();
  55. String model = rootNode.get("model").asText();
  56. String create_time = rootNode.get("createTime").asText();
  57. String cycle = rootNode.get("data").get("cycle").asText();
  58. JsonNode evaluationList = rootNode.get("data").get("evaluationList");
  59. for (JsonNode evaluationItem : evaluationList) {
  60. String lane_no = evaluationItem.get("laneNo").asText();
  61. String lane_type = evaluationItem.get("laneType").asText();
  62. String volume = evaluationItem.get("volume").asText();
  63. String queue_len_max = evaluationItem.get("queueLenMax").asText();
  64. String sample_num = evaluationItem.get("sampleNum").asText();
  65. String stop_avg = evaluationItem.get("stopAvg").asText();
  66. String delay_avg = evaluationItem.get("delayAvg").asText();
  67. String pass_rate = evaluationItem.get("passRate").asText();
  68. String travel_dist = evaluationItem.get("travelDist").asText();
  69. String travel_time_avg = evaluationItem.get("travelTimeAvg").asText();
  70. String outputLine = String.format("%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s",
  71. device_no, source_device_type, sn, model, create_time, cycle,lane_no, lane_type,
  72. volume,queue_len_max,sample_num,stop_avg,delay_avg,pass_rate,travel_dist,travel_time_avg);
  73. // 发送数据到 Kafka
  74. ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic_db_data_evaluation", record.key(), outputLine);
  75. producer.send(producerRecord, (RecordMetadata metadata, Exception e) -> {
  76. if (e != null) {
  77. e.printStackTrace();
  78. } else {
  79. System.out.println("The offset of the record we just sent is: " + metadata.offset());
  80. }
  81. });
  82. }
  83. } catch (Exception e) {
  84. e.printStackTrace();
  85. }
  86. }
  87. consumer.commitAsync();
  88. }
  89. }
  90. }

1、服务器IP都是 192.168.0.70

2、消费Kafka主题(数据源):topic_internal_data_evaluation

3、生产Kafka主题(目标源):topic_db_data_evaluation

4、注意:字段顺序与ODS层表结构字段顺序一致!!!

四、开启Kafka主题topic_db_data_evaluation消费者

[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.0.70:9092 --topic topic_db_data_evaluation --from-beginning

五、运行测试

1、启动项目

2、消费者输出数据

然后再用Flume采集写入HDFS就行了,不过ODS层表结构需要转变

六、ODS层新表结构

  1. create external table if not exists hurys_dc_ods.ods_evaluation(
  2. device_no string COMMENT '设备编号',
  3. source_device_type string COMMENT '设备类型',
  4. sn string COMMENT '设备序列号 ',
  5. model string COMMENT '设备型号',
  6. create_time timestamp COMMENT '创建时间',
  7. cycle int COMMENT '评价数据周期',
  8. lane_no int COMMENT '车道编号',
  9. lane_type int COMMENT '车道类型 0:渠化1:来向2:出口3:去向4:左弯待转区5:直行待行区6:右转专用道99:未定义车道',
  10. volume int COMMENT '车道内过停止线流量(辆)',
  11. queue_len_max float COMMENT '车道内最大排队长度(m)',
  12. sample_num int COMMENT '评价数据计算样本量',
  13. stop_avg float COMMENT '车道内平均停车次数(次)',
  14. delay_avg float COMMENT '车道内平均延误时间(s)',
  15. pass_rate float COMMENT '车道内一次通过率',
  16. travel_dist float COMMENT '车道内检测行程距离(m)',
  17. travel_time_avg float COMMENT '车道内平均行程时间'
  18. )
  19. comment '评价数据外部表——静态分区'
  20. partitioned by (day string)
  21. row format delimited fields terminated by ','
  22. stored as SequenceFile
  23. ;

七、Flume采集配置文件

八、运行Flume任务,检查HDFS文件、以及ODS表数据

  1. --刷新表分区
  2. msck repair table ods_evaluation;
  3. --查看表分区
  4. show partitions hurys_dc_ods.ods_evaluation;
  5. --查看表数据
  6. select * from hurys_dc_ods.ods_evaluation
  7. where day='2024-09-03';

搞定,这样就不需要在Hive中解析JSON数据了!!!

标签: java kafka 开发语言

本文转载自: https://blog.csdn.net/tiantang2renjian/article/details/141860148
版权归原作者 天地风雷水火山泽 所有, 如有侵权,请联系我们删除。

“二百五十九、Java——采集Kafka数据,解析成一条条数据,写入另一Kafka中(一般JSON)”的评论:

还没有评论