0


Flink读写Doris操作介绍

Flink读写Doris操作介绍

​ Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改、删除) Doris 中存储的数据。可以将 Doris 表映射为 DataStream 或者 Table。

  • Flink操作Doris修改和删除只支持在 Unique Key 模型上

1. 准备开发环境

  • pom.xml加入依赖
<dependency><groupId>org.apache.doris</groupId><artifactId>flink-doris-connector-1.13_2.12</artifactId><version>1.0.3</version></dependency>
  • 创建测试库测试表
-- 切测试库use test_db;-- 创建测试表flinktestCREATETABLE flinktest
(
    siteid INTDEFAULT'10',
    citycode SMALLINT,
    username VARCHAR(32)DEFAULT'',
    pv BIGINT SUM DEFAULT'0')
AGGREGATE KEY(siteid, citycode, username)DISTRIBUTEDBYHASH(siteid) BUCKETS 10
PROPERTIES("replication_num"="1");-- 插入样例数据insertinto flinktest values(1,1,'jim',2),(2,1,'grace',2),(3,2,'tom',2),(4,3,'bush',3),(5,3,'helen',3);-- 查看表数据情况select*from flinktest;+--------+----------+----------+------+| siteid | citycode | username | pv   |+--------+----------+----------+------+|1|1| jim      |2||5|3| helen    |3||4|3| bush     |3||3|2| tom      |2||2|1| grace    |2|+--------+----------+----------+------+
  • Doris 和 Flink 列类型映射关系
    Doris TypeFlink TypeNULL_TYPENULLBOOLEANBOOLEANTINYINTTINYINTSMALLINTSMALLINTINTINTBIGINTBIGINTFLOATFLOATDOUBLEDOUBLEDATEDATEDATETIMETIMESTAMPDECIMALDECIMALCHARSTRINGLARGEINTSTRINGVARCHARSTRINGDECIMALV2DECIMALTIMEDOUBLEHLLUnsupported datatype

    2. Flink-DataStream读Doris

代码示例:

packagecom.zenitera.bigdata.doris;importorg.apache.doris.flink.cfg.DorisStreamOptions;importorg.apache.doris.flink.datastream.DorisSourceFunction;importorg.apache.doris.flink.deserialization.SimpleListDeserializationSchema;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importjava.util.Properties;publicclassFlink_stream_read_doris{publicstaticvoidmain(String[] args){Configuration conf =newConfiguration();
        conf.setInteger("rest.port",2000);StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);Properties props =newProperties();
        props.setProperty("fenodes","hdt-dmcp-ops01:8130");
        props.setProperty("username","root");
        props.setProperty("password","123456");
        props.setProperty("table.identifier","test_db.flinktest");

        env
                .addSource(newDorisSourceFunction(newDorisStreamOptions(props),newSimpleListDeserializationSchema())).print();try{
            env.execute();}catch(Exception e){
            e.printStackTrace();}}}/*
  代码控制台输出:
[4, 3, bush, 3]
[2, 1, grace, 2]
[1, 1, jim, 2]
[5, 3, helen, 3]
[3, 2, tom, 2]
 */

3. Flink写Doris

Flink 读写 Doris 数据主要有两种方式

  • DataStream
  • SQL

3.1 Flink-DataStream以 JSON 数据 写到Doris

代码示例:

