0


Flink写入数据到ClickHouse

文章目录

1.ClickHouse建表

ClickHouse中建表

CREATETABLEdefault.test_write
(
    id   UInt16,
    name String,
    age  UInt16
)ENGINE= TinyLog();

2.ClickHouse依赖

Flink开发相关依赖

<properties><flink.version>1.12.1</flink.version><scala.version>2.12.13</scala.version><clickhouse-jdbc.version>0.1.54</clickhouse-jdbc.version><lombok.version>1.18.12</lombok.version></properties><dependencies><!-- 写入数据到clickhouse --><dependency><groupId>ru.yandex.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId><version>${clickhouse-jdbc.version}</version></dependency><!-- flink核心API --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok.version}</version></dependency></dependencies>

3.Bean实体类

User.java

packagecom.daniel.bean;importlombok.Builder;importlombok.Data;/**
 * @Author Daniel
 * @Date: 2023/7/3 15:35
 * @Description
 **/@Data@BuilderpublicclassUser{publicint id;publicString name;publicint age;}

4.ClickHouse业务写入逻辑

ClickHouseSinkFunction.java

packagecom.daniel.util;importcom.daniel.bean.User;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction;importjava.sql.Connection;importjava.sql.DriverManager;importjava.sql.PreparedStatement;importjava.sql.SQLException;/**
 * @Author Daniel
 * @Date: 2023/7/3 15:36
 * @Description
 **/publicclassClickHouseSinkFunctionextendsRichSinkFunction<User>{Connection conn =null;String sql;publicClickHouseSinkFunction(String sql){this.sql = sql;}@Overridepublicvoidopen(Configuration parameters)throwsException{super.open(parameters);
        conn =getConn("localhost",8123,"default");}@Overridepublicvoidclose()throwsException{super.close();if(conn !=null){
            conn.close();}}// 定义具体的操作@Overridepublicvoidinvoke(User user,Context context)throwsException{// 批量插入PreparedStatement preparedStatement = conn.prepareStatement(sql);
        preparedStatement.setLong(1, user.id);
        preparedStatement.setString(2, user.name);
        preparedStatement.setLong(3, user.age);
        preparedStatement.addBatch();long startTime =System.currentTimeMillis();int[] ints = preparedStatement.executeBatch();
        conn.commit();long endTime =System.currentTimeMillis();System.out.println("批量插入用时:"+(endTime - startTime)+"ms -- 插入数据行数:"+ ints.length);}publicConnectiongetConn(String host,int port,String database)throwsSQLException,ClassNotFoundException{Class.forName("ru.yandex.clickhouse.ClickHouseDriver");String address ="jdbc:clickhouse://"+ host +":"+ port +"/"+ database;
        conn =DriverManager.getConnection(address);return conn;}}
  • open():在SinkFunction实例化后调用,用于初始化连接或资源。这在处理每个并行任务的子任务之前只被调用一次。
  • invoke():定义了在每个元素到达Sink操作时所执行的逻辑。用户需要实现这个方法来定义如何将数据写入外部存储系统或执行其他操作。
  • close():在SinkFunction关闭之前调用,用于释放资源、关闭连接等操作。

5.测试写入类

ClickHouseWriteTest.java

packagecom.daniel;importcom.daniel.bean.User;importcom.daniel.util.ClickHouseSinkFunction;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/**
 * @Author daniel
 * @Date: 2023/7/3 15:37
 * @Description
 **/publicclassClickHouseWriteTest{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironment();// SourceDataStream<String> ds = env.socketTextStream("localhost",9999);// TransformSingleOutputStreamOperator<User> dataStream = ds.map((MapFunction<String,User>) data ->{String[] split = data.split(",");returnUser.builder().id(Integer.parseInt(split[0])).name(split[1]).age(Integer.parseInt(split[2])).build();});// SinkString sql ="INSERT INTO default.test_write (id, name, age) VALUES (?,?,?)";ClickHouseSinkFunction jdbcSink =newClickHouseSinkFunction(sql);
        dataStream.addSink(jdbcSink);
        env.execute("flink-clickhouse-write");}}

6.发送数据

使用nc或者任意工具向指定端口发送数据
例如

nc-L-p9999

发送数据

1,Daniel,25
2,David,38
3,James,16
4,Robert,27

然后启动

ClickHouseWriteTest.java

程序

在这里插入图片描述

查询数据

select*fromdefault.test_write;

由于这里是并行插入,所以没有顺序可言

在这里插入图片描述

标签: flink clickhouse

本文转载自: https://blog.csdn.net/a805814077/article/details/131529587
版权归原作者 DanielMaster 所有, 如有侵权,请联系我们删除。

“Flink写入数据到ClickHouse”的评论:

还没有评论