本文将通过使用 Flink 框架实现 **实时热门合约 **需求。实际业务过程中,如何判断合约是否属于热门合约,可以从以下几个方面进行分析,比如:
- 交易数量:合约被调用的次数可以作为其热门程度的指标之一。
- 交易金额:合约处理的资金量也是评判热门程度的重要指标。
- 活跃用户数量:调用合约的用户数量可以反映合约的受欢迎程度。
- 交易频率:合约的调用频率可以反映其热门程度和使用情况。
但我们本次目的主要是关于学习 Flink API 的一些使用,以及在生产过程中,我们应该如何一步一步改进,所以本次我们主要以 交易数量 作为热门合约的评判标准。
通过本文你将学到:
- 如何基于 EventTime 处理,如何指定 Watermark
- 如何使用 Flink 灵活的 Window API
- 何时需要用到 State,以及如何使用
- 如何使用 ProcessFunction 实现 TopN 功能
- 如何使用 Flink DataStream API 读取 kafka 数据源
- 如何将计算结果 Sink 到 Kafka 存储
实战案例介绍
要实现一个 实时热门合约 的需求,我们首先拆解成以下思路:
基本需求
- 每隔 5 分钟输出最近一小时交易量最多的前N个合约
- 过滤出属于合约的交易数量
解决思路
- 抽取出业务时间戳,告诉 Flink 框架基于业务时间做窗口
- 在所有交易行为数据中,过滤出合约行为进行统计
- 构建滑动窗口,窗口长度为1小时,滑动距离为 5 分钟
- 将KeyedStream中的元素存储到ListState中,当水位线超过窗口结束时间时,排序输出
- 按每个窗口聚合,输出每个窗口中交易量前N名的合约
数据准备
这里我们采用已经同步好在 kafka 的真实的 链上数据 ,数据结构如下:
{
"hash":"0xf20f572847c23be6055f5373691c16b002cd573a16314ca2509c7c13805719c1",
"blockHash":"0x7785b54d5e82bab42a0b1a3ef015ab1f0b3dce78fe188f0838993d360e26289a",
"blockNumber":19168715,
"from":"0xf20f572847c23be6055f5373691c16b002cd573a16314ca2509c7c13805719c1", //交易发起地址
"to":"0xf20f572847c23be6055f5373691c16b002cd573a16314ca2509c7c13805719c1", //交易接收地址
"value":0,
"timestamp":1707216599,
"transactionsType":1 //0:普通账户交易 1:合约账户交易
}
编写程序
首先获取环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
创建kafka数据源
我们已经将链上数据同步管道开启,在实时同步数据到 kafka。
我们先创建一个
Transactions
的 POJO 类,所有成员变量声明成
public
。
/**
* 交易行为数据结构
*/
public class Transactions {
public String hash;
public String blockHash;
public BigInteger blockNumber;
public String from;
public String to;
public BigInteger value;
public long timestamp;
public Integer transactionsType;
public Transactions(){}
public Transactions(String hash, String blockHash, BigInteger blockNumber, String from, String to, BigInteger value, long timestamp, Integer transactionsType) {
this.hash = hash;
this.blockHash = blockHash;
this.blockNumber = blockNumber;
this.from = from;
this.to = to;
this.value = value;
this.timestamp = timestamp;
this.transactionsType = transactionsType;
}
public String getHash() {
return hash;
}
public void setHash(String hash) {
this.hash = hash;
}
public String getBlockHash() {
return blockHash;
版权归原作者 打不倒的小怪兽R.L 所有, 如有侵权,请联系我们删除。