🚀 作者 :“大数据小禅”
🚀 文章简介 :Flink 商品销量统计-实战Bahir Connetor实战存储 数据到Redis6.X
🚀 欢迎小伙伴们 点赞👍、收藏⭐、留言💬
目录导航
Flink怎么操作Redis
- Flink怎么操作redis?- 方式一:自定义sink- 方式二:使用connector
- Redis Sink 核心是RedisMapper 是一个接口,使用时要编写自己的redis操作类实现这个接口中的三个方法- getCommandDescription 选择对应的数据结构和key名称配置- getKeyFromData 获取key- getValueFromData 获取value
- 使用- 添加依赖
<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version></dependency>
- 编码
publicclassMyRedisSinkimplementsRedisMapper<Tuple2<String,Integer>>{@OverridepublicRedisCommandDescriptiongetCommandDescription(){returnnewRedisCommandDescription(RedisCommand.HSET,"VIDEO_ORDER_COUNTER");}@OverridepublicStringgetKeyFromData(Tuple2<String,Integer> value){return value.f0;}@OverridepublicStringgetValueFromData(Tuple2<String,Integer> value){return value.f1.toString();}}
Flink 商品销量统计-转换-分组-聚合-存储自定义的Redis Sink实战
- Redis环境说明 redis6- 使用docker部署redis6.x 看个人主页docker相关文章
docker run -d -p 6379:6379 redis
- 编码实战
数据源
public class VideoOrderSource extends RichParallelSourceFunction<VideoOrder> {
private volatile Boolean flag = true;
private Random random = new Random();
private static List<String> list = new ArrayList<>();
static {
list.add("spring boot2.x课程");
list.add("微服务SpringCloud课程");
list.add("RabbitMQ消息队列");
list.add("Kafka课程");
list.add("小滴课堂面试专题第一季");
list.add("Flink流式技术课程");
list.add("工业级微服务项目大课训练营");
list.add("Linux课程");
}
/**
* run 方法调用前 用于初始化连接
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("-----open-----");
}
/**
* 用于清理之前
* @throws Exception
*/
@Override
public void close() throws Exception {
System.out.println("-----close-----");
}
/**
* 产生数据的逻辑
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<VideoOrder> ctx) throws Exception {
while (flag){
Thread.sleep(1000);
String id = UUID.randomUUID().toString();
int userId = random.nextInt(10);
int money = random.nextInt(100);
int videoNum = random.nextInt(list.size());
String title = list.get(videoNum);
VideoOrder videoOrder = new VideoOrder(id,title,money,userId,new Date());
ctx.collect(videoOrder);
}
}
/**
* 控制任务取消
*/
@Override
public void cancel() {
flag = false;
}
}
保存的格式与存取的方法
publicclassVideoOrderCounterSinkimplementsRedisMapper<Tuple2<String,Integer>>{/***
* 选择需要用到的命令,和key名称
* @return
*/@OverridepublicRedisCommandDescriptiongetCommandDescription(){returnnewRedisCommandDescription(RedisCommand.HSET,"VIDEO_ORDER_COUNTER");}/**
* 获取对应的key或者filed
*
* @param data
* @return
*/@OverridepublicStringgetKeyFromData(Tuple2<String,Integer> data){System.out.println("getKeyFromData="+ data.f0);return data.f0;}/**
* 获取对应的值
*
* @param data
* @return
*/@OverridepublicStringgetValueFromData(Tuple2<String,Integer> data){System.out.println("getValueFromData="+ data.f1.toString());return data.f1.toString();}}
落地
publicclassFlink07RedisSinkApp{/**
* source
* transformation
* sink
*
* @param args
*/publicstaticvoidmain(String[] args)throwsException{//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);//数据源 source// DataStream<VideoOrder> ds = env.fromElements(// new VideoOrder("21312","java",32,5,new Date()),// new VideoOrder("314","java",32,5,new Date()),// new VideoOrder("542","springboot",32,5,new Date()),// new VideoOrder("42","redis",32,5,new Date()),// new VideoOrder("4252","java",32,5,new Date()),// new VideoOrder("42","springboot",32,5,new Date()),// new VideoOrder("554232","flink",32,5,new Date()),// new VideoOrder("23323","java",32,5,new Date())// );DataStream<VideoOrder> ds = env.addSource(newVideoOrderSource());//transformationDataStream<Tuple2<String,Integer>> mapDS = ds.map(newMapFunction<VideoOrder,Tuple2<String,Integer>>(){@OverridepublicTuple2<String,Integer>map(VideoOrder value)throwsException{returnnewTuple2<>(value.getTitle(),1);}});// DataStream<Tuple2<String,Integer>> mapDS = ds.flatMap(new FlatMapFunction<VideoOrder, Tuple2<String,Integer>>() {// @Override// public void flatMap(VideoOrder value, Collector<Tuple2<String, Integer>> out) throws Exception {// out.collect(new Tuple2<>(value.getTitle(),1));// }// });//分组KeyedStream<Tuple2<String,Integer>,String> keyByDS = mapDS.keyBy(newKeySelector<Tuple2<String,Integer>,String>(){@OverridepublicStringgetKey(Tuple2<String,Integer> value)throwsException{return value.f0;}});//统计每组有多少个DataStream<Tuple2<String,Integer>> sumDS = keyByDS.sum(1);//控制台打印
sumDS.print();//单机redisFlinkJedisPoolConfig conf =newFlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPort(6379).build();
sumDS.addSink(newRedisSink<>(conf,newVideoOrderCounterSink()));//DataStream需要调用execute,可以取个名称
env.execute("custom redis sink job");}}
本文转载自: https://blog.csdn.net/weixin_45574790/article/details/132857544
版权归原作者 大数据小禅 所有, 如有侵权,请联系我们删除。
版权归原作者 大数据小禅 所有, 如有侵权,请联系我们删除。