文章目录
概要
Flink的适用场景以及如何使用
什么是FLink
一句话总结,Flink就是一个分布式,高可用,高性能的流处理框架。
主要构造
- checkpoint:基于chandy-lamport算法实现分布式计算任务的一致性语义;
- state:flink中的状态机制,flink天生支持state,state可以认为程序的中间计算结果或者是历史计算结果;
- time:flink中支持基于事件时间和处理时间进行计算,spark streaming只能按照process time进行处理;基于事件时间的计算我们可以解决数据迟到和乱序等问题。
- window:flink提供了更多丰富的window,基于时间,基于数量,session window,同样支持滚动和滑动窗口的计算。这个看起来没那么浅显易懂,用个很容易理解的栗子来说,Flink类似于一个多线程环境,每个的处理都是独立的,在每个处理里面他包含有自己的一系列机制,可以存储一些执行状态,定时任务,窗口计算等功能。 理论结合实际来打个比方,你现在有一个水池,连接了很多根水管,当水从水池流向水管的时候,水管里面的水就是相对独立的一个执行环境了,至于是这个水是被使用,还是被浪费,对于其他的水管没有任何影响。每个水管都有自己独立的开关,你可以控制他什么时候开什么时候关。但是需要注意的一点是,每根管子的构造是一模一样的,所以他们的流程是一样的,只是会因为状态的不一样而处理方式不同
何时应用,如何应用
当你的数据量很大,比如有些数据推送,每几秒都会给你推送,然后还分了很多个渠道给你推送,你对这些消息中的某些值比较关心,有些还需要做延迟处理的,使用flink那真是赶上巧了。flink的高性能处理,可以很好的满足你数据量大的问题,其中的state,timer等也会对你的每一条消息负责。
springboot使用flink还算比较简单
引入依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.13.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.13.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.13.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.12</artifactId><version>1.13.2</version></dependency>
添加配置
@ComponentpublicclassSignalInit{privatestaticString topicNameSource;privatestaticString topicGroupSource;privatestaticString mqAddressSource;@Value("${custom-rocketmq-source.attribute-id-topic-name}")publicvoidsetHeatTopicSource(String heatTopicName){
billetTopicNameSource = heatTopicName;}@Value("${rocketmq.name-server}")publicvoidsetMqAddressSource(String mqAddressSourceAndPort){
mqAddressSource = mqAddressSourceAndPort;}@Value("${custom-rocketmq-source.attribute-ids-topic-group}")publicvoidsetHeatTopicGroupSource(String heatTopicGroup){
heatTopicGroupSource = heatTopicGroup;}publicstaticvoidinit(){StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();Properties consumerProps =getConsumerProps();//获取配置信息SimpleTupleDeserializationSchema schema =newSimpleTupleDeserializationSchema();DataStream<Tuple2<String,String>> source =
env.addSource(newRocketMQSource<>(schema, consumerProps));//获取mq发过来的数据转成IotEvent对象DataStream<IotEvent> attributeEventStreamSource =
source.flatMap(newFilterCommonTuple2ToIotEventFlatMap());// 分组处理
attributeEventStreamSource
.keyBy(IotEvent::getPushId).process(newDealSignalKeyedProcess());try{
env.execute("SignalInit");}catch(Exception e){
e.printStackTrace();}}privatestaticPropertiesgetConsumerProps(){Properties consumerProps =newProperties();
consumerProps.setProperty(RocketMQConstant.NAME_SERVER_ADDR,
mqAddressSource);
consumerProps.setProperty(RocketMQConstant.CONSUMER_GROUP, heatTopicGroupSource);
consumerProps.setProperty(RocketMQConstant.CONSUMER_TOPIC, billetTopicNameSource);return consumerProps;}
上面代码是Flink配合RokectMq对mq发送过来的数据做的一个处理,通过获取mq发送的消息转换成一个Event对象(这个Event对象自己根据mq发送过来的消息实体新建),最后生成一个Flink的流处理(new DealSignalKeyedProcess())开始执行,在DealSignalKeyedProcess会对你发过来的信号进行具体的处理。
如下是DealSignalKeyedProcess方法中大概需要实现的东西
@Slf4jpublicclassDealSignalKeyedProcessextendsKeyedProcessFunction<String,IotEvent,String>{ApplicationContext context;//这里就使用到了Flink的状态机制,里面存放是数据可以是基本数据类型,也可以是一个对象,其大致的使用方式为 f赋值: timeTsState.update(1.0),获取值timeTsState。value()ValueState<Long> timeTsState;ValueState<IotEvent> iotEventStatus;publicDealSignalKeyedProcess(){}/**
* open中将会对前面定义的属性进行一个简单的初始化
*/@Overridepublicvoidopen(Configuration parameters)throwsException{
context =SpringUtil.getApplicationContext();
timeTsState =getRuntimeContext().getState(newValueStateDescriptor<>("times-ts",Long.class));
iotEventStatus =getRuntimeContext().getState(newValueStateDescriptor<>("iotEvent",IotEvent.clas
}/**
* processElement为业务的主要执行方法,在这个方法中对接收到的Event来进行处理
* 其中可以声明一个定时器,在多少时间后执行,然后销毁定时器,下方将有一个简单的举例
*/@OverridepublicvoidprocessElement(IotEvent value,Context ctx,Collector<String> out){
iotEventStatus.update(value);//....if(timeTsState.value()==null){Long timestamp = sourceMsgTime +(timesUnitCode ==1? durationTime : durationTime *60)*1000L;//创建持续时长后的定时器
ctx.timerService().registerProcessingTimeTimer(timestamp);
timeTsState.update(timestamp);}}@OverridepublicvoidonTimer(long timestamp,KeyedProcessFunction<String,IotEvent,String>.OnTimerContext ctx,Collector<String> out)throwsException{//这里为达到该定时时间时的具体业务实现...}}
小结
很多细节的东西网上都有讲,比如flink的架构体系,流程图等,很多东西其实就在于一个了解,知道在什么时候用什么方法能够提升效率,有一个大概的技术框架感觉就能很轻松的干活了。纯个人想法,不喜勿喷。
版权归原作者 小树勿招风 所有, 如有侵权,请联系我们删除。