文章目录
1、参考
Flink -3- 一文详解安装部署以及使用和调优(standalone 模式 | yarn 模式)
flink-cdc
2、flink 常见部署模式组合
3、Standalone 安装
3.1 单节点安装
flink 下载地址:https://flink.apache.org/downloads/
下载 flink 安装包:flink-1.18.1-bin-scala_2.12.tgz
安装在基础环境 192.168.1.51
cd /home/module
tar -xzf flink-1.18.1-bin-scala_2.12.tgz
mv flink-1.18.1 flink
3.2 问题1
The file .flink-runtime.version.properties has not been generated correctly. You MUST run ‘mvn generate-sources’ in the flink-runtime module
解决:把jdk 升级1.8.421 就可以了
3.3 修改ui 端口
conf/flink-conf.yaml
rest.port: 8086
3.4 使用ip访问
4 flink sql postgres —>mysql
4.1 配置postgres 12
vi /var/lib/postgresql/data/postgresql.conf
vi /var/lib/postgresql/data/postgresql.conf
# 更改wal日志方式为logical(方式有:minimal、replica 、logical )
wal_level = logical
# 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
max_replication_slots = 20
# 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
max_wal_senders = 20
# 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s,0表示禁用)
wal_sender_timeout = 180s
重启 postgres
4.2 新建用户并赋权
先创建数据库和表:
-- 创建数据库 test_db
CREATE DATABASE test_db;
-- 连接到新创建的数据库 test_db
\c test_db
-- 创建 t_user 表CREATE TABLE "public"."t_user" (
"id" int8 NOT NULL,
"name" varchar(255),
"age" int2,
PRIMARY KEY ("id")
);
新建用户并且给用户权限:
-- pg新建用户
CREATE USER test1 WITH PASSWORD 'test123';
-- 给用户复制流权限
ALTER ROLE test1 replication;
-- 给用户登录数据库权限
GRANT CONNECT ON DATABASE test_db to test1;
-- 把当前库public下所有表查询权限赋给用户
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO test1;
4.3. 发布表
-- 设置发布为true
update pg_publication set puballtables=true where pubname is not null;
-- 把所有表进行发布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;
-- 查询哪些表已经发布
select * from pg_publication_tables;
更改表的复制标识包含更新和删除的值:
-- 更改复制标识包含更新和删除之前值(目的是为了确保表 t_user 在实时同步过程中能够正确地捕获并同步更新和删除的数据变化。如果不执行这两条语句,那么 t_user 表的复制标识可能默认为 NOTHING,这可能导致实时同步时丢失更新和删除的数据行信息,从而影响同步的准确性)
ALTER TABLE t_user REPLICA IDENTITY FULL;
-- 查看复制标识(为f标识说明设置成功,f(表示 full),否则为 n(表示 nothing),即复制标识未设置)
select relreplident from pg_class where relname='t_user';
4.4 Flink sql
Flink sql 客户端开启: ./sql-client.sh
CREATE TABLE `table_source_pg` (
id BIGINT,
name STRING,
age INT
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '192.168.1.115',
'port' = '5432',
'username' = 'test1',
'password' = 'xxxxxx',
'database-name' = 'test_db',
'schema-name' = 'public',
'table-name' = 't_user',
'decoding.plugin.name' = 'pgoutput',
'slot.name'= 'flink'
);
CREATE TABLE `table_sink_mysql` (
id BIGINT,
name STRING,
age INT,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.1.51:3306/test',
'username' = 'root',
'password' = 'xxxxxx',
'table-name' = 't_user_copy'
);
INSERT INTO `table_sink_mysql` (`id`, `name`, `age`) (SELECT `id`, `name`, `age` FROM `table_source_pg`);
4.5 Could not find any factory for identifier ‘postgres-cdc’ that implements ‘org.apache.flink.table.factories.DynamicTableFactory’
Flink SQL> INSERT INTO
table_sink_mysql
(
id
,
name
,
age
) (SELECT
id
,
name
,
age
FROM
table_source_pg
);
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier ‘postgres-cdc’ that implements ‘org.apache.flink.table.factories.DynamicTableFactory’ in the classpath.
解决: 在flink -->lib 目录下增加如下jar
-rw-r--r--. 1 root root 23715175 10月 15 19:04 flink-sql-connector-mysql-cdc-3.0.1.jar
-rw-r--r--. 1 root root 19379756 10月 15 17:01 flink-sql-connector-postgres-cdc-3.0.1.jar
-rw-r--r--. 1 root root 385471 10月 15 19:27 flink-connector-jdbc-3.2.0.jar
-rw-r--r--. 1 root root 2480823 10月 15 19:33 mysql-connector-j-8.0.32.jar
4.6 Caused by: java.io.StreamCorruptedException: unexpected block data
解决方案:在flink的flink-conf.yaml文件中添加classloader.resolve-order: parent-first 改成parent-first,重启集群即可
4.7 FLink:Missing required options are: slot.name
4.8 ERROR: relation “pg_publication” does not exist
这个问题在postgres 12上不存在,是在9.6中存在的
4.9 Flink:job报错NoResourceAvailableException: Could not acquire the minimum required resources
1、修改如下参数:
jobmanager.memory.process.size: 2600m
taskmanager.memory.process.size: 2728m
taskmanager.memory.flink.size: 2280m
taskmanager.numberOfTaskSlots: 50
版权归原作者 山巅 所有, 如有侵权,请联系我们删除。