0


Flink 实时数仓(十一)【ADS 层搭建】

前言

    ADS 层也就是这个实时数仓的最后一层了,意味着今天这个项目就要结束了;那么接下来就是结合着离线和实时数仓,把这两个数仓项目再次复习,反复理解。同时好好看看《阿里巴巴大数据之路》中关于数仓建模的内容;

    今天的秋招估计比去年更加严峻,本科学历也在贬值,不管怎么样,尽自己努力就好。

1、ADS 层搭建

    这里难度就不大了,无非就是写写 SQL (查询 DWS 层),把要展示的数据拼接成一张表,通过接口传给报表;

    DWS 层把轻度聚合的结果保存到 ClickHouse 中,主要的目的就是提供即时的数据查询、统计、分析服务。这些统计服务一般会以两种形式呈现,一种是面向专业数据分析人员准备的 BI 工具,一种是面向非专业人员的更加直观的数据大屏。

    这里用 Sugar 来作为 BI 可视化工具进行展示,做离线数仓的时候,我们是使用 Superset 主动读取 MySQL 数据库来实现数据可视化的,这里,我们要保证实时性不可能再去实时把 clickhouse 的数据写入到 MySQL,而是直接在 clickhouse 基础上使用 SpringBoot 进行 jdbc 查询,查询到的数据封装成 json 格式,让 Sugar 来周期性地读取这些 json 格式的数据;

    这个 json 格式的 k-v 当然是由 Sugar 来决定的,所以我们首先需要在 Sugar 上确定图表,然后再去根据 Sugar 要求的 json 格式去封装;

    此外,外网(Sugar)想要访问我们本地的数据需要做内网穿透(借助花生壳),这里不多废话,直接跳过;下面只讲核心的 ADS 层代码:

1.1、流量主题

1.1.1、各渠道流量统计

需求如下:

统计周期

统计粒度

指标

说明

当日

渠道

独立访客数

统计访问人数

当日

渠道

会话总数

统计会话总数

当日

渠道

会话平均浏览页面数

统计每个会话平均浏览页面数

当日

渠道

会话平均停留时长

统计每个会话平均停留时长

当日

渠道

跳出率

只有一个页面的会话的比例

我们在 DWS 层的时候,已经创建过 dws_traffic_vc_ch_ar_is_new_page_view_window 这张表了,所以这里我们直接查询 ck 即可;当时我们创建的这张表的建表语句如下:

create table if not exists dws_traffic_vc_ch_ar_is_new_page_view_window
(
    stt     DateTime, -- 窗口开始时间(年月日)
    edt     DateTime, -- 窗口结束时间(年月日)
    vc      String, -- 版本号
    ch      String, -- 渠道
    ar      String, -- 地区
    is_new  String, -- 新用户
    uv_ct   UInt64, -- 独立访客数
    sv_ct   UInt64, -- 会话数
    pv_ct   UInt64, -- 页面浏览数
    dur_sum UInt64, -- 浏览时长
    uj_ct   UInt64, -- 跳出会话数
    ts      UInt64
) engine = ReplacingMergeTree(ts)
      partition by toYYYYMMDD(stt)
      order by (stt, edt, vc, ch, ar, is_new);

所以这里我们只需要根据需求写查询语句即可,我们将来会把不同渠道的不同指标做成柱状图,比如各渠道独立访客数

此外,还有各渠道会话总数、会话平均浏览页面数、会话平均停留时长、跳出率等,意味着我们需要写 5 条 SQL 去 clickhouse 中查询;

各渠道独立访客数

SELECT
    ch,
    sum(uv_ct) uv_ct
FROM
    dws_traffic_vc_ch_ar_is_new_page_view_window
WHERE
    toYYYYMMDD(stt) = #{date} 
GROUP BY 
    toYYYYMMDD(stt), ch
ORDER BY
    uv_ct desc;

各渠道总会话数

SELECT
    ch,
    sum(sv_ct) sv_ct
FROM
    dws_traffic_vc_ch_ar_is_new_page_view_window
WHERE
    toYYYYMMDD(stt) = #{date} 
GROUP BY 
    toYYYYMMDD(stt), ch
ORDER BY
    sv_ct desc;

各渠道会话平均页面浏览数

平均浏览页面数(平均每个会话浏览的页面数) = 页面浏览总数 / 会话总数

