1、场景
kafka日志数据从kafka读取
1、关联字典表:完善日志数据
2、判断日志内容级别:多路输出
低级:入clickhouse
高级:入clickhouse的同时推送到kafka供2次数据流程处理。
2、实现
package com.ws.kafka2clickhouse;
import cn.hutool.json.JSONUtil;
import com.ws.kafka2clickhouse.bean.CompanyInfo;
import com.ws.kafka2clickhouse.bean.LogEvent;
import com.ws.kafka2clickhouse.sink.MyClickHouseSink;
import org.apache.avro.data.Json;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Properties;
public class Kafka2ClickHouse {
public static void main(String[] args) throws Exception {
System.setProperty("java.net.preferIPv4Stack", "true");
System.setProperty("HADOOP_USER_NAME", "hdfs");
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(8);
// env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
// env.getCheckpointConfig().setCheckpointStorage("file:///D:/out_test/ck");
// env.getCheckpointConfig().setCheckpointStorage("hdfs://hdp01:8020/tmp/kafka2hdfs/");
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// 1、读取主流日志数据
KafkaSource<String> build = KafkaSource.<String>builder()
.setTopics("dataSource")
.setGroupId("group1")
.setBootstrapServers("hdp01:6667")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStreamSource<String> kafka = env.fromSource(build, WatermarkStrategy.noWatermarks(), "kafka");
// 2、主流数据json转换成POJO对象
SingleOutputStreamOperator<LogEvent> beans = kafka.map((MapFunction<String, LogEvent>) s -> JSONUtil.toBean(s, LogEvent.class));
// 3、加载字典表cdc流
tenv.executeSql(
"CREATE TABLE dmpt_base_oper_log (\n" +
"id bigInt primary key," +
"title String" +
") WITH (\n" +
"'connector' = 'mysql-cdc',\n" +
"'hostname' = 'localhost',\n" +
"'port' = '3306',\n" +
"'username' = 'root',\n" +
"'password' = 'root',\n" +
"'database-name' = 'test',\n" +
"'table-name' = 'test_recursive'\n" +
")"
);
Table result = tenv.sqlQuery("select * from dmpt_base_oper_log");
DataStream<Row> dict = tenv.toChangelogStream(result);
dict.print();
// 4、加工字典数据,并组装上 字典表更新类型
SingleOutputStreamOperator<CompanyInfo> companyDict = dict.map(new RichMapFunction<Row, CompanyInfo>() {
@Override
public CompanyInfo map(Row row) throws Exception {
Long id = (Long) row.getField("id");
String title = (String) row.getField("title");
// 携带上cdc数据的数据类型,《新增,删除,修改》
RowKind kind = row.getKind();
return new CompanyInfo(id, title, kind);
}
});
// 5、对字典数据进行广播
MapStateDescriptor<Long, CompanyInfo> company_info_desc = new MapStateDescriptor<>("company_info_dict", Long.class, CompanyInfo.class);
BroadcastStream<CompanyInfo> broadcastStream = companyDict.broadcast(company_info_desc);
// 6、创建测流
OutputTag<String> tokafka = new OutputTag<String>("tokafka") {
};
SingleOutputStreamOperator<LogEvent> beans_company = beans.connect(broadcastStream).process(new BroadcastProcessFunction<LogEvent, CompanyInfo, LogEvent>() {
@Override
public void processElement(LogEvent logEvent, ReadOnlyContext readOnlyContext, Collector<LogEvent> collector) throws Exception {
// 新来一条数据流,处理方法
ReadOnlyBroadcastState<Long, CompanyInfo> broadcastState = readOnlyContext.getBroadcastState(company_info_desc);
CompanyInfo companyInfo = broadcastState.get(logEvent.getMessageId());
// 7、如果有单位信息,代表为高级用户数据,将消息同时吐到kafka,因此再输出到主流的同时往测流中也输出一份
if (companyInfo != null) {
logEvent.setCompanyInfo(companyInfo);
readOnlyContext.output(tokafka, JSONUtil.toJsonStr(logEvent));
}
collector.collect(logEvent);
}
@Override
public void processBroadcastElement(CompanyInfo companyInfo, Context context, Collector<LogEvent> collector) throws Exception {
// 新来一条广播流,处理方法
BroadcastState<Long, CompanyInfo> broadcastState = context.getBroadcastState(company_info_desc);
// 新增
if (companyInfo.getRowKind().name().equalsIgnoreCase(RowKind.INSERT.name())) {
broadcastState.put(companyInfo.getId(), companyInfo);
} else if (companyInfo.getRowKind().name().equalsIgnoreCase(RowKind.DELETE.name())) {
// 删除
broadcastState.remove(companyInfo.getId());
} else {
// 修改
broadcastState.remove(companyInfo.getId());
broadcastState.put(companyInfo.getId(), companyInfo);
}
}
});
//准备向ClickHouse中插入数据的sql
String insetIntoCkSql = "insert into default.dns_logs values(?,?,?,?,?,?,?,?,?,?,?,?,?)";
//设置ClickHouse Sink
SinkFunction<LogEvent> sink = JdbcSink.sink(
//插入数据SQL
insetIntoCkSql,
//设置插入ClickHouse数据的参数
new JdbcStatementBuilder<LogEvent>() {
@Override
public void accept(PreparedStatement preparedStatement, LogEvent logEvent) throws SQLException {
try {
preparedStatement.setString(1, logEvent.getMessageType());
preparedStatement.setLong(2, logEvent.getMessageId());
preparedStatement.setString(3, logEvent.getDeviceId());
preparedStatement.setString(4, logEvent.getCol1());
preparedStatement.setString(5, logEvent.getCol2());
preparedStatement.setString(6, logEvent.getCol3());
preparedStatement.setString(7, logEvent.getCol4());
preparedStatement.setString(8, logEvent.getHeaders().getDeviceTime());
preparedStatement.setLong(9, logEvent.getHeaders().get_uid());
preparedStatement.setString(10, logEvent.getHeaders().getProductId());
preparedStatement.setString(11, logEvent.getHeaders().getOrgId());
if (logEvent.getCompanyInfo() != null) {
preparedStatement.setString(12, logEvent.getCompanyInfo().getTitle());
} else {
preparedStatement.setString(12, null);
}
preparedStatement.setString(13, logEvent.getRegion());
} catch (SQLException e) {
e.printStackTrace();
}
}
},
//设置批次插入数据
new JdbcExecutionOptions.Builder()
// 批次大小,默认5000
.withBatchSize(10000)
// 批次间隔时间
.withBatchIntervalMs(5000).
withMaxRetries(3).build(),
//设置连接ClickHouse的配置
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
.withUrl("jdbc:clickhouse://192.16.10.118:1111")
.withUsername("default")
.withPassword("xxxx")
.build()
);
// 8、所有数据进入基础库
beans_company.addSink(sink);
beans_company.print("基础库clickhouse");
// 9、高级用户同时推送到分析kafka
DataStream<String> sideOutput = beans_company.getSideOutput(tokafka);
sideOutput.print("增强分析kafka");
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "hdp01:6667");
// 10、构建kafka sink
KafkaSerializationSchema<String> serializationSchema = new KafkaSerializationSchema<String>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
return new ProducerRecord<>(
"dataZengQiang", // target topic
element.getBytes(StandardCharsets.UTF_8)); // record contents
}
};
FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
"dataZengQiang", // target topic
serializationSchema, // serialization schema
properties, // producer config
FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
// 11、写入kafka
sideOutput.addSink(myProducer);
env.execute();
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>test</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>flink</artifactId>
<properties>
<flink.version>1.13.2</flink.version>
<scala.version>2.11</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flinkSql 需要的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.1</version>
</dependency>
<!-- clickhouse驱动 -->
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
<!-- flink-cdc-mysql 连接器-->
<dependency>
<groupId>com.ws</groupId>
<artifactId>mysql-cdc</artifactId>
<version>2.2.0</version>
<scope>system</scope>
<systemPath>${project.basedir}/lib/flink-connector-mysql-cdc-2.3-SNAPSHOT.jar</systemPath>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<!-- 把依赖打进jar包 -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
标签:
大数据
本文转载自: https://blog.csdn.net/java_creatMylief/article/details/128805670
版权归原作者 qzWsong 所有, 如有侵权,请联系我们删除。
版权归原作者 qzWsong 所有, 如有侵权,请联系我们删除。