0


Flink查询关联Hbase输出

1、前言

大家在开发Flink的时候,很多时候会把某些固定的维度信息存储在Hbase或者Redis等这些第三方库里,已方便来跟流发生关联查询输出。本文将从如何本地搭建一个Hbase环境开始讲起,到如何构建一个Hbase公共调用类,到如何构建一个异步调用Hbase的公共代码,再到最后实际调用代码后的输出。

2、本地利用Docker搭建HBase环境

本地如何搭建Docker环境,之前一篇博客中已经详细描述过,大家如果想学习如何搭建,可以去看下,地址如下:Docker入门-Windows 10&Mac系统安装_一个数据小开发的博客-CSDN博客一、何为DockerDocker 是一个开源的应用容器引擎,基于Go语言并遵从 Apache2.0 协议开源。Docker 可以让开发者打包他们的应用以及依赖包到一个轻量级、可移植的容器中,然后发布到任何流行的 Linux 机器上,也可以实现虚拟化。容器是完全使用沙箱机制,相互之间不会有任何接口(类似 iPhone 的 app),更重要的是容器性能开销极低。runc 是一个 Linux 命令行工具,用于根据 OCI容器运行时规范 创建和运行容器。containerd 是一个守护程序https://blog.csdn.net/Aaron_ch/article/details/115559512?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522164568960216780271525279%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fblog.%2522%257D&request_id=164568960216780271525279&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~blog~first_rank_ecpm_v1~rank_v31_ecpm-1-115559512.nonecase&utm_term=docker&spm=1018.2226.3001.4450

2.1、下载Hbase镜像

  1. docker pull harisekhon/hbase

2.2、启动Hbase镜像:

  1. docker run -d -h myhbase -p 2181:2181 -p 8080:8080 -p 8085:8085 -p 9090:9090 -p 9095:9095 -p 16000:16000 -p 16010:16010 -p 16020:16020 -p 16201:16201 -p 16301:16301 --name hbase harisekhon/hbase

Hbase 端口映射图:

2.3、本地添加host

打开/etc/hosts ,如下图所示,添加一行

127.0.0.1 myhbase

此时在浏览器中输入hbase访问的地址,可以看到hbase的页面

http://localhost:16010/master-status

3、访问Hbase

3.1、图形化工具访问Hbase

如下的图形化工具,Mac和Windows都可以访问。

如果需要图形化工具的,评论区留下邮箱,本人看到了会第一时间发送

3.2、Java工具类访问

