【1】引入第三方
Bahir
提供的
Flink-redis
相关依赖包
<!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis --><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version></dependency>
【2】
Flink
连接
Redis
并输出
Sink
处理结果
packagecom.zzx.flinkimportorg.apache.flink.streaming.api.scala._
importorg.apache.flink.streaming.connectors.redis.RedisSinkimportorg.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfigimportorg.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand,RedisCommandDescription,RedisMapper}
object RedisSinkTest{
def main(args:Array[String]):Unit={// 创建一个流处理执行环境
val env =StreamExecutionEnvironment.getExecutionEnvironment
//从文件中读取数据并转换为 类
val inputStreamFromFile:DataStream[String]= env.readTextFile("E:\\Project\\flink\\src\\main\\resources\\wordcount.txt")//转换 SensorReading为用户自定义的类,是从文件转换而来的
val dataStream:DataStream[SensorReading]= inputStreamFromFile
.map( data =>{var dataArray = data.split(",")SensorReading(dataArray(0),dataArray(1).toLong,dataArray(2).toDouble)})//定义一个 redis 的配置类 继承了FlinkJedisConfigBase 正是 SensorReading需要传入的参数,底层将有些数据保存成了状态数据。
val conf =newFlinkJedisPoolConfig.Builder().setHost("192.168.52.131").setPort(6379).setPassword("zzx").build()//定义 RedisMapper 数据保存的类型
val myMapper =newRedisMapper[SensorReading]{//定义保存数据到 redis的命令,hset table key value
override def getCommandDescription:RedisCommandDescription={// hset tablesnamenewRedisCommandDescription(RedisCommand.HSET,"sensor_temp")}//设置key
override def getKeyFromData(data:SensorReading):String= data.id
//设置value
override def getValueFromData(data:SensorReading):String= data.temperature.toString
}
dataStream.addSink(newRedisSink[SensorReading](conf,myMapper))
env.execute("Redis Sink test")}}
查看源码可知
RedisSink
是继承自
RichSinkFunction<IN>
类
publicclassRedisSink<IN>extendsRichSinkFunction<IN>{
【3】查看
Redis
输出信息
本文转载自: https://blog.csdn.net/zhengzhaoyang122/article/details/135211509
版权归原作者 程序猿进阶 所有, 如有侵权,请联系我们删除。
版权归原作者 程序猿进阶 所有, 如有侵权,请联系我们删除。