0


ES基本操作工具类-JAVA-API 版本:7.8.0

添加依赖

<dependencies><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>7.8.0</version></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.8.0</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.9.9</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>2.8.2</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.8.2</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.6</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.72</version></dependency></dependencies>

代码 (没自测过!!!)

package com.lh.es.common.util;

import org.apache.http.HttpHost;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import sun.misc.BASE64Encoder;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;

/**
 * ES 工具类
 **/
public class EsUtil {

    private static final Map<String, RestHighLevelClient> clientMap = new HashMap<String, RestHighLevelClient>();
    private static final Map<String, RestClientBuilder> restBuilderMap = new HashMap<String, RestClientBuilder>();
    
    /**
     * 创建不带权限的客户端
     * @param ip
     * @param port
     * @return
     */
    public static RestHighLevelClient getClient(String ip, int port) {
        String clientKey = ip + port;
        try {
            RestHighLevelClient clientSingle = clientMap.get(clientKey);

            if (clientSingle == null) {
                RestClientBuilder builder;
                if (restBuilderMap.get(clientKey) == null) {
                    builder = RestClient.builder(getHttpHosts(ip, port));
                    restBuilderMap.put(clientKey, builder);
                } else {
                    builder = restBuilderMap.get(clientKey);
                }

                RestHighLevelClient client = new RestHighLevelClient(builder);
                clientMap.put(clientKey, client);
                return client;
            } else {
                return clientSingle;
            }

        } catch (Exception e) {
            e.printStackTrace();
            return null;

        }
    }
    
    /**
     * 创建带权限的客户端
     * @param ip
     * @param port
     * @param user
     * @param key
     * @return
     */
    public static RestHighLevelClient getAuthClient(String ip, int port, String user, String key) {
        String clientKey = ip + port + user;
        try {
            RestHighLevelClient clientSingle = clientMap.get(clientKey);
            if (clientSingle == null) {
                RestClientBuilder builder;
                if (restBuilderMap.get(clientKey) == null) {
                    builder = RestClient.builder(getHttpHosts(ip, port));
                    restBuilderMap.put(clientKey, builder);
                } else {
                    builder = restBuilderMap.get(clientKey);
                }
                String auth = new BASE64Encoder().encode((user + ":" + key).getBytes());
                builder.setDefaultHeaders(new BasicHeader[]{new BasicHeader("Authorization", "Basic" + auth)});
                RestHighLevelClient client = new RestHighLevelClient(builder);
                clientMap.put(clientKey, client);
                return client;
            } else {
                return clientSingle;
            }

        } catch (Exception e) {
            e.printStackTrace();
            return null;

        }
    }

    public static HttpHost[] getHttpHosts(String host, int port) {

        String[] split = host.split(",");

        HttpHost[] httpHosts = new HttpHost[split.length];
        for (int i = 0; i < split.length; i++) {

            httpHosts[i] = new HttpHost(split[i], port, "http");

        }
        return httpHosts;
    }

    /**
     * 分页查询
     *
     * @param client
     * @param indexName
     * @param docType
     * @param queryBuilders 条件构造器
     * @param from          页数
     * @param size          每页大小
     * @return
     */

