添加依赖
<dependencies><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>7.8.0</version></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.8.0</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.9.9</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>2.8.2</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.8.2</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.6</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.72</version></dependency></dependencies>
代码 (没自测过!!!)
package com.lh.es.common.util;
import org.apache.http.HttpHost;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import sun.misc.BASE64Encoder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
/**
* ES 工具类
**/
public class EsUtil {
private static final Map<String, RestHighLevelClient> clientMap = new HashMap<String, RestHighLevelClient>();
private static final Map<String, RestClientBuilder> restBuilderMap = new HashMap<String, RestClientBuilder>();
/**
* 创建不带权限的客户端
* @param ip
* @param port
* @return
*/
public static RestHighLevelClient getClient(String ip, int port) {
String clientKey = ip + port;
try {
RestHighLevelClient clientSingle = clientMap.get(clientKey);
if (clientSingle == null) {
RestClientBuilder builder;
if (restBuilderMap.get(clientKey) == null) {
builder = RestClient.builder(getHttpHosts(ip, port));
restBuilderMap.put(clientKey, builder);
} else {
builder = restBuilderMap.get(clientKey);
}
RestHighLevelClient client = new RestHighLevelClient(builder);
clientMap.put(clientKey, client);
return client;
} else {
return clientSingle;
}
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 创建带权限的客户端
* @param ip
* @param port
* @param user
* @param key
* @return
*/
public static RestHighLevelClient getAuthClient(String ip, int port, String user, String key) {
String clientKey = ip + port + user;
try {
RestHighLevelClient clientSingle = clientMap.get(clientKey);
if (clientSingle == null) {
RestClientBuilder builder;
if (restBuilderMap.get(clientKey) == null) {
builder = RestClient.builder(getHttpHosts(ip, port));
restBuilderMap.put(clientKey, builder);
} else {
builder = restBuilderMap.get(clientKey);
}
String auth = new BASE64Encoder().encode((user + ":" + key).getBytes());
builder.setDefaultHeaders(new BasicHeader[]{new BasicHeader("Authorization", "Basic" + auth)});
RestHighLevelClient client = new RestHighLevelClient(builder);
clientMap.put(clientKey, client);
return client;
} else {
return clientSingle;
}
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public static HttpHost[] getHttpHosts(String host, int port) {
String[] split = host.split(",");
HttpHost[] httpHosts = new HttpHost[split.length];
for (int i = 0; i < split.length; i++) {
httpHosts[i] = new HttpHost(split[i], port, "http");
}
return httpHosts;
}
/**
* 分页查询
*
* @param client
* @param indexName
* @param docType
* @param queryBuilders 条件构造器
* @param from 页数
* @param size 每页大小
* @return
*/
public static List<Map<String, Object>> queryPage(RestHighLevelClient client, String indexName, String docType, QueryBuilder queryBuilders, int from, int size) {
List<Map<String, Object>> resList = new ArrayList<>();
//根据页数计算偏移量
from = (from - 1) * size;
try {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(queryBuilders);
searchSourceBuilder.from(from);
searchSourceBuilder.size(size);
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(indexName);
searchRequest.types(docType); //9.0版本后没有docType概念
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
SearchHit[] hits = searchResponse.getHits().getHits();
if (!CollectionUtils.isEmpty(hits)) {
for (SearchHit hit : hits) {
resList.add(hit.getSourceAsMap());
}
}
} catch (Exception e) {
e.printStackTrace();
}
return resList;
}
/**
* @param client
* @param indexName
* @param docType
* @param queryBuilders
* @param exFields
* @param fields
* @param size 每次滚动返回的条数
* @param timeout 滚动时间间隔 时间间隔内当前条数没返回完会往下滚动
* @return
*/
public static List<Map<String, Object>> findWithScroll(RestHighLevelClient client,
String indexName,
String docType,
QueryBuilder queryBuilders,
String exFields,
String fields,
int size,
int timeout) {
List<Map<String, Object>> resList = new ArrayList<>();
try {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.size(size);
searchSourceBuilder.query(queryBuilders);
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.source(searchSourceBuilder);
searchRequest.types(docType);
if (StringUtils.isNullOrEmpty(fields) && !StringUtils.isNullOrEmpty(exFields)) {
searchSourceBuilder.fetchSource(fields.split(","), exFields.split(","));
} else if (StringUtils.isNullOrEmpty(fields)) {
searchSourceBuilder.fetchSource(null, exFields.split(","));
} else if (StringUtils.isNullOrEmpty(exFields)) {
searchSourceBuilder.fetchSource(fields.split(","), null);
}
//初始化scroll
Scroll scroll = new Scroll(TimeValue.timeValueMinutes(timeout));
searchRequest.scroll(scroll);
//第一次游标返回的结果中拿到当前scrollId
SearchResponse search = client.search(searchRequest, RequestOptions.DEFAULT);
String firstScrollId = search.getScrollId();
SearchHit[] hits = search.getHits().getHits();
if (CollectionUtils.isEmpty(hits)) {
return new ArrayList<>();
}
//将第一次返回的结果收集
for (SearchHit hit : hits) {
resList.add(hit.getSourceAsMap());
}
//收集之后的结果 注意可以加入长度限制 避免OOM
while (!CollectionUtils.isEmpty(hits)) {
SearchScrollRequest searchScrollRequest = new SearchScrollRequest(firstScrollId);
searchScrollRequest.scroll(scroll);
SearchResponse scroll1 = client.scroll(searchScrollRequest, RequestOptions.DEFAULT);
firstScrollId = scroll1.getScrollId();
for (SearchHit hit : hits) {
resList.add(hit.getSourceAsMap());
}
}
} catch (Exception e) {
e.printStackTrace();
}
return resList;
}
/**
* 根据ID查找
* @param client
* @param indexName
* @param type
* @param id
* @return
*/
public static List<Map<String, Object>> queryById(RestHighLevelClient client, String indexName, String type, String id) {
List<Map<String, Object>> resList = new ArrayList<Map<String, Object>>();
try {
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(indexName);
searchRequest.types(type);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.idsQuery().addIds(id.split(",")));
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
for (SearchHit hit : searchResponse.getHits().getHits()) {
resList.add(hit.getSourceAsMap());
}
} catch (Exception e) {
e.printStackTrace();
}
return resList;
}
/**
* 批量插入
* @param client
* @param indexName
* @param type
* @param list
* @param timeOut
* @return
*/
public static boolean bulkInsertMap(RestHighLevelClient client, String indexName, String type, List<Map<String, Object>> list, int timeOut) {
boolean res = true;
try {
BulkProcessor bulkProcessor = getBulkProcessor(client);
for (Map<String, Object> msg : list) {
//9.0之后去去掉type就好
bulkProcessor.add(new IndexRequest(indexName, type).source(msg, XContentType.JSON).id(String.valueOf(msg.get("_id"))));
}
//
res = bulkProcessor.awaitClose(timeOut, TimeUnit.SECONDS);
// 立即退出会导致剩余的文档不会插入
// 比如 10100 在 1000条完成插入后 此时BulkProcessor的awaitClose会被标记为已完成 这时候时候close的话 剩下的文档不会被插入。
// bulkProcessor.close();
} catch (Exception e) {
e.printStackTrace();
res = false;
}
return res;
}
/**
* 插入单条数据
*
* @param client
* @param indexName
* @param indexType
* @param record
* @return
*/
public static boolean insertOne(RestHighLevelClient client, String indexName, String indexType, Map<String, Object> record) {
boolean res = true;
IndexRequest indexRequest;
try {
Object id = record.get("_id");
if (id != null) {
record.remove("_id");
indexRequest = new IndexRequest(indexName, indexType, id.toString()).source(record);
} else {
indexRequest = new IndexRequest(indexName, indexType).source(record);
}
client.index(indexRequest, RequestOptions.DEFAULT);
} catch (Exception e) {
res = false;
}
return res;
}
/**
* 删除数据
*
* @param client
* @param indexName
* @param indexType
* @param queryBuilder 条件构造器
* @return
*/
public static boolean delete(RestHighLevelClient client, String indexName, QueryBuilder queryBuilder, String indexType) {
boolean res = true;
try {
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest();
deleteByQueryRequest.indices(indexName);
deleteByQueryRequest.types(indexType);
deleteByQueryRequest.setQuery(queryBuilder);
client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
} catch (Exception e) {
res = false;
}
return res;
}
/**
* 根据id删除数据
*
* @param client
* @param indexName
* @param indexType
* @param id
* @return
*/
public static boolean deleteById(RestHighLevelClient client, String indexName, String id, String indexType) {
boolean res = true;
DeleteRequest deleteRequest = new DeleteRequest();
try {
deleteRequest.index(indexName);
deleteRequest.type(indexType);
deleteRequest.id(id);
client.delete(deleteRequest, RequestOptions.DEFAULT);
} catch (Exception e) {
res = false;
}
return res;
}
public static BulkProcessor getBulkProcessor(RestHighLevelClient client) {
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long l, BulkRequest bulkRequest) {
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
}
@Override
public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
}
};
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer = (request, bulkListener) -> client
.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
return BulkProcessor.builder(bulkConsumer, listener)
.setBulkActions(500)
.setBulkSize(new ByteSizeValue(100L, ByteSizeUnit.MB))
.setConcurrentRequests(10)
.setFlushInterval(TimeValue.timeValueSeconds(100L))
.build();
}
}
标签:
java
elasticsearch
本文转载自: https://blog.csdn.net/qq_40374643/article/details/123992788
版权归原作者 鸿鸿鸿鸿 所有, 如有侵权,请联系我们删除。
版权归原作者 鸿鸿鸿鸿 所有, 如有侵权,请联系我们删除。