【1】引入
pom.xml
依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch6_2.12</artifactId><version>1.10.0</version></dependency>
【2】
ES6 Scala
代码,自动导入的
scala
包需要修改为
scala._
否则会出现错误。
packagecom.zzx.flinkimportjava.utilimportorg.apache.flink.api.common.functions.RuntimeContextimportorg.apache.flink.streaming.api.scala._
importorg.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction,RequestIndexer}importorg.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkimportorg.apache.http.HttpHostimportorg.elasticsearch.client.Requests
object EsSinkTest{
def main(args:Array[String]):Unit={// 创建一个流处理执行环境
val env =StreamExecutionEnvironment.getExecutionEnvironment
//从文件中读取数据并转换为 类
val inputStreamFromFile:DataStream[String]= env.readTextFile("E:\\Project\\flink\\src\\main\\resources\\wordcount.txt")//转换
val dataStream:DataStream[SensorReading]= inputStreamFromFile
.map( data =>{var dataArray = data.split(",")SensorReading(dataArray(0),dataArray(1).toLong,dataArray(2).toDouble)})//定义一个 HttpHosts
val httpHost =newutil.ArrayList[HttpHost]()//默认 9200 我的修改为了 9201
httpHost.add(newHttpHost("192.168.1.12",9200,"http"))
httpHost.add(newHttpHost("127.0.0.1",9200,"http"))//定义一个 ElasticSearchFuntion 操作 es的function
val esSinkFunc =newElasticsearchSinkFunction[SensorReading]{//element 每一条数据 通过 index 发送
override def process(element:SensorReading, runtimeContext:RuntimeContext, index:RequestIndexer):Unit={//包装写入 es 的数据
val dataSource =newutil.HashMap[String,String]()
dataSource.put("sensor_id",element.id)
dataSource.put("temp",element.temperature.toString)
dataSource.put("ts",element.timestamp.toString)//index
val indexRequest =Requests.indexRequest().index("sensor_temp").`type`("readingdata").source(dataSource)
index.add(indexRequest)println("saved successfully "+ element.toString)}}//输出值 es
dataStream.addSink(newElasticsearchSink.Builder[SensorReading](httpHost,esSinkFunc).build())
env.execute("es")}}
【3】
ES6
输出展示
本文转载自: https://blog.csdn.net/zhengzhaoyang122/article/details/135299055
版权归原作者 程序猿进阶 所有, 如有侵权,请联系我们删除。
版权归原作者 程序猿进阶 所有, 如有侵权,请联系我们删除。