flinksql 累计指标练习
数据流向:kafka ->kafka ->mysql
模拟写数据到kafka topic:wxt中
importcom.alibaba.fastjson.JSONObject;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerRecord;importjava.util.Properties;publicclassKafkaProducerExample{publicstaticvoidmain(String[] args)throwsException{// 设置kafka服务器地址和端口号String kafkaServers ="localhost:9092";// 设置producer属性Properties properties =newProperties();
properties.put("bootstrap.servers", kafkaServers);
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");// 创建Kafka producer对象KafkaProducer<String,String> producer =newKafkaProducer<>(properties);// 发送消息String topic ="wxt";JSONObject jsonObject =newJSONObject();
jsonObject.put("id",9);
jsonObject.put("name","王大大");
jsonObject.put("age",11);// 将JSON对象转换成字符串String jsonString = jsonObject.toString();// 输出JSON字符串System.out.println("JSON String: "+ jsonString);ProducerRecord<String,String> record =newProducerRecord<>(topic, jsonString);
producer.send(record);// 关闭producer
producer.close();}}
kafka topic :wxt1
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.EnvironmentSettings;importorg.apache.flink.table.api.TableEnvironment;publicclassKafkaToMysqlJob{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings =EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();TableEnvironment tEnv =TableEnvironment.create(settings);// 定义Kafka连接属性String kafkaBootstrapServers ="localhost:9092";String kafkaTopic ="wxt";String groupId ="wxt1";// 注册Kafka表
tEnv.executeSql("CREATE TABLE kafka_table (\n"+" id INT,\n"+" name STRING,\n"+" age INT,\n"+" proctime as PROCTIME()\n"+") WITH (\n"+" 'connector' = 'kafka',\n"+" 'topic' = '"+ kafkaTopic +"',\n"+" 'properties.bootstrap.servers' = '"+ kafkaBootstrapServers +"',\n"+" 'properties.group.id' = '"+ groupId +"',\n"+" 'format' = 'json',\n"+" 'scan.startup.mode' = 'earliest-offset'\n"+")");// 注册Kafka表// latest-offset//earliest-offset
tEnv.executeSql("CREATE TABLE kafka_table2 (\n"+" window_start STRING,\n"+" window_end STRING,\n"+" name STRING,\n"+" age INT\n"+") WITH (\n"+" 'connector' = 'kafka',\n"+" 'topic' = 'wxt2',\n"+" 'properties.bootstrap.servers' = '"+ kafkaBootstrapServers +"',\n"+" 'properties.group.id' = 'kafka_table2',\n"+" 'format' = 'json',\n"+" 'scan.startup.mode' = 'latest-offset',\n"+" 'value.format' = 'csv'\n"+")");
tEnv.executeSql("CREATE TABLE mysql_sink_table (\n"+" window_start String,\n"+" window_end String,\n"+" name String,\n"+" age INT\n"+") WITH (\n"+" 'connector' = 'jdbc',\n"+" 'url' = 'jdbc:mysql://localhost:3306/tests?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8',\n"+" 'username' = 'root',\n"+" 'password' = '12345678',\n"+" 'table-name' = 'leiji_age'\n"+")");
tEnv.executeSql("insert into kafka_table2 select cast(window_start as string) as window_start,cast(window_end as string) as window_end,name,sum(age) as age\n"+"from TABLE( CUMULATE( TABLE kafka_table, DESCRIPTOR(proctime), INTERVAL '20' SECOND, INTERVAL '1' DAY))\n"+"group by window_start,window_end,name");
tEnv.executeSql("insert into mysql_sink_table select window_start,window_end,name,age from kafka_table2");
env.execute("KafkaToMysqlJob");}}
kafka topic :wxt2
mysql结果数据:
pom文件
<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>flinksql</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><scala.binary.version>2.12</scala.binary.version><flink.version>1.14.3</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.22</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.73</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.15</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.15</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-math3</artifactId><version>3.5</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><version>8.0.31</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><!-- https://mvnrepository.com/artifact/commons-io/commons-io --><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.11.0</version></dependency><dependency><groupId>org.antlr</groupId><artifactId>antlr-runtime</artifactId><version>3.5.2</version></dependency><dependency><groupId>org.apache.thrift</groupId><artifactId>libfb303</artifactId><version>0.9.3</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.15</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.7</version><scope>runtime</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version><scope>runtime</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><configuration><!-- put your configurations here --></configuration><executions><execution><phase>package</phase><goals><goal>shade</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-dependency-plugin</artifactId><version>2.10</version><executions><execution><id>copy-dependencies</id><phase>package</phase><goals><goal>copy-dependencies</goal></goals><configuration><outputDirectory>${project.build.directory}/lib</outputDirectory></configuration></execution></executions></plugin><plugin><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><version>2.15.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin></plugins></build></project>
本文转载自: https://blog.csdn.net/weixin_47699191/article/details/134059249
版权归原作者 小涛手记 所有, 如有侵权,请联系我们删除。
版权归原作者 小涛手记 所有, 如有侵权,请联系我们删除。