0


Flink 内容分享(七):Flink 读写 HBase 总结

前言

总结 Flink 读写 HBase

版本

  • Flink 1.15.4
  • HBase 2.0.2
  • Hudi 0.13.0

官方文档

https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/connectors/table/hbase/

Jar包

https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hbase-2.2/1.15.4/flink-sql-connector-hbase-2.2-1.15.4.jar

SQL

需要提前建好hbase表;如果没有对应的hbase表,flink写hbase任务会显示finished,没有异常,但是并没有自动创建对应的hbase表

hbase shell创建Hbase表

hbase shell
create 'flink_hbase_table', 'cf'

Flink 写 Hbase

CREATE TABLE flink_hbase_table(
id int,
cf ROW<name string,price double,ts bigint, dt string>,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'flink_hbase_table',
'zookeeper.quorum' = 'indata-192-168-44-128.indata.com:2181,indata-192-168-44-129.indata.com:2181,indata-192-168-44-130.indata.com:2181',
'zookeeper.znode.parent' = '/hbase-secure'
);

insert into flink_hbase_table values (1,ROW('hudi',10.1,1000,'2023-10-10'));

Flink 读 Hbase

select * from flink_hbase_table;

图片

select id,name,price,ts,dt from flink_hbase_table;

图片

hbase shell 验证数据

hbase(main):002:0> scan 'flink_hbase_table'
ROW                                                                   COLUMN+CELL
 \x00\x00\x00\x01                                                     column=cf:dt, timestamp=1697160801719, value=2023-10-10
 \x00\x00\x00\x01                                                     column=cf:name, timestamp=1697160801719, value=hudi
 \x00\x00\x00\x01                                                     column=cf:price, timestamp=1697160801719, value=@$333333
 \x00\x00\x00\x01                                                     column=cf:ts, timestamp=1697160801719, value=\x00\x00\x00\x00\x00\x00\x03\xE8
1 row(s)
Took 0.4339 seconds

参数

完整的参数可以查看官网

参数是否必选默认值数据类型描述connector必选(none)String指定使用的连接器, 支持的值如下 :hbase-1.4: 连接 HBase 1.4.x 集群hbase-2.2: 连接 HBase 2.2.x 集群(我的hbase版本为2.0.2)table-name必选(none)String连接的 HBase 表名。默认该表在 "default" 命名空间下,指定命名空间下的表需要使用 "namespace:table"。zookeeper.quorum必选(none)StringHBase Zookeeper quorum 信息。zookeeper.znode.parent可选/hbaseStringHBase 集群的 Zookeeper 根目录。properties.*可选(无)String可以设置任意 HBase 的配置项。后缀名必须匹配在 HBase 配置文档 中定义的配置键。Flink 将移除 "properties." 配置键前缀并将变换后的配置键和值传入底层的 HBase 客户端。例如您可以设置 'properties.hbase.security.authentication' = 'kerberos' 等kerberos认证参数。

Hudi包兼容性

前提:在开启了kerberos的环境上

当flink lib下面存在hudi(0.13.0版本)包时会出现flink连接不上hbase的现象,具体表现为:
1、Flink 查询 HBase 时,会抛异常:

ava.net.SocketTimeoutException: callTimeout=60000, callDuration=74175: Call to indata-192-168-44-128.indata.com/192.168.44.128:16020 failed on local exception: org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.exceptions.ConnectionClosedException: Connection closed row 'flink_hbase_table,,' on table 'hbase:meta' at region=hbase:meta,,1.1588230740, hostname=indata-192-168-44-128.indata.com,16020,1695447819772, seqNum=-1
    at org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.RpcRetryingCallerImpl.callWithRetries(RpcRetryingCallerImpl.java:159)
    at org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture.run(ResultBoundedCompletionService.java:80)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.exceptions.ConnectionClosedException: Call to indata-192-168-44-128.indata.com/192.168.44.128:16020 failed on local exception: org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.exceptions.ConnectionClosedException: Connection closed

2、Flink 写 HBase 时,不报异常,但是会卡住,卡15分钟(正常执行时间20s)左右显示任务完成,实际上没有写成功。

图片

原因

出现这种现象的原因是hudi包下存在hbase-site.xml,其中安全认证相关的配置和kerberos环境不一致。

获取Hbase配置的逻辑和优先级

为了验证上面原因的正确性,研究了一下获取Hbase配置的逻辑和优先级

