0


Flink中普通API的使用

本篇文章从Source、Transformation(转换因子)、sink这三个地方进行讲解

Source:

  1. 创建DataStream
  2. 本地文件
  3. Socket
  4. Kafka

Transformation(转换因子):

  1. map
  2. FlatMap
  3. Filter
  4. KeyBy
  5. Reduce
  6. Union和connect
  7. Side Outputs

sink:

  1. print 打印
  2. writerAsText 以文本格式输出
  3. writeAsCsv 以csv格式输出
  4. 输出到MySQL
  5. 输出到kafka
  6. 自定义输出

先准备一个模板方便后续使用


#if (${PACKAGE_NAME} && ${PACKAGE_NAME} != "")package ${PACKAGE_NAME};#end
#parse("File Header.java")
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
 @基本功能:
 @program:${PROJECT_NAME}
 @author: ${USER}
 @create:${YEAR}-${MONTH}-${DAY} ${HOUR}:${MINUTE}:${SECOND}
 **/
public class ${NAME} {

public static void main(String[] args) throws Exception {

    //1. env-准备环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

    //2. source-加载数据
    //3. transformation-数据处理转换
    //4. sink-数据输出

    //5. execute-执行
    env.execute();
}
}

Source:

预定义source

创建DataStream(四种)

  • 使用env.fromElements:类型要一致
  • 使用env.fromcollections:支持多种collection的具体类型
  • 使用env.generateSequence()方法创建基于Sequence的DataStream --已经废弃了
  • 使用env.fromSequence()方法创建基于开始和结束的DataStream
package com.bigdata.source;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class _01YuDingYiSource {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 各种获取数据的Source
        DataStreamSource<String> dataStreamSource = env.fromElements("hello world txt", "hello nihao kongniqiwa");
        dataStreamSource.print();
        // 演示一个错误的
        //DataStreamSource<Object> dataStreamSource2 = env.fromElements("hello", 1,3.0f);
        //dataStreamSource2.print();
        DataStreamSource<Tuple2<String, Integer>> elements = env.fromElements(
                Tuple2.of("张三", 18),
                Tuple2.of("lisi", 18),
                Tuple2.of("wangwu", 18)
        );
        elements.print();

        // 有一个方法,可以直接将数组变为集合  复习一下数组和集合以及一些非常常见的API
        String[] arr = {"hello","world"};
        System.out.println(arr.length);
        System.out.println(Arrays.toString(arr));
        List<String> list = Arrays.asList(arr);
        System.out.println(list);

        env.fromElements(
                Arrays.asList(arr),
                Arrays.asList(arr),
                Arrays.asList(arr)
        ).print();

        // 第二种加载数据的方式
        // Collection 的子接口只有 Set 和 List
        ArrayList<String> list1 = new ArrayList<>();
        list1.add("python");
        list1.add("scala");
        list1.add("java");
        DataStreamSource<String> ds1 = env.fromCollection(list1);
        DataStreamSource<String> ds2 = env.fromCollection(Arrays.asList(arr));

        // 第三种
        DataStreamSource<Long> ds3 = env.fromSequence(1, 100);
        ds3.print();

        // execute 下面的代码不运行,所以,这句话要放在最后。
        env.execute("获取预定义的Source");
    }
}

本地文件

        File file = new File("datas/wc.txt");
        File file2 = new File("./");
        System.out.println(file.getAbsoluteFile());
        System.out.println(file2.getAbsoluteFile());
        DataStreamSource<String> ds1 = env.readTextFile("datas/wc.txt");
        ds1.print();
        // 还可以获取hdfs路径上的数据
        DataStreamSource<String> ds2 = env.readTextFile("hdfs://bigdata01:9820/home/a.txt");
        ds2.print();

Socket

linux(socket命令)
下载:yum install -y nc   
nc -lk 8888   --向8888端口发送消息,这个命令先运行,如果先运行java程序,会报错!
如果端口被占用就换一个端口
本地(socket命令)
nc -lp 8888
或者 nc -l -p 8888
如果不是在exe端打开的用 -l -p 8888
java代码
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
Word Count案例
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class SourceDemo02_Socket {
    public static void main(String[] args) throws Exception {
        //TODO 1.env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        //TODO 2.source-加载数据
        DataStream<String> socketDS = env.socketTextStream("bigdata01", 8889);

        //TODO 3.transformation-数据转换处理
        //3.1对每一行数据进行分割并压扁
        DataStream<String> wordsDS = socketDS.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(word);
                }
            }
        });
        //3.2每个单词记为<单词,1>
        DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                return Tuple2.of(value, 1);
            }
        });
        //3.3分组
        KeyedStream<Tuple2<String, Integer>, String> keyedDS = wordAndOneDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
        });

        //3.4聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyedDS.sum(1);

        //TODO 4.sink-数据输出
        result.print();

        //TODO 5.execute-执行
        env.execute();
    }
}

