0


Flink - sink算子

水善利万物而不争,处众人之所恶,故几于道💦

文章目录


官方文档 - Flink1.13

在这里插入图片描述


1. Kafka_Sink

addSink(new FlinkKafkaProducer< String>(kafka_address,topic,序列化器)

要先添加依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.12</artifactId><version>1.13.6</version></dependency>
publicstaticvoidmain(String[] args){Configuration conf =newConfiguration();
    conf.setInteger("rest.port",1000);StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(1);ArrayList<WaterSensor> waterSensors =newArrayList<>();
    waterSensors.add(newWaterSensor("sensor_1",1607527992000L,20));
    waterSensors.add(newWaterSensor("sensor_1",1607527994000L,50));
    waterSensors.add(newWaterSensor("sensor_1",1607527996000L,50));
    waterSensors.add(newWaterSensor("sensor_2",1607527993000L,10));
    waterSensors.add(newWaterSensor("sensor_2",1607527995000L,30));DataStreamSource<WaterSensor> stream = env.fromCollection(waterSensors);

    stream
            .keyBy(WaterSensor::getId).sum("vc").map(JSON::toJSONString).addSink(newFlinkKafkaProducer<String>("hadoop101:9092",// kafaka地址"flink_sink_kafka",//要写入的KafkatopicnewSimpleStringSchema()// 序列化器));try{
        env.execute();}catch(Exception e){
        e.printStackTrace();}}

运行结果:
在这里插入图片描述

2. Kafka_Sink - 自定义序列化器

  自定义序列化器,

new FlinkKafkaProducer()

的时候,选择四个参数的构造方法,然后使用

new KafkaSerializationSchema

序列化器。然后重写

serialize

方法

publicstaticvoidmain(String[] args){Configuration conf =newConfiguration();
    conf.setInteger("rest.port",1000);StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(1);ArrayList<WaterSensor> waterSensors =newArrayList<>();
    waterSensors.add(newWaterSensor("sensor_1",1607527992000L,20));
    waterSensors.add(newWaterSensor("sensor_1",1607527994000L,50));
    waterSensors.add(newWaterSensor("sensor_1",1607527996000L,50));
    waterSensors.add(newWaterSensor("sensor_2",1607527993000L,10));
    waterSensors.add(newWaterSensor("sensor_2",1607527995000L,30));DataStreamSource<WaterSensor> stream = env.fromCollection(waterSensors);Properties sinkConfig =newProperties();
    sinkConfig.setProperty("bootstrap.servers","hadoop101:9092");
    stream
            .keyBy(WaterSensor::getId).sum("vc").addSink(newFlinkKafkaProducer<WaterSensor>("defaultTopic",// 默认发往的topic ,一般用不上newKafkaSerializationSchema<WaterSensor>(){// 自定义的序列化器@OverridepublicProducerRecord<byte[],byte[]>serialize(WaterSensor waterSensor,@NullableLong aLong
                        ){String s =JSON.toJSONString(waterSensor);returnnewProducerRecord<>("flink_sink_kafka",s.getBytes(StandardCharsets.UTF_8));}},
                    sinkConfig,// Kafka的配置FlinkKafkaProducer.Semantic.AT_LEAST_ONCE// 一致性语义:现在只能传入至少一次));try{
        env.execute();}catch(Exception e){
        e.printStackTrace();}}

运行结果:
在这里插入图片描述

3. Redis_Sink_String

addSink(new RedisSink<>(config, new RedisMapper< WaterSensor>() {}

写到String结构里面

添加依赖:

<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1.5</version></dependency>
publicstaticvoidmain(String[] args){Configuration conf =newConfiguration();
    conf.setInteger("rest.port",1000);StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(1);ArrayList<WaterSensor> waterSensors =newArrayList<>();
    waterSensors.add(newWaterSensor("sensor_1",1607527992000L,20));
    waterSensors.add(newWaterSensor("sensor_1",1607527994000L,50));
    waterSensors.add(newWaterSensor("sensor_1",1607527996000L,50));
    waterSensors.add(newWaterSensor("sensor_2",1607527993000L,10));
    waterSensors.add(newWaterSensor("sensor_2",1607527995000L,30));DataStreamSource<WaterSensor> stream = env.fromCollection(waterSensors);SingleOutputStreamOperator<WaterSensor> result = stream
            .keyBy(WaterSensor::getId).sum("vc");/*
往redis里面写字符串,string   命令提示符用set
假设写的key是id,value是整个json格式的字符串
key         value
sensor_1    json格式字符串
 */// new一个单机版的配置FlinkJedisPoolConfig config =newFlinkJedisPoolConfig.Builder().setHost("hadoop101").setPort(6379).setMaxTotal(100)//最大连接数量.setMaxIdle(10)// 连接池里面的最大空闲.setMinIdle(2)// 连接池里面的最小空闲.setTimeout(10*1000)// 超时时间.build();// 写出到redis中
    result.addSink(newRedisSink<>(config,newRedisMapper<WaterSensor>(){// 返回命令描述符:往不同的数据结构写数据用的方法不一样@OverridepublicRedisCommandDescriptiongetCommandDescription(){// 写入到字符串,用setreturnnewRedisCommandDescription(RedisCommand.SET);}@OverridepublicStringgetKeyFromData(WaterSensor waterSensor){return waterSensor.getId();}@OverridepublicStringgetValueFromData(WaterSensor waterSensor){returnJSON.toJSONString(waterSensor);}}));try{
        env.execute();}catch(Exception e){
        e.printStackTrace();}}

运行结果:
在这里插入图片描述

4. Redis_Sink_list

addSink(new RedisSink<>(config, new RedisMapper< WaterSensor>() {}

写到 list 结构里面

publicstaticvoidmain(String[] args){Configuration conf =newConfiguration();
    conf.setInteger("rest.port",1000);StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(1);ArrayList<WaterSensor> waterSensors =newArrayList<>();
    waterSensors.add(newWaterSensor("sensor_1",1607527992000L,20));
    waterSensors.add(newWaterSensor("sensor_1",1607527994000L,50));
    waterSensors.add(newWaterSensor("sensor_1",1607527996000L,50));
    waterSensors.add(newWaterSensor("sensor_2",1607527993000L,10));
    waterSensors.add(newWaterSensor("sensor_2",1607527995000L,30));DataStreamSource<WaterSensor> stream = env.fromCollection(waterSensors);SingleOutputStreamOperator<WaterSensor> result = stream
            .keyBy(WaterSensor::getId).sum("vc");// key是id,value是处理后的json格式字符串FlinkJedisPoolConfig config =newFlinkJedisPoolConfig.Builder().setHost("hadoop101").setPort(6379).setMaxTotal(100)//最大连接数量.setMaxIdle(10)// 连接池里面的最大空闲.setMinIdle(2)// 连接池里面的最小空闲.setTimeout(10*1000)// 超时时间.build();
    result.addSink(newRedisSink<>(config,newRedisMapper<WaterSensor>(){@OverridepublicRedisCommandDescriptiongetCommandDescription(){// 写入listreturnnewRedisCommandDescription(RedisCommand.RPUSH);}@OverridepublicStringgetKeyFromData(WaterSensor waterSensor){return waterSensor.getId();}@OverridepublicStringgetValueFromData(WaterSensor waterSensor){returnJSON.toJSONString(waterSensor);}}));try{
        env.execute();}catch(Exception e){
        e.printStackTrace();}}

运行结果:
在这里插入图片描述

5. Redis_Sink_set

addSink(new RedisSink<>(config, new RedisMapper< WaterSensor>() {}

写到 set 结构里面

publicstaticvoidmain(String[] args){Configuration conf =newConfiguration();
    conf.setInteger("rest.port",1000);StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(1);ArrayList<WaterSensor> waterSensors =newArrayList<>();
    waterSensors.add(newWaterSensor("sensor_1",1607527992000L,20));
    waterSensors.add(newWaterSensor("sensor_1",1607527994000L,50));
    waterSensors.add(newWaterSensor("sensor_1",1607527996000L,50));
    waterSensors.add(newWaterSensor("sensor_2",1607527993000L,10));
    waterSensors.add(newWaterSensor("sensor_2",1607527995000L,30));DataStreamSource<WaterSensor> stream = env.fromCollection(waterSensors);SingleOutputStreamOperator<WaterSensor> result = stream
            .keyBy(WaterSensor::getId).sum("vc");FlinkJedisPoolConfig config =newFlinkJedisPoolConfig.Builder().setHost("hadoop101").setPort(6379).setMaxTotal(100).setMaxIdle(10).setMinIdle(2).setTimeout(10*1000).build();
    result.addSink(newRedisSink<>(config,newRedisMapper<WaterSensor>(){@OverridepublicRedisCommandDescriptiongetCommandDescription(){// 数据写入set集合returnnewRedisCommandDescription(RedisCommand.SADD);}@OverridepublicStringgetKeyFromData(WaterSensor waterSensor){return waterSensor.getId();}@OverridepublicStringgetValueFromData(WaterSensor waterSensor){returnJSON.toJSONString(waterSensor);}}));try{
        env.execute();}catch(Exception e){
        e.printStackTrace();}}

运行结果:
在这里插入图片描述

6. Redis_Sink_hash

addSink(new RedisSink<>(config, new RedisMapper< WaterSensor>() {}

写到 hash结构里面

publicstaticvoidmain(String[] args){Configuration conf =newConfiguration();
    conf.setInteger("rest.port",1000);StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(1);ArrayList<WaterSensor> waterSensors =newArrayList<>();
    waterSensors.add(newWaterSensor("sensor_1",1607527992000L,20));
    waterSensors.add(newWaterSensor("sensor_1",1607527994000L,50));
    waterSensors.add(newWaterSensor("sensor_1",1607527996000L,50));
    waterSensors.add(newWaterSensor("sensor_2",1607527993000L,10));
    waterSensors.add(newWaterSensor("sensor_2",1607527995000L,30));DataStreamSource<WaterSensor> stream = env.fromCollection(waterSensors);SingleOutputStreamOperator<WaterSensor> result = stream
            .keyBy(WaterSensor::getId).sum("vc");FlinkJedisPoolConfig config =newFlinkJedisPoolConfig.Builder().setHost("hadoop101").setPort(6379).setMaxTotal(100).setMaxIdle(10).setMinIdle(2).setTimeout(10*1000).build();
    result.addSink(newRedisSink<>(config,newRedisMapper<WaterSensor>(){@OverridepublicRedisCommandDescriptiongetCommandDescription(){// 数据写入hashreturnnewRedisCommandDescription(RedisCommand.HSET,"a");}@OverridepublicStringgetKeyFromData(WaterSensor waterSensor){return waterSensor.getId();}@OverridepublicStringgetValueFromData(WaterSensor waterSensor){returnJSON.toJSONString(waterSensor);}}));try{
        env.execute();}catch(Exception e){
        e.printStackTrace();}}

运行结果:
在这里插入图片描述

7. 有界流数据写入到ES中

new ElasticsearchSink.Builder()
publicstaticvoidmain(String[] args){Configuration conf =newConfiguration();
    conf.setInteger("rest.port",1000);StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(1);ArrayList<WaterSensor> waterSensors =newArrayList<>();
    waterSensors.add(newWaterSensor("sensor_1",1607527992000L,20));
    waterSensors.add(newWaterSensor("sensor_1",1607527994000L,50));
    waterSensors.add(newWaterSensor("sensor_1",1607527996000L,50));
    waterSensors.add(newWaterSensor("sensor_2",1607527993000L,10));
    waterSensors.add(newWaterSensor("sensor_2",1607527995000L,30));DataStreamSource<WaterSensor> stream = env.fromCollection(waterSensors);SingleOutputStreamOperator<WaterSensor> result = stream
            .keyBy(WaterSensor::getId).sum("vc");List<HttpHost> hosts =Arrays.asList(newHttpHost("hadoop101",9200),newHttpHost("hadoop102",9200),newHttpHost("hadoop103",9200));ElasticsearchSink.Builder<WaterSensor> builder =newElasticsearchSink.Builder<WaterSensor>(
            hosts,newElasticsearchSinkFunction<WaterSensor>(){@Overridepublicvoidprocess(WaterSensor element,// 需要写出的元素RuntimeContext runtimeContext,// 运行时上下文   不是context上下文对象RequestIndexer requestIndexer){// 把要写出的数据,封装到RequestIndexer里面String msg =JSON.toJSONString(element);IndexRequest ir =Requests.indexRequest("sensor").type("_doc")// 定义type的时候, 不能下划线开头. _doc是唯一的特殊情况.id(element.getId())// 定义每条数据的id. 如果不指定id, 会随机分配一个id. id重复的时候会更新数据.source(msg,XContentType.JSON);

                    requestIndexer.add(ir);// 把ir存入到indexer, 就会自动的写入到es中}});

    result.addSink(builder.build());try{
        env.execute();}catch(Exception e){
        e.printStackTrace();}}

8. 无界流数据写入到ES

  和有界差不多 ,只不过把数据源换成socket,然后因为无界流,它高效不是你来一条就刷出去,所以设置刷新时间、大小、条数,才能看到结果。

publicstaticvoidmain(String[] args){Configuration conf =newConfiguration();
    conf.setInteger("rest.port",1000);StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> result = env.socketTextStream("hadoop101",9999).map(line->{String[] data = line.split(",");returnnewWaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).sum("vc");List<HttpHost> hosts =Arrays.asList(newHttpHost("hadoop101",9200),newHttpHost("hadoop102",9200),newHttpHost("hadoop103",9200));ElasticsearchSink.Builder<WaterSensor> builder =newElasticsearchSink.Builder<WaterSensor>(
            hosts,newElasticsearchSinkFunction<WaterSensor>(){@Overridepublicvoidprocess(WaterSensor element,// 需要写出的元素RuntimeContext runtimeContext,// 运行时上下文   不是context上下文对象RequestIndexer requestIndexer){// 把要写出的数据,封装到RequestIndexer里面String msg =JSON.toJSONString(element);IndexRequest ir =Requests.indexRequest("sensor").type("_doc")// 定义type的时候, 不能下划线开头. _doc是唯一的特殊情况.id(element.getId())// 定义每条数据的id. 如果不指定id, 会随机分配一个id. id重复的时候会更新数据.source(msg,XContentType.JSON);

                    requestIndexer.add(ir);// 把ir存入到indexer, 就会自动的写入到es中}});// 自动刷新时间
    builder.setBulkFlushInterval(2000);// 默认不会根据时间自动刷新
    builder.setBulkFlushMaxSizeMb(1024);// 当批次中的数据大于等于这个值刷新
    builder.setBulkFlushMaxActions(2);// 每来多少条数据刷新一次// 这三个是或的关系,只要有一个满足就会刷新

    result.addSink(builder.build());try{
        env.execute();}catch(Exception e){
        e.printStackTrace();}}

9. 自定义sink - mysql_Sink

  需要写一个类,实现

RichSinkFunction

,然后实现

invoke

方法。这里因为是写MySQL所以需要建立连接,那就用Rich版本。

  记得导入MySQL依赖

publicstaticvoidmain(String[] args){Configuration conf =newConfiguration();
    conf.setInteger("rest.port",1000);StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(1);ArrayList<WaterSensor> waterSensors =newArrayList<>();
    waterSensors.add(newWaterSensor("sensor_1",1607527992000L,20));
    waterSensors.add(newWaterSensor("sensor_1",1607527994000L,50));
    waterSensors.add(newWaterSensor("sensor_1",1607527996000L,50));
    waterSensors.add(newWaterSensor("sensor_2",1607527993000L,10));
    waterSensors.add(newWaterSensor("sensor_2",1607527995000L,30));DataStreamSource<WaterSensor> stream = env.fromCollection(waterSensors);SingleOutputStreamOperator<WaterSensor> result = stream
            .keyBy(WaterSensor::getId).sum("vc");

    result.addSink(newMySqlSink());try{
        env.execute();}catch(Exception e){
        e.printStackTrace();}}publicstaticclassMySqlSinkextendsRichSinkFunction<WaterSensor>{privateConnection connection;@Overridepublicvoidopen(Configuration parameters)throwsException{Class.forName("com.mysql.cj.jdbc.Driver");
        connection =DriverManager.getConnection("jdbc:mysql://hadoop101:3306/test?useSSL=false","root","123456");}@Overridepublicvoidclose()throwsException{if(connection!=null){
            connection.close();}}// 调用:每来一条元素,这个方法执行一次@Overridepublicvoidinvoke(WaterSensor value,Context context)throwsException{// jdbc的方式想MySQL写数据//            String sql = "insert into sensor(id,ts,vc)values(?,?,?)";//如果主键不重复就新增,主键重复就更新//            String sql = "insert into sensor(id,ts,vc)values(?,?,?) duplicate key update vc=?";String sql ="replace into sensor(id,ts,vc)values(?,?,?)";// 1. 得到预处理语句PreparedStatement ps = connection.prepareStatement(sql);// 2. 给sql中的占位符进行赋值
        ps.setString(1,value.getId());
        ps.setLong(2,value.getTs());
        ps.setInt(3,value.getVc());//            ps.setInt(4,value.getVc());// 3. 执行
        ps.execute();// 4. 提交//            connection.commit();  MySQL默认自动提交,所以这个地方不用调用// 5. 关闭预处理
        ps.close();}}

运行结果:
在这里插入图片描述

10. Jdbc_Sink

addSink(JdbcSink.sink(sql,JdbcStatementBuilder,执行参数,连接参数)

  对于jdbc数据库,我们其实没必要自定义,因为官方给我们了一个JDBC Sink -> 官方JDBC Sink 传送门

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>1.13.6</version></dependency>
publicstaticvoidmain(String[] args){Configuration conf =newConfiguration();
    conf.setInteger("rest.port",1000);StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.setParallelism(1);ArrayList<WaterSensor> waterSensors =newArrayList<>();
    waterSensors.add(newWaterSensor("sensor_1",1607527992000L,20));
    waterSensors.add(newWaterSensor("sensor_1",1607527994000L,50));
    waterSensors.add(newWaterSensor("sensor_1",1607527996000L,50));
    waterSensors.add(newWaterSensor("sensor_2",1607527993000L,10));
    waterSensors.add(newWaterSensor("sensor_2",1607527995000L,30));DataStreamSource<WaterSensor> stream = env.fromCollection(waterSensors);SingleOutputStreamOperator<WaterSensor> result = stream
            .keyBy(WaterSensor::getId).sum("vc");

    result.addSink(JdbcSink.sink("replace into sensor(id,ts,vc)values(?,?,?)",newJdbcStatementBuilder<WaterSensor>(){@Overridepublicvoidaccept(PreparedStatement ps,WaterSensor waterSensor)throwsSQLException{// 只做一件事:给占位符赋值
                    ps.setString(1,waterSensor.getId());
                    ps.setLong(2,waterSensor.getTs());
                    ps.setInt(3,waterSensor.getVc());}},newJdbcExecutionOptions.Builder()//设置执行参数.withBatchSize(1024)// 刷新大小上限.withBatchIntervalMs(2000)//刷新间隔.withMaxRetries(3)// 重试次数.build(),newJdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName("com.mysql.cj.jdbc.Driver").withUrl("jdbc:mysql://hadoop101:3306/test?useSSL=false").withUsername("root").withPassword("123456").build()));try{
        env.execute();}catch(Exception e){
        e.printStackTrace();}}

运行结果:
在这里插入图片描述


本文转载自: https://blog.csdn.net/qq_45796486/article/details/132005887
版权归原作者 阿年、嗯啊 所有, 如有侵权,请联系我们删除。

“Flink - sink算子”的评论:

还没有评论