优先级:

  • 1、用户自定义参数 优先级最高 (SQL中配置的)
  • 2、环境变量 优先级第二 环境变量一共有两个:HBASE_CONF_DIRHBASE_HOME,其中 HBASE_CONF_DIR 的优先级要高于HBASE_HOME, 这两个环境变量下有两个配置文件 hbase-site.xml 和 hbase-default.xml 其中hbase-site.xml 优先级要高于 hbase-default.xml,也就是一共有四个优先级: 2.1 HBASE_CONF_DIR/conf/hbase-site.xml 2.2 HBASE_CONF_DIR/conf/hbase-default.xml 2.3 HBASE_HOME/conf/hbase-site.xml 2.4 HBASE_HOME/conf/hbase-default.xml
  • 3、classpath 优先级最低,其中也有两个配置文件 hbase-site.xml 和 hbase-default.xml,hbase-site.xml 优先级要高于 hbase-default.xml 如hudi包中就存在 hbase-site.xml 和 hbase-default.xml 另外classpath可能有多个目录,多个目录之间也有优先级,对于我们环境有两个classpath存在hbase-site.xml,一个flink lib路径下面的hudi包,一个是/etc/hbase/conf。 为啥是/etc/hbase/conf,具体逻辑在flink bin/config.sh中:
   # try and set HBASE_CONF_DIR to some common default if it's not set
    if [ -z "$HBASE_CONF_DIR" ]; then
        if [ -d "/etc/hbase/conf" ]; then
            echo "Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set."
            HBASE_CONF_DIR="/etc/hbase/conf"
        fi
    fi

因为我们环境默认没有配置HBASE_CONF_DIR,并且存在/etc/hbase/conf,所以就会走到这个逻辑,我们在启动sql-client时也会看到:

bash Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set

图片

那又是在哪里将这里的HBASE_CONF_DIR加到classpath中的呢?它是在sql-client.sh中通过java -classpath 参数添加的。

图片

CC_CLASSPATH优先级高于INTERNAL_HADOOP_CLASSPATHS ,flink lib属于CC_CLASSPATH,/etc/hbase/conf 属于INTERNAL_HADOOP_CLASSPATHS

/etc/hbase/conf 属于INTERNAL_HADOOP_CLASSPATHS:

图片

flink lib属于CC_CLASSPATH:

图片

图片

因为上面这个函数(

constructFlinkClassPath

)的逻辑我不太确定,我们可以在sql-clent.sh中打印: echo $CC_CLASSPATH

图片

读取Hbase配置的逻辑可以查看源码,本文先不分析这块源码,可能会在下一篇文章补充分析部分源码,先只截个图:

HBase2DynamicTableFactory

图片

prepareRuntimeConfiguration

(

HBaseSinkFunction

HBaseRowDataLookupFunction

中都有这个方法,分别是写和读)

图片

解决方法

知道了Hbase配置的优先级,弄明白了hudi包中的hbase-site.xml为啥会影响flink读写hbase,也就知道如何解决这个问题,我们只需要根据优先级设置正确的hbase配置参数就好了。比如只有classpath中存在hbase配置那么我们就需要修改classpath中的hbase配置为正确配置。如果不想修改classpath中的配置文件或者觉得这样做不合适,我们可以设置更好优先级的配置,比如设置环境变量

HBASE_CONF_DIR

HBASE_HOME

指向正确的hbase配置,另外我们也可以通过在sql参数中配置正确的参数,因为用户参数级别最高,这样配置优点是比较灵活,缺点是需要用户每次都多写一下额外的配置。

解决方法1

删除hudi包里面的hbase-site.xml(hudi-flink1.15-bundle-0.13.0.jar),这样就会去加载我们服务器环境上的正确的hbase-site.xml (/etc/hbase/conf)

删除hbase-site.xml不清楚对hudi有什么影响,目前没有发现~;另外理论上也应该删除hbase-default.xml,因为我们环境上有/etc/hbase/conf这个路径,所以会将/etc/hbase/conf加到classpath中,/etc/hbase/conf/hbase-site.xml的优先级高于同样在classpath中的hbase-default.xml,所以不删除hudi包中的hbase-default.xml也可以正常使用

解决方法2

修改hudi包里面的hbase-site.xml中的kerberos配置:

# 经测试我们环境只需要这两个配置,可以根据自己的环境调整
hbase.security.authentication true
hbase.regionserver.kerberos.principal hbase/[email protected]