SELECT
    ch,
    sum(pv_ct) / sum(sv_ct) pv_per_session
FROM
    dws_traffic_vc_ch_ar_is_new_page_view_window
WHERE
    toYYYYMMDD(stt) = #{date} 
GROUP BY 
    toYYYYMMDD(stt), ch
ORDER BY
    pv_per_session desc;

各渠道会话平均页面停留时长

SELECT
    ch,
    sum(dur_sum) / sum(sv_ct) dur_per_session
FROM
    dws_traffic_vc_ch_ar_is_new_page_view_window
WHERE
    toYYYYMMDD(stt) = #{date} 
GROUP BY 
    toYYYYMMDD(stt), ch
ORDER BY
    dur_per_session desc;

各渠道会话跳出率

SELECT
    ch,
    sum(uj_ct) / sum(sv_ct) uj_rate
FROM
    dws_traffic_vc_ch_ar_is_new_page_view_window
WHERE
    toYYYYMMDD(stt) = #{date} 
GROUP BY 
    toYYYYMMDD(stt), ch
ORDER BY
    uj_rate desc;
代码实现
1、编写实体类

创建上面 5 个 SQL 各自返回的表的实体类,很简单,毕竟基本上都返回两个字段(维度 + 度量值):

@Data
@AllArgsConstructor
public class TrafficUvCt {
    String ch; // 渠道
    Integer uvCt; // 独立访客数
}
@Data
@AllArgsConstructor
public class TrafficSvCt {
    // 渠道
    String ch;
    // 会话数
    Integer svCt;
}
@Data
@AllArgsConstructor
public class TrafficUjRate {
    // 渠道
    String ch;
    // 跳出率
    Double ujRate;
}
@Data
@AllArgsConstructor
public class TrafficDurPerSession {
    // 渠道
    String ch;
    // 各会话页面访问时长
    Double durPerSession;
}
@Data
@AllArgsConstructor
public class TrafficPvPerSession {
    // 渠道
    String ch;
    // 各会话页面浏览数
    Double pvPerSession;
}
2、编写Mapper 接口

通过 Mybatis 自动实现接口来获取返回结果:

@Mapper
public interface TrafficChannelStatusMapper {

    // 1. 各渠道独立访客数(uv)
    @Select("select ch,sum(uv_ct) uv_ct from" +
            " dws_traffic_vc_ch_ar_is_new_page_view_window" +
            " where toYYYYMMDD(stt) = #{date} group by toYYYYMMDD(stt),ch" +
            " order by uv_ct desc;")
    List<TrafficUvCt> selectUvCt(@Param("date")Integer date);

    // 2. 各渠道会话数(sv)
    @Select("select ch,sum(sv_ct) sv_ct from" +
            " dws_traffic_vc_ch_ar_is_new_page_view_window" +
            " where toYYYYMMDD(stt) = #{date} group by toYYYYMMDD(stt),ch" +
            " order by sv_ct desc;")
    List<TrafficSvCt> selectSvCt(@Param("date")Integer date);

    // 3. 各渠道会话平均页面浏览数
    @Select("select ch,sum(pv_ct) / sum(sv_ct) pv_per_session from" +
            " dws_traffic_vc_ch_ar_is_new_page_view_window" +
            " where toYYYYMMDD(stt) = #{date} group by toYYYYMMDD(stt),ch" +
            " order by pv_per_session desc;")
    List<TrafficPvPerSession> selectPvPerSession(@Param("date")Integer date);

    // 4. 各渠道会话平均页面访问时长
    @Select("select ch,sum(dur_sum) / sum(sv_ct) dur_per_session from" +
            " dws_traffic_vc_ch_ar_is_new_page_view_window" +
            " where toYYYYMMDD(stt) = #{date} group by toYYYYMMDD(stt),ch" +
            " order by dur_per_session desc;")
    List<TrafficDurPerSession> selectDurPerSession(@Param("date")Integer date);

    // 5. 各渠道跳出率(跳出会话数/会话总数)
    @Select("select ch,sum(uj_ct)/sum(sv_ct) uj_rate from" +
            " dws_traffic_vc_ch_ar_is_new_page_view_window" +
            " where toYYYYMMDD(stt) = #{date} group by toYYYYMMDD(stt),ch" +
            " order by uj_rate desc;")
    List<TrafficUjRate> selectUjRate(@Param("date")Integer date);

}
3、编写 Service 接口
public interface TrafficChannelStatsService {
    // 1. 获取各渠道独立访客数
    List<TrafficUvCt> getUvCt(Integer date);

