0


FlinkCDC部署

文章目录

Flink安装

1、解压

wget-b https://archive.apache.org/dist/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.12.tgz
tar-zxf flink-1.13.6-bin-scala_2.12.tgz
mv flink-1.13.6 /opt/module/flink

2、环境变量

vim /etc/profile.d/my_env.sh
exportHADOOP_CLASSPATH=`hadoop classpath`

3、分发环境变量

source ~/bin/source.sh

4、Per-Job-Cluster时报错:Exception in thread “Thread-5” java.lang.IllegalStateException:
Trying to access closed classloader.
Please check if you store classloaders directly or indirectly in static fields.
If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately,
you can disable this check with the configuration ‘classloader.check-leaked-classloader’.
对此,编辑配置文件

vim /opt/module/flink/conf/flink-conf.yaml

在配置文件添加下面这行,可解决上面报错

classloader.check-leaked-classloader: false

5、下载 flink-sql-connector-kafka 和 fastjson

1.2.83

的jar(去Maven官网找链接)

cd /opt/module/flink/lib
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/1.13.6/flink-sql-connector-kafka_2.12-1.13.6.jar
wget https://repo1.maven.org/maven2/com/alibaba/fastjson/1.2.83/fastjson-1.2.83.jar

job部署

1、测试代码

packageorg.example;importcom.alibaba.fastjson.JSONObject;importcom.ververica.cdc.connectors.mysql.source.MySqlSource;importcom.ververica.cdc.connectors.mysql.table.StartupOptions;importcom.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.api.common.serialization.SimpleStringSchema;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;importjava.util.Objects;publicclassTestCDC{publicstaticvoidmain(String[] args)throwsException{//TODO 1 创建流处理环境,设置并行度StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);//TODO 2 创建Flink-MySQL-CDC数据源MySqlSource<String> mySqlSource =MySqlSource.<String>builder().hostname("hadoop107").port(3306).username("root").password("密码").databaseList("db1")//设置要捕获的库.tableList("db1.t")//设置要捕获的表(库不能省略).deserializer(newJsonDebeziumDeserializationSchema())//将接收到的SourceRecord反序列化为JSON字符串.startupOptions(StartupOptions.initial())//启动策略:监视的数据库表执行初始快照,并继续读取最新的binlog.build();//TODO 3 读取数据并打印
        env.fromSource(mySqlSource,WatermarkStrategy.noWatermarks(),"sourceName").map(JSONObject::parseObject).map(Objects::toString).addSink(newFlinkKafkaProducer<>("hadoop105:9092","topic01",newSimpleStringSchema()));//TODO 4 执行
        env.execute();}}

2、打包插件

服务器上已有的jar,就不需打包,加

<scope>provided</scope>
flink-connector-mysql-cdc

flink-table-api-java-bridge

需要打包上

<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.13.6</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.13.6</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.13.6</version><scope>provided</scope></dependency><!-- FlinkCDC --><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.3.0</version></dependency><!-- JSON处理 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version><scope>provided</scope></dependency><!-- Flink_Kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.12</artifactId><version>1.13.6</version><scope>provided</scope></dependency></dependencies><!-- 打包插件 --><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>

3、打包

上传

jar-with-dependencies

的jar到服务器

4、测试

kafka-console-consumer.sh --bootstrap-server hadoop105:9092 --topic topic01
/opt/module/flink/bin/flink run-application \-t yarn-application \-nm a3 \-ys2\-yjm2048\-ytm4096\-c org.example.TestCDC \
FlinkCDC-1.0-SNAPSHOT-jar-with-dependencies.jar

测试结果JSON格式一览

数据库的操作opbeforeafterinsertcnull行数据update先d后c行数据行数据deleted行数据null

库名和表名在

source

1、对监视的数据库表执行初始快照

-- 建库DROPDATABASEIFEXISTS db1;CREATEDATABASE db1;-- 建表CREATETABLE db1.t(a INTPRIMARYKEY,b TIMESTAMPDEFAULTCURRENT_TIMESTAMP);-- 插入INSERT db1.t(a,b)VALUES(1,'2022-10-24 00:00:00');

JSON

{"before":null,"after":{"a":1,"b":"2022-10-24T00:00:00Z"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1670656489808,"snapshot":"false","db":"db1","sequence":null,"table":"t","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1670656489815,"transaction":null}

2、插入数据

INSERT db1.t(a)VALUES(2);

JSON

{"before":null,"after":{"a":2,"b":"2022-12-10T07:15:52Z"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1670656552000,"snapshot":"false","db":"db1","sequence":null,"table":"t","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":5152,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1670656552743,"transaction":null}

3、更新数据

UPDATE db1.t SET a=3WHERE a=1;SELECT*FROM db1.t;

JSON

{"before":{"a":1,"b":"2022-10-23T16:00:00Z"},"after":null,"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1670656602000,"snapshot":"false","db":"db1","sequence":null,"table":"t","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":5434,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1670656602253,"transaction":null}{"before":null,"after":{"a":3,"b":"2022-10-23T16:00:00Z"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1670656602000,"snapshot":"false","db":"db1","sequence":null,"table":"t","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":5434,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1670656602253,"transaction":null}

4、删除数据

DELETEFROM db1.t WHERE a=3;

JSON

{"before":{"a":3,"b":"2022-10-23T16:00:00Z"},"after":null,"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1670656744000,"snapshot":"false","db":"db1","sequence":null,"table":"t","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":5717,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1670656744059,"transaction":null}
标签: java 大数据 kafka

本文转载自: https://blog.csdn.net/Yellow_python/article/details/128263892
版权归原作者 小基基o_O 所有, 如有侵权,请联系我们删除。

“FlinkCDC部署”的评论:

还没有评论