Flink之DataStream数据源、数据转换、数据输出(scala)
0.前言–数据源
在进行数据转换之前,需要进行数据读取。
数据读取分为4大部分:
(1)内置数据源;
又分为文件数据源;
socket数据源;
集合数据源三类
(2)Kafka数据源
第二个参数用到的SimpleStringSchema对象是一个内置的DeserializationSchema对象,可以把字节数据反序列化程一个String对象。
另外,FlinkKafkaConsumer开始读取Kafka消息时,可以配置他的 读 起始位置,有如下四种。
importjava.util.Properties
importorg.apache.flink.streaming.api.scala._
importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
importorg.apache.flink.api.common.serialization.SimpleStringSchema
importorg.apache.flink.streaming.api.windowing.time.Time
object KafkaWordCount {def main(args: Array[String]):Unit={val kafkaProps =new Properties()//Kafka的一些属性
kafkaProps.setProperty("bootstrap.servers","localhost:9092")//所在的消费组
kafkaProps.setProperty("group.id","group1")//获取当前的执行环境val evn = StreamExecutionEnvironment.getExecutionEnvironment
//创建Kafka的消费者,wordsendertest是要消费的Topicval kafkaSource =new FlinkKafkaConsumer[String]("wordsendertest",new SimpleStringSchema,kafkaProps)//设置从最新的offset开始消费
kafkaSource.setStartFromLatest()//自动提交offset
kafkaSource.setCommitOffsetsOnCheckpoints(true)//绑定数据源val stream = evn.addSource(kafkaSource)//设置转换操作逻辑val text = stream.flatMap{ _.toLowerCase().split("\\W+")filter{ _.nonEmpty}}.map{(_,1)}.keyBy(0).timeWindow(Time.seconds(5)).sum(1)//打印输出
text.print()//程序触发执行
evn.execute("Kafka Word Count")}}
(3)HDFS数据源
(4)自定义数据源
一个例子:
importjava.util.Calendar
importorg.apache.flink.streaming.api.functions.source.RichSourceFunction
importorg.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
importorg.apache.flink.streaming.api.scala._
importorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment
importscala.util.Random
caseclass StockPrice(stockId:String,timeStamp:Long,price:Double)object StockPriceStreaming {def main(args: Array[String]){//设置执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置程序并行度
env.setParallelism(1)//股票价格数据流val stockPriceStream: DataStream[StockPrice]= env
//该数据流由StockPriceSource类随机生成.addSource(new StockPriceSource)//打印结果
stockPriceStream.print()//程序触发执行
env.execute("stock price streaming")}class StockPriceSource extends RichSourceFunction[StockPrice]{var isRunning:Boolean=trueval rand =new Random()//初始化股票价格var priceList: List[Double]= List(10.0d,20.0d,30.0d,40.0d,50.0d)var stockId =0var curPrice =0.0doverridedef run(srcCtx: SourceContext[StockPrice]):Unit={while(isRunning){//每次从列表中随机选择一只股票
stockId = rand.nextInt(priceList.size)val curPrice = priceList(stockId)+ rand.nextGaussian()*0.05
priceList = priceList.updated(stockId, curPrice)val curTime = Calendar.getInstance.getTimeInMillis
//将数据源收集写入SourceContext
srcCtx.collect(StockPrice("stock_"+ stockId.toString, curTime, curPrice))
Thread.sleep(rand.nextInt(10))}}overridedef cancel():Unit={
isRunning =false}}}
1.数据转换之map操作
1.数据转换算子的四种类型
基于单条记录:fliter、map
基于窗口:window
合并多条数据流:union,join,connect
拆分多条数据流:split
2.map(func)操作将一个DataStream中的每个元素传递到函数func中,并将结果返回为一个新的DataStream。输出的数据流DataStream[OUT]类型可能和输入的数据流DataStream[IN]不同
理解:一 一对应的关系,一个x得到一个y
val dataStream = env.fromElements(1,2,3,4,5)val mapStream = dataStream.map(x=>x+10)
3.演示代码
importorg.apache.flink.api.common.functions.RichMapFunction
importorg.apache.flink.streaming.api.scala._
importorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment
caseclass StockPrice(stockId:String,timeStamp:Long,price:Double)object MapFunctionTest {def main(args: Array[String]):Unit={//设定执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment
//设定程序并行度
env.setParallelism(1)//创建数据源val dataStream: DataStream[Int]= env.fromElements(1,2,3,4,5,6,7)//设置转换操作逻辑val richFunctionDataStream = dataStream.map {new MyMapFunction()}//打印输出
richFunctionDataStream.print()//程序触发执行
env.execute("MapFunctionTest")}//自定义函数,继承RichMapFunctionclass MyMapFunction extends RichMapFunction[Int,String]{overridedef map(input:Int):String=("Input : "+ input.toString +", Output : "+(input *3).toString)}}
2.数据转换之flatMap操作
1.flatMap和map相似,每个输入元素都可以映射到0或多个输出结果。
val dataStream = env.fromElements("Hadoop is good","Flink is fast","Flink is better")val flatMapStream = dataStream.flatMap(line => line.split(" "))
可以理解为flatMap比map多了flat操作。如图。map是将输入数据映射成数组,flat是将数据拍扁,成为一个个元素。把元素映射成了多个。
2.代码演示
importorg.apache.flink.api.common.functions.FlatMapFunction
importorg.apache.flink.streaming.api.scala._
importorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment
importorg.apache.flink.util.Collector
caseclass StockPrice(stockId:String,timeStamp:Long,price:Double)object FlatMapFunctionTest {def main(args: Array[String]):Unit={//设定执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment
//设定程序并行度
env.setParallelism(1)//设置数据源val dataStream: DataStream[String]=
env.fromElements("Hello Spark", "Flink is excellent“)//针对数据集的转换操作逻辑val result = dataStream.flatMap(new WordSplitFlatMap(15))//打印输出
result.print()//程序触发执行
env.execute("FlatMapFunctionTest")}//使用FlatMapFunction实现过滤逻辑,只对字符串长度大于threshold的内容进行切词class WordSplitFlatMap(threshold:Int)extends FlatMapFunction[String,String]{overridedef flatMap(value:String, out: Collector[String]):Unit={if(value.size > threshold){
value.split(" ").foreach(out.collect)}}}}
预计输出:
Flink
is
excellent
这里只对字符长度超过15的做切割。threshold是阈值,少于15的不做切割。
3.数据转换之filter和keyBy操作
1.filter(func)操作会筛选出满足函数func的元素,并返回一个新的数据集
2.代码举例
val dataStream = env.fromElements("Hadoop is good","Flink is fast","Flink is better")val filterStream = dataStream.filter(line => line.contains("Flink"))
如图所示
3.keyBy(注意方法里k小写B大写):将相同Key的数据放置在相同的分区中。
keyBy算子根据元素的形状对数据进行分组,相同形状的元素被分到了一起,可被后续算子统一处理
比如在词频统计时:
hello flink
hello hadoop
hello zhangsan
这里 词频(hello,1),(hello,1),(hello,1)统计出来之后,通过keyBy,就可以聚合,放在了相同的分区里进行统一计算。
通过聚合函数后又可以吧KeyedStream转换成DataStream。
4.在使用keyBy算子时,需要向keyBy算子传递一个参数, 可使用数字位置来指定Key
比如刚才词频统计时,keyBy(0)就是hello这个单词。
val dataStream: DataStream[(Int,Double)]=
env.fromElements((1,2.0),(2,1.7),(1,4.9),(3,8.5),(3,11.2))//使用数字位置定义Key 按照第一个字段进行分组val keyedStream = dataStream.keyBy(0)
这里keyby 是第一个字段1或者2或者3分组(分类)。
5.keyBy代码举例:
importorg.apache.flink.streaming.api.scala._
importorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment
//声明一个样例类,包含三个字段:股票ID、交易时间、交易价格caseclass StockPrice(stockId:String,timeStamp:Long,price:Double)object KeyByTest{def main(args: Array[String]):Unit={//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置程序并行度
env.setParallelism(1)//创建数据源val stockList = List(
StockPrice("stock_4",1602031562148L,43.4D),
StockPrice("stock_1",1602031562148L,22.9D),
StockPrice("stock_0",1602031562153L,8.2D),
StockPrice("stock_3",1602031562153L,42.1D),
StockPrice("stock_2",1602031562153L,29.2D),
StockPrice("stock_0",1602031562159L,8.1D),
StockPrice("stock_4",1602031562159L,43.7D),
StockPrice("stock_4",1602031562169L,43.5D))val dataStream = env.fromCollection(stockList)//设定转换操作逻辑val keyedStream = dataStream.keyBy("stockId“)//打印输出
keyedStream.print()//程序触发执行
env.execute("KeyByTest")}}
这里看起来没什么变换 ,因为没进行聚合操作,所以什么变化都没有,原样输出。
我加上聚合函数,看起来就有变化了。
//简写上面的代码 加上聚合函数val keyedStream = dataStream.keyBy("stockId")val aggre = keyedStream.sum(2)//这里相加的是价格price(第三个字段)// keyedStream.print()
aggre.print()//聚合后打印
结果
对比上面哪里变化了呢?
stcok_id顺序,4-1-0-3-2-0(这里之前也有0,就会加上之前的0,变为16.299,后面的4也在累加前面的price了
4.数据转换之reduce操作和聚合操作
1.reduce:reduce算子将输入的KeyedStream通过传入的用户自定义函数滚动地进行数据聚合处理,处理以后得到一个新的DataStream,如下实例
importorg.apache.flink.streaming.api.scala._
importorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment
//声明一个样例类,包含三个字段:股票ID、交易时间、交易价格caseclass StockPrice(stockId:String,timeStamp:Long,price:Double)object ReduceTest{def main(args: Array[String]):Unit={//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置程序并行度
env.setParallelism(1)//创建数据源val stockList = List(
StockPrice("stock_4",1602031562148L,43.4D),
StockPrice("stock_1",1602031562148L,22.9D),
StockPrice("stock_0",1602031562153L,8.2D),
StockPrice("stock_3",1602031562153L,42.1D),
StockPrice("stock_2",1602031562153L,29.2D),
StockPrice("stock_0",1602031562159L,8.1D),
StockPrice("stock_4",1602031562159L,43.7D),
StockPrice("stock_4",1602031562169L,43.5D))val dataStream = env.fromCollection(stockList)//设定转换操作逻辑val keyedStream = dataStream.keyBy("stockId")val reduceStream = keyedStream
.reduce((t1,t2)=>StockPrice(t1.stockId,t1.timeStamp,t1.price+t2.price))//打印输出
reduceStream.print()//程序触发执行
env.execute("ReduceTest")}}
reduce结果和上面的一样,就是累加
2.flink也支持自定义的reduce函数
importorg.apache.flink.api.common.functions.ReduceFunction
importorg.apache.flink.streaming.api.scala._
importorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment
//声明一个样例类,包含三个字段:股票ID,交易时间,交易价格caseclass StockPrice(stockId:String,timeStamp:Long,price:Double)object MyReduceFunctionTest{def main(args: Array[String]):Unit={//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置程序并行度
env.setParallelism(1)//创建数据源val stockList = List(
StockPrice("stock_4",1602031562148L,43.4D),
StockPrice("stock_1",1602031562148L,22.9D),
StockPrice("stock_0",1602031562153L,8.2D),
StockPrice("stock_3",1602031562153L,42.1D),
StockPrice("stock_2",1602031562153L,29.2D),
StockPrice("stock_0",1602031562159L,8.1D),
StockPrice("stock_4",1602031562159L,43.7D),
StockPrice("stock_4",1602031562169L,43.5D))val dataStream = env.fromCollection(stockList)//设定转换操作逻辑val keyedStream = dataStream.keyBy("stockId")val reduceStream = keyedStream.reduce(new MyReduceFunction)//打印输出
reduceStream.print()//程序触发执行
env.execute("MyReduceFunctionTest")}class MyReduceFunction extends ReduceFunction[StockPrice]{overridedef reduce(t1: StockPrice,t2:StockPrice):StockPrice ={
StockPrice(t1.stockId,t1.timeStamp,t1.price+t2.price)}}}
主要不同的就是创建了MyReduceFunction ().
3.聚合算子
和excel一样。
代码举例:
importorg.apache.flink.streaming.api.scala._
importorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment
//声明一个样例类,包含三个字段:股票ID、交易时间、交易价格caseclass StockPrice(stockId:String,timeStamp:Long,price:Double)object AggregationTest{def main(args: Array[String]):Unit={//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置程序并行度
env.setParallelism(1)//创建数据源val stockList = List(
StockPrice("stock_4",1602031562148L,43.4D),
StockPrice("stock_1",1602031562148L,22.9D),
StockPrice("stock_0",1602031562153L,8.2D),
StockPrice("stock_3",1602031562153L,42.1D),
StockPrice("stock_2",1602031562153L,29.2D),
StockPrice("stock_0",1602031562159L,8.1D),
StockPrice("stock_4",1602031562159L,43.7D),
StockPrice("stock_4",1602031562169L,43.5D))val dataStream = env.fromCollection(stockList)//设定转换操作逻辑val keyedStream = dataStream.keyBy("stockId")val aggregationStream = keyedStream.sum(2)//区别在这里 sum聚合 2表示第三个字段//打印输出
aggregationStream.print()//执行操作
env.execute(" AggregationTest")}}
运行结果
5.数据输出
1.基本数据输出包括:文件输出,客户端输出,socket网络端口输出。
文件输出具体代码
val dataStream = env.fromElements("hadoop","spark","flink")//文件输出
dataStream.writeAsText("file:///home/hadoop/output.txt")//hdfs输出//把数据写入HDFS
dataStream.writeAsText("hdfs://localhost:9000/output.txt“)//通过writeToSocket方法将DataStream数据集输出到指定socket端口
dataStream.writeToSocket(outputHost,outputPort,new SimpleStringSchema())
2.输出到kafka
代码举例:
importorg.apache.flink.api.common.serialization.SimpleStringSchema
importorg.apache.flink.streaming.api.scala._
importorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment
importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
object SinkKafkaTest{def main(args: Array[String]):Unit={//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment
//加载或创建数据源val dataStream = env.fromElements("hadoop","spark","flink")//把数据输出到Kafka
dataStream.addSink(new FlinkKafkaProducer [String]("localhost:9092","sinkKafka",new SimpleStringSchema()))//程序触发执行
env.execute()}}
版权归原作者 今天炒饭有点咸 所有, 如有侵权,请联系我们删除。