    // 2. 获取各渠道会话数
    List<TrafficSvCt> getSvCt(Integer date);

    // 3. 获取各渠道会话平均页面浏览数
    List<TrafficPvPerSession> getPvPerSession(Integer date);

    // 4. 获取各渠道会话平均页面访问时长
    List<TrafficDurPerSession> getDurPerSession(Integer date);

    // 5. 获取各渠道跳出率
    List<TrafficUjRate> getUjRate(Integer date);
}
4、编写 Service 实现类
@Service
public class TrafficChannelStatsServiceImpl implements TrafficChannelStatsService {

    // 自动装载 Mapper 接口实现类
    @Autowired
    TrafficChannelStatusMapper TrafficChannelStatusMapper;

    // 1. 获取各渠道独立访客数
    @Override
    public List<TrafficUvCt> getUvCt(Integer date) {
        return TrafficChannelStatusMapper.selectUvCt(date);
}

    // 2. 获取各渠道会话数
    @Override
    public List<TrafficSvCt> getSvCt(Integer date) {
        return TrafficChannelStatusMapper.selectSvCt(date);
    }

    // 3. 获取各渠道会话平均页面浏览数
    @Override
    public List<TrafficPvPerSession> getPvPerSession(Integer date) {
        return TrafficChannelStatusMapper.selectPvPerSession(date);
    }

    // 4. 获取各渠道会话平均页面访问时长
    @Override
    public List<TrafficDurPerSession> getDurPerSession(Integer date) {
        return TrafficChannelStatusMapper.selectDurPerSession(date);
    }

    // 5. 获取各渠道跳出率
    @Override
    public List<TrafficUjRate> getUjRate(Integer date) {
        return TrafficChannelStatusMapper.selectUjRate(date);
    }
}
5、编写 Controller 类
@RestController
@RequestMapping("/gmall/realtime/traffic")
public class TrafficController {
    // 自动装载渠道流量统计服务实现类
    @Autowired
    private TrafficChannelStatsService trafficChannelStatsService;

    // 1. 独立访客请求拦截方法
    @RequestMapping("/uvCt")
    public String getUvCt(
            @RequestParam(value = "date", defaultValue = "1") Integer date) {
        if (date == 1) {
            date = DateUtil.now();
        }
        List<TrafficUvCt> trafficUvCtList = trafficChannelStatsService.getUvCt(date);
        if (trafficUvCtList == null) {
            return "";
        }
        StringBuilder categories = new StringBuilder("[");
        StringBuilder uvCtValues = new StringBuilder("[");

        for (int i = 0; i < trafficUvCtList.size(); i++) {
            TrafficUvCt trafficUvCt = trafficUvCtList.get(i);
            String ch = trafficUvCt.getCh();
            Integer uvCt = trafficUvCt.getUvCt();

            categories.append("\"").append(ch).append("\"");
            uvCtValues.append("\"").append(uvCt).append("\"");

            if (i < trafficUvCtList.size() - 1) {
                categories.append(",");
                uvCtValues.append(",");
            } else {
                categories.append("]");
                uvCtValues.append("]");
            }
        }

        return "{\n" +
                "  \"status\": 0,\n" +
                "  \"msg\": \"\",\n" +
                "  \"data\": {\n" +
                "    \"categories\":" + categories + ",\n" +
                "    \"series\": [\n" +
                "      {\n" +
                "        \"name\": \"独立访客数\",\n" +
                "        \"data\": " + uvCtValues + "\n" +
                "      }\n" +
                "    ]\n" +
                "  }\n" +
                "}";
    }

