0


Flink的简单使用

文章目录

概要

Flink的适用场景以及如何使用

什么是FLink

一句话总结,Flink就是一个分布式,高可用,高性能的流处理框架。

主要构造

  • checkpoint:基于chandy-lamport算法实现分布式计算任务的一致性语义;
  • state:flink中的状态机制,flink天生支持state,state可以认为程序的中间计算结果或者是历史计算结果;
  • time:flink中支持基于事件时间和处理时间进行计算,spark streaming只能按照process time进行处理;基于事件时间的计算我们可以解决数据迟到和乱序等问题。
  • window:flink提供了更多丰富的window,基于时间,基于数量,session window,同样支持滚动和滑动窗口的计算。这个看起来没那么浅显易懂,用个很容易理解的栗子来说,Flink类似于一个多线程环境,每个的处理都是独立的,在每个处理里面他包含有自己的一系列机制,可以存储一些执行状态,定时任务,窗口计算等功能。 理论结合实际来打个比方,你现在有一个水池,连接了很多根水管,当水从水池流向水管的时候,水管里面的水就是相对独立的一个执行环境了,至于是这个水是被使用,还是被浪费,对于其他的水管没有任何影响。每个水管都有自己独立的开关,你可以控制他什么时候开什么时候关。但是需要注意的一点是,每根管子的构造是一模一样的,所以他们的流程是一样的,只是会因为状态的不一样而处理方式不同

何时应用,如何应用

当你的数据量很大,比如有些数据推送,每几秒都会给你推送,然后还分了很多个渠道给你推送,你对这些消息中的某些值比较关心,有些还需要做延迟处理的,使用flink那真是赶上巧了。flink的高性能处理,可以很好的满足你数据量大的问题,其中的state,timer等也会对你的每一条消息负责。

springboot使用flink还算比较简单
引入依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.13.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.13.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.13.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.12</artifactId><version>1.13.2</version></dependency>

添加配置

@ComponentpublicclassSignalInit{privatestaticString topicNameSource;privatestaticString topicGroupSource;privatestaticString mqAddressSource;@Value("${custom-rocketmq-source.attribute-id-topic-name}")publicvoidsetHeatTopicSource(String heatTopicName){
        billetTopicNameSource = heatTopicName;}@Value("${rocketmq.name-server}")publicvoidsetMqAddressSource(String mqAddressSourceAndPort){
        mqAddressSource = mqAddressSourceAndPort;}@Value("${custom-rocketmq-source.attribute-ids-topic-group}")publicvoidsetHeatTopicGroupSource(String heatTopicGroup){
        heatTopicGroupSource = heatTopicGroup;}publicstaticvoidinit(){StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();Properties consumerProps =getConsumerProps();//获取配置信息SimpleTupleDeserializationSchema schema =newSimpleTupleDeserializationSchema();DataStream<Tuple2<String,String>> source =
                env.addSource(newRocketMQSource<>(schema, consumerProps));//获取mq发过来的数据转成IotEvent对象DataStream<IotEvent> attributeEventStreamSource =
                source.flatMap(newFilterCommonTuple2ToIotEventFlatMap());// 分组处理
        attributeEventStreamSource
                .keyBy(IotEvent::getPushId).process(newDealSignalKeyedProcess());try{
            env.execute("SignalInit");}catch(Exception e){
            e.printStackTrace();}}privatestaticPropertiesgetConsumerProps(){Properties consumerProps =newProperties();
        consumerProps.setProperty(RocketMQConstant.NAME_SERVER_ADDR,
                mqAddressSource);
        consumerProps.setProperty(RocketMQConstant.CONSUMER_GROUP, heatTopicGroupSource);
        consumerProps.setProperty(RocketMQConstant.CONSUMER_TOPIC, billetTopicNameSource);return consumerProps;}

上面代码是Flink配合RokectMq对mq发送过来的数据做的一个处理,通过获取mq发送的消息转换成一个Event对象(这个Event对象自己根据mq发送过来的消息实体新建),最后生成一个Flink的流处理(new DealSignalKeyedProcess())开始执行,在DealSignalKeyedProcess会对你发过来的信号进行具体的处理。
如下是DealSignalKeyedProcess方法中大概需要实现的东西

@Slf4jpublicclassDealSignalKeyedProcessextendsKeyedProcessFunction<String,IotEvent,String>{ApplicationContext context;//这里就使用到了Flink的状态机制,里面存放是数据可以是基本数据类型,也可以是一个对象,其大致的使用方式为 f赋值:      timeTsState.update(1.0),获取值timeTsState。value()ValueState<Long> timeTsState;ValueState<IotEvent> iotEventStatus;publicDealSignalKeyedProcess(){}/**
 * open中将会对前面定义的属性进行一个简单的初始化
 */@Overridepublicvoidopen(Configuration parameters)throwsException{
        context =SpringUtil.getApplicationContext();
        timeTsState =getRuntimeContext().getState(newValueStateDescriptor<>("times-ts",Long.class));
        iotEventStatus =getRuntimeContext().getState(newValueStateDescriptor<>("iotEvent",IotEvent.clas
    }/**
 * processElement为业务的主要执行方法,在这个方法中对接收到的Event来进行处理
 * 其中可以声明一个定时器,在多少时间后执行,然后销毁定时器,下方将有一个简单的举例
 */@OverridepublicvoidprocessElement(IotEvent value,Context ctx,Collector<String> out){
        iotEventStatus.update(value);//....if(timeTsState.value()==null){Long timestamp = sourceMsgTime +(timesUnitCode ==1? durationTime : durationTime *60)*1000L;//创建持续时长后的定时器
            ctx.timerService().registerProcessingTimeTimer(timestamp);
            timeTsState.update(timestamp);}}@OverridepublicvoidonTimer(long timestamp,KeyedProcessFunction<String,IotEvent,String>.OnTimerContext ctx,Collector<String> out)throwsException{//这里为达到该定时时间时的具体业务实现...}}

小结

很多细节的东西网上都有讲,比如flink的架构体系,流程图等,很多东西其实就在于一个了解,知道在什么时候用什么方法能够提升效率,有一个大概的技术框架感觉就能很轻松的干活了。纯个人想法,不喜勿喷。


本文转载自: https://blog.csdn.net/qq_41961134/article/details/135213130
版权归原作者 小树勿招风 所有, 如有侵权,请联系我们删除。

“Flink的简单使用”的评论:

还没有评论