查es大于10000条数据-滚动查询(scroll)
背景
总所周知,es一般查询只支持最多查询出前1w条数据,很难受。想要一次性查询出你想要的数据,一些大数据的场景下,我们需要用到ElasicSearch的两种查询方式:深度分页或者滚动查询,我们今天使用的是滚动查询方式,因为需要一批次加载全部使用的数据。
介绍
深度分页
使用from和size来查询,操作比较简单,如下:
{"query":{"match_all":{}},"from":9990,"size":10}{"query":{"match_all":{}},"from":9999,"size":10}
我们在获取第9999条到10009条数据的时候,每次需要将前9990、9999条都查出来,然后再向下寻找后10条。如果es还有分片存在,加载的数量就是9990*分片数量,这样查询到以后,还要排序处理,得到10条数据。。。如此一来,搜索得太深,就会造成性能问题,会耗费内存和占用cpu。
其实我们应该避免深度分页操作(限制分页页数),比如最多只能提供100页的展示,从第101页开始就没了,毕竟用户也不会搜的那么深,我们平时搜索淘宝或者百度,一般也就看个10来页就顶多了。
譬如淘宝搜索限制分页最多100页,如下:
滚动查询
通过上面可以指定,from-size不适合做离线大数据的场景,因此我们使用es提供的另一种查询大量数据的方式——滚动查询,也叫游标查询:
json处理如下:
#第一次查询:
GET/sms/_search?scroll=5m
{"size":20,"query":{"bool":{"must":[{"match":{"userId":"9d995c0b90fe4128896a1a84eca213bf"}}]}}}
返回结果:
{"_scroll_id":"DnF1ZXJ5VGhlbkZldGNoBgAAAAAATJH1FlFTYzlSZ0VNVGdlM2o0T0dTX2tVUncAAAAAAE0-zBZQUVp6Sy04X1J1NjJCaVZfQUhHWjFnAAAAAABMkfYWUVNjOVJnRU1UZ2UzajRPR1Nfa1VSdwAAAAAATXVxFk83UWRhNGg3UmxTQnpXTEUzd0dreXcAAAAAAEyR9xZRU2M5UmdFTVRnZTNqNE9HU19rVVJ3AAAAAABNPs0WUFFaekstOF9SdTYyQmlWX0FIR1oxZw==","took":6,......}
把上一次得到的_scroll_id拿到按以下查询即可得到下一轮的数据:
GET/_search/scroll/{"scroll":"1m","scroll_id":"DnF1ZXJ5VGhlbkZldGNoBgAAAAAATJH1FlFTYzlSZ0VNVGdlM2o0T0dTX2tVUncAAAAAAE0-zBZQUVp6Sy04X1J1NjJCaVZfQUhHWjFnAAAAAABMkfYWUVNjOVJnRU1UZ2UzajRPR1Nfa1VSdwAAAAAATXVxFk83UWRhNGg3UmxTQnpXTEUzd0dreXcAAAAAAEyR9xZRU2M5UmdFTVRnZTNqNE9HU19rVVJ3AAAAAABNPs0WUFFaekstOF9SdTYyQmlWX0FIR1oxZw=="}
这样直到把数据查完为止。
使用json查询只是做一个简单的理解,我们真正用到的还是使用语言去操作它,RestHignLevelClient就是一个很实用的es客户端,接下来我们使用java对其进行操作:导入的pom如下,需要与es版本对应:
<dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.0.0</version></dependency>
直接上一个对api调用的工具类吧:❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️
其中,获得SearchResponse使用了滚动查询:具体是由以下几个模块组成的:
1、构建searchRequest
Scroll scroll =newScroll(TimeValue.timeValueMillis(SCROLL_TIMEOUT));//构建searchRequestSearchRequest request =newSearchRequest(indices);SearchSourceBuilder sourceBuilder =newSearchSourceBuilder();if(includes !=null){//构造器加入需要查找的字段
sourceBuilder.fetchSource(includes,null);}//加入query语句
sourceBuilder.query(query);//每次滚动的长度
sourceBuilder.size(SIZE);//加入排序字段if(orderField !=null&&!"".equals(orderField.trim())){
sourceBuilder.sort(orderField, order);}//加入scroll和构造器
request.scroll(scroll);
request.source(sourceBuilder);
获取返回结果:
SearchResponse searchResponse = client.search(request,RequestOptions.DEFAULT);//拿到第一个ScrollId(游标)String scrollId = searchResponse.getScrollId();//拿到hits结果SearchHit[] hits = searchResponse.getHits().getHits();//保存返回结果ListList<T> result =newArrayList<>();
scrollIdList.add(scrollId);
循环滚动查询—>保存结果:
//滚动查询将SearchHit封装到result中while(ArrayUtils.isNotEmpty(hits)){for(SearchHit hit : hits){//Function<SearchHit, T>, 输入SearchHit,经过操作后,返回T结果
result.add(fun.apply(hit));}//说明滚动完了,返回结果即可if(hits.length < SIZE){break;}//继续滚动,根据上一个游标,得到这次开始查询位置SearchScrollRequest searchScrollRequest =newSearchScrollRequest(scrollId);
searchScrollRequest.scroll(scroll);//得到结果SearchResponse searchScrollResponse = client.scroll(searchScrollRequest,RequestOptions.DEFAULT);//定位游标
scrollId = searchScrollResponse.getScrollId();
hits = searchScrollResponse.getHits().getHits();
scrollIdList.add(scrollId);
util全部代码如下:
packagecom.yq.demo.Util;importorg.apache.commons.lang3.ArrayUtils;importorg.elasticsearch.action.search.ClearScrollRequest;importorg.elasticsearch.action.search.SearchRequest;importorg.elasticsearch.action.search.SearchResponse;importorg.elasticsearch.action.search.SearchScrollRequest;importorg.elasticsearch.action.support.IndicesOptions;importorg.elasticsearch.client.RequestOptions;importorg.elasticsearch.client.RestHighLevelClient;importorg.elasticsearch.core.TimeValue;importorg.elasticsearch.index.query.QueryBuilder;importorg.elasticsearch.search.Scroll;importorg.elasticsearch.search.SearchHit;importorg.elasticsearch.search.aggregations.AggregationBuilder;importorg.elasticsearch.search.builder.SearchSourceBuilder;importorg.elasticsearch.search.sort.SortOrder;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importjava.util.ArrayList;importjava.util.List;importjava.util.function.Function;publicclassESUtil{privatestaticfinalLogger log =LoggerFactory.getLogger(ESUtil.class);privatestaticfinallong SCROLL_TIMEOUT =180000;privatestaticint SIZE =1000;privatestaticint MAX_BUFFER =209715200;/**
* 构建SearchResponse
*
* @param client restHighLevelClient
* @param indices 索引
* @param query queryBuilder
* @param includes 包含的字段
* @param orderField 排序字段
* @param order 排序类型
* @param fun 返回函数
* @param <T> 返回类型
* @return List, 可以使用fun转换为T结果
* @throws Exception
*/publicstatic<T>List<T>searchResponse(RestHighLevelClient client,String[] indices,QueryBuilder query,String[] includes,String orderField,SortOrder order,Function<SearchHit,T> fun)throwsException{//滚动查询的ScrollScroll scroll =newScroll(TimeValue.timeValueMillis(SCROLL_TIMEOUT));//构建searchRequestSearchRequest request =newSearchRequest(indices);SearchSourceBuilder sourceBuilder =newSearchSourceBuilder();if(includes !=null){//构造器加入需要查找的字段
sourceBuilder.fetchSource(includes,null);}//加入query语句
sourceBuilder.query(query);//每次滚动的长度
sourceBuilder.size(SIZE);//加入排序字段if(orderField !=null&&!"".equals(orderField.trim())){
sourceBuilder.sort(orderField, order);}//加入scroll和构造器
request.scroll(scroll);
request.source(sourceBuilder);//存储scroll的listList<String> scrollIdList =newArrayList<>();//返回结果SearchResponse searchResponse = client.search(request,RequestOptions.DEFAULT);//拿到第一个ScrollId(游标)String scrollId = searchResponse.getScrollId();//拿到hits结果SearchHit[] hits = searchResponse.getHits().getHits();//保存返回结果ListList<T> result =newArrayList<>();
scrollIdList.add(scrollId);try{//滚动查询将SearchHit封装到result中while(ArrayUtils.isNotEmpty(hits)){for(SearchHit hit : hits){//Function<SearchHit, T>, 输入SearchHit,经过操作后,返回T结果
result.add(fun.apply(hit));}//说明滚动完了,返回结果即可if(hits.length < SIZE){break;}//继续滚动,根据上一个游标,得到这次开始查询位置SearchScrollRequest searchScrollRequest =newSearchScrollRequest(scrollId);
searchScrollRequest.scroll(scroll);//得到结果SearchResponse searchScrollResponse = client.scroll(searchScrollRequest,RequestOptions.DEFAULT);//定位游标
scrollId = searchScrollResponse.getScrollId();
hits = searchScrollResponse.getHits().getHits();
scrollIdList.add(scrollId);}}finally{//清理scroll,释放资源ClearScrollRequest clearScrollRequest =newClearScrollRequest();
clearScrollRequest.setScrollIds(scrollIdList);
client.clearScroll(clearScrollRequest,RequestOptions.DEFAULT);}return result;}/**
* 聚合查询的SearchResponse
* @param client
* @param indices 索引
* @param query QueryBuilder
* @param aggregations AggregationBuilder
* @return SearchResponse
* @throws Exception
*/publicstaticSearchResponsesearchResponse(RestHighLevelClient client,String[] indices,QueryBuilder query,AggregationBuilder... aggregations)throwsException{//构建request请求SearchRequest request =newSearchRequest(indices);SearchSourceBuilder sourceBuilder =newSearchSourceBuilder();
sourceBuilder.query(query);//加入Aggif(aggregations !=null&& aggregations.length >0){for(AggregationBuilder aggregation : aggregations){
sourceBuilder.aggregation(aggregation);}}
sourceBuilder.size(0);//忽略不可用索引,只用于开放索引
request.indicesOptions(IndicesOptions.lenientExpandOpen());
request.source(sourceBuilder);return client.search(request,RequestOptions.DEFAULT);}}
希望对你有所帮助,Thank you for whatching!!!😆😆😆😆😆😆
版权归原作者 卡卡东~ 所有, 如有侵权,请联系我们删除。