0


Flink中常用的去重方案

Flink Sql去重方案

1、状态去重

将数据保存到状态中,进行累计

select
    window_start, 
    window_end, 
    count(distinct devId) as cnt 
from table (tumble(table source_table,descriptor(rt),interval '60' minute )) --滚动窗口  
group by window_start,window_end;

2、利用HyperLogLog进行去重

select
    window_start, 
    window_end, 
    hllDistinct(distinct devId) as cnt 
from table (tumble(table source_table,descriptor(rt),interval '60' minute )) --滚动窗口  
group by window_start,window_end;

3、Deduplication方式

当rownum<=1时,flink采用的是Deduplication方式进行去重。该方式有两种去重方案:有保留第一条(Deduplicate Keep FirstRow)和保留最后一条(Deduplicate Keep LastRow)2种。

Deduplicate Keep FirstRow

保留首行的去重策略:保留KEY下第一条出现的数据,之后出现该KEY下的数据会被丢弃掉。因为STATE中只存储了KEY数据,所以性能较优。

SELECT *
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY b ORDER BY proctime) as rowNum
  FROM T
)
WHERE rowNum = 1

Deduplicate Keep LastRow

保留末行的去重策略:保留KEY下最后一条出现的数据。因此过程中会产生变更的记录,会下下游发送变更的消息。因此,sink表需要支持update操作。

SELECT *
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY rowtime DESC) as rowNum
  FROM T
)
WHERE rowNum = 1

Flink 程序去重方案

package com.yyds.flink_distinct;

import org.apache.flink.api.common.state.*;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

/**
 * 方便起见使用输出类型使用Void,这里直接使用打印控制台方式查看结果,在实际中可输出到下游做一个批量的处理然后输出
 */
public class _01_DistinctProcessFunction extends KeyedProcessFunction<_01_AdKey,_01_AdvertiseMentData,Void> {

    // 定义第一个状态MapState
    MapState<String,Integer> deviceIdState ;
    // 定义第二个状态ValueState
    ValueState<Long> countState ;

    @Override
    public void open(Configuration parameters) throws Exception {
        MapStateDescriptor<String, Integer> deviceIdStateDescriptor = new MapStateDescriptor<>("deviceIdState", String.class, Integer.class);
        /*
          MapState,key表示devId, value表示一个随意的值只是为了标识,该状态表示一个广告位在某个小时的设备数据,
            如果我们使用rocksdb作为statebackend, 那么会将mapstate中key作为rocksdb中key的一部分,
            mapstate中value作为rocksdb中的value, rocksdb中value大小是有上限的,这种方式可以减少rocksdb value的大小;
         */
        deviceIdState = getRuntimeContext().getMapState(deviceIdStateDescriptor);

        ValueStateDescriptor<Long> countStateDescriptor = new ValueStateDescriptor<>("countState", Long.class);
        /*
          ValueState,存储当前MapState的数据量,是由于mapstate只能通过迭代方式获得数据量大小,每次获取都需要进行迭代,这种方式可以避免每次迭代。
         */
        countState = getRuntimeContext().getState(countStateDescriptor);

    }

    @Override
    public void processElement(_01_AdvertiseMentData data, Context context, Collector<Void> collector) throws Exception {
        // 主要考虑可能会存在滞后的数据比较严重,会影响之前的计算结果
        long currw = context.timerService().currentWatermark();
        if(context.getCurrentKey().getTime() + 1 <= currw){
            System.out.println("迟到的数据:" + data);
            return;
        }

        String devId = data.getDevId();
        Integer i = deviceIdState.get(devId);
        if(i == null){
            i = 0;
        }

        if(  i == 1  ){
            // 表示已经存在
        }else {
            // 表示不存在,放入到状态中
            deviceIdState.put(devId,1);
            // 将统计的数据 + 1
            Long count = countState.value();

            if(count == null){
                count = 0L;
            }
            count ++;
            countState.update(count);
            // 注册一个定时器,定期清理状态中的数据
            context.timerService().registerEventTimeTimer(context.getCurrentKey().getTime() + 1);
        }

        System.out.println("countState.value() = " + countState.value());
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Void> out) throws Exception {
        System.out.println(timestamp + " exec clean~~~");
        System.out.println("countState.value() = " + countState.value());
        // 清除状态
        deviceIdState.clear();
        countState.clear();
    }
}
标签: flink 大数据

本文转载自: https://blog.csdn.net/qq_42456324/article/details/127821866
版权归原作者 大大大大肉包 所有, 如有侵权,请联系我们删除。

“Flink中常用的去重方案”的评论:

还没有评论