直接上代码,可以直接使用

  1. package com.horse.utils.hbase;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.google.common.base.CaseFormat;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.apache.commons.beanutils.BeanUtils;
  6. import org.apache.flink.util.StringUtils;
  7. import org.apache.hadoop.hbase.*;
  8. import org.apache.hadoop.hbase.client.*;
  9. import org.apache.hadoop.conf.Configuration;
  10. import org.apache.hadoop.hbase.util.Bytes;
  11. import java.io.IOException;
  12. import java.util.List;
  13. /**
  14. * @author :aaronChen
  15. * @date :Created in 2022-02-22 16:51
  16. * @description :
  17. * @modifiedBy :
  18. */
  19. @Slf4j
  20. public class HBaseUtils {
  21. public static Connection getHBaseConnection() {
  22. try {
  23. // 1.获取配置文件信息
  24. Configuration configuration = HBaseConfiguration.create();
  25. configuration.set("hbase.zookeeper.quorum", "localhost");
  26. // configuration.set("hbase.zookeeper.property.clientPort", "2181");//可写可不写,默认为2181
  27. // 2.创建连接对象
  28. return ConnectionFactory.createConnection(configuration);
  29. } catch (IOException e) {
  30. log.error("获取Hbase相关基础信息失");
  31. e.printStackTrace();
  32. }
  33. return null;
  34. }
  35. // static {
  36. // try {
  37. // // 1.获取配置文件信息
  38. // Configuration configuration = HBaseConfiguration.create();
  39. // configuration.set("hbase.zookeeper.quorum", "localhost");
  40. // configuration.set("hbase.zookeeper.property.clientPort", "2181");//可写可不写,默认为2181
  41. //
  42. // // 2.创建连接对象
  43. // connection = ConnectionFactory.createConnection(configuration);
  44. //
  45. // // 3.创建Admin对象
  46. // admin = connection.getAdmin();
  47. //
  48. // } catch (IOException e) {
  49. // log.error("获取Hbase相关基础信息失");
  50. // e.printStackTrace();
  51. // }
  52. // }
  53. /**
  54. * @param tableName
  55. * @return true or false
  56. * @throws IOException
  57. * @author aaronChen
  58. * @description 判断表是否存在
  59. */
  60. public static boolean isTableExist(String tableName, Admin admin) throws IOException {
  61. // 1.判断表是否存在
  62. boolean exists = admin.tableExists(TableName.valueOf(tableName));
  63. // 2.返回结果
  64. return exists;
  65. }
  66. /**
  67. * @param tableName 需要创建的hbase表名,格式必须要是 命名空间:tablename
  68. * @param cfs 可以同时传入多个列簇
  69. * @param createNameSpaceIfExists 如果命名空间不存在,是否创建
  70. * @throws IOException
  71. * @author aaronChen
  72. * @description 创建表
  73. */
  74. public static boolean createTable(Admin admin, String tableName, boolean createNameSpaceIfExists, String... cfs) throws IOException {
  75. //1、判断传入的表是否正确
  76. if (tableName.split(":").length != 2) {
  77. log.error("{},传入的表名有问题,需要传入 nameSpace:tableName 这种格式", tableName);
  78. return false;
  79. }
  80. String nameSpace = tableName.split(":")[0];
  81. //2、判断命名空间相关信息
  82. if (!isNameSpaceExist(admin, nameSpace)) {
  83. if (createNameSpaceIfExists) {
  84. log.info("{},该命名空间不存在,开始创建该命名空间", nameSpace);
  85. createNameSpace(admin, nameSpace);
  86. } else {
  87. log.error("{},该命名空间不存在", nameSpace);
  88. return false;
  89. }
  90. }
  91. // 3、判断表是否存在
  92. if (isTableExist(tableName, admin)) {
  93. log.error("{},表已存在", tableName);
  94. return false;
  95. }
  96. // 4、判断是否存在列族信息
  97. if (cfs.length <= 0) {
  98. log.info("请设置列族信息!");
  99. return false;
  100. }
  101. // 5、创建表描述器
  102. try {
  103. if (admin.tableExists(TableName.valueOf(tableName))) {
  104. return false;
  105. }
  106. //定义表描述对象
  107. TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
  108. //遍历列族数组
  109. for (String cf : cfs) {
  110. //定义列族描述对象
  111. ColumnFamilyDescriptor columnFamily = ColumnFamilyDescriptorBuilder.of(cf);
  112. //给表添加列族信息
  113. tableDescriptorBuilder.setColumnFamily(columnFamily);
  114. }
  115. //创建表
  116. admin.createTable(tableDescriptorBuilder.build());
  117. } catch (Exception e) {
  118. e.printStackTrace();
  119. return false;
  120. }
  121. log.info("{},表新建成功", tableName);
  122. return true;
  123. }
  124. /**
  125. * @param tableName
  126. * @throws IOException
  127. * @author aaronChen
  128. * @description 删除表
  129. */
  130. public static void dropTable(Admin admin, String tableName) throws IOException {
  131. // 1.判断表是否存在
  132. if (!isTableExist(tableName, admin)) {
  133. log.error("{},表不存在", tableName);
  134. return;
  135. }
  136. // 2.使表下线
  137. admin.disableTable(TableName.valueOf(tableName));
  138. // 3.删除表
  139. admin.deleteTable(TableName.valueOf(tableName));
  140. }
  141. /**
  142. * @param nameSpace
  143. * @return true or false
  144. * @throws IOException
  145. * @author aaronChen
  146. * @description 判断一个命名空间是否存在
  147. */
  148. public static boolean isNameSpaceExist(Admin admin, String nameSpace) throws IOException {
  149. NamespaceDescriptor[] namespaceDescriptors = admin.listNamespaceDescriptors();
  150. for (NamespaceDescriptor namespaceDescriptor : namespaceDescriptors) {
  151. if (namespaceDescriptor.getName().equals(nameSpace)) {
  152. return true;
  153. }
  154. }
  155. return false;
  156. }
  157. /**
  158. * @param namespace
  159. * @author aaronChen
  160. * @description 创建命名空间
  161. */
  162. public static void createNameSpace(Admin admin, String namespace) {
  163. // 1.创建命名空间描述器
  164. NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(namespace).build();
  165. // 2.创建命名空间
  166. try {
  167. admin.createNamespace(namespaceDescriptor);
  168. } catch (NamespaceExistException e) {
  169. log.error("{},命名空间已存在", namespace);
  170. } catch (IOException e) {
  171. e.printStackTrace();
  172. }
  173. }
  174. /**
  175. * @param tableName
  176. * @param rowKey
  177. * @param columnFamily
  178. * @param columnName
  179. * @param value
  180. * @throws IOException
  181. * @author aaronChen
  182. * @description 向表中插入数据
  183. */
  184. public static void putData(Connection connection, String tableName, String rowKey, String columnFamily, String columnName, String value) throws IOException {
  185. // 1.获取表对象
  186. Table table = connection.getTable(TableName.valueOf(tableName));
  187. // 2.创建put对象
  188. Put put = new Put(Bytes.toBytes(rowKey));
  189. // 3.给Put对象赋值
  190. put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName), Bytes.toBytes(value));
  191. // 4.插入数据
  192. table.put(put);
  193. // 5.关闭表连接
  194. table.close();
  195. }
  196. /**
  197. * @param tableName
  198. * @param clz
  199. * @param <T>
  200. * @return
  201. * @throws Exception
  202. */
  203. public static <T> T getData(Connection connection, String tableName, String rowKey, String columnFamily, String columnName, boolean underScoreToCamel, Class<T> clz) throws Exception {
  204. //1、判断传入的参数是否为空或空字符串
  205. if (StringUtils.isNullOrWhitespaceOnly(tableName)) {
  206. log.error("传入的tableName为空:{}", tableName);
  207. return null;
  208. }
  209. if (StringUtils.isNullOrWhitespaceOnly(rowKey)) {
  210. log.error("传入的rowKey为空:{}", rowKey);
  211. return null;
  212. }
  213. if (StringUtils.isNullOrWhitespaceOnly(columnFamily)) {
  214. log.error("传入的columnFamily为空:{}", columnFamily);
  215. return null;
  216. }
  217. Get get = new Get(Bytes.toBytes(rowKey));
  218. if (StringUtils.isNullOrWhitespaceOnly(columnName)) {
  219. get.addFamily(Bytes.toBytes(columnFamily));
  220. // log.info("传入的columnName为空:{},将根据传入的其他信息查询全量的列信息", columnName);
  221. } else {
  222. //传入的不为空的话,就查询传入的值
  223. get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName));
  224. // log.info("传入的columnName为:{},将根据传入的值查询信息", columnName);
  225. }
  226. //4、获取表对象
  227. Table table = connection.getTable(TableName.valueOf(tableName));
  228. //5、设置获取数据的版本数
  229. //get.setMaxVersions(5);
  230. //5、获取数据
  231. Result resultData = table.get(get);
  232. //6、关闭表连接
  233. table.close();
  234. //创建 泛型对象
  235. T t = clz.newInstance();
  236. //给泛型对象赋值
  237. List<Cell> cells = resultData.listCells();
  238. for (Cell cell : cells) {
  239. String resultColumnName = Bytes.toString(CellUtil.cloneQualifier(cell));
  240. String resultValue = Bytes.toString(CellUtil.cloneValue(cell));
  241. //判断是否需要转换为驼峰命名
  242. if (underScoreToCamel) {
  243. if (resultColumnName.contains("_")) {
  244. resultColumnName = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, resultColumnName.toLowerCase());
  245. } else if (resultColumnName.contains("-")) {
  246. resultColumnName = CaseFormat.LOWER_HYPHEN.to(CaseFormat.LOWER_CAMEL, resultColumnName.toLowerCase());
  247. } else {
  248. resultColumnName = CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_CAMEL, resultColumnName);
  249. }
  250. }
  251. BeanUtils.setProperty(t, resultColumnName, resultValue);
  252. }
  253. return t;
  254. }
  255. /**
  256. * @param tableName
  257. * @return 返回一个 JSONArray
  258. * <p>
  259. * [
  260. * {
  261. * "rowKey":"11111",
  262. * "value":[
  263. * {
  264. * "columnFamily":"first",
  265. * "columnValue":{
  266. * "name":"aaron",
  267. * "age":18
  268. * }
  269. * },
  270. * {
  271. * "columnFamily":"second",
  272. * "columnValue":{
  273. * "name":"aaron",
  274. * "age":18
  275. * }
  276. * }
  277. * ]
  278. * },
  279. * {
  280. * "rowKey":"22222",
  281. * "value":[
  282. * {
  283. * "columnFamily":"first",
  284. * "columnValue":{
  285. * "name":"wwww",
  286. * "age":19
  287. * }
  288. * },
  289. * {
  290. * "columnFamily":"second",
  291. * "columnValue":{
  292. * "name":"cccc",
  293. * "age":18,
  294. * "address","NanJ"
  295. * }
  296. * }
  297. * ]
  298. * }
  299. * ]
  300. * @throws IOException
  301. * @author aaronChen
  302. * @description 扫描全表数据
  303. * @deprecated
  304. */
  305. private static void scanTable(Connection connection, String tableName, String leftScanRowKey, String rightScanRowKey) throws IOException {
  306. // 1.获取表对象
  307. Table table = connection.getTable(TableName.valueOf(tableName));
  308. // 2.构建Scan对象 // 左闭右开
  309. Scan scan = new Scan(Bytes.toBytes(leftScanRowKey), Bytes.toBytes(rightScanRowKey));
  310. // 3.扫描表
  311. ResultScanner resultScanner = table.getScanner(scan);
  312. // 4.解析resultScanner
  313. for (Result result : resultScanner) {
  314. // 5.解析result并打印
  315. for (Cell cell : result.rawCells()) {
  316. // 6.打印数据
  317. System.out.println("RowKey:" + Bytes.toString(CellUtil.cloneRow(cell)) +
  318. ",CF:" + Bytes.toString(CellUtil.cloneFamily(cell)) +
  319. ",CN:" + Bytes.toString(CellUtil.cloneQualifier(cell)) +
  320. ",Value:" + Bytes.toString(CellUtil.cloneValue(cell)));
  321. }
  322. }
  323. // 7.关闭表连接
  324. table.close();
  325. }
  326. /**
  327. * @param tableName 要删除的表名
  328. * @param rowKey 要删除的rowKey
  329. * @param cf 要删除的列簇
  330. * @param cn 要删除的列名
  331. * @throws IOException
  332. * @description Delete标记: 删除特定列列指定的版本
  333. * DeleteFamily标记: 删除特定列族所有列
  334. * DeleteColumn标记: 删除特定列的所有版本
  335. * 指定rowkey: 使用DeleteFamily标记
  336. * ---->不加时间戳表示删除[指定rowkey]的所有数据,加时间戳表示删除[指定rowkey]中[时间戳版本小于或等于指定时间戳]的所有数据
  337. * 指定rowkey+columnFamily: 使用DeleteFamily标记
  338. * ---->不加时间戳表示删除[指定列族]的所有数据,加了时间戳就表示删除[指定列族]下[时间戳版本小于或等于指定时间戳]的所有数据
  339. * 指定rowkey+columnFamily+column(addColumns): 使用DeleteColumn标记
  340. * ---->不加时间戳表示删除[指定列]所有版本的数据,加时间戳表示删除[指定列]中[时间戳版本小于或等于指定时间戳]的所有数据。
  341. * 指定rowkey+columnFamily+column(addColumn): 使用Delete标记 (只删除单个版本数据,生产环境尽量别用)
  342. * ---->不加时间戳表示删除[指定列]中[最新版本]的数据,加时间戳表示删除[指定列]中[指定时间戳版本]的数据。
  343. * ---->不推荐的原因是:操作不同(如flush前后操作产生的结果会不一样)结果可能不同
  344. * 如:在flush前如果有多个版本的数据,此时进行addColumn(不加时间戳)操作,会将最新版本的数据删除,然后老版本的数据会出现
  345. * 在flush后进行addColumn(不加时间戳)操作,会将最新版本的数据删除,而此时flush已将老版本的数据进行了删除,所有此时老版本的数据就不会出现了
  346. * <p>
  347. * 删除列最好使用addColumns
  348. * addColumns:不加时间戳表示删除指定列所有版本的数据(推荐)
  349. * addColumns:加时间戳表示删除时间戳小于或等于指定时间戳的指定列的所有版本。
  350. * addColumn:不加时间戳表示删除最新版本的数据,操作不同(如flush前后操作产生的结果会不一样)结果可能不同
  351. * addColumn:加时间戳表示删除指定时间戳的指定列版本的数据。
  352. */
  353. public static void deleteData(Connection connection, String tableName, String rowKey, String cf, String cn) throws IOException {
  354. // 1.获取表对象
  355. Table table = connection.getTable(TableName.valueOf(tableName));
  356. // 2.创建删除对象
  357. Delete delete = new Delete(Bytes.toBytes(rowKey));
  358. //delete.addColumns(Bytes.toBytes(cf), Bytes.toBytes(cn));
  359. //delete.addColumns(Bytes.toBytes(cf), Bytes.toBytes(cn),1574158036021L);
  360. //delete.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn));
  361. //delete.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cn),1574158036021L);
  362. // 2.2 删除指定的列族
  363. // addFamily:删除指定列族的所有列的所有版本数据。
  364. delete.addFamily(Bytes.toBytes(cf));
  365. // 3.执行删除操作
  366. table.delete(delete);
  367. // 4.关闭连接
  368. table.close();
  369. }
  370. /**
  371. * @author aaronChen
  372. * @description 关闭资源
  373. */
  374. public static void close(Admin admin, Connection connection) {
  375. if (admin != null) {
  376. try {
  377. admin.close();
  378. } catch (IOException e) {
  379. e.printStackTrace();
  380. }
  381. }
  382. if (connection != null) {
  383. try {
  384. connection.close();
  385. } catch (IOException e) {
  386. e.printStackTrace();
  387. }
  388. }
  389. }
  390. public static JSONObject getJSONObject(String rowKey, List<Cell> cells) {
  391. /**
  392. * @return JSONObject {
  393. * "rowKey":"11111",
  394. * "value":{
  395. * "name":"aaron",
  396. * "age":18
  397. * }
  398. * }
  399. */
  400. JSONObject jsonObject = new JSONObject();
  401. JSONObject valueJsonObject = new JSONObject();
  402. for (Cell cell : cells) {
  403. valueJsonObject.put(Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell)));
  404. }
  405. jsonObject.put("rowKey", rowKey);
  406. jsonObject.put("value", valueJsonObject);
  407. return jsonObject;
  408. }
  409. }