    // 2. 会话数请求拦截方法
    @RequestMapping("/svCt")
    public String getPvCt(
            @RequestParam(value = "date", defaultValue = "1") Integer date) {
        if (date == 1) {
            date = DateUtil.now();
        }
        List<TrafficSvCt> trafficSvCtList = trafficChannelStatsService.getSvCt(date);
        if (trafficSvCtList == null) {
            return "";
        }
        StringBuilder categories = new StringBuilder("[");
        StringBuilder svCtValues = new StringBuilder("[");

        for (int i = 0; i < trafficSvCtList.size(); i++) {
            TrafficSvCt trafficSvCt = trafficSvCtList.get(i);
            String ch = trafficSvCt.getCh();
            Integer svCt = trafficSvCt.getSvCt();

            categories.append("\"").append(ch).append("\"");
            svCtValues.append("\"").append(svCt).append("\"");

            if (i < trafficSvCtList.size() - 1) {
                categories.append(",");
                svCtValues.append(",");
            } else {
                categories.append("]");
                svCtValues.append("]");
            }
        }

        return "{\n" +
                "  \"status\": 0,\n" +
                "  \"msg\": \"\",\n" +
                "  \"data\": {\n" +
                "    \"categories\":" + categories + ",\n" +
                "    \"series\": [\n" +
                "      {\n" +
                "        \"name\": \"会话数\",\n" +
                "        \"data\": " + svCtValues + "\n" +
                "      }\n" +
                "    ]\n" +
                "  }\n" +
                "}";
    }

    // 3. 各会话浏览页面数请求拦截方法
    @RequestMapping("/pvPerSession")
    public String getPvPerSession(
            @RequestParam(value = "date", defaultValue = "1") Integer date) {
        if (date == 1) {
            date = DateUtil.now();
        }
        List<TrafficPvPerSession> trafficPvPerSessionList = trafficChannelStatsService.getPvPerSession(date);
        if (trafficPvPerSessionList == null) {
            return "";
        }
        StringBuilder categories = new StringBuilder("[");
        StringBuilder pvPerSessionValues = new StringBuilder("[");

        for (int i = 0; i < trafficPvPerSessionList.size(); i++) {
            TrafficPvPerSession trafficPvPerSession = trafficPvPerSessionList.get(i);
            String ch = trafficPvPerSession.getCh();
            Double pvPerSession = trafficPvPerSession.getPvPerSession();

            categories.append("\"").append(ch).append("\"");
            pvPerSessionValues.append("\"").append(pvPerSession).append("\"");

            if (i < trafficPvPerSessionList.size() - 1) {
                categories.append(",");
                pvPerSessionValues.append(",");
            } else {
                categories.append("]");
                pvPerSessionValues.append("]");
            }
        }

        return "{\n" +
                "  \"status\": 0,\n" +
                "  \"msg\": \"\",\n" +
                "  \"data\": {\n" +
                "    \"categories\":" + categories + ",\n" +
                "    \"series\": [\n" +
                "      {\n" +
                "        \"name\": \"会话平均页面浏览数\",\n" +
                "        \"data\": " + pvPerSessionValues + "\n" +
                "      }\n" +
                "    ]\n" +
                "  }\n" +
                "}";
    }

    // 4. 各会话累计访问时长请求拦截方法
    @RequestMapping("/durPerSession")
    public String getDurPerSession(
            @RequestParam(value = "date", defaultValue = "1") Integer date) {
        if (date == 1) {
            date = DateUtil.now();
        }
        List<TrafficDurPerSession> trafficDurPerSessionList = trafficChannelStatsService.getDurPerSession(date);
        if (trafficDurPerSessionList == null) {
            return "";
        }
        StringBuilder categories = new StringBuilder("[");
        StringBuilder durPerSessionValues = new StringBuilder("[");

        for (int i = 0; i < trafficDurPerSessionList.size(); i++) {
            TrafficDurPerSession trafficDurPerSession = trafficDurPerSessionList.get(i);
            String ch = trafficDurPerSession.getCh();
            Double durPerSession = trafficDurPerSession.getDurPerSession();

            categories.append("\"").append(ch).append("\"");
            durPerSessionValues.append("\"").append(durPerSession).append("\"");

            if (i < trafficDurPerSessionList.size() - 1) {
                categories.append(",");
                durPerSessionValues.append(",");
            } else {
                categories.append("]");
                durPerSessionValues.append("]");
            }
        }

        return "{\n" +
                "  \"status\": 0,\n" +
                "  \"msg\": \"\",\n" +
                "  \"data\": {\n" +
                "    \"categories\":" + categories + ",\n" +
                "    \"series\": [\n" +
                "      {\n" +
                "        \"name\": \"会话平均页面访问时长\",\n" +
                "        \"data\": " + durPerSessionValues + "\n" +
                "      }\n" +
                "    ]\n" +
                "  }\n" +
                "}";
    }

