0


flink-cdc,clickhouse写入,多路输出

1、场景

kafka日志数据从kafka读取

1、关联字典表:完善日志数据

2、判断日志内容级别:多路输出

低级:入clickhouse

高级:入clickhouse的同时推送到kafka供2次数据流程处理。

2、实现

package com.ws.kafka2clickhouse;

import cn.hutool.json.JSONUtil;
import com.ws.kafka2clickhouse.bean.CompanyInfo;
import com.ws.kafka2clickhouse.bean.LogEvent;
import com.ws.kafka2clickhouse.sink.MyClickHouseSink;
import org.apache.avro.data.Json;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Properties;

public class Kafka2ClickHouse {

    public static void main(String[] args) throws Exception {
        System.setProperty("java.net.preferIPv4Stack", "true");
        System.setProperty("HADOOP_USER_NAME", "hdfs");

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(8);
//        env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
//        env.getCheckpointConfig().setCheckpointStorage("file:///D:/out_test/ck");
//        env.getCheckpointConfig().setCheckpointStorage("hdfs://hdp01:8020/tmp/kafka2hdfs/");
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

        // 1、读取主流日志数据
        KafkaSource<String> build = KafkaSource.<String>builder()
                .setTopics("dataSource")
                .setGroupId("group1")
                .setBootstrapServers("hdp01:6667")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();
        DataStreamSource<String> kafka = env.fromSource(build, WatermarkStrategy.noWatermarks(), "kafka");
        // 2、主流数据json转换成POJO对象
        SingleOutputStreamOperator<LogEvent> beans = kafka.map((MapFunction<String, LogEvent>) s -> JSONUtil.toBean(s, LogEvent.class));
        // 3、加载字典表cdc流
        tenv.executeSql(
                "CREATE TABLE dmpt_base_oper_log (\n" +
                        "id bigInt primary key," +
                        "title String" +
                        ") WITH (\n" +
                        "'connector' = 'mysql-cdc',\n" +
                        "'hostname' = 'localhost',\n" +
                        "'port' = '3306',\n" +
                        "'username' = 'root',\n" +
                        "'password' = 'root',\n" +
                        "'database-name' = 'test',\n" +
                        "'table-name' = 'test_recursive'\n" +
                        ")"
        );
        Table result = tenv.sqlQuery("select * from dmpt_base_oper_log");
        DataStream<Row> dict = tenv.toChangelogStream(result);
        dict.print();
        // 4、加工字典数据,并组装上 字典表更新类型
        SingleOutputStreamOperator<CompanyInfo> companyDict = dict.map(new RichMapFunction<Row, CompanyInfo>() {
            @Override
            public CompanyInfo map(Row row) throws Exception {
                Long id = (Long) row.getField("id");
                String title = (String) row.getField("title");
                // 携带上cdc数据的数据类型,《新增,删除,修改》
                RowKind kind = row.getKind();
                return new CompanyInfo(id, title, kind);
            }
        });
        // 5、对字典数据进行广播
        MapStateDescriptor<Long, CompanyInfo> company_info_desc = new MapStateDescriptor<>("company_info_dict", Long.class, CompanyInfo.class);
        BroadcastStream<CompanyInfo> broadcastStream = companyDict.broadcast(company_info_desc);
        // 6、创建测流
        OutputTag<String> tokafka = new OutputTag<String>("tokafka") {
        };

        SingleOutputStreamOperator<LogEvent> beans_company = beans.connect(broadcastStream).process(new BroadcastProcessFunction<LogEvent, CompanyInfo, LogEvent>() {
            @Override
            public void processElement(LogEvent logEvent, ReadOnlyContext readOnlyContext, Collector<LogEvent> collector) throws Exception {
                // 新来一条数据流,处理方法
                ReadOnlyBroadcastState<Long, CompanyInfo> broadcastState = readOnlyContext.getBroadcastState(company_info_desc);
                CompanyInfo companyInfo = broadcastState.get(logEvent.getMessageId());
                // 7、如果有单位信息,代表为高级用户数据,将消息同时吐到kafka,因此再输出到主流的同时往测流中也输出一份
                if (companyInfo != null) {
                    logEvent.setCompanyInfo(companyInfo);
                    readOnlyContext.output(tokafka, JSONUtil.toJsonStr(logEvent));
                }
                collector.collect(logEvent);
            }

            @Override
            public void processBroadcastElement(CompanyInfo companyInfo, Context context, Collector<LogEvent> collector) throws Exception {
                // 新来一条广播流,处理方法
                BroadcastState<Long, CompanyInfo> broadcastState = context.getBroadcastState(company_info_desc);
                // 新增
                if (companyInfo.getRowKind().name().equalsIgnoreCase(RowKind.INSERT.name())) {
                    broadcastState.put(companyInfo.getId(), companyInfo);
                } else if (companyInfo.getRowKind().name().equalsIgnoreCase(RowKind.DELETE.name())) {
                    // 删除
                    broadcastState.remove(companyInfo.getId());
                } else {
                    // 修改
                    broadcastState.remove(companyInfo.getId());
                    broadcastState.put(companyInfo.getId(), companyInfo);
                }
            }
        });

        //准备向ClickHouse中插入数据的sql
        String insetIntoCkSql = "insert into default.dns_logs values(?,?,?,?,?,?,?,?,?,?,?,?,?)";
        //设置ClickHouse Sink
        SinkFunction<LogEvent> sink = JdbcSink.sink(
                //插入数据SQL
                insetIntoCkSql,
                //设置插入ClickHouse数据的参数
                new JdbcStatementBuilder<LogEvent>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, LogEvent logEvent) throws SQLException {
                        try {
                            preparedStatement.setString(1, logEvent.getMessageType());
                            preparedStatement.setLong(2, logEvent.getMessageId());
                            preparedStatement.setString(3, logEvent.getDeviceId());
                            preparedStatement.setString(4, logEvent.getCol1());
                            preparedStatement.setString(5, logEvent.getCol2());
                            preparedStatement.setString(6, logEvent.getCol3());
                            preparedStatement.setString(7, logEvent.getCol4());
                            preparedStatement.setString(8, logEvent.getHeaders().getDeviceTime());
                            preparedStatement.setLong(9, logEvent.getHeaders().get_uid());
                            preparedStatement.setString(10, logEvent.getHeaders().getProductId());
                            preparedStatement.setString(11, logEvent.getHeaders().getOrgId());
                            if (logEvent.getCompanyInfo() != null) {
                                preparedStatement.setString(12, logEvent.getCompanyInfo().getTitle());
                            } else {
                                preparedStatement.setString(12, null);
                            }
                            preparedStatement.setString(13, logEvent.getRegion());
                        } catch (SQLException e) {
                            e.printStackTrace();
                        }
                    }
                },
                //设置批次插入数据
                new JdbcExecutionOptions.Builder()
                        // 批次大小,默认5000
                        .withBatchSize(10000)
                        // 批次间隔时间
                        .withBatchIntervalMs(5000).
                        withMaxRetries(3).build(),
                //设置连接ClickHouse的配置
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
                        .withUrl("jdbc:clickhouse://192.16.10.118:1111")
                        .withUsername("default")
                        .withPassword("xxxx")
                        .build()
        );
        // 8、所有数据进入基础库
        beans_company.addSink(sink);
        beans_company.print("基础库clickhouse");
        // 9、高级用户同时推送到分析kafka
        DataStream<String> sideOutput = beans_company.getSideOutput(tokafka);
        sideOutput.print("增强分析kafka");
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hdp01:6667");
        // 10、构建kafka sink
        KafkaSerializationSchema<String> serializationSchema = new KafkaSerializationSchema<String>() {
            @Override
            public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
                return new ProducerRecord<>(
                        "dataZengQiang", // target topic
                        element.getBytes(StandardCharsets.UTF_8)); // record contents
            }
        };

        FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
                "dataZengQiang",             // target topic
                serializationSchema,    // serialization schema
                properties,             // producer config
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
        // 11、写入kafka
        sideOutput.addSink(myProducer);
        env.execute();
    }
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>test</artifactId>
        <groupId>org.example</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>flink</artifactId>
    <properties>
        <flink.version>1.13.2</flink.version>
        <scala.version>2.11</scala.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- flinkSql 需要的依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.8.1</version>
        </dependency>

        <!-- clickhouse驱动 -->
        <dependency>
            <groupId>ru.yandex.clickhouse</groupId>
            <artifactId>clickhouse-jdbc</artifactId>
            <version>0.2.4</version>
        </dependency>
        <!-- flink-cdc-mysql 连接器-->
        <dependency>
            <groupId>com.ws</groupId>
            <artifactId>mysql-cdc</artifactId>
            <version>2.2.0</version>
            <scope>system</scope>
            <systemPath>${project.basedir}/lib/flink-connector-mysql-cdc-2.3-SNAPSHOT.jar</systemPath>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <!-- 把依赖打进jar包 -->
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.3.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
标签: 大数据

本文转载自: https://blog.csdn.net/java_creatMylief/article/details/128805670
版权归原作者 qzWsong 所有, 如有侵权,请联系我们删除。

“flink-cdc,clickhouse写入,多路输出”的评论:

还没有评论