JDBC

 Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/flink", "root", "root");
                PreparedStatement preparedStatement = connection.prepareStatement("select monitor_id,speed_limit from t_monitor_info group by monitor_id, speed_limit");
                ResultSet resultSet = preparedStatement.executeQuery();
                ArrayList<Tuple2<String,Double>> arr = new ArrayList<>();
                while (resultSet.next()){
                    String monitor_id = resultSet.getString("monitor_id");
                    Double speed_limit = resultSet.getDouble("speed_limit");
                    Tuple2 tuple2 = new Tuple2(monitor_id, speed_limit);
                    arr.add(tuple2);
                }

Kafka

添加依赖

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>${flink.version}</version>
</dependency>
package com.bigdata.day02;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class KafkaSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "bigdata01:9092");
        properties.setProperty("group.id", "g1");
        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("topic1",new SimpleStringSchema(),properties);
        DataStreamSource<String> dataStreamSource = env.addSource(kafkaSource);
        // 以下代码跟flink消费kakfa数据没关系,仅仅是将需求搞的复杂一点而已
        // 返回true 的数据就保留下来,返回false 直接丢弃
        dataStreamSource.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String word) throws Exception {
                // 查看单词中是否包含success 字样
                return word.contains("success");
            }
        }).print();

        env.execute();
    }
}

自定义source

SourceFunction:非并行数据源(并行度只能=1) --接口

RichSourceFunction:多功能非并行数据源(并行度只能=1) --类

ParallelSourceFunction:并行数据源(并行度能够>=1) --接口

RichParallelSourceFunction:多功能并行数据源(并行度能够>=1) --类 【建议使用的】

package com.bigdata.day02;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Random;
import java.util.UUID;

/**
 * 需求: 每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)
 * 要求:
 * - 随机生成订单ID(UUID)
 * - 随机生成用户ID(0-2)
 * - 随机生成订单金额(0-100)
 * - 时间戳为当前系统时间
 */

@Data  // set get toString
@AllArgsConstructor
@NoArgsConstructor
class OrderInfo{
    private String orderId;
    private int uid;
    private int money;
    private long timeStamp;
}
// class MySource extends RichSourceFunction<OrderInfo> {
//class MySource extends RichParallelSourceFunction<OrderInfo> {
class MySource implements SourceFunction<OrderInfo> {
    boolean flag = true;

    @Override
    public void run(SourceContext ctx) throws Exception {
        // 源源不断的产生数据
        Random random = new Random();
        while(flag){
            OrderInfo orderInfo = new OrderInfo();
            orderInfo.setOrderId(UUID.randomUUID().toString());
            orderInfo.setUid(random.nextInt(3));
            orderInfo.setMoney(random.nextInt(101));
            orderInfo.setTimeStamp(System.currentTimeMillis());
            ctx.collect(orderInfo);
            Thread.sleep(1000);// 间隔1s
        }
    }

    // source 停止之前需要干点啥
    @Override
    public void cancel() {
        flag = false;
    }
}
public class CustomSource {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(2);
        // 将自定义的数据源放入到env中
        DataStreamSource dataStreamSource = env.addSource(new MySource())/*.setParallelism(1)*/;
        System.out.println(dataStreamSource.getParallelism());
        dataStreamSource.print();
        env.execute();
    }

}

如果代码换成ParallelSourceFunction,每次生成12个数据,假如是12核数的话(有多少核就生成多少个数据)。

Rich 类型的Source可以比非Rich的多出有:
- open方法,实例化的时候会执行一次,多个并行度会执行多次的哦(因为是多个实例了)
- close方法,销毁实例的时候会执行一次,多个并行度会执行多次的哦
- getRuntimeContext 方法可以获得当前的Runtime对象(底层API)

rich模板
/**
 * 自定义一个RichParallelSourceFunction的实现
 */
public class CustomerRichSourceWithParallelDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> mySource = env.addSource(new MySource()).setParallelism(6);
        mySource.print();

        env.execute();
    }

    /*
    Rich 类型的Source可以比非Rich的多出有:
    - open方法,实例化的时候会执行一次,多个并行度会执行多次的哦(因为是多个实例了)
    - close方法,销毁实例的时候会执行一次,多个并行度会执行多次的哦
    - getRuntime方法可以获得当前的Runtime对象(底层API)
     */
    public static class MySource extends RichParallelSourceFunction<String> {
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            System.out.println("open......");
        }

        @Override
        public void close() throws Exception {
            super.close();
            System.out.println("close......");
        }

        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            ctx.collect(UUID.randomUUID().toString());
        }

        @Override
        public void cancel() {}
    }
}

Transformation(转换因子):

map算子(一变多)

package com.bigdata.day02;

import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @基本功能:
 * @program:FlinkDemo
 * @author: 闫哥
 * @create:2024-05-13 11:40:37
 **/
@Data
@AllArgsConstructor
class LogBean{
    private String ip;      // 访问ip
    private int userId;     // 用户id
    private long timestamp; // 访问时间戳
    private String method;  // 访问方法
    private String path;    // 访问路径
}
public class Demo04 {

    // 将数据转换为javaBean
    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        DataStreamSource<String> streamSource = env.readTextFile("datas/a.log");

        //3. transformation-数据处理转换
        SingleOutputStreamOperator<LogBean> map = streamSource.map(new MapFunction<String, LogBean>() {
            @Override
            public LogBean map(String line) throws Exception {
                String[] arr = line.split("\\s+");

                //时间戳转换  17/05/2015:10:06:53
                String time = arr[2];
                SimpleDateFormat format = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");
                Date date = format.parse(time);
                long timeStamp = date.getTime();
                return new LogBean(arr[0],Integer.parseInt(arr[1]),timeStamp,arr[3],arr[4]);
            }
        });

        //4. sink-数据输出
        map.print();

        //5. execute-执行
        env.execute();
    }
}

FlatMap算子(类似于炸裂函数)

数据

张三,苹果手机,联想电脑,华为平板
李四,华为手机,苹果电脑,小米平板

package com.bigdata.day03;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @基本功能:
 * @program:FlinkDemo
 * @author: 闫哥
 * @create:2023-11-21 09:51:59
 **/
public class FlatMapDemo {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        //2. source-加载数据
        DataStream<String> fileStream = env.readTextFile("F:\\BD230801\\FlinkDemo\\datas\\flatmap.log");
        //3. transformation-数据处理转换
        DataStream<String> flatMapStream = fileStream.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String line, Collector<String> collector) throws Exception {
                //张三,苹果手机,联想电脑,华为平板
                String[] arr = line.split(",");
                String name = arr[0];
                for (int i = 1; i < arr.length; i++) {
                    String goods = arr[i];
                    collector.collect(name+"有"+goods);
                }
            }
        });
        //4. sink-数据输出
        flatMapStream.print();

        //5. execute-执行
        env.execute();
    }
}

Filter(过滤)

package com.bigdata.day03;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Date;

/**
 * @基本功能:
 * @program:FlinkDemo
 * @author: zxx
 * @create:2023-11-21 09:10:30
 **/

public class FilterDemo {

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    static class LogBean{
        String ip;      // 访问ip
        int userId;     // 用户id
        long timestamp; // 访问时间戳
        String method;  // 访问方法
        String path;    // 访问路径
    }

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        DataStream<String> fileStream = env.readTextFile("F:\\BD230801\\FlinkDemo\\datas\\a.log");

        //3. transformation-数据处理转换
        // 读取第一题中 a.log文件中的访问日志数据,过滤出来以下访问IP是83.149.9.216的访问日志
        DataStream<String> filterStream = fileStream.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String line) throws Exception {
                String ip = line.split(" ")[0];
                return  ip.equals("83.149.9.216");
            }
        });
        //4. sink-数据输出
        filterStream.print();

        //5. execute-执行
        env.execute();
    }
}

KeyBy(分组)

元组

//用字段位置
wordAndOne.keyBy(0, 1);

//用KeySelector
wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
    @Override
    public Tuple2<String, Integer> getKey(Tuple2<String, Integer> value) throws Exception {
        return Tuple2.of(value.f0, value.f1);
    }
});

