0


Flink 输出至 Redis

【1】引入第三方

Bahir

提供的

Flink-redis

相关依赖包

<!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis --><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version></dependency>

【2】

Flink

连接

Redis

并输出

Sink

处理结果

packagecom.zzx.flinkimportorg.apache.flink.streaming.api.scala._
importorg.apache.flink.streaming.connectors.redis.RedisSinkimportorg.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfigimportorg.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand,RedisCommandDescription,RedisMapper}

object RedisSinkTest{
  def main(args:Array[String]):Unit={// 创建一个流处理执行环境
    val env =StreamExecutionEnvironment.getExecutionEnvironment
    //从文件中读取数据并转换为 类
    val inputStreamFromFile:DataStream[String]= env.readTextFile("E:\\Project\\flink\\src\\main\\resources\\wordcount.txt")//转换  SensorReading为用户自定义的类,是从文件转换而来的
    val dataStream:DataStream[SensorReading]= inputStreamFromFile
      .map( data =>{var dataArray = data.split(",")SensorReading(dataArray(0),dataArray(1).toLong,dataArray(2).toDouble)})//定义一个 redis 的配置类 继承了FlinkJedisConfigBase 正是 SensorReading需要传入的参数,底层将有些数据保存成了状态数据。
    val conf =newFlinkJedisPoolConfig.Builder().setHost("192.168.52.131").setPort(6379).setPassword("zzx").build()//定义 RedisMapper 数据保存的类型
    val myMapper =newRedisMapper[SensorReading]{//定义保存数据到 redis的命令,hset table key value
      override def getCommandDescription:RedisCommandDescription={// hset tablesnamenewRedisCommandDescription(RedisCommand.HSET,"sensor_temp")}//设置key
      override def getKeyFromData(data:SensorReading):String= data.id
      //设置value
      override def getValueFromData(data:SensorReading):String= data.temperature.toString
    }
    dataStream.addSink(newRedisSink[SensorReading](conf,myMapper))

    env.execute("Redis Sink test")}}

查看源码可知

RedisSink

是继承自

RichSinkFunction<IN>

publicclassRedisSink<IN>extendsRichSinkFunction<IN>{

【3】查看

Redis

输出信息
[点击并拖拽以移动]

标签: flink redis 大数据

本文转载自: https://blog.csdn.net/zhengzhaoyang122/article/details/135211509
版权归原作者 程序猿进阶 所有, 如有侵权,请联系我们删除。

“Flink 输出至 Redis”的评论:

还没有评论