0


Flink 1.18安装 及配置 postgres12 同步到mysql5.7(Flink sql 方式)

文章目录

1、参考

Flink -3- 一文详解安装部署以及使用和调优(standalone 模式 | yarn 模式)

flink-cdc

2、flink 常见部署模式组合

在这里插入图片描述

3、Standalone 安装

3.1 单节点安装

flink 下载地址:https://flink.apache.org/downloads/

下载 flink 安装包:flink-1.18.1-bin-scala_2.12.tgz

安装在基础环境 192.168.1.51


cd /home/module

tar -xzf flink-1.18.1-bin-scala_2.12.tgz

mv flink-1.18.1 flink

3.2 问题1

The file .flink-runtime.version.properties has not been generated correctly. You MUST run ‘mvn generate-sources’ in the flink-runtime module

解决:把jdk 升级1.8.421 就可以了

3.3 修改ui 端口

conf/flink-conf.yaml
rest.port: 8086

3.4 使用ip访问

在这里插入图片描述

4 flink sql postgres —>mysql

4.1 配置postgres 12

vi /var/lib/postgresql/data/postgresql.conf

vi /var/lib/postgresql/data/postgresql.conf

# 更改wal日志方式为logical(方式有:minimal、replica 、logical  )
wal_level = logical  

# 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
max_replication_slots = 20

# 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
max_wal_senders = 20     

# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s,0表示禁用)
wal_sender_timeout = 180s

重启 postgres

4.2 新建用户并赋权

先创建数据库和表:

-- 创建数据库 test_db
CREATE DATABASE test_db;

-- 连接到新创建的数据库 test_db
\c test_db

-- 创建 t_user 表CREATE TABLE "public"."t_user" (
    "id" int8 NOT NULL,
    "name" varchar(255),
    "age" int2,
    PRIMARY KEY ("id")
);

新建用户并且给用户权限:


-- pg新建用户
CREATE USER test1 WITH PASSWORD 'test123';

-- 给用户复制流权限
ALTER ROLE test1 replication;

-- 给用户登录数据库权限
GRANT CONNECT ON DATABASE test_db to test1;

-- 把当前库public下所有表查询权限赋给用户
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO test1;

4.3. 发布表


-- 设置发布为true
update pg_publication set puballtables=true where pubname is not null;

-- 把所有表进行发布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;

-- 查询哪些表已经发布
select * from pg_publication_tables;

更改表的复制标识包含更新和删除的值:


-- 更改复制标识包含更新和删除之前值(目的是为了确保表 t_user 在实时同步过程中能够正确地捕获并同步更新和删除的数据变化。如果不执行这两条语句,那么 t_user 表的复制标识可能默认为 NOTHING,这可能导致实时同步时丢失更新和删除的数据行信息,从而影响同步的准确性)
ALTER TABLE t_user REPLICA IDENTITY FULL;

-- 查看复制标识(为f标识说明设置成功,f(表示 full),否则为 n(表示 nothing),即复制标识未设置)
select relreplident from pg_class where relname='t_user';

4.4 Flink sql

Flink sql 客户端开启: ./sql-client.sh

CREATE TABLE `table_source_pg` (
      id BIGINT,
      name STRING,
      age INT
      ) WITH (
      'connector' = 'postgres-cdc',
      'hostname' = '192.168.1.115',
      'port' = '5432',
      'username' = 'test1',
      'password' = 'xxxxxx',
      'database-name' = 'test_db',
      'schema-name' = 'public',
      'table-name' = 't_user',
      'decoding.plugin.name' = 'pgoutput',
            'slot.name'= 'flink'
);

CREATE TABLE `table_sink_mysql` (
      id BIGINT,
      name STRING,
      age INT,
      PRIMARY KEY (`id`) NOT ENFORCED
      ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:mysql://192.168.1.51:3306/test',
      'username' = 'root',
      'password' = 'xxxxxx',
      'table-name' = 't_user_copy'
);

INSERT INTO `table_sink_mysql` (`id`, `name`, `age`) (SELECT `id`, `name`, `age` FROM `table_source_pg`);

4.5 Could not find any factory for identifier ‘postgres-cdc’ that implements ‘org.apache.flink.table.factories.DynamicTableFactory’

Flink SQL> INSERT INTO

table_sink_mysql

(

id

,

name

,

age

) (SELECT

id

,

name

,

age

FROM

table_source_pg

);
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier ‘postgres-cdc’ that implements ‘org.apache.flink.table.factories.DynamicTableFactory’ in the classpath.

解决: 在flink -->lib 目录下增加如下jar

-rw-r--r--. 1 root root 23715175 10月 15 19:04 flink-sql-connector-mysql-cdc-3.0.1.jar
-rw-r--r--. 1 root root 19379756 10月 15 17:01 flink-sql-connector-postgres-cdc-3.0.1.jar
-rw-r--r--. 1 root root 385471 10月 15 19:27 flink-connector-jdbc-3.2.0.jar
-rw-r--r--. 1 root root 2480823 10月 15 19:33 mysql-connector-j-8.0.32.jar

4.6 Caused by: java.io.StreamCorruptedException: unexpected block data

解决方案:在flink的flink-conf.yaml文件中添加classloader.resolve-order: parent-first 改成parent-first,重启集群即可

4.7 FLink:Missing required options are: slot.name

在这里插入图片描述

4.8 ERROR: relation “pg_publication” does not exist

这个问题在postgres 12上不存在,是在9.6中存在的

4.9 Flink:job报错NoResourceAvailableException: Could not acquire the minimum required resources

1、修改如下参数:

jobmanager.memory.process.size: 2600m

taskmanager.memory.process.size: 2728m

taskmanager.memory.flink.size: 2280m

taskmanager.numberOfTaskSlots: 50
标签: flink sql 大数据

本文转载自: https://blog.csdn.net/shandian534/article/details/143021999
版权归原作者 山巅 所有, 如有侵权,请联系我们删除。

“Flink 1.18安装 及配置 postgres12 同步到mysql5.7(Flink sql 方式)”的评论:

还没有评论