packagecom.zenitera.bigdata.doris;importorg.apache.doris.flink.cfg.DorisExecutionOptions;importorg.apache.doris.flink.cfg.DorisOptions;importorg.apache.doris.flink.cfg.DorisSink;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importjava.util.Properties;/**
 * 使用 Flink 将 JSON 数据 写到Doris数据库
 */publicclassFlink_stream_write_doris_json{publicstaticvoidmain(String[] args){Configuration conf =newConfiguration();
        conf.setInteger("rest.port",2000);StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);Properties pro =newProperties();
        pro.setProperty("format","json");
        pro.setProperty("strip_outer_array","true");
        env
                .fromElements("{\"siteid\":\"10\", \"citycode\": \"1001\",\"username\": \"ww\",\"pv\":\"100\"}").addSink(DorisSink.sink(newDorisExecutionOptions.Builder().setBatchIntervalMs(2000L).setEnableDelete(false).setMaxRetries(3).setStreamLoadProp(pro).build(),newDorisOptions.Builder().setFenodes("hdt-dmcp-ops01:8130").setUsername("root").setPassword("123456").setTableIdentifier("test_db.flinktest").build()));try{
            env.execute();}catch(Exception e){
            e.printStackTrace();}}}/*
    代码执行前: 5 rows
 select * from flinktest;
+--------+----------+----------+------+
| siteid | citycode | username | pv   |
+--------+----------+----------+------+
|      1 |        1 | jim      |    2 |
|      5 |        3 | helen    |    3 |
|      4 |        3 | bush     |    3 |
|      3 |        2 | tom      |    2 |
|      2 |        1 | grace    |    2 |
+--------+----------+----------+------+

    代码执行后: 6 rows
 select * from flinktest;
+--------+----------+----------+------+
| siteid | citycode | username | pv   |
+--------+----------+----------+------+
|      2 |        1 | grace    |    2 |
|      3 |        2 | tom      |    2 |
|      5 |        3 | helen    |    3 |
|      1 |        1 | jim      |    2 |
|     10 |     1001 | ww       |  100 |
|      4 |        3 | bush     |    3 |
+--------+----------+----------+------+
 */

3.2 Flink-DataStream以 RowData 数据 写Doris

代码示例:

packagecom.zenitera.bigdata.doris;importcom.alibaba.fastjson.JSON;importcom.alibaba.fastjson.JSONObject;importorg.apache.doris.flink.cfg.DorisExecutionOptions;importorg.apache.doris.flink.cfg.DorisOptions;importorg.apache.doris.flink.cfg.DorisSink;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.data.GenericRowData;importorg.apache.flink.table.data.StringData;importorg.apache.flink.table.types.logical.*;publicclassFlink_stream_write_doris_rowdata{publicstaticvoidmain(String[] args){Configuration conf =newConfiguration();
        conf.setInteger("rest.port",2000);StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);LogicalType[] types ={newIntType(),newSmallIntType(),newVarCharType(),newBigIntType()};String[] fields ={"siteid","citycode","username","pv"};

        env
                .fromElements("{\"siteid\":\"100\", \"citycode\": \"1002\",\"username\": \"wang\",\"pv\":\"100\"}").map(json ->{JSONObject obj = JSON.parseObject(json);GenericRowData rowData =newGenericRowData(4);
                    rowData.setField(0, obj.getIntValue("siteid"));
                    rowData.setField(1, obj.getShortValue("citycode"));
                    rowData.setField(2,StringData.fromString(obj.getString("username")));
                    rowData.setField(3, obj.getLongValue("pv"));return rowData;}).addSink(DorisSink.sink(
                        fields,
                        types,newDorisExecutionOptions.Builder().setBatchIntervalMs(2000L).setEnableDelete(false).setMaxRetries(3).build(),newDorisOptions.Builder().setFenodes("hdt-dmcp-ops01:8130").setUsername("root").setPassword("123456").setTableIdentifier("test_db.flinktest").build()));try{
            env.execute();}catch(Exception e){
            e.printStackTrace();}}}/*
    代码执行前: 6 rows
 select * from flinktest;
+--------+----------+----------+------+
| siteid | citycode | username | pv   |
+--------+----------+----------+------+
|      2 |        1 | grace    |    2 |
|      3 |        2 | tom      |    2 |
|      5 |        3 | helen    |    3 |
|      1 |        1 | jim      |    2 |
|     10 |     1001 | ww       |  100 |
|      4 |        3 | bush     |    3 |
+--------+----------+----------+------+

    代码执行后: 7 rows
 select * from flinktest;
+--------+----------+----------+------+
| siteid | citycode | username | pv   |
+--------+----------+----------+------+
|      1 |        1 | jim      |    2 |
|      2 |        1 | grace    |    2 |
|      3 |        2 | tom      |    2 |
|      5 |        3 | helen    |    3 |
|     10 |     1001 | ww       |  100 |
|    100 |     1002 | wang     |  100 |
|      4 |        3 | bush     |    3 |
+--------+----------+----------+------+
 */