比较核心的一个几个方法做出详细说明下:

3.2.1、新建表函数

  1. public static boolean createTable(Admin admin, String tableName, boolean createNameSpaceIfExists, String... cfs)

会根据传入的相关信息进行表的创建,例如:

表名是"aaron1:test1",当"aaron1"命名空间不存在的时候,会根据createNameSpaceIfExists参数来选择是否在不存在的情况下创建,如果需要创建,就会先创建该命名空间后,再去新建表。

  1. public static void main(String[] args) throws Exception{
  2. createTable(getHBaseConnection().getAdmin(),"aaron1:test1",false,"name","age","id");
  3. }

当传入的createNameSpaceIfExists是true的时候

  1. public static void main(String[] args) throws Exception{
  2. createTable(getHBaseConnection().getAdmin(),"aaron1:test1",true,"name","age","id");
  3. }

在web页面可以查看到已经创建成功

3.2.2、插入数据

根据传入的参数来插入具体的数据。

  1. public static void putData(Connection connection, String tableName, String rowKey, String columnFamily, String columnName, String value)
  1. public static void main(String[] args) throws Exception{
  2. putData(getHBaseConnection(),"aaron1:test1","aaaa","name","schoolName","Nanj");
  3. }

通过图形化工具可以看到数据已经插入成功