    public static List<Map<String, Object>> queryPage(RestHighLevelClient client, String indexName, String docType, QueryBuilder queryBuilders, int from, int size) {

        List<Map<String, Object>> resList = new ArrayList<>();

        //根据页数计算偏移量
        from = (from - 1) * size;
        try {
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            searchSourceBuilder.query(queryBuilders);
            searchSourceBuilder.from(from);
            searchSourceBuilder.size(size);

            SearchRequest searchRequest = new SearchRequest();
            searchRequest.indices(indexName);
            searchRequest.types(docType); //9.0版本后没有docType概念
            searchRequest.source(searchSourceBuilder);
            SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

            SearchHit[] hits = searchResponse.getHits().getHits();
            if (!CollectionUtils.isEmpty(hits)) {
                for (SearchHit hit : hits) {
                    resList.add(hit.getSourceAsMap());
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
        return resList;
    }

    /**
     * @param client
     * @param indexName
     * @param docType
     * @param queryBuilders
     * @param exFields
     * @param fields
     * @param size          每次滚动返回的条数
     * @param timeout       滚动时间间隔 时间间隔内当前条数没返回完会往下滚动
     * @return
     */

    public static List<Map<String, Object>> findWithScroll(RestHighLevelClient client,
                                                           String indexName,
                                                           String docType,
                                                           QueryBuilder queryBuilders,
                                                           String exFields,
                                                           String fields,
                                                           int size,
                                                           int timeout) {

        List<Map<String, Object>> resList = new ArrayList<>();

        try {

            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

            searchSourceBuilder.size(size);
            searchSourceBuilder.query(queryBuilders);

            SearchRequest searchRequest = new SearchRequest(indexName);
            searchRequest.source(searchSourceBuilder);
            searchRequest.types(docType);
            if (StringUtils.isNullOrEmpty(fields) && !StringUtils.isNullOrEmpty(exFields)) {
                searchSourceBuilder.fetchSource(fields.split(","), exFields.split(","));
            } else if (StringUtils.isNullOrEmpty(fields)) {
                searchSourceBuilder.fetchSource(null, exFields.split(","));

            } else if (StringUtils.isNullOrEmpty(exFields)) {
                searchSourceBuilder.fetchSource(fields.split(","), null);

            }
            //初始化scroll
            Scroll scroll = new Scroll(TimeValue.timeValueMinutes(timeout));
            searchRequest.scroll(scroll);

            //第一次游标返回的结果中拿到当前scrollId
            SearchResponse search = client.search(searchRequest, RequestOptions.DEFAULT);
            String firstScrollId = search.getScrollId();
            SearchHit[] hits = search.getHits().getHits();
            if (CollectionUtils.isEmpty(hits)) {
                return new ArrayList<>();
            }
            //将第一次返回的结果收集
            for (SearchHit hit : hits) {
                resList.add(hit.getSourceAsMap());
            }

            //收集之后的结果 注意可以加入长度限制 避免OOM
            while (!CollectionUtils.isEmpty(hits)) {
                SearchScrollRequest searchScrollRequest = new SearchScrollRequest(firstScrollId);
                searchScrollRequest.scroll(scroll);
                SearchResponse scroll1 = client.scroll(searchScrollRequest, RequestOptions.DEFAULT);
                firstScrollId =   scroll1.getScrollId();
                for (SearchHit hit : hits) {
                    resList.add(hit.getSourceAsMap());
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
        return resList;
    }

    /**
     * 根据ID查找
     * @param client
     * @param indexName
     * @param type
     * @param id
     * @return
     */
    public static List<Map<String, Object>> queryById(RestHighLevelClient client, String indexName, String type, String id) {
        List<Map<String, Object>> resList = new ArrayList<Map<String, Object>>();

        try {
            SearchRequest searchRequest = new SearchRequest();
            searchRequest.indices(indexName);
            searchRequest.types(type);
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            searchSourceBuilder.query(QueryBuilders.idsQuery().addIds(id.split(",")));

            searchRequest.source(searchSourceBuilder);
            SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
            for (SearchHit hit : searchResponse.getHits().getHits()) {
                resList.add(hit.getSourceAsMap());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return resList;

    }

    /**
     * 批量插入
     * @param client
     * @param indexName
     * @param type
     * @param list
     * @param timeOut
     * @return
     */
    public static boolean bulkInsertMap(RestHighLevelClient client, String indexName, String type, List<Map<String, Object>> list, int timeOut) {
        boolean res = true;

        try {

            BulkProcessor bulkProcessor = getBulkProcessor(client);
            for (Map<String, Object> msg : list) {
                //9.0之后去去掉type就好
                bulkProcessor.add(new IndexRequest(indexName, type).source(msg, XContentType.JSON).id(String.valueOf(msg.get("_id"))));

            }
            //
            res = bulkProcessor.awaitClose(timeOut, TimeUnit.SECONDS);
            // 立即退出会导致剩余的文档不会插入
            // 比如 10100 在 1000条完成插入后 此时BulkProcessor的awaitClose会被标记为已完成 这时候时候close的话 剩下的文档不会被插入。
            //  bulkProcessor.close();

        } catch (Exception e) {
            e.printStackTrace();
            res = false;
        }

        return res;
    }

    /**
     * 插入单条数据
     *
     * @param client
     * @param indexName
     * @param indexType
     * @param record
     * @return
     */
    public static boolean insertOne(RestHighLevelClient client, String indexName, String indexType, Map<String, Object> record) {

        boolean res = true;
        IndexRequest indexRequest;

        try {
            Object id = record.get("_id");

            if (id != null) {
                record.remove("_id");
                indexRequest = new IndexRequest(indexName, indexType, id.toString()).source(record);

            } else {
                indexRequest = new IndexRequest(indexName, indexType).source(record);

            }

            client.index(indexRequest, RequestOptions.DEFAULT);

        } catch (Exception e) {

            res = false;

        }
        return res;
    }

    /**
     * 删除数据
     *
     * @param client
     * @param indexName
     * @param indexType
     * @param queryBuilder 条件构造器
     * @return
     */
    public static boolean delete(RestHighLevelClient client, String indexName, QueryBuilder queryBuilder, String indexType) {

        boolean res = true;

        try {

            DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest();
            deleteByQueryRequest.indices(indexName);
            deleteByQueryRequest.types(indexType);
            deleteByQueryRequest.setQuery(queryBuilder);

            client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);

        } catch (Exception e) {

            res = false;

        }
        return res;
    }

    /**
     * 根据id删除数据
     *
     * @param client
     * @param indexName
     * @param indexType
     * @param id
     * @return
     */
    public static boolean deleteById(RestHighLevelClient client, String indexName, String id, String indexType) {

        boolean res = true;
        DeleteRequest deleteRequest = new DeleteRequest();

        try {
            deleteRequest.index(indexName);
            deleteRequest.type(indexType);
            deleteRequest.id(id);
            client.delete(deleteRequest, RequestOptions.DEFAULT);

        } catch (Exception e) {

            res = false;

        }
        return res;
    }

    public static BulkProcessor getBulkProcessor(RestHighLevelClient client) {

        BulkProcessor.Listener listener = new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long l, BulkRequest bulkRequest) {

            }

            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {

            }

            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {

            }
        };

        BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer = (request, bulkListener) -> client
                .bulkAsync(request, RequestOptions.DEFAULT, bulkListener);

        return BulkProcessor.builder(bulkConsumer, listener)
                .setBulkActions(500)
                .setBulkSize(new ByteSizeValue(100L, ByteSizeUnit.MB))
                .setConcurrentRequests(10)
                .setFlushInterval(TimeValue.timeValueSeconds(100L))
                .build();

    }

}
标签: java elasticsearch

本文转载自: https://blog.csdn.net/qq_40374643/article/details/123992788
版权归原作者 鸿鸿鸿鸿 所有, 如有侵权,请联系我们删除。

“ES基本操作工具类-JAVA-API 版本:7.8.0”的评论:

还没有评论