0


【Flink实战】Flink 商品销量统计-实战Bahir Connetor实战存储 数据到Redis6.X

🚀 作者 :“大数据小禅”

🚀 文章简介 :Flink 商品销量统计-实战Bahir Connetor实战存储 数据到Redis6.X

🚀 欢迎小伙伴们 点赞👍、收藏⭐、留言💬


目录导航

Flink怎么操作Redis

  • Flink怎么操作redis?- 方式一:自定义sink- 方式二:使用connector
  • Redis Sink 核心是RedisMapper 是一个接口,使用时要编写自己的redis操作类实现这个接口中的三个方法- getCommandDescription 选择对应的数据结构和key名称配置- getKeyFromData 获取key- getValueFromData 获取value
  • 使用- 添加依赖<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version></dependency>
  • 编码publicclassMyRedisSinkimplementsRedisMapper<Tuple2<String,Integer>>{@OverridepublicRedisCommandDescriptiongetCommandDescription(){returnnewRedisCommandDescription(RedisCommand.HSET,"VIDEO_ORDER_COUNTER");}@OverridepublicStringgetKeyFromData(Tuple2<String,Integer> value){return value.f0;}@OverridepublicStringgetValueFromData(Tuple2<String,Integer> value){return value.f1.toString();}}

Flink 商品销量统计-转换-分组-聚合-存储自定义的Redis Sink实战

  • Redis环境说明 redis6- 使用docker部署redis6.x 看个人主页docker相关文章docker run -d -p 6379:6379 redis
  • 编码实战

数据源

public class VideoOrderSource extends RichParallelSourceFunction<VideoOrder> {

    private volatile Boolean flag = true;

    private Random random = new Random();

    private static List<String> list = new ArrayList<>();
    static {
        list.add("spring boot2.x课程");
        list.add("微服务SpringCloud课程");
        list.add("RabbitMQ消息队列");
        list.add("Kafka课程");
        list.add("小滴课堂面试专题第一季");
        list.add("Flink流式技术课程");
        list.add("工业级微服务项目大课训练营");
        list.add("Linux课程");
    }

    /**
     * run 方法调用前 用于初始化连接
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        System.out.println("-----open-----");
    }

    /**
     * 用于清理之前
     * @throws Exception
     */
    @Override
    public void close() throws Exception {
        System.out.println("-----close-----");
    }

    /**
     * 产生数据的逻辑
     * @param ctx
     * @throws Exception
     */
    @Override
    public void run(SourceContext<VideoOrder> ctx) throws Exception {

        while (flag){
            Thread.sleep(1000);
            String id = UUID.randomUUID().toString();
            int userId = random.nextInt(10);
            int money = random.nextInt(100);
            int videoNum = random.nextInt(list.size());
            String title = list.get(videoNum);
            VideoOrder videoOrder = new VideoOrder(id,title,money,userId,new Date());

            ctx.collect(videoOrder);
        }

    }

    /**
     * 控制任务取消
     */
    @Override
    public void cancel() {

        flag = false;
    }
}

保存的格式与存取的方法

publicclassVideoOrderCounterSinkimplementsRedisMapper<Tuple2<String,Integer>>{/***
     * 选择需要用到的命令,和key名称
     * @return
     */@OverridepublicRedisCommandDescriptiongetCommandDescription(){returnnewRedisCommandDescription(RedisCommand.HSET,"VIDEO_ORDER_COUNTER");}/**
     * 获取对应的key或者filed
     *
     * @param data
     * @return
     */@OverridepublicStringgetKeyFromData(Tuple2<String,Integer> data){System.out.println("getKeyFromData="+ data.f0);return data.f0;}/**
     * 获取对应的值
     *
     * @param data
     * @return
     */@OverridepublicStringgetValueFromData(Tuple2<String,Integer> data){System.out.println("getValueFromData="+ data.f1.toString());return data.f1.toString();}}

落地

publicclassFlink07RedisSinkApp{/**
     * source
     * transformation
     * sink
     *
     * @param args
     */publicstaticvoidmain(String[] args)throwsException{//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();

        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);//数据源 source//        DataStream<VideoOrder> ds = env.fromElements(//                new VideoOrder("21312","java",32,5,new Date()),//                new VideoOrder("314","java",32,5,new Date()),//                new VideoOrder("542","springboot",32,5,new Date()),//                new VideoOrder("42","redis",32,5,new Date()),//                new VideoOrder("4252","java",32,5,new Date()),//                new VideoOrder("42","springboot",32,5,new Date()),//                new VideoOrder("554232","flink",32,5,new Date()),//                new VideoOrder("23323","java",32,5,new Date())//        );DataStream<VideoOrder> ds = env.addSource(newVideoOrderSource());//transformationDataStream<Tuple2<String,Integer>> mapDS =  ds.map(newMapFunction<VideoOrder,Tuple2<String,Integer>>(){@OverridepublicTuple2<String,Integer>map(VideoOrder value)throwsException{returnnewTuple2<>(value.getTitle(),1);}});//        DataStream<Tuple2<String,Integer>> mapDS = ds.flatMap(new FlatMapFunction<VideoOrder, Tuple2<String,Integer>>() {//            @Override//            public void flatMap(VideoOrder value, Collector<Tuple2<String, Integer>> out) throws Exception {//                out.collect(new Tuple2<>(value.getTitle(),1));//            }//        });//分组KeyedStream<Tuple2<String,Integer>,String> keyByDS = mapDS.keyBy(newKeySelector<Tuple2<String,Integer>,String>(){@OverridepublicStringgetKey(Tuple2<String,Integer> value)throwsException{return value.f0;}});//统计每组有多少个DataStream<Tuple2<String,Integer>> sumDS =  keyByDS.sum(1);//控制台打印
        sumDS.print();//单机redisFlinkJedisPoolConfig conf =newFlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPort(6379).build();

        sumDS.addSink(newRedisSink<>(conf,newVideoOrderCounterSink()));//DataStream需要调用execute,可以取个名称
        env.execute("custom redis sink job");}}

在这里插入图片描述

标签: flink 大数据

本文转载自: https://blog.csdn.net/weixin_45574790/article/details/132857544
版权归原作者 大数据小禅 所有, 如有侵权,请联系我们删除。

“【Flink实战】Flink 商品销量统计-实战Bahir Connetor实战存储 数据到Redis6.X”的评论:

还没有评论