0


flinksql kafka到mysql累计指标练习

flinksql 累计指标练习

数据流向:kafka ->kafka ->mysql

模拟写数据到kafka topic:wxt中

importcom.alibaba.fastjson.JSONObject;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerRecord;importjava.util.Properties;publicclassKafkaProducerExample{publicstaticvoidmain(String[] args)throwsException{// 设置kafka服务器地址和端口号String kafkaServers ="localhost:9092";// 设置producer属性Properties properties =newProperties();
        properties.put("bootstrap.servers", kafkaServers);
        properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");// 创建Kafka producer对象KafkaProducer<String,String> producer =newKafkaProducer<>(properties);// 发送消息String topic ="wxt";JSONObject jsonObject =newJSONObject();
        jsonObject.put("id",9);
        jsonObject.put("name","王大大");
        jsonObject.put("age",11);// 将JSON对象转换成字符串String jsonString = jsonObject.toString();// 输出JSON字符串System.out.println("JSON String: "+ jsonString);ProducerRecord<String,String> record =newProducerRecord<>(topic, jsonString);
        producer.send(record);// 关闭producer
        producer.close();}}

kafka topic :wxt1
在这里插入图片描述

importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.EnvironmentSettings;importorg.apache.flink.table.api.TableEnvironment;publicclassKafkaToMysqlJob{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings =EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();TableEnvironment tEnv =TableEnvironment.create(settings);// 定义Kafka连接属性String kafkaBootstrapServers ="localhost:9092";String kafkaTopic ="wxt";String groupId ="wxt1";// 注册Kafka表
        tEnv.executeSql("CREATE TABLE kafka_table (\n"+"  id INT,\n"+"  name STRING,\n"+"  age INT,\n"+"  proctime as PROCTIME()\n"+") WITH (\n"+"  'connector' = 'kafka',\n"+"  'topic' = '"+ kafkaTopic +"',\n"+"  'properties.bootstrap.servers' = '"+ kafkaBootstrapServers +"',\n"+"  'properties.group.id' = '"+ groupId +"',\n"+"  'format' = 'json',\n"+"  'scan.startup.mode' = 'earliest-offset'\n"+")");// 注册Kafka表// latest-offset//earliest-offset
        tEnv.executeSql("CREATE TABLE kafka_table2 (\n"+"  window_start STRING,\n"+"  window_end STRING,\n"+"  name STRING,\n"+"  age INT\n"+") WITH (\n"+"  'connector' = 'kafka',\n"+"  'topic' = 'wxt2',\n"+"  'properties.bootstrap.servers' = '"+ kafkaBootstrapServers +"',\n"+"  'properties.group.id' = 'kafka_table2',\n"+"  'format' = 'json',\n"+"  'scan.startup.mode' = 'latest-offset',\n"+"  'value.format' = 'csv'\n"+")");

        tEnv.executeSql("CREATE TABLE mysql_sink_table (\n"+"  window_start String,\n"+"   window_end String,\n"+"    name String,\n"+"    age INT\n"+") WITH (\n"+"   'connector' = 'jdbc',\n"+"   'url' = 'jdbc:mysql://localhost:3306/tests?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8',\n"+"   'username' = 'root',\n"+"   'password' = '12345678',\n"+"   'table-name' = 'leiji_age'\n"+")");

         tEnv.executeSql("insert into kafka_table2 select cast(window_start as string) as window_start,cast(window_end as string) as window_end,name,sum(age) as age\n"+"from TABLE( CUMULATE( TABLE kafka_table, DESCRIPTOR(proctime), INTERVAL '20' SECOND, INTERVAL '1' DAY))\n"+"group by  window_start,window_end,name");

        tEnv.executeSql("insert into mysql_sink_table select window_start,window_end,name,age from kafka_table2");

        env.execute("KafkaToMysqlJob");}}

kafka topic :wxt2
在这里插入图片描述
mysql结果数据:
在这里插入图片描述
pom文件

<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>flinksql</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><scala.binary.version>2.12</scala.binary.version><flink.version>1.14.3</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.22</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.73</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.15</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.15</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-math3</artifactId><version>3.5</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><version>8.0.31</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><!-- https://mvnrepository.com/artifact/commons-io/commons-io --><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.11.0</version></dependency><dependency><groupId>org.antlr</groupId><artifactId>antlr-runtime</artifactId><version>3.5.2</version></dependency><dependency><groupId>org.apache.thrift</groupId><artifactId>libfb303</artifactId><version>0.9.3</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.15</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.7</version><scope>runtime</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version><scope>runtime</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><configuration><!-- put your configurations here --></configuration><executions><execution><phase>package</phase><goals><goal>shade</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-dependency-plugin</artifactId><version>2.10</version><executions><execution><id>copy-dependencies</id><phase>package</phase><goals><goal>copy-dependencies</goal></goals><configuration><outputDirectory>${project.build.directory}/lib</outputDirectory></configuration></execution></executions></plugin><plugin><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><version>2.15.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin></plugins></build></project>
标签: flink sql

本文转载自: https://blog.csdn.net/weixin_47699191/article/details/134059249
版权归原作者 小涛手记 所有, 如有侵权,请联系我们删除。

“flinksql kafka到mysql累计指标练习”的评论:

还没有评论