0


flink如何写入es

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录


前言

Flink sink 流数据写入到es5和es7的简单示例。


一、写入到Elasticsearch5

  • pom maven依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch5_2.11</artifactId><version>${flink.version}</version></dependency>
  • 代码如下(示例):
publicclassEs5SinkDemo{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();Row row=Row.of("张三","001",getTimestamp("2016-10-24 21:59:06"));Row row2=Row.of("张三","002",getTimestamp("2016-10-24 21:50:06"));Row row3=Row.of("张三","002",getTimestamp("2016-10-24 21:51:06"));Row row4=Row.of("李四","003",getTimestamp("2016-10-24 21:50:56"));Row row5=Row.of("李四","004",getTimestamp("2016-10-24 00:48:36"));Row row6=Row.of("王五","005",getTimestamp("2016-10-24 00:48:36"));DataStreamSource<Row> source =env.fromElements(row,row2,row3,row4,row5,row6);Map<String,String> config =newHashMap<>();//        config.put("cluster.name", "my-cluster-name");//        config.put("bulk.flush.max.actions", "1");List<InetSocketAddress> transportAddresses =newArrayList<>();

        transportAddresses.add(newInetSocketAddress(InetAddress.getByName("10.68.8.60"),9300));//Sink操作DataStreamSink<Row> rowDataStreamSink = source.addSink(newElasticsearchSink<>(config, transportAddresses,newElasticsearchSinkFunction<Row>(){publicIndexRequestcreateIndexRequest(Row element){Map<String,Object> json =newHashMap<>();
                json.put("name22", element.getField(0).toString());
                json.put("no22", element.getField(1));
                json.put("age",34);
                json.put("create_time", element.getField(2));returnRequests.indexRequest().index("cc").type("mtype").id(element.getField(1).toString()).source(json);}@Overridepublicvoidprocess(Row element,RuntimeContext ctx,RequestIndexer indexer){//利用requestIndexer进行发送请求,写入数据
                indexer.add(createIndexRequest(element));}}));
        env.execute("es demo");}privatestaticjava.sql.TimestampgetTimestamp(String str)throwsException{//        String string = "2016-10-24 21:59:06";SimpleDateFormat sdf =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");java.util.Date date=sdf.parse(str);java.sql.Timestamp s =newjava.sql.Timestamp(date.getTime());return s;}

二、写入到Elasticsearch7

  • pom maven依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7_2.11</artifactId><version>${flink.version}</version><scope>provided</scope></dependency>
  • 代码如下(示例):
importorg.apache.flink.api.common.functions.RuntimeContext;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;importorg.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;importorg.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;importorg.apache.flink.types.Row;importorg.apache.http.HttpHost;importorg.elasticsearch.action.index.IndexRequest;importorg.elasticsearch.client.Requests;importjava.text.SimpleDateFormat;importjava.util.ArrayList;importjava.util.HashMap;importjava.util.List;importjava.util.Map;publicclassEsSinkDemo{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();Row row=Row.of("张三","001",getTimestamp("2016-10-24 21:59:06"));Row row2=Row.of("张三","002",getTimestamp("2016-10-24 21:50:06"));Row row3=Row.of("张三","002",getTimestamp("2016-10-24 21:51:06"));Row row4=Row.of("李四","003",getTimestamp("2016-10-24 21:50:56"));Row row5=Row.of("李四","004",getTimestamp("2016-10-24 00:48:36"));Row row6=Row.of("王五","005",getTimestamp("2016-10-24 00:48:36"));DataStreamSource<Row> source =env.fromElements(row,row2,row3,row4,row5,row6);Map<String,String> config =newHashMap<>();//        config.put("cluster.name", "my-cluster-name");// This instructs the sink to emit after every element, otherwise they would be buffered//        config.put("bulk.flush.max.actions", "1");List<HttpHost> hosts =newArrayList<>();
        hosts.add(newHttpHost("10.68.8.69",9200,"http"));ElasticsearchSink.Builder<Row> esSinkBuilder =newElasticsearchSink.Builder<Row>(hosts,newElasticsearchSinkFunction<Row>(){publicIndexRequestcreateIndexRequest(Row element){Map<String,Object> json =newHashMap<>();
                json.put("name22", element.getField(0).toString());
                json.put("no22", element.getField(1));
                json.put("age",34);//                json.put("create_time", element.getField(2));returnRequests.indexRequest().index("cc").id(element.getField(1).toString()).source(json);}@Overridepublicvoidprocess(Row element,RuntimeContext ctx,RequestIndexer indexer){//利用requestIndexer进行发送请求,写入数据
                indexer.add(createIndexRequest(element));}});
        esSinkBuilder.setBulkFlushMaxActions(100);//Sink操作
        source.addSink(esSinkBuilder.build());

        env.execute("es demo");}privatestaticjava.sql.TimestampgetTimestamp(String str)throwsException{//        String string = "2016-10-24 21:59:06";SimpleDateFormat sdf =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");java.util.Date date=sdf.parse(str);java.sql.Timestamp s =newjava.sql.Timestamp(date.getTime());return s;}}

总结

flink写入es5和es7 的区别是引入不同的flink-connector-elasticsearch,es7已没有type的概念故无需再设置type。


本文转载自: https://blog.csdn.net/gwc791224/article/details/135355317
版权归原作者 shandongwill 所有, 如有侵权,请联系我们删除。

“flink如何写入es”的评论:

还没有评论