0


Flink系列之:基于scala语言实现flink实时消费Kafka Topic中的数据

Flink系列之:基于scala语言实现flink实时消费Kafka Topic中的数据

一、引入flink相关依赖

<groupId>com.bigdata</groupId><artifactId>flink</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><flink.version>1.13.1</flink.version><scala.binary.version>2.12</scala.binary.version></properties><dependencies><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency></dependencies>

二、properties保存连接kafka的配置

//用properties保存kafka连接的相关配置val properties =new Properties()
    properties.setProperty("bootstrap.servers","10.129.44.26:9092,10.129.44.32:9092,10.129.44.39:9092")
    properties.setProperty("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\"debezium\" password=\"swlfalfal\";")
    properties.setProperty("security.protocol","SASL_PLAINTEXT")
    properties.setProperty("sasl.mechanism","PLAIN")
    properties.setProperty("group.id","flink-test")
    properties.setProperty("auto.offset.reset","earliest")

三、构建flink实时消费环境

val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setRestartStrategy(RestartStrategies.noRestart())

四、添加Kafka源和处理数据

val lines: DataStream[String]= env.addSource(new FlinkKafkaConsumer[String]("debezium-test-optics_uds",new SimpleStringSchema(),properties))
    lines.print()//触发执行
    env.execute()

五、完整代码

importorg.apache.flink.api.common.restartstrategy.RestartStrategies
importorg.apache.flink.api.common.serialization.SimpleStringSchema
importorg.apache.flink.streaming.api.scala._
importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

importjava.util.Properties

object SourceKafka {def main(args: Array[String]):Unit={val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setRestartStrategy(RestartStrategies.noRestart())//用properties保存kafka连接的相关配置val properties =new Properties()
    properties.setProperty("bootstrap.servers","10.129.44.26:9092,10.129.44.32:9092,10.129.44.39:9092")
    properties.setProperty("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\"debezium\" password=\"******\";")
    properties.setProperty("security.protocol","SASL_PLAINTEXT")
    properties.setProperty("sasl.mechanism","PLAIN")
    properties.setProperty("group.id","flink-test")
    properties.setProperty("auto.offset.reset","earliest")//添加kafka源,并打印数据val lines: DataStream[String]= env.addSource(new FlinkKafkaConsumer[String]("debezium-test-optics_uds",new SimpleStringSchema(),properties))
    lines.print()//触发执行
    env.execute()}}

六、执行程序查看消费到的数据

在这里插入图片描述

{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"sid"},{"type":"string","optional":false,"field":"sname"},{"type":"int64","optional":false,"name":"io.debezium.time.Timestamp","version":1,"field":"updatetime"},{"type":"string","optional":false,"field":"ssex"}],"optional":true,"name":"debezium_test_optics_uds.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"sid"},{"type":"string","optional":false,"field":"sname"},{"type":"int64","optional":false,"name":"io.debezium.time.Timestamp","version":1,"field":"updatetime"},{"type":"string","optional":false,"field":"ssex"}],"optional":true,"name":"debezium_test_optics_uds.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"debezium_test_optics_uds.Envelope"},"payload":{"before":null,"after":{"sid":3600,"sname":"f","updatetime":1661126400000,"ssex":"a"},"source":{"version":"1.9.6.Final","connector":"mysql","name":"debezium-uds8-optics8-test_1h","ts_ms":1665155935000,"snapshot":"false","db":"dw","sequence":null,"table":"student","server_id":223344,"gtid":null,"file":"mysql-bin.000012","pos":6193972,"row":0,"thread":66072,"query":"/* ApplicationName=DBeaver 21.0.1 - SQLEditor <Script-3.sql> */ insert into dw.student values(3600,'f','20220822','a')"},"op":"c","ts_ms":1665155935640,"transaction":null}}

本文转载自: https://blog.csdn.net/zhengzaifeidelushang/article/details/127201099
版权归原作者 最笨的羊羊 所有, 如有侵权,请联系我们删除。

“Flink系列之:基于scala语言实现flink实时消费Kafka Topic中的数据”的评论:

还没有评论