这样保留了hbase-site.xml,避免删除hbase-site.xml造成的潜在影响

解决方法3

通过在建表语句中添加配置

CREATE TABLE flink_hbase_table(
id int,
cf ROW<name string,price double,ts bigint, dt string>,
PRIMARY KEY (id) NOT ENFORCED
) with (
  'connector' = 'hbase-2.2',
  'table-name' = 'flink_hbase_table',
  'zookeeper.quorum' = 'indata-192-168-44-128.indata.com:2181,indata-192-168-44-129.indata.com:2181,indata-192-168-44-130.indata.com:2181',
  'zookeeper.znode.parent' = '/hbase-secure',
  'properties.hbase.security.authentication' = 'kerberos',
  --'properties.hbase.master.kerberos.principal' = 'hbase/[email protected]',
  'properties.hbase.regionserver.kerberos.principal' = 'hbase/[email protected]'
);

我们在sql中也是添加了上面的两个配置,sql中的配置优先级最高,所以可以有效解决该问题。
另外我不清楚hbase.master.kerberos.principal在什么时候会用到,所以先备注在这里
还有一个问题就是我的方法2和方法3中的配置中都没有配置keytab: hbase.regionserver.keytab.file = /etc/security/keytabs/hbase.service.keytab,但是它可以正常读写,所以我目前还不清楚它是怎么知道读取哪个keytab文件的还是说连接hbase regionserver 时不需要这个配置项。

不启用kerberos的配置:

CREATE TABLE dkl(
id int,
cf ROW<name string,price double,ts bigint, dt string>,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'flink_hbase_table',
'zookeeper.quorum' = 'indata-192-168-44-128.indata.com:2181,indata-10-110-105-163.indata.com:2181,indata-10-110-105-164.indata.com:2181',
'zookeeper.znode.parent' = '/hbase-unsecure',
'properties.hbase.security.authentication' = 'simple'
);

解决方法4

配置环境变量

HBASE_CONF_DIR

HBASE_HOME

中的其中一个即可:

# 配置环境变量指向正确的hbase配置路径
export HBASE_CONF_DIR = /etc/hbase/conf
export HBASE_HOME = /etc/hbase/conf

因为环境变量优先级要比classpath高,所以就不会受hudi包中的hbase-site.xml影响了。(hudi包属于classpath)

解决方法5

通过修改源码,添加参数支持通过参数配置:

'hbase.conf.dir'='/opt/dkl/hbase/conf'

有个疑问:就是既然有了方法3和方法4为啥还要改源码呢。有两个原因一个是因为有的flink客户端环境通过设置环境变量的形式不好实现,比如在pod里(对于做平台来说)。第二个原因是我开始并不清楚是哪几个参数影响的,方法3中通过配置用户自定义参数解决了这个问题,这两个参数是后来才知道的。而且对于不同的环境可能影响的参数也不一样,所以不如直接配置整个文件夹的形式简单有效。而且对于修改的这部分源码比较简单打包也很快,所以一开始尝试的这个方法,并且成功了。不过代码不是很完善(因为没有正式使用),这里先记录一下,后面再进行完善。

打包:

## 修改代码,需要先检查代码格式
mvn spotless:apply  -pl flink-connectors/flink-connector-hbase-2.2
## 先编译 hbase-base ,因为 hbase-2.2 依赖 hbase-base中的代码
mvn clean install -DskipTests -pl flink-connectors/flink-connector-hbase-base
mvn clean install -DskipTests -pl flink-connectors/flink-connector-hbase-2.2
## 最后将 flink-sql-connector-hbase-2.2 打包,最后打出来的包名为 flink-sql-connector-hbase-2.2-1.15.4.jar
mvn clean package -DskipTests -pl flink-connectors/flink-sql-connector-hbase-2.2

总结

本文总结了Flink SQL 读写 HBase 的参数配置,解决了在kerberos环境下因 hudi 包 hbase-site.xml 配置冲突引起的异常,学习总结了 Flink SQL 读写 HBase 时加载 HBase 配置的优先级,但是没有详细的分析源码中的逻辑,可能会在后面的文章中补充相关的源码分析~

标签: flink hbase 大数据

本文转载自: https://blog.csdn.net/qq_45038038/article/details/135253457
版权归原作者 之乎者也· 所有, 如有侵权,请联系我们删除。

“Flink 内容分享(七):Flink 读写 HBase 总结”的评论:

还没有评论