POJO (Plain Old Java Object):普通Java对象

public class PeopleCount {
    private String province;
    private String city;
    private Integer counts;
    public PeopleCount() {
    }
    //省略其他代码。。。
}

多个字段keyBy

source.keyBy(new KeySelector<PeopleCount, Tuple2<String, String>>() {
    @Override
    public Tuple2<String, String> getKey(PeopleCount value) throws Exception {
        return Tuple2.of(value.getProvince(), value.getCity());
    }
});

Reduce --sum的底层是reduce(聚合)

 //  [ ("10.0.0.1",1),("10.0.0.1",1),("10.0.0.1",1) ]
        keyByStream.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
                // t1 => ("10.0.0.1",10)
                // t2 => ("10.0.0.1",1)
                return Tuple2.of(t1.f0,  t1.f1 + t2.f1);
            }
        }).print();

union和connect-合并和连接

Union

union可以合并多个同类型的流

将多个DataStream 合并成一个DataStream

connect

connect可以连接2个不同类型的流(最后需要处理后再输出)

DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被 Connect 之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化【一国两制】,两个流相互独立, 作为对比Union后是真的变成一个流了。

*和union类似,但是connect只能连接两个流,两个流之间的数据类型可以同,对两个流的数据可以分别***应用不同的处理逻辑. **

package com.bigdata.day03;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

/**
 * @基本功能:
 * @program:FlinkDemo
 * @author: zxx
 * @create:2023-11-21 11:40:12
 **/
public class UnionConnectDemo {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        DataStreamSource<String> stream1 = env.fromElements("hello", "nihao", "吃甘蔗的人");
        DataStreamSource<String> stream2 = env.fromElements("hello", "kong ni qi wa", "看电子书的人");
        DataStream<String> unionStream = stream1.union(stream2);
        unionStream.print();

        DataStream<Long> stream3 = env.fromSequence(1, 10);
        // stream1.union(stream3); 报错
        //3. transformation-数据处理转换
        ConnectedStreams<String, Long> connectStream = stream1.connect(stream3);
        // 此时你想使用这个流,需要各自重新处理
        // 处理完之后的数据类型必须相同
        DataStream<String> mapStream = connectStream.map(new CoMapFunction<String, Long, String>() {
            // string 类型的数据
            @Override
            public String map1(String value) throws Exception {
                return value;
            }

            // 这个处理long 类型的数据

            @Override
            public String map2(Long value) throws Exception {
                return Long.toString(value);
            }
        });

        //4. sink-数据输出
        mapStream.print();

        //5. execute-执行
        env.execute();
    }
}
package com.bigdata.transforma;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;

/**
 * @基本功能:
 * @program:FlinkDemo
 * @author: zxx
 * @create:2024-11-22 10:50:13
 **/
public class _08_两个流join {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        DataStreamSource<String> ds1 = env.fromElements("bigdata", "spark", "flink");
        DataStreamSource<String> ds2 = env.fromElements("python", "scala", "java");
        DataStream<String> ds3 = ds1.union(ds2);
        ds3.print();

        // 接着演示 connect
        DataStreamSource<Long> ds4 = env.fromSequence(1, 10);
        ConnectedStreams<String, Long> ds5 = ds1.connect(ds4);

        ds5.process(new CoProcessFunction<String, Long, String>() {
            @Override
            public void processElement1(String value, CoProcessFunction<String, Long, String>.Context ctx, Collector<String> out) throws Exception {
                System.out.println("String流:"+value);
                out.collect(value);
            }

            @Override
            public void processElement2(Long value, CoProcessFunction<String, Long, String>.Context ctx, Collector<String> out) throws Exception {
                System.out.println("Long流:"+value);
                out.collect(String.valueOf(value));
            }
        }).print("合并后的打印:");

        //2. source-加载数据
        //3. transformation-数据处理转换
        //4. sink-数据输出

        //5. execute-执行
        env.execute();
    }
}

Side Outputs侧道输出(侧输出流) --可以分流

举例说明:对流中的数据按照奇数和偶数进行分流,并获取分流后的数据

package com.bigdata.day02;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

/**
 * @基本功能:
 * @program:FlinkDemo
 * @author: zxx
 * @create:2024-05-13 16:19:56
 **/
