0


FlinkSql写入/读取Kafka

FlinkSql读取iceberg数据写入kafka

  1. 创建写入kafka的sink表
  1. create table dws_mtrl_tree(
  2. ODS_QYBM INT,
  3. ODS_QYMC STRING,
  4. MTRL_CODE STRING,
  5. UPPER_MTRL_CODE STRING)
  6. with ('connector'='kafka','topic'='dws_mtrl_tree','properties.bootstrap.servers'='xx.xxx.xxx.xxx:9092','format' = 'json','sink.partitioner'='round-robin');
  1. 创建catalog
  1. CREATE CATALOG hive_catalog WITH ('type'='iceberg','catalog-type'='hive','uri'='thrift://xx.xxx.xxx.xxx:9083','clients'='5','hive-conf-dir'='/opt/softwares/hadoop-3.1.1/etc/hadoop/','warehouse'='hdfs://xx.xxx.xxx.xxx:8020/user/hive/warehouse/hive_catalog','property-version'='1');
  1. 插入数据
  1. insert into dws_mtrl_tree select*from hive_catalog.dws_hgdd. dws_mtrl_tree;

发现kafka中已有数据
在这里插入图片描述

FlinkSql读取kafka数据写入iceberg

  1. 创建连接Kafka的Source表
  1. CREATE TABLE kafka_dws_mtrl_tree (
  2. ODS_QYBM INT,
  3. ODS_QYMC STRING,
  4. MTRL_CODE STRING,
  5. UPPER_MTRL_CODE STRING
  6. )with('connector' = 'kafka','topic' = 'dws_mtrl_tree','properties.bootstrap.servers' = 'xx.xxx.xxx.xxx:9092','properties.group.id' = 'consumergroup','format' = 'json','scan.startup.mode' = 'earliest-offset');
  1. 创建iceberg表
  1. CREATE TABLE DWS_MTRL_TREE (
  2. ODS_QYBM int,
  3. ODS_QYMC string,
  4. MTRL_CODE string,
  5. UPPER_MTRL_CODE string
  6. )with('connector'='iceberg','catalog-name'='hive_catalog','catalog-type'='hive','catalog-database'='DWS_HGDD','warehouse'='hdfs://xx.xxx.xxx.xxx:8020/user/hive/warehouse/hive_catalog','format-version'='2');

3.插入数据

  1. insert into DWS_MTRL_TREE select*from kafka_dws_mtrl_tree;
  • 问题: 报错如下:org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka null with FlinkKafkaInternalProducer{transactionalId=‘null’, inTransaction=false, closed=false} Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException: Topic dws_mtrl_tree not present in metadata after 60000 ms.

在这里插入图片描述
在这里插入图片描述
排查原因:
通过查看taskmanager的logs发现 :
Error connecting to node node02:9092 (id: 43 rack: null)
在这里插入图片描述
解决办法:
在/etc/hosts文件中添加映射,问题解决

  1. xx.xxx.xxx.xxx node01
  2. xx.xxx.xxx.xxx node02
  3. xx.xxx.xxx.xxx node03
标签: flink 大数据 kafka

本文转载自: https://blog.csdn.net/weixin_47924795/article/details/127664853
版权归原作者 Xiaobai on data 开发 所有, 如有侵权,请联系我们删除。

“FlinkSql写入/读取Kafka”的评论:

还没有评论