0


使用Flink进行股票计算

使用Flink进行股票计算

文章目录

股票平均价格计算

代码详解

这四个类一起实现了对股票平均价格的计算处理,并展示了Apache Flink在实时流处理中的应用。
类名功能Stock表示股票交易数据的实体类StockStream创建流处理环境,读取股票交易数据,并计算每分钟股票平均价格的类ResultWindowFunction汇总每分钟内股票平均价格的计算结果,并输出为字符串的类AvgStockAggregateFunction计算每个股票代码在每分钟内的平均交易价格的类

Stock

股票价格的实体类

根据数据来看:

US2.AAPL,20200108,093003,297.260000000,100

可以分析为
股票名字交易日期交易时间股票价格股票数量US2.AAPL20200108093003297.260000000100
那么定义一个股票实体类

packagecom.lfl.bigwork;/**
 * @author 叶星痕
 * @data 2024/6/14 上午8:56
 * 文件名 : Stock
 * 描述 : 股票实体类
 */publicclassStock{privateString stockCode;// 股票的唯一代码或符号privateString tradeDate;// 交易发生的日期privateString tradeTime;// 交易发生的具体时间privatedouble price;// 交易时的股票价格privateint volume;// 交易的股票数量publicStock(){}publicStock(String stockCode,String tradeDate,String tradeTime,double price,int volume){this.stockCode = stockCode;this.tradeDate = tradeDate;this.tradeTime = tradeTime;this.price = price;this.volume = volume;}publicStringgetStockCode(){return stockCode;}publicvoidsetStockCode(String stockCode){this.stockCode = stockCode;}publicStringgetTradeDate(){return tradeDate;}publicvoidsetTradeDate(String tradeDate){this.tradeDate = tradeDate;}publicStringgetTradeTime(){return tradeTime;}publicvoidsetTradeTime(String tradeTime){this.tradeTime = tradeTime;}publicdoublegetPrice(){return price;}publicvoidsetPrice(double price){this.price = price;}publicintgetVolume(){return volume;}publicvoidsetVolume(int volume){this.volume = volume;}@OverridepublicStringtoString(){return"Stock{"+"stockCode='"+ stockCode +'\''+", tradeDate='"+ tradeDate +'\''+", tradeTime='"+ tradeTime +'\''+", price="+ price +", volume="+ volume +'}';}}

StockStream

首先,定义了一个名为

StockStream

的类,并在该类中定义了一个

main

方法作为程序的入口点。

main

方法中,首先获取了一个

StreamExecutionEnvironment

对象,这是 Flink 流处理的上下文环境。

然后,使用

readTextFile

方法从一个名为 “input/stock.txt” 的文本文件中读取数据,返回一个

DataStreamSource<String>

对象。

接着,使用

map

方法将每一行字符串转换为

Stock

对象,并使用

assignTimestampsAndWatermarks

方法为每个事件分配时间戳和水印。

然后,使用

keyBy

方法按照股票代码进行分组,返回一个

KeyedStream<Stock, String>

对象。

接着,使用

window

方法定义了一个滚动窗口,窗口大小为 60 秒,然后使用

aggregate

方法聚合窗口内的数据,这里传入了

AvgStockAggregateFunction

ResultWindowFunction

两个函数,前者用于计算平均价格,后者用于格式化输出。

最后,使用

print

方法将结果打印到控制台,然后调用

StreamExecutionEnvironment

execute

方法启动流处理。

parseTimeToSeconds

是一个辅助方法,用于将形式为 “HHmmss” 的时间字符串解析为一天中的秒数。

这个程序的主要目的是读取股票数据,按照股票代码进行分组,然后在每个 60 秒的窗口内计算每种股票的平均价格,并打印到控制台。

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();:获取 Flink 流处理的环境。
  2. env.setParallelism(1);:这是设置并行度为 1,设置在一个线程中或者后续输出到一个同一个文件中。
  3. DataStreamSource<String> source = env.readTextFile("input/stock.txt");:这是从一个名为 “input/stock.txt” 的文本文件中读取数据,返回一个 DataStreamSource<String> 对象。每一行字符串就是一个数据项。
  4. DataStream<Stock> stockStream = source.map(...):这是将每一行字符串转换为 Stock 对象。转换的逻辑是先将字符串按照逗号分割,然后取出各个字段的值,创建 Stock 对象。
  5. .assignTimestampsAndWatermarks(WatermarkStrategy.<Stock>forMonotonousTimestamps()...):这是为每个事件分配时间戳和水印。时间戳是事件的发生时间,水印是用于处理事件时间乱序的机制。
  6. KeyedStream<Stock, String> keyedStream = stockStream.keyBy((KeySelector<Stock, String>) Stock::getStockCode);:这是按照股票代码进行分组,返回一个 KeyedStream<Stock, String> 对象。每个股票代码对应的所有 Stock 对象会被分到同一个组。
  7. SingleOutputStreamOperator<String> result = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(60)))...:这是定义了一个滚动窗口,窗口大小为 60 秒,然后聚合窗口内的数据。这里传入了 AvgStockAggregateFunctionResultWindowFunction 两个函数,前者用于计算平均价格,后者用于格式化输出。
  8. result.print();:将结果打印到控制台。
  9. result.writeAsText("output/avgStock.txt"):将平均价格写入到avgStock.txt文件内。
  10. private static long parseTimeToSeconds(String time) {...}:这是一个辅助方法,用于将形式为 “HHmmss” 的时间字符串解析为一天中的秒数。这个方法被assignTimestampsAndWatermarks 方法调用,用于分配时间戳。