    // 5. 跳出率请求拦截方法
    @RequestMapping("/ujRate")
    public String getUjRate(
            @RequestParam(value = "date", defaultValue = "1") Integer date) {
        if (date == 1) {
            date = DateUtil.now();
        }
        List<TrafficUjRate> trafficUjRateList = trafficChannelStatsService.getUjRate(date);
        if (trafficUjRateList == null) {
            return "";
        }
        StringBuilder categories = new StringBuilder("[");
        StringBuilder ujRateValues = new StringBuilder("[");

        for (int i = 0; i < trafficUjRateList.size(); i++) {
            TrafficUjRate trafficUjRate = trafficUjRateList.get(i);
            String ch = trafficUjRate.getCh();
            Double ujRate = trafficUjRate.getUjRate();

            categories.append("\"").append(ch).append("\"");
            ujRateValues.append("\"").append(ujRate).append("\"");

            if (i < trafficUjRateList.size() - 1) {
                categories.append(",");
                ujRateValues.append(",");
            } else {
                categories.append("]");
                ujRateValues.append("]");
            }
        }

        return "{\n" +
                "  \"status\": 0,\n" +
                "  \"msg\": \"\",\n" +
                "  \"data\": {\n" +
                "    \"categories\":" + categories + ",\n" +
                "    \"series\": [\n" +
                "      {\n" +
                "        \"name\": \"跳出率\",\n" +
                "        \"data\": " + ujRateValues + "\n" +
                "      }\n" +
                "    ]\n" +
                "  }\n" +
                "}";
    }

}
6、用到的工具类

主要是返回当前日期的 Int 值

