0


Flink 实战:如何计算实时热门合约

本文将通过使用 Flink 框架实现 **实时热门合约 **需求。实际业务过程中,如何判断合约是否属于热门合约,可以从以下几个方面进行分析,比如:

  1. 交易数量:合约被调用的次数可以作为其热门程度的指标之一。
  2. 交易金额:合约处理的资金量也是评判热门程度的重要指标。
  3. 活跃用户数量:调用合约的用户数量可以反映合约的受欢迎程度。
  4. 交易频率:合约的调用频率可以反映其热门程度和使用情况。

但我们本次目的主要是关于学习 Flink API 的一些使用,以及在生产过程中,我们应该如何一步一步改进,所以本次我们主要以 交易数量 作为热门合约的评判标准。

通过本文你将学到:

  1. 如何基于 EventTime 处理,如何指定 Watermark
  2. 如何使用 Flink 灵活的 Window API
  3. 何时需要用到 State,以及如何使用
  4. 如何使用 ProcessFunction 实现 TopN 功能
  5. 如何使用 Flink DataStream API 读取 kafka 数据源
  6. 如何将计算结果 Sink 到 Kafka 存储

实战案例介绍

要实现一个 实时热门合约 的需求,我们首先拆解成以下思路:

基本需求

  • 每隔 5 分钟输出最近一小时交易量最多的前N个合约
  • 过滤出属于合约的交易数量

解决思路

  • 抽取出业务时间戳,告诉 Flink 框架基于业务时间做窗口
  • 在所有交易行为数据中,过滤出合约行为进行统计
  • 构建滑动窗口,窗口长度为1小时,滑动距离为 5 分钟
  • 将KeyedStream中的元素存储到ListState中,当水位线超过窗口结束时间时,排序输出
  • 按每个窗口聚合,输出每个窗口中交易量前N名的合约

数据准备

这里我们采用已经同步好在 kafka 的真实的 链上数据 ,数据结构如下:

{
    "hash":"0xf20f572847c23be6055f5373691c16b002cd573a16314ca2509c7c13805719c1",
    "blockHash":"0x7785b54d5e82bab42a0b1a3ef015ab1f0b3dce78fe188f0838993d360e26289a",
    "blockNumber":19168715,
    "from":"0xf20f572847c23be6055f5373691c16b002cd573a16314ca2509c7c13805719c1", //交易发起地址
    "to":"0xf20f572847c23be6055f5373691c16b002cd573a16314ca2509c7c13805719c1", //交易接收地址
    "value":0,
    "timestamp":1707216599,
    "transactionsType":1  //0:普通账户交易 1:合约账户交易
}

编写程序

首先获取环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
创建kafka数据源

我们已经将链上数据同步管道开启,在实时同步数据到 kafka。

我们先创建一个

Transactions

的 POJO 类,所有成员变量声明成

public

/**
 * 交易行为数据结构
 */
public class Transactions {
    public String hash;
    public String blockHash;
    public BigInteger blockNumber;
    public String from;
    public String to;
    public BigInteger value;
    public long timestamp;
    public Integer transactionsType;
​
    public Transactions(){}
​
    public Transactions(String hash, String blockHash, BigInteger blockNumber, String from, String to, BigInteger value, long timestamp, Integer transactionsType) {
        this.hash = hash;
        this.blockHash = blockHash;
        this.blockNumber = blockNumber;
        this.from = from;
        this.to = to;
        this.value = value;
        this.timestamp = timestamp;
        this.transactionsType = transactionsType;
    }
​
    public String getHash() {
        return hash;
    }
​
    public void setHash(String hash) {
        this.hash = hash;
    }
​
    public String getBlockHash() {
        return blockHash;
  
标签: flink 大数据 web3

本文转载自: https://blog.csdn.net/qq_32825915/article/details/136133667
版权归原作者 打不倒的小怪兽R.L 所有, 如有侵权,请联系我们删除。

“Flink 实战:如何计算实时热门合约”的评论:

还没有评论