packagecom.lfl.bigwork;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.java.functions.KeySelector;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.KeyedStream;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;importorg.apache.flink.streaming.api.windowing.time.Time;/**
 * @author 叶星痕
 * @data 2024/6/14 下午12:21
 * 文件名 : ResultWindowFunction
 * 描述 : 主要的执行程序
 */publicclassStockStream{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);DataStreamSource<String> source = env.readTextFile("input/stock.txt");DataStream<Stock> stockStream = source.map((MapFunction<String,Stock>) value ->{String[] fields = value.split(",");if(fields.length <5){thrownewRuntimeException("Invalid input data: "+ value);}returnnewStock(fields[0], fields[1], fields[2],Double.parseDouble(fields[3]),Integer.parseInt(fields[4]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<Stock>forMonotonousTimestamps().withTimestampAssigner((event, timestamp)->parseTimeToSeconds(event.getTradeTime())));KeyedStream<Stock,String> keyedStream = stockStream.keyBy((KeySelector<Stock,String>)Stock::getStockCode);SingleOutputStreamOperator<String> result = keyedStream
                .window(TumblingEventTimeWindows.of(Time.seconds(60))).aggregate(newAvgStockAggregateFunction(),newResultWindowFunction());

        result.print();
        result.writeAsText("output/avgStock.txt");

        env.execute();}//将形式为“HHmmss”的时间字符串解析为一天中的秒privatestaticlongparseTimeToSeconds(String time){int hours =Integer.parseInt(time.substring(0,2));int minutes =Integer.parseInt(time.substring(2,4));int seconds =Integer.parseInt(time.substring(4,6));return(hours *3600L+ minutes *60L+ seconds)*1000L;}}

ResultWindowFunction

定义了一个名为

ResultWindowFunction

的类,这个类实现了

WindowFunction

接口。

接口的四个类型参数分别代表:
输入类型输出类型键类型窗口类型DoubleStringStringTimeWindow
在这个类中,实现了 apply 方法,这是窗口函数的核心方法,它会在每个窗口结束时被调用。方法参数包括:
键窗口输入数据输出收集器swindowinput (Double 类型迭代器)out

apply

方法中,首先获取了输入数据的第一个元素作为平均价格,然后通过输出收集器

out

发出一个格式化的字符串,这个字符串包含了股票代码和平均价格。

packagecom.lfl.bigwork;importorg.apache.flink.streaming.api.functions.windowing.WindowFunction;importorg.apache.flink.streaming.api.windowing.windows.TimeWindow;importorg.apache.flink.util.Collector;/**
 * @author 叶星痕
 * @data 2024/6/14 下午12:20
 * 文件名 : ResultWindowFunction
 * 描述 : 更好的输出
 */publicclassResultWindowFunctionimplementsWindowFunction<Double,String,String,TimeWindow>{@Overridepublicvoidapply(String s,TimeWindow window,Iterable<Double> input,Collector<String> out){Double averagePrice = input.iterator().next();
        out.collect("股票代码:"+ s +",平均价格:"+ averagePrice);}}

AvgStockAggregateFunction

这个 代码定义了一个名为

AvgStockAggregateFunction

的聚合函数,该函数用于计算股票的平均价格。

首先,这个类实现了 AggregateFunction 接口,这个接口有三个类型参数:
输入类型累加器类型输出类型StockTuple2<Double, Integer>Double
在这个类中,实现了

AggregateFunction

接口的五个方法:

  1. createAccumulator():创建一个新的累加器,这里是一个包含两个元素的元组,第一个元素是总价格(初始化为 0.0),第二个元素是总数量(初始化为 0)。
  2. add(Stock value, Tuple2<Double, Integer> accumulator):将输入数据添加到累加器,这里是将股票的价格乘以数量加到总价格上,将股票的数量加到总数量上。
  3. getResult(Tuple2<Double, Integer> accumulator):从累加器获取结果,这里是计算平均价格,即总价格除以总数量。
  4. merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b):合并两个累加器,这里是将两个累加器的总价格和总数量分别相加。
packagecom.lfl.bigwork;importorg.apache.flink.api.common.functions.AggregateFunction;importorg.apache.flink.api.java.tuple.Tuple2;/**
 * @author 叶星痕
 * @data 2024/6/14 下午12:20
 * 文件名 : ResultWindowFunction
 * 描述 : AggregateFunction的实现
 */publicclassAvgStockAggregateFunctionimplementsAggregateFunction<Stock,Tuple2<Double,Integer>,Double>{@OverridepublicTuple2<Double,Integer>createAccumulator(){returnTuple2.of(0.0,0);}@OverridepublicTuple2<Double,Integer>add(Stock value,Tuple2<Double,Integer> accumulator){returnTuple2.of(accumulator.f0 + value.getPrice()* value.getVolume(), accumulator.f1 + value.getVolume());}@OverridepublicDoublegetResult(Tuple2<Double,Integer> accumulator){return accumulator.f0 / accumulator.f1;}@OverridepublicTuple2<Double,Integer>merge(Tuple2<Double,Integer> a,Tuple2<Double,Integer> b){returnTuple2.of(a.f0 + b.f0, a.f1 + b.f1);}}

流程文档

本文主要介绍如何使用Apache Flink的AggregateFunction接口实现对股票平均价格的计算。整个流程包括读取股票交易数据,对数据进行处理和聚合,最后输出每个股票代码在每分钟内的平均交易价格。

1. 创建流处理环境

首先,我们需要创建一个StreamExecutionEnvironment对象,这是所有Flink程序的基础。然后,设置并行度为1,表示程序的并行级别。

StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

2. 读取数据

使用env.readTextFile方法从文本文件中读取数据。这里我们假设数据文件名为"input/stock.txt"。

DataStreamSource<String> source = env.readTextFile("input/stock.txt");

3. 数据处理

接着,使用map函数将读取的每行数据转换为Stock对象,并使用assignTimestampsAndWatermarks方法分配时间戳和水印。其中,我们使用了WatermarkStrategy.forMonotonousTimestamps策略,表示事件时间戳是单调递增的。

DataStream<Stock> stockStream = source.map(value ->{String[] fields = value.split(",");if(fields.length <5){thrownewRuntimeException("Invalid input data: "+ value);}returnnewStock(fields[0], fields[1], fields[2],Double.parseDouble(fields[3]),Integer.parseInt(fields[4]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<Stock>forMonotonousTimestamps().withTimestampAssigner((event, timestamp)->parseTimeToSeconds(event.getTradeTime())));

4. 数据分区

使用keyBy方法按照股票代码进行分区,这样每个股票代码的数据都会被分到同一个分区进行处理。

KeyedStream<Stock,String> keyedStream = stockStream.keyBy(Stock::getStockCode);

5. 数据聚合

在每个分区上,我们定义一个滑动窗口,窗口大小为60秒,然后使用AggregateFunction进行聚合计算。我们定义了一个AvgStockAggregateFunction,用于计算每个股票代码的平均价格。

SingleOutputStreamOperator<String> result = keyedStream
    .window(TumblingEventTimeWindows.of(Time.seconds(60))).aggregate(newAvgStockAggregateFunction(),newResultWindowFunction());

6. 结果输出

最后,我们将计算结果打印出来,并执行任务。

result.print();
env.execute();

以上就是使用AggregateFunction计算股票平均价格的整个流程。通过这个流程,我们可以实时计算每个股票代码在每分钟内的平均交易价格,为股票交易提供有价值的信息。

标签: flink 大数据

本文转载自: https://blog.csdn.net/xinglu20/article/details/139721139
版权归原作者 叶星痕 所有, 如有侵权,请联系我们删除。

“使用Flink进行股票计算”的评论:

还没有评论