3.3 Flink-SQL 方式写Doris

Doris测试表:

use test_db;truncatetable flinktest;insertinto flinktest values(1,1,'aaa',1),(2,2,'bbb',2),(3,3,'ccc',3);select*from flinktest;+--------+----------+----------+------+| siteid | citycode | username | pv   |+--------+----------+----------+------+|2|2| bbb      |2||1|1| aaa      |1||3|3| ccc      |3|+--------+----------+----------+------+3rowsinset(0.01 sec)

Flink-SQL代码示例:

packagecom.zenitera.bigdata.doris;importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;publicclassFlink_SQL_doris{publicstaticvoidmain(String[] args){Configuration conf =newConfiguration();
        conf.setInteger("rest.port",2000);StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);StreamTableEnvironment tEnv =StreamTableEnvironment.create(env);

        tEnv.executeSql("create table flink_0518("+" siteid int, "+" citycode int, "+" username string, "+" pv bigint "+")with("+"  'connector' = 'doris', "+"  'fenodes' = 'hdt-dmcp-ops01:8130', "+"  'table.identifier' = 'test_db.flinktest', "+"  'username' = 'root', "+"  'password' = '123456' "+")");

        tEnv.executeSql("insert into flink_0518(siteid, citycode, username, pv) values(4, 4, 'wangting', 4) ");}@Data@NoArgsConstructor@AllArgsConstructorpublicstaticclassFlink_0518{privateInteger siteid;privateInteger citycode;privateString username;privateLong pv;}}

执行代码,执行完成后查看Doris对应表数据进行验证:

select*from flinktest;+--------+----------+----------+------+| siteid | citycode | username | pv   |+--------+----------+----------+------+|3|3| ccc      |3||2|2| bbb      |2||1|1| aaa      |1||4|4| wangting |4|+--------+----------+----------+------+4rowsinset(0.01 sec)

3.4 Flink-SQL 方式读Doris

packagecom.zenitera.bigdata.doris;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;publicclassFlink_SQL_doris_read{publicstaticvoidmain(String[] args){Configuration conf =newConfiguration();
        conf.setInteger("rest.port",2000);StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);StreamTableEnvironment tEnv =StreamTableEnvironment.create(env);

        tEnv.executeSql("create table flink_0520("+" siteid int, "+" citycode SMALLINT, "+" username string, "+" pv bigint "+")with("+"  'connector' = 'doris', "+"  'fenodes' = 'hdt-dmcp-ops01:8130', "+"  'table.identifier' = 'test_db.flinktest', "+"  'username' = 'root', "+"  'password' = '123456' "+")");

        tEnv.sqlQuery("select * from flink_0520").execute().print();}}/*
   控制台输出信息:
+----+-------------+----------+---------------+---------+
| op |      siteid | citycode |      username |      pv |
+----+-------------+----------+---------------+---------+
| +I |           1 |        1 |           aaa |       1 |
| +I |           3 |        3 |           ccc |       3 |
| +I |           2 |        2 |           bbb |       2 |
| +I |           4 |        4 |      wangting |       4 |
+----+-------------+----------+---------------+---------+
4 rows in set
*/
标签: flink 数据库 Doris

本文转载自: https://blog.csdn.net/wt334502157/article/details/130679992
版权归原作者 王亭_666 所有, 如有侵权,请联系我们删除。

“Flink读写Doris操作介绍”的评论:

还没有评论