Flink Sql去重方案
1、状态去重
将数据保存到状态中,进行累计
select
window_start,
window_end,
count(distinct devId) as cnt
from table (tumble(table source_table,descriptor(rt),interval '60' minute )) --滚动窗口
group by window_start,window_end;
2、利用HyperLogLog进行去重
select
window_start,
window_end,
hllDistinct(distinct devId) as cnt
from table (tumble(table source_table,descriptor(rt),interval '60' minute )) --滚动窗口
group by window_start,window_end;
3、Deduplication方式
当rownum<=1时,flink采用的是Deduplication方式进行去重。该方式有两种去重方案:有保留第一条(Deduplicate Keep FirstRow)和保留最后一条(Deduplicate Keep LastRow)2种。
Deduplicate Keep FirstRow
保留首行的去重策略:保留KEY下第一条出现的数据,之后出现该KEY下的数据会被丢弃掉。因为STATE中只存储了KEY数据,所以性能较优。
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b ORDER BY proctime) as rowNum
FROM T
)
WHERE rowNum = 1
Deduplicate Keep LastRow
保留末行的去重策略:保留KEY下最后一条出现的数据。因此过程中会产生变更的记录,会下下游发送变更的消息。因此,sink表需要支持update操作。
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY rowtime DESC) as rowNum
FROM T
)
WHERE rowNum = 1
Flink 程序去重方案
package com.yyds.flink_distinct;
import org.apache.flink.api.common.state.*;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
/**
* 方便起见使用输出类型使用Void,这里直接使用打印控制台方式查看结果,在实际中可输出到下游做一个批量的处理然后输出
*/
public class _01_DistinctProcessFunction extends KeyedProcessFunction<_01_AdKey,_01_AdvertiseMentData,Void> {
// 定义第一个状态MapState
MapState<String,Integer> deviceIdState ;
// 定义第二个状态ValueState
ValueState<Long> countState ;
@Override
public void open(Configuration parameters) throws Exception {
MapStateDescriptor<String, Integer> deviceIdStateDescriptor = new MapStateDescriptor<>("deviceIdState", String.class, Integer.class);
/*
MapState,key表示devId, value表示一个随意的值只是为了标识,该状态表示一个广告位在某个小时的设备数据,
如果我们使用rocksdb作为statebackend, 那么会将mapstate中key作为rocksdb中key的一部分,
mapstate中value作为rocksdb中的value, rocksdb中value大小是有上限的,这种方式可以减少rocksdb value的大小;
*/
deviceIdState = getRuntimeContext().getMapState(deviceIdStateDescriptor);
ValueStateDescriptor<Long> countStateDescriptor = new ValueStateDescriptor<>("countState", Long.class);
/*
ValueState,存储当前MapState的数据量,是由于mapstate只能通过迭代方式获得数据量大小,每次获取都需要进行迭代,这种方式可以避免每次迭代。
*/
countState = getRuntimeContext().getState(countStateDescriptor);
}
@Override
public void processElement(_01_AdvertiseMentData data, Context context, Collector<Void> collector) throws Exception {
// 主要考虑可能会存在滞后的数据比较严重,会影响之前的计算结果
long currw = context.timerService().currentWatermark();
if(context.getCurrentKey().getTime() + 1 <= currw){
System.out.println("迟到的数据:" + data);
return;
}
String devId = data.getDevId();
Integer i = deviceIdState.get(devId);
if(i == null){
i = 0;
}
if( i == 1 ){
// 表示已经存在
}else {
// 表示不存在,放入到状态中
deviceIdState.put(devId,1);
// 将统计的数据 + 1
Long count = countState.value();
if(count == null){
count = 0L;
}
count ++;
countState.update(count);
// 注册一个定时器,定期清理状态中的数据
context.timerService().registerEventTimeTimer(context.getCurrentKey().getTime() + 1);
}
System.out.println("countState.value() = " + countState.value());
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Void> out) throws Exception {
System.out.println(timestamp + " exec clean~~~");
System.out.println("countState.value() = " + countState.value());
// 清除状态
deviceIdState.clear();
countState.clear();
}
}
版权归原作者 大大大大肉包 所有, 如有侵权,请联系我们删除。