3.2.3、查询数据

本函数可以通过特定的rowKey查询出结果,并把结果转成JavaBean输出

  1. public static <T> T getData(Connection connection, String tableName, String rowKey, String columnFamily, String columnName, boolean underScoreToCamel, Class<T> clz)
  1. public static void main(String[] args) throws Exception {
  2. PassengerInfor passengerInfor = getData(getHBaseConnection(), "aaron:test1", "11111", "first", "", true, PassengerInfor.class);
  3. log.info("查询出来的结果为:{},", passengerInfor);
  4. }

4、Flink异步IO访问Hbase

因为异步IO都是通过函数继承RichAsyncFunction这个抽象类,所以为了更大化的使用,先编写一个工具类。

4.1、异步IO访问Hbase工具类

  1. /**
  2. * @param <IN> 流输入需要查询的内容
  3. * @param <HBase> 从HBase查询返回的结果集
  4. * @param <OUT> 最终输入流查询完HBase后返回的结果集
  5. * @author :aaronChen
  6. * @date :Created in 2022-02-22 16:48
  7. * @description: 该类是用来根据传入的一个rowKey来查询HBase某个表里的数据公共类
  8. */

源码如下,其中使用的时候,使用到了如上的一些Hbase工具类:

  1. package com.horse.utils.hbase;
  2. import com.horse.utils.ThreadPoolUtil;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.apache.flink.configuration.Configuration;
  5. import org.apache.flink.streaming.api.functions.async.ResultFuture;
  6. import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
  7. import org.apache.hadoop.hbase.TableName;
  8. import org.apache.hadoop.hbase.client.*;
  9. import java.io.IOException;
  10. import java.util.Collections;
  11. import java.util.concurrent.ThreadPoolExecutor;
  12. /**
  13. * @param <IN> 流输入需要查询的内容
  14. * @param <HBase> 从HBase查询返回的结果集
  15. * @param <OUT> 最终输入流查询完HBase后返回的结果集
  16. * @author :aaronChen
  17. * @date :Created in 2022-02-22 16:48
  18. * @description: 该类是用来根据传入的一个rowKey来查询HBase某个表里的数据公共类
  19. */
  20. @Slf4j
  21. public abstract class HBaseAsyncFunctionUtil<IN, HBase, OUT> extends RichAsyncFunction<IN, OUT> {
  22. private Connection connection;
  23. private ThreadPoolExecutor threadPoolExecutor;
  24. private Admin hBaseAdmin;
  25. private HBase queryHBaseBean;
  26. private Class<HBase> clz;
  27. /**
  28. * hBase表名
  29. */
  30. private Table hBaseTable;
  31. /**
  32. * 列簇名
  33. */
  34. private String columnFamily;
  35. /**
  36. * 列名
  37. */
  38. private String columnName;
  39. /**
  40. * 命名空间:tableName
  41. */
  42. private String tableName;
  43. public HBaseAsyncFunctionUtil(String tableName, String columnFamily, String columnName, Class<HBase> clz) {
  44. this(tableName, columnFamily, clz);
  45. this.columnName = columnName;
  46. }
  47. public HBaseAsyncFunctionUtil(String tableName, String columnFamily, Class<HBase> clz) {
  48. this.clz = clz;
  49. this.tableName = tableName;
  50. this.columnFamily = columnFamily;
  51. }
  52. public abstract String setRowKey(IN input);
  53. public abstract OUT getList(String rowKey, IN input, HBase hBaseResult);
  54. private void setQueryHBaseBean() throws Exception {
  55. queryHBaseBean = clz.newInstance();
  56. }
  57. @Override
  58. public void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception {
  59. threadPoolExecutor.submit(new Runnable() {
  60. @Override
  61. public void run() {
  62. resultFuture.complete(Collections.singleton(getResult(input)));
  63. }
  64. });
  65. }
  66. @Override
  67. public void timeout(IN input, ResultFuture<OUT> resultFuture) throws Exception {
  68. close();
  69. connc();
  70. log.info("Connection to HBase TimeOut,Now is reConnect!");
  71. threadPoolExecutor.submit(new Runnable() {
  72. @Override
  73. public void run() {
  74. resultFuture.complete(Collections.singleton(getResult(input)));
  75. }
  76. });
  77. }
  78. private OUT getResult(IN input) {
  79. String rowKey = setRowKey(input);
  80. try {
  81. setQueryHBaseBean();
  82. HBase hBaseResult = (HBase) HBaseUtils.getData(connection, tableName, rowKey, columnFamily, columnName, true, queryHBaseBean.getClass());
  83. return getList(rowKey, input, hBaseResult);
  84. } catch (Exception e) {
  85. e.printStackTrace();
  86. }
  87. return null;
  88. }
  89. @Override
  90. public void open(Configuration parameters) throws Exception {
  91. super.open(parameters);
  92. //赋值connection
  93. connc();
  94. }
  95. @Override
  96. public void close() throws Exception {
  97. super.close();
  98. if (hBaseTable != null) {
  99. hBaseTable.close();
  100. }
  101. if (hBaseAdmin != null) {
  102. hBaseAdmin.close();
  103. }
  104. if (connection != null) {
  105. connection.close();
  106. }
  107. }
  108. private void connc() throws IOException {
  109. connection = HBaseUtils.getHBaseConnection();
  110. hBaseAdmin = connection.getAdmin();
  111. if (!hBaseAdmin.tableExists(TableName.valueOf(tableName))) {
  112. throw new IOException("Query Hbase Table is not exists!");
  113. }
  114. hBaseTable = connection.getTable(TableName.valueOf(tableName));
  115. //初始化
  116. threadPoolExecutor = ThreadPoolUtil.getThreadPool();
  117. }
  118. }

