Flink 系列文章
一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
- 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。
- 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
- 3、Flik Table API和SQL基础系列 本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
- 4、Flik Table API和SQL提高与应用系列 本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
- 5、Flink 监控系列 本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
文章目录
本文通过Flink 的异步I/O访问外部数据,以redis作为数据源的异步读取使用示例。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,没有其他依赖。
本文运行示例需要redis的环境。
一、示例:异步读取用户信息
本示例是模拟根据外部数据用户姓名查询redis中用户的个人信息。
本示例外部数据就以flink的集合作为示例,redis数据中存储的为hash表,下面验证中会有具体展示。
1、maven依赖
<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.12</artifactId><version>1.1.0</version><exclusions><exclusion><artifactId>flink-streaming-java_2.12</artifactId><groupId>org.apache.flink</groupId></exclusion><exclusion><artifactId>flink-runtime_2.12</artifactId><groupId>org.apache.flink</groupId></exclusion><exclusion><artifactId>flink-core</artifactId><groupId>org.apache.flink</groupId></exclusion><exclusion><artifactId>flink-java</artifactId><groupId>org.apache.flink</groupId></exclusion></exclusions></dependency></dependencies>
2、redis异步交互数据实现
1)、读取redis数据时以string进行输出
packageorg.datastreamapi.source.custom.redis;importjava.util.Collections;importjava.util.concurrent.CompletableFuture;importjava.util.function.Supplier;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.functions.async.ResultFuture;importorg.apache.flink.streaming.api.functions.async.RichAsyncFunction;importcom.sun.jdi.IntegerValue;importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;importredis.clients.jedis.Jedis;importredis.clients.jedis.JedisPool;importredis.clients.jedis.JedisPoolConfig;/**
* @author alanchan
*
*/publicclassCustomRedisSourceextendsRichAsyncFunction<String,String>{privateJedisPoolConfig config =null;privatestaticStringADDR="192.168.10.41";privatestaticintPORT=6379;// 等待可用连接的最大时间,单位是毫秒,默认是-1,表示永不超时privatestaticintTIMEOUT=10000;privateJedisPool jedisPool =null;privateJedis jedis =null;@Overridepublicvoidopen(Configuration parameters)throwsException{super.open(parameters);
config =newJedisPoolConfig();
jedisPool =newJedisPool(config,ADDR,PORT,TIMEOUT);
jedis = jedisPool.getResource();}@OverridepublicvoidasyncInvoke(String input,ResultFuture<String> resultFuture)throwsException{// 文件中读取的内容System.out.println("输入参数input----:"+ input);// 发起一个异步请求,返回结果CompletableFuture.supplyAsync(newSupplier<String>(){@OverridepublicStringget(){String[] arrayData = input.split(",");String name = arrayData[1];String value = jedis.hget("AsyncReadUser_Redis", name);System.out.println("查询结果output----:"+ value);return value;}}).thenAccept((String dbResult)->{// 设置请求完成时的回调,将结果返回
resultFuture.complete(Collections.singleton(dbResult));});}// 连接超时的时候调用的方法@Overridepublicvoidtimeout(String input,ResultFuture<String> resultFuture)throwsException{System.out.println("redis connect timeout!");}@Overridepublicvoidclose()throwsException{super.close();if(jedis.isConnected()){
jedis.close();}}@Data@AllArgsConstructor@NoArgsConstructorstaticclassUser{privateint id;privateString name;privateint age;privatedouble balance;User(String value){String[] str = value.split(",");this.setId(Integer.valueOf(str[0]));this.setName(str[1]);this.setAge(Integer.valueOf(str[2]));this.setBalance(Double.valueOf(str[3]));}}}
2)、读取redis数据时以pojo进行输出
packageorg.datastreamapi.source.custom.redis;importjava.util.Collections;importjava.util.concurrent.CompletableFuture;importjava.util.function.Supplier;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.functions.async.ResultFuture;importorg.apache.flink.streaming.api.functions.async.RichAsyncFunction;importorg.datastreamapi.source.custom.redis.CustomRedisSource.User;importredis.clients.jedis.Jedis;importredis.clients.jedis.JedisPool;importredis.clients.jedis.JedisPoolConfig;/**
* @author alanchan
*
*/publicclassCustomRedisSource2extendsRichAsyncFunction<String,User>{privateJedisPoolConfig config =null;privatestaticStringADDR="192.168.10.41";privatestaticintPORT=6379;// 等待可用连接的最大时间,单位是毫秒,默认是-1,表示永不超时privatestaticintTIMEOUT=10000;privateJedisPool jedisPool =null;privateJedis jedis =null;@Overridepublicvoidopen(Configuration parameters)throwsException{super.open(parameters);
config =newJedisPoolConfig();
jedisPool =newJedisPool(config,ADDR,PORT,TIMEOUT);
jedis = jedisPool.getResource();}@OverridepublicvoidasyncInvoke(String input,ResultFuture<User> resultFuture)throwsException{System.out.println("输入查询条件:"+ input);CompletableFuture.supplyAsync(newSupplier<User>(){@OverridepublicUserget(){String[] arrayData = input.split(",");String name = arrayData[1];String value = jedis.hget("AsyncReadUser_Redis", name);System.out.println("查询redis结果:"+ value);returnnewUser(value);}}).thenAccept((User dbResult)->{// 设置请求完成时的回调,将结果返回
resultFuture.complete(Collections.singleton(dbResult));});}// 连接超时的时候调用的方法@Overridepublicvoidtimeout(String input,ResultFuture<User> resultFuture)throwsException{System.out.println("redis connect timeout!");}@Overridepublicvoidclose()throwsException{super.close();if(jedis.isConnected()){
jedis.close();}}}
3、使用示例
packageorg.datastreamapi.source.custom.redis;importjava.util.concurrent.TimeUnit;importorg.apache.flink.streaming.api.datastream.AsyncDataStream;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.datastreamapi.source.custom.redis.CustomRedisSource.User;/**
* @author alanchan
*
*/publicclassTestCustomRedisSourceDemo{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// id,nameDataStreamSource<String> lines = env.fromElements("1,alan","2,alanchan","3,alanchanchn","4,alan_chan","5,alan_chan_chn");SingleOutputStreamOperator<String> result =AsyncDataStream.orderedWait(lines,newCustomRedisSource(),10,TimeUnit.SECONDS,1);SingleOutputStreamOperator<User> result2 =AsyncDataStream.orderedWait(lines,newCustomRedisSource2(),10,TimeUnit.SECONDS,1);
result.print("result-->").setParallelism(1);
result2.print("result2-->").setParallelism(1);
env.execute();}}
4、验证
1)、准备redis环境数据
hset AsyncReadUser_Redis alan '1,alan,18,20,[email protected]'
hset AsyncReadUser_Redis alanchan '2,alanchan,19,25,[email protected]'
hset AsyncReadUser_Redis alanchanchn '3,alanchanchn,20,30,[email protected]'
hset AsyncReadUser_Redis alan_chan '4,alan_chan,27,20,[email protected]'
hset AsyncReadUser_Redis alan_chan_chn '5,alan_chan_chn,36,10,[email protected]'127.0.0.1:6379> hset AsyncReadUser_Redis alan '1,alan,18,20,[email protected]'(integer)1127.0.0.1:6379> hset AsyncReadUser_Redis alanchan '2,alanchan,19,25,[email protected]'(integer)1127.0.0.1:6379> hset AsyncReadUser_Redis alanchanchn '3,alanchanchn,20,30,[email protected]'(integer)1127.0.0.1:6379> hset AsyncReadUser_Redis alan_chan '4,alan_chan,27,20,[email protected]'(integer)1127.0.0.1:6379> hset AsyncReadUser_Redis alan_chan_chn '5,alan_chan_chn,36,10,[email protected]'(integer)1127.0.0.1:6379> hgetall AsyncReadUser_Redis
1)"alan"2)"1,alan,18,20,[email protected]"3)"alanchan"4)"2,alanchan,19,25,[email protected]"5)"alanchanchn"6)"3,alanchanchn,20,30,[email protected]"7)"alan_chan"8)"4,alan_chan,27,20,[email protected]"9)"alan_chan_chn"10)"5,alan_chan_chn,36,10,[email protected]"
2)、启动应用程序,并观察控制台输出
输入查询条件:5,alan_chan_chn
输入参数input----:2,alanchan
输入参数input----:5,alan_chan_chn
输入查询条件:3,alanchanchn
输入查询条件:1,alan
输入参数input----:1,alan
输入查询条件:2,alanchan
输入查询条件:4,alan_chan
输入参数input----:4,alan_chan
输入参数input----:3,alanchanchn
查询结果output----:3,alanchanchn,20,30,[email protected]
查询redis结果:1,alan,18,20,[email protected]
查询结果output----:1,alan,18,20,[email protected]
查询redis结果:4,alan_chan,27,20,[email protected]
查询redis结果:2,alanchan,19,25,[email protected]
查询结果output----:2,alanchan,19,25,[email protected]
查询redis结果:3,alanchanchn,20,30,[email protected]
查询结果output----:4,alan_chan,27,20,[email protected]
查询结果output----:5,alan_chan_chn,36,10,[email protected]
查询redis结果:5,alan_chan_chn,36,10,[email protected]
result-->>4,alan_chan,27,20,[email protected]
result-->>5,alan_chan_chn,36,10,[email protected]
result-->>3,alanchanchn,20,30,[email protected]
result-->>2,alanchan,19,25,[email protected]
result-->>1,alan,18,20,[email protected]
result2-->> CustomRedisSource.User(id=4, name=alan_chan, age=27, balance=4.0)
result2-->> CustomRedisSource.User(id=1, name=alan, age=18, balance=1.0)
result2-->> CustomRedisSource.User(id=3, name=alanchanchn, age=20, balance=3.0)
result2-->> CustomRedisSource.User(id=5, name=alan_chan_chn, age=36, balance=5.0)
result2-->> CustomRedisSource.User(id=2, name=alanchan, age=19, balance=2.0)
以上,本文通过Flink 的异步I/O访问外部数据,以redis作为数据源的异步读取使用示例。
版权归原作者 一瓢一瓢的饮 alanchanchn 所有, 如有侵权,请联系我们删除。