0


flink写入hudi MOR表

第一步:创建flink内存表从kafka读取数据:

  1. DROP TABLE IF EXISTS HUDI_KAFKA_DEBEZIUM_ZHANG;
  2. CREATE TABLE IF NOT EXISTS HUDI_KAFKA_DEBEZIUM_ZHANG(
  3. ID STRING comment '编码'
  4. ,NAME STRING comment '名称'
  5. ,PRIMARY KEY(RCLNT,RLDNR,RRCTY,RVERS,RYEAR,ROBJNR,COBJNR,SOBJNR,RTCUR,RUNIT,DRCRK,RPMAX) NOT ENFORCED
  6. ) with (
  7. 'connector'='kafka',
  8. 'topic'='GLFUNCT_DEBEZIUM_TRANSFER',
  9. --'scan.startup.mode'='earliest-offset',
  10. 'scan.startup.mode'='timestamp',
  11. 'scan.startup.timestamp-millis'='1725811200000',
  12. 'properties.group.id'='KAFKA_GLFUNCT_CHANGELOG_HUDI7',
  13. 'properties.bootstrap.servers'='10.66.28.69:9092,10.66.28.70:9092,10.66.28.61:9092',
  14. 'value.format'='debezium-json',
  15. 'scan.topic-partition-discovery.interval' = '10000',
  16. 'value.debezium-json.ignore-parse-errors' = 'true'
  17. );

第二步:创建MOR类型的hudi表

  1. DROP TABLE IF EXISTS HUDI_ZHANG;
  2. CREATE TABLE IF NOT EXISTS HUDI_ZHANG(
  3. ID STRING comment '编码'
  4. ,NAME STRING comment '名称'
  5. ,PRIMARY KEY(ID,NAME) NOT ENFORCED
  6. )with (
  7. 'connector' = 'hudi',
  8. 'path' = 'hdfs://nameservice1/user/hive/warehouse/hudi_ods_sap.db/HUDI_ZHANG',
  9. 'table.type' = 'MERGE_ON_READ',
  10. 'hive_sync.skip_ro_suffix' = 'true',
  11. 'hoodie.datasource.write.recordkey.field' = 'ID,NAME',
  12. 'write.operation' = 'upsert',
  13. --'write.precombine.field' = 'ETL_DT',
  14. 'write.tasks' = '4',
  15. 'index.bootstrap.enabled' = 'true',
  16. 'write.insert.drop.duplicates'='true',
  17. 'compaction.tasks' = '4',
  18. 'compaction.async.enabled' = 'true',
  19. 'compaction.trigger.strategy' = 'time_elapsed',
  20. 'compaction.delta_seconds' = '1200',
  21. 'changelog.enabled' = 'true',
  22. 'read.streaming.enabled' = 'true',
  23. 'read.streaming.check-interval' = '1',
  24. 'hive_sync.enable' = 'true',
  25. 'hive_sync.mode' = 'hms',
  26. 'hive_sync.metastore.uris' = 'thrift://pld3cwztmg01:9083',
  27. --'hive_sync.jdbc_url' = 'jdbc:hive2://pld3cwztmg01:10000',
  28. 'hive_sync.table' = 'ZHANG',
  29. 'hive_sync.db' = 'hudi_ods_sap',
  30. 'hive_sync.username' = 'hive',
  31. 'hive_sync.password' = 'hive'
  32. );

第三步:把kafka表写入到hudi表即可

  1. insert into HUDI_ZHANG select * from HUDI_KAFKA_DEBEZIUM_ZHANG where RCLNT = '300';

以上就是从kafka读取数据写入到hudi表,且表类型是MOR。

标签: flink sql 大数据

本文转载自: https://blog.csdn.net/zhangyupeng0528/article/details/142108338
版权归原作者 大鹏_展翅 所有, 如有侵权,请联系我们删除。

“flink写入hudi MOR表”的评论:

还没有评论