文章目录
1.ClickHouse建表
ClickHouse中建表
CREATETABLEdefault.test_write
(
id UInt16,
name String,
age UInt16
)ENGINE= TinyLog();
2.ClickHouse依赖
Flink开发相关依赖
<properties><flink.version>1.12.1</flink.version><scala.version>2.12.13</scala.version><clickhouse-jdbc.version>0.1.54</clickhouse-jdbc.version><lombok.version>1.18.12</lombok.version></properties><dependencies><!-- 写入数据到clickhouse --><dependency><groupId>ru.yandex.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId><version>${clickhouse-jdbc.version}</version></dependency><!-- flink核心API --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok.version}</version></dependency></dependencies>
3.Bean实体类
User.java
packagecom.daniel.bean;importlombok.Builder;importlombok.Data;/**
* @Author Daniel
* @Date: 2023/7/3 15:35
* @Description
**/@Data@BuilderpublicclassUser{publicint id;publicString name;publicint age;}
4.ClickHouse业务写入逻辑
ClickHouseSinkFunction.java
packagecom.daniel.util;importcom.daniel.bean.User;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction;importjava.sql.Connection;importjava.sql.DriverManager;importjava.sql.PreparedStatement;importjava.sql.SQLException;/**
* @Author Daniel
* @Date: 2023/7/3 15:36
* @Description
**/publicclassClickHouseSinkFunctionextendsRichSinkFunction<User>{Connection conn =null;String sql;publicClickHouseSinkFunction(String sql){this.sql = sql;}@Overridepublicvoidopen(Configuration parameters)throwsException{super.open(parameters);
conn =getConn("localhost",8123,"default");}@Overridepublicvoidclose()throwsException{super.close();if(conn !=null){
conn.close();}}// 定义具体的操作@Overridepublicvoidinvoke(User user,Context context)throwsException{// 批量插入PreparedStatement preparedStatement = conn.prepareStatement(sql);
preparedStatement.setLong(1, user.id);
preparedStatement.setString(2, user.name);
preparedStatement.setLong(3, user.age);
preparedStatement.addBatch();long startTime =System.currentTimeMillis();int[] ints = preparedStatement.executeBatch();
conn.commit();long endTime =System.currentTimeMillis();System.out.println("批量插入用时:"+(endTime - startTime)+"ms -- 插入数据行数:"+ ints.length);}publicConnectiongetConn(String host,int port,String database)throwsSQLException,ClassNotFoundException{Class.forName("ru.yandex.clickhouse.ClickHouseDriver");String address ="jdbc:clickhouse://"+ host +":"+ port +"/"+ database;
conn =DriverManager.getConnection(address);return conn;}}
- open():在SinkFunction实例化后调用,用于初始化连接或资源。这在处理每个并行任务的子任务之前只被调用一次。
- invoke():定义了在每个元素到达Sink操作时所执行的逻辑。用户需要实现这个方法来定义如何将数据写入外部存储系统或执行其他操作。
- close():在SinkFunction关闭之前调用,用于释放资源、关闭连接等操作。
5.测试写入类
ClickHouseWriteTest.java
packagecom.daniel;importcom.daniel.bean.User;importcom.daniel.util.ClickHouseSinkFunction;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/**
* @Author daniel
* @Date: 2023/7/3 15:37
* @Description
**/publicclassClickHouseWriteTest{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironment();// SourceDataStream<String> ds = env.socketTextStream("localhost",9999);// TransformSingleOutputStreamOperator<User> dataStream = ds.map((MapFunction<String,User>) data ->{String[] split = data.split(",");returnUser.builder().id(Integer.parseInt(split[0])).name(split[1]).age(Integer.parseInt(split[2])).build();});// SinkString sql ="INSERT INTO default.test_write (id, name, age) VALUES (?,?,?)";ClickHouseSinkFunction jdbcSink =newClickHouseSinkFunction(sql);
dataStream.addSink(jdbcSink);
env.execute("flink-clickhouse-write");}}
6.发送数据
使用nc或者任意工具向指定端口发送数据
例如
nc-L-p9999
发送数据
1,Daniel,25
2,David,38
3,James,16
4,Robert,27
然后启动
ClickHouseWriteTest.java
程序
查询数据
select*fromdefault.test_write;
由于这里是并行插入,所以没有顺序可言
标签:
flink
clickhouse
本文转载自: https://blog.csdn.net/a805814077/article/details/131529587
版权归原作者 DanielMaster 所有, 如有侵权,请联系我们删除。
版权归原作者 DanielMaster 所有, 如有侵权,请联系我们删除。