0


Flink 输出至 Elasticsearch

【1】引入

pom.xml

依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch6_2.12</artifactId><version>1.10.0</version></dependency>

【2】

ES6 Scala

代码,自动导入的

scala

包需要修改为

scala._

否则会出现错误。

packagecom.zzx.flinkimportjava.utilimportorg.apache.flink.api.common.functions.RuntimeContextimportorg.apache.flink.streaming.api.scala._
importorg.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction,RequestIndexer}importorg.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkimportorg.apache.http.HttpHostimportorg.elasticsearch.client.Requests

object EsSinkTest{
  def main(args:Array[String]):Unit={// 创建一个流处理执行环境
    val env =StreamExecutionEnvironment.getExecutionEnvironment
    //从文件中读取数据并转换为 类
    val inputStreamFromFile:DataStream[String]= env.readTextFile("E:\\Project\\flink\\src\\main\\resources\\wordcount.txt")//转换
    val dataStream:DataStream[SensorReading]= inputStreamFromFile
      .map( data =>{var dataArray = data.split(",")SensorReading(dataArray(0),dataArray(1).toLong,dataArray(2).toDouble)})//定义一个 HttpHosts
    val httpHost =newutil.ArrayList[HttpHost]()//默认 9200 我的修改为了 9201
    httpHost.add(newHttpHost("192.168.1.12",9200,"http"))
    httpHost.add(newHttpHost("127.0.0.1",9200,"http"))//定义一个 ElasticSearchFuntion 操作 es的function
    val esSinkFunc =newElasticsearchSinkFunction[SensorReading]{//element 每一条数据 通过 index 发送
      override def process(element:SensorReading, runtimeContext:RuntimeContext, index:RequestIndexer):Unit={//包装写入 es 的数据
        val dataSource =newutil.HashMap[String,String]()
        dataSource.put("sensor_id",element.id)
        dataSource.put("temp",element.temperature.toString)
        dataSource.put("ts",element.timestamp.toString)//index
        val indexRequest =Requests.indexRequest().index("sensor_temp").`type`("readingdata").source(dataSource)
        index.add(indexRequest)println("saved successfully "+ element.toString)}}//输出值 es
    dataStream.addSink(newElasticsearchSink.Builder[SensorReading](httpHost,esSinkFunc).build())
    env.execute("es")}}

【3】

ES6

输出展示

​ [点击并拖拽以移动] ​​


本文转载自: https://blog.csdn.net/zhengzhaoyang122/article/details/135299055
版权归原作者 程序猿进阶 所有, 如有侵权,请联系我们删除。

“Flink 输出至 Elasticsearch”的评论:

还没有评论