4.2、具体使用

同样还是利用nc模拟输入流数据

主类AsyncIOQueryHBase代码如下:

  1. package com.horse.asyncio;
  2. import com.horse.bean.PassengerInfor;
  3. import com.horse.bean.UserLoginLog;
  4. import com.horse.cep.function.MyFlatMapFunction;
  5. import org.apache.flink.streaming.api.datastream.AsyncDataStream;
  6. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  7. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  9. import com.horse.utils.hbase.HBaseAsyncFunctionUtil;
  10. import java.util.concurrent.TimeUnit;
  11. /**
  12. * @author :aaronChen
  13. * @date :Created in 2022-02-24 14:08
  14. * @description :
  15. * @modifiedBy :
  16. */
  17. public class AsyncIOQueryHBase {
  18. public static void main(String[] args) {
  19. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  20. env.setParallelism(1);
  21. DataStreamSource<String> socketTextStream = env.socketTextStream("localhost", 8888);
  22. SingleOutputStreamOperator<UserLoginLog> dataStream = socketTextStream.flatMap(new MyFlatMapFunction());
  23. SingleOutputStreamOperator<PassengerInfor> outputStreamOperator = AsyncDataStream.unorderedWait(dataStream,
  24. new HBaseAsyncFunctionUtil<UserLoginLog, PassengerInfor, PassengerInfor>("aaron:test1", "first", "", PassengerInfor.class) {
  25. @Override
  26. public String setRowKey(UserLoginLog input) {
  27. return input.getUserName();
  28. }
  29. @Override
  30. public PassengerInfor getList(String rowKey, UserLoginLog input, PassengerInfor hBaseResult) {
  31. return hBaseResult;
  32. }
  33. }, 1000, TimeUnit.MINUTES, 100);
  34. outputStreamOperator.print("异步IO查询Hbase结果输出");
  35. try {
  36. env.execute();
  37. } catch (Exception e) {
  38. e.printStackTrace();
  39. }
  40. }
  41. }

在nc中输入如下数据:

{"loginId":11111,"loginTime":1645177352000,"loginStatus":1,"userName":"11111"}

查看日志:

可以看到如上图中所示,已经能够从hbase中查询出结果。

如果觉得写的不错的,可以适当表示下哈~本人表示感谢。如果有写的不到位的也可以提问。

标签: hbase flink big data

本文转载自: https://blog.csdn.net/Aaron_ch/article/details/123113871
版权归原作者 一个肉团子 所有, 如有侵权,请联系我们删除。

“Flink查询关联Hbase输出”的评论:

还没有评论