public class Demo11 {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        // 侧道输出流
        DataStreamSource<Long> streamSource = env.fromSequence(0, 100);
        // 定义两个标签
        OutputTag<Long> tag_even = new OutputTag<Long>("偶数", TypeInformation.of(Long.class));
        OutputTag<Long> tag_odd = new OutputTag<Long>("奇数", TypeInformation.of(Long.class));
        //2. source-加载数据
        SingleOutputStreamOperator<Long> process = streamSource.process(new ProcessFunction<Long, Long>() {
            @Override
            public void processElement(Long value, ProcessFunction<Long, Long>.Context ctx, Collector<Long> out) throws Exception {
                // value 代表每一个数据
                if (value % 2 == 0) {
                    ctx.output(tag_even, value);
                } else {
                    ctx.output(tag_odd, value);
                }
            }
        });
        // 从数据集中获取奇数的所有数据
        DataStream<Long> sideOutput = process.getSideOutput(tag_odd);
        sideOutput.print("奇数:");
        // 获取所有偶数数据
        DataStream<Long> sideOutput2 = process.getSideOutput(tag_even);
        sideOutput2.print("偶数:");
        //3. transformation-数据处理转换
        //4. sink-数据输出

        //5. execute-执行
        env.execute();
    }
}

sink:

print 打印

writerAsText 以文本格式输出

dataStreamSource.writeAsText("F:\\BD230801\\FlinkDemo\\datas\\result", FileSystem.WriteMode.OVERWRITE);

writerAsText 以文本格式输出

 DataStreamSource<Tuple2<String, Integer>> streamSource = env.fromElements(
                Tuple2.of("篮球", 1),
                Tuple2.of("篮球", 2),
                Tuple2.of("篮球", 3),
                Tuple2.of("足球", 3),
                Tuple2.of("足球", 2),
                Tuple2.of("足球", 3)
        );
        // writeAsCsv 只能保存 tuple类型的DataStream流,因为如果不是多列的话,没必要使用什么分隔符
        streamSource.writeAsCsv("datas/csv", FileSystem.WriteMode.OVERWRITE).setParallelism(1);

输出到MySQL

 JdbcConnectionOptions jdbcConnectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName("com.mysql.cj.jdbc.Driver")
                .withUrl("jdbc:mysql://localhost:3306/zuoye")
                .withUsername("root").withPassword("123456").build();

        studentDataStreamSource.addSink(JdbcSink.sink(
                "insert into stu values(?,?,?)",
                new JdbcStatementBuilder<Student>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, Student student) throws SQLException {
                        preparedStatement.setInt(1,student.getId());
                        preparedStatement.setString(2,student.getName());
                        preparedStatement.setInt(3,student.getAge());
                    }
                },jdbcConnectionOptions
        ));

输出到kafka

FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer<String>("topic2",new SimpleStringSchema(),properties);
        filterStream.addSink(kafkaProducer);

自定义Sink--模拟jdbcSink的实现

package com.bigdata.day03;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

/**
 * @基本功能:
 * @program:FlinkDemo
 * @author: zxx
 * @create:2023-11-21 16:08:04
 **/

public class CustomJdbcSinkDemo {

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    static class Student{
        private int id;
        private String name;
        private int age;
    }

    static class MyJdbcSink  extends RichSinkFunction<Student> {

        Connection conn =null;
        PreparedStatement ps = null;
        @Override
        public void open(Configuration parameters) throws Exception {
            // 这个里面编写连接数据库的代码
            Class.forName("com.mysql.jdbc.Driver");
            conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test1", "root", "123456");
            ps = conn.prepareStatement("INSERT INTO `student` (`id`, `name`, `age`) VALUES (null, ?, ?)");
        }

        @Override
        public void close() throws Exception {
            // 关闭数据库的代码
            ps.close();
            conn.close();
        }

        @Override
        public void invoke(Student student, Context context) throws Exception {
            // 将数据插入到数据库中
            ps.setString(1,student.getName());
            ps.setInt(2,student.getAge());
            ps.execute();

        }
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Student> studentStream = env.fromElements(new Student(1, "马斯克", 51));

        studentStream.addSink(new MyJdbcSink());

        env.execute();
    }
}
标签: flink 大数据

本文转载自: https://blog.csdn.net/m0_58419490/article/details/144035290
版权归原作者 floret* 所有, 如有侵权,请联系我们删除。

“Flink中普通API的使用”的评论:

还没有评论