public class DateUtil {
    public static Integer now(){
        return Integer.parseInt(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd")));
    }
}

下面的需求就不浪费时间了,只写 SQL,Java 代码又臭又长我这里就不展示了!


1.1.2、流量分时统计

需求:

统计周期

指标

说明

1 小时

独立访客数

统计当日各小时独立访客数

1 小时

页面浏览数

统计当日各小时页面浏览数

1 小时

新访客数

统计当日各小时新访客数

SELECT
    toHour(stt) hr, -- 小时(0-23)
    sum(uv_ct)  uv_ct, -- 独立访客数
    sum(pv_ct)  pv_ct, -- 页面浏览数
    -- 新访客数统计
    sum(if(is_new = '1', dws_traffic_vc_ch_ar_is_new_page_view_window.uv_ct, 0)) new_uv_ct 
FROM 
    dws_traffic_vc_ch_ar_is_new_page_view_window
WHERE
    toYYYYMMDD(stt) = #{date}\n" +
GROUP BY
    hr

1.1.3、新老访客流量统计

需求:

统计周期

统计粒度

指标

说明

当日

访客类型

访客数

分别统计新老访客数

当日

访客类型

页面浏览数

分别统计新老访客页面浏览数

当日

访客类型

跳出率

分别统计新老访客跳出率

当日

访客类型

平均在线时长

分别统计新老访客平均在线时长

当日

访客类型

平均访问页面数

分别统计新老访客平均访问页面数

其实就是计算新老访客这两大群体的指标:

SELECT 
       is_new, -- 新老访客标记
       sum(uv_ct)   uv_ct, -- 独立访客数
       sum(pv_ct)   pv_ct, -- 页面浏览数
       sum(sv_ct)   sv_ct, -- 会话总数
       sum(uj_ct) / sum(sv_ct)   uj_rate, -- 跳出率
       sum(dur_sum) dur_sum -- 停留时长
from dws_traffic_vc_ch_ar_is_new_page_view_window
where toYYYYMMDD(stt) = #{date}
group by is_new

1.1.4、关键词统计

统计周期

统计粒度

指标

说明

当日

关键词

关键词评分

根据不同来源和频次计算得分

数据源来自我们 DWS 层创建的第一张表:

create table if not exists dws_traffic_source_keyword_page_view_window
(
    stt           DateTime,
    edt           DateTime,
    source        String,
    keyword       String,
    keyword_count UInt64,
    ts            UInt64
) engine = ReplacingMergeTree(ts)
      partition by toYYYYMMDD(stt)
      order by (stt, edt, source, keyword);

分析 SQL :

这里我们不仅分析了搜索的关键词,还分析了购物车、点击和订单中的关键词,只不过权重不同:

SELECT keyword,
       sum(keyword_count * multiIf(
                   source = 'SEARCH', 10,
                   source = 'ORDER', 5,
                   source = 'CART', 2,
                   source = 'CLICK', 1, 0
           )) keyword_score
FROM 
     dws_traffic_source_keyword_page_view_window
WHERE 
      toYYYYMMDD(stt) = #{date}
GROUP BY 
         toYYYYMMDD(stt), keyword
ORDER BY 
         keyword_score desc;

1.2、用户主题

1.2.1、用户变动统计

统计周期

指标

指标说明
当日回流用户数之前的活跃用户,一段时间未活跃(流失),今日又活跃了,就称为回流用户。此处要求统计回流用户总数。
当日

新增用户数

当日

活跃用户数

1、计算回流用户

用到 DWS 层的用户域用户登陆各窗口汇总表(dws_user_user_login_window)

create table if not exists dws_user_user_login_window
(
    stt     DateTime,
    edt     DateTime,    
    back_ct UInt64,
    uu_ct   UInt64,
    ts      UInt64
) engine = ReplacingMergeTree(ts)
      partition by toYYYYMMDD(stt)
      order by (stt, edt);

指标分析:

SELECT 
       'backCt'     type,
       sum(back_ct) back_ct
FROM 
    dws_user_user_login_window
WHERE 
    toYYYYMMDD(stt) = #{date}
2、计算新增用户

直接读取 DWS 层的用户域用户注册各窗口汇总表(dws_user_user_register_window)即可:

create table if not exists dws_user_user_register_window
(
    stt         DateTime,
    edt         DateTime,
    register_ct UInt64,
    ts          UInt64
) engine = ReplacingMergeTree(ts)
      partition by toYYYYMMDD(stt)
      order by (stt, edt);

指标分析:

SELECT
     'newUserCt'      type,
     sum(register_ct) register_ct
FROM
     dws_user_user_register_window
WHERE
     toYYYYMMDD(stt) = #{date}
3、计算活跃用户

还是用户登录表,直接统计今日的 uu_ct即可:

SELECT
     'activeUserCt' type,
     sum(uu_ct)     uu_ct
FROM
     dws_user_user_login_window
4、汇总结果

直接 union all 即可;

1.2.2、用户行为漏斗分析

漏斗分析是一个数据分析模型,它能够科学反映一个业务过程从起点到终点各阶段用户转化情况。由于其能将各阶段环节都展示出来,故哪个阶段存在问题,就能一目了然。

统计周期

指标

说明

当日

首页浏览人数

当日

商品详情页浏览人数

当日

加购人数

当日

下单人数

当日

支付人数

支付成功人数

1、计算首页浏览人数、商品详情页浏览人数

需要查询 DWS 层 用户域页面浏览汇总表

create table if not exists dws_traffic_page_view_window
(
    stt               DateTime,
    edt               DateTime,
    home_uv_ct        UInt64,
    good_detail_uv_ct UInt64,
    ts                UInt64
) engine = ReplacingMergeTree(ts)
      partition by toYYYYMMDD(stt)
      order by (stt, edt);

指标计算:

select
        'home'          page_id,
        sum(home_uv_ct) uvCt
from 
        dws_traffic_page_view_window
where 
        toYYYYMMDD(stt) = #{date}
union all
select 
        'good_detail'          page_id,
        sum(good_detail_uv_ct) uvCt
from 
        dws_traffic_page_view_window
where 
        toYYYYMMDD(stt) = #{date}
2、计算加购人数

需要查询自 dws_trade_cart_add_uu_window:

create table if not exists dws_trade_cart_add_uu_window
(
    stt            DateTime,
    edt            DateTime,
    cart_add_uu_ct UInt64,
    ts             UInt64
) engine = ReplacingMergeTree(ts)
      partition by toYYYYMMDD(stt)
      order by (stt, edt);

指标计算:

select 
       'cart'              page_id,
       sum(cart_add_uu_ct) uvCt
from 
        dws_trade_cart_add_uu_window
where 
        toYYYYMMDD(stt) = #{date}
3、计算下单人数
create table if not exists dws_trade_order_window
(
    stt                          DateTime,
    edt                          DateTime,
    order_unique_user_count      UInt64,
    order_new_user_count         UInt64,
    order_activity_reduce_amount Decimal(38, 20),
    order_coupon_reduce_amount   Decimal(38, 20),
    order_origin_total_amount    Decimal(38, 20),
    ts                           UInt64
) engine = ReplacingMergeTree(ts)
      partition by toYYYYMMDD(stt)
      order by (stt, edt);
select 
       'trade' page_id,
       sum(order_unique_user_count) uvCt
from 
      dws_trade_order_window
where 
      toYYYYMMDD(stt) = #{date}
4、计算支付人数
create table if not exists dws_trade_payment_suc_window
(
    stt                           DateTime,
    edt                           DateTime,
    payment_suc_unique_user_count UInt64,
    payment_new_user_count        UInt64,
    ts                            UInt64
) engine = ReplacingMergeTree(ts)
      partition by toYYYYMMDD(stt)
      order by (stt, edt);
select 
        'payment' page_id,
        sum(payment_suc_unique_user_count) uvCt
from 
        dws_trade_payment_suc_window
where 
        toYYYYMMDD(stt) = #{date}

1.2.3、新增交易用户统计

统计周期

指标

说明

当日

下单新用户人数

当日

支付成功新用户人数

区别于上一个需求中的统计指标,上面是求的独立用户数,这里求的是新用户数:

select
       'order' trade_type,
       sum(order_new_user_count) order_new_user_count
from
      dws_trade_order_window
where
      toYYYYMMDD(stt) = #{date}
union all
select
       'payment'                   trade_type,
       sum(payment_new_user_count) pay_suc_new_user_count
from
      dws_trade_payment_suc_window
where
      toYYYYMMDD(stt) = #{date};

1.3、商品主题

1.3.1、各品牌商品交易统计

指标:

统计周期

统计粒度

指标

说明

当日

品牌

订单数

当日

品牌

订单人数

当日

品牌

订单金额

当日

品牌

退单数

当日

品牌

退单人数

计算指标:

这里需要对两张表(用户-品牌-品类-SPU-下单表和用户-品牌-品类-SPU-退单表)进行 full outer join:

select trademark_name,
       order_count,
       uu_count,
       sum(order_amount) order_amount,
       refund_count,
       refund_uu_count
from (
         select trademark_id,
                trademark_name,
                sum(order_count)        order_count,
                count(distinct user_id) uu_count,
                sum(order_amount)       order_amount
         from
               dws_trade_trademark_category_user_spu_order_window
         where
               toYYYYMMDD(stt) = #{date}
         group by
                  trademark_id, trademark_name) oct
         full outer join(
             select
                     trademark_id,
                     trademark_name,
                     sum(refund_count)       refund_count,
                     count(distinct user_id) refund_uu_count
              from
                    dws_trade_trademark_category_user_refund_window
              where
                    toYYYYMMDD(stt) = #{date}
              group by
                    trademark_id, trademark_name) rct
on oct.trademark_id = rct.trademark_id;

1.3.2、各品类商品交易统计

统计周期

统计粒度

指标

说明

当日

品类

订单数

当日

品类

订单人数

当日

品类

订单金额

当日

品类

退单数

当日

品类

退单人数

和上面的指标用到的表一样,无非就是 group by 的字段变成了品类:

select category1_name,
       category2_name,
       category3_name,
       order_count,
       uu_count,
       order_amount,
       refund_count,
       refund_uu_count
from (
    select category1_id,
             category1_name,
             category2_id,
             category2_name,
             category3_id,
             category3_name,
             sum(order_count)        order_count,
             count(distinct user_id) uu_count,
             sum(order_amount)       order_amount
      from 
            dws_trade_trademark_category_user_spu_order_window
      where 
            toYYYYMMDD(stt) = #{date}
      group by 
               category1_id,
               category1_name,
               category2_id,
               category2_name,
               category3_id,
               category3_name) oct 
    full outer join(
            select  category1_id,
                    category1_name,
                    category2_id,
                    category2_name,
                    category3_id,
                    category3_name,
                    sum(refund_count)       refund_count,
                    count(distinct user_id) refund_uu_count
            from 
                  dws_trade_trademark_category_user_refund_window
            where 
                  toYYYYMMDD(stt) = #{date}
            group by 
                  category1_id,
                  category1_name,
                  category2_id,
                  category2_name,
                  category3_id,
                  category3_name) rct
on oct.category1_id = rct.category1_id
    and oct.category2_id = rct.category2_id
    and oct.category3_id = rct.category3_id;

1.3.3、各 SPU 商品交易统计

统计周期

统计粒度

指标

说明

当日

SPU

订单数

当日

SPU

订单人数

当日

SPU

订单金额

SPU 没有退单指标:

select spu_name,
       sum(order_count)        order_count,
       count(distinct user_id) uu_count,
       sum(order_amount)       order_amount
from 
      dws_trade_trademark_category_user_spu_order_window
where 
     toYYYYMMDD(stt) = #{date}
group by 
     spu_id, spu_name

1.4、交易主题

1.4.1、交易综合统计

统计周期

指标

说明

当日

订单总额

订单最终金额

当日

订单数

当日

订单人数

当日

退单数

当日

退单人数

1、计算订单总额

直接查询所有省份的订单总额即为总金额:

select 
       sum(order_amount) order_total_amount
from 
      dws_trade_province_order_window
where 
      toYYYYMMDD(stt) = #{date}
group by 
      toYYYYMMDD(stt)
2、计算订单数、订单人数、退单数、退单人数
select '下单数' type,
       sum(order_count)        value
from 
      dws_trade_trademark_category_user_spu_order_window
where 
      toYYYYMMDD(stt) = #{date}
union all
select '下单人数' type,
       count(distinct user_id) value
from 
      dws_trade_trademark_category_user_spu_order_window
where 
      toYYYYMMDD(stt) = #{date}
union all
select '退单数' type,
       sum(refund_count)       value
from  
      dws_trade_trademark_category_user_refund_window
where 
      toYYYYMMDD(stt) = #{date}
union all
select '退单人数' type,
       count(distinct user_id) value
from 
      dws_trade_trademark_category_user_refund_window
where 
      toYYYYMMDD(stt) = #{date}

1.4.2、各省份交易统计

统计周期

统计粒度

指标

当日

省份

订单数

当日

省份

订单金额

1、计算各省份订单数
select province_name,
       sum(order_count)  order_count
from 
      dws_trade_province_order_window
where 
      toYYYYMMDD(stt) = #{date}
group by 
      province_id, province_name
2、计算各省份订单总金额
select province_name,
       sum(order_amount) order_amount
from 
      dws_trade_province_order_window
where 
      toYYYYMMDD(stt) = #{date}
group by
      province_id, province_name

这俩 SQL 完全可以写成一条 SQL ,分开写是因为它俩对应的是 Sugar 上不同的图表;

1.5、优惠券主题

1.5.1、当日优惠券补贴率

统计周期

统计粒度

指标

说明

当日

优惠券

补贴率

用券的订单明细优惠券减免金额总和/原始金额总和

select 
       sum(order_coupon_reduce_amount) coupon_reduce_amount,
       sum(order_origin_total_amount) origin_total_amount,
       round(round(toFloat64(coupon_reduce_amount), 5) /
             round(toFloat64(origin_total_amount), 5), 20) coupon_subsidy_rate
from 
        dws_trade_order_window
where 
        toYYYYMMDD(stt) = #{date}
group by 
        toYYYYMMDD(stt)

1.6、活动主题

1.6.1、当日活动补贴率

统计周期

统计粒度

指标

说明

当日

活动

补贴率

参与促销活动的订单明细活动减免金额总和/原始金额总和

select 
       sum(order_activity_reduce_amount)                   activity_reduce_amount,
       sum(order_origin_total_amount)                      origin_total_amount,
       round(round(toFloat64(activity_reduce_amount), 5) /
             round(toFloat64(origin_total_amount), 5), 20) subsidyRate
from 
      dws_trade_order_window
where 
      toYYYYMMDD(stt) = #{date}
group by 
      toYYYYMMDD(stt)

总结

    至此,ADS 层搭建完毕,相比离线数仓,这里所有指标都是直接从 DWS 层读取过来的,而 clickhouse 最大的优点也正是单表查询超级快!这 ADS 比离线数仓要简单多了,离线数仓可以分析更加深层的指标,毕竟它有大量的历史数据做支撑;而实时数仓则更多的是分析当下近几秒钟、几小时、远至一天的一些指标;
标签: flink 大数据

本文转载自: https://blog.csdn.net/m0_64261982/article/details/141022580
版权归原作者 让线程再跑一会 所有, 如有侵权,请联系我们删除。

“Flink 实时数仓(十一)【ADS 层搭建】”的评论:

还没有评论