0


查es大于10000条数据-滚动查询(scroll)

查es大于10000条数据-滚动查询(scroll)

背景

总所周知,es一般查询只支持最多查询出前1w条数据,很难受。想要一次性查询出你想要的数据,一些大数据的场景下,我们需要用到ElasicSearch的两种查询方式:深度分页或者滚动查询,我们今天使用的是滚动查询方式,因为需要一批次加载全部使用的数据。

介绍

深度分页

使用from和size来查询,操作比较简单,如下:

{"query":{"match_all":{}},"from":9990,"size":10}{"query":{"match_all":{}},"from":9999,"size":10}

我们在获取第9999条到10009条数据的时候,每次需要将前9990、9999条都查出来,然后再向下寻找后10条。如果es还有分片存在,加载的数量就是9990*分片数量,这样查询到以后,还要排序处理,得到10条数据。。。如此一来,搜索得太深,就会造成性能问题,会耗费内存和占用cpu。

其实我们应该避免深度分页操作(限制分页页数),比如最多只能提供100页的展示,从第101页开始就没了,毕竟用户也不会搜的那么深,我们平时搜索淘宝或者百度,一般也就看个10来页就顶多了。

譬如淘宝搜索限制分页最多100页,如下:
在这里插入图片描述

滚动查询

通过上面可以指定,from-size不适合做离线大数据的场景,因此我们使用es提供的另一种查询大量数据的方式——滚动查询,也叫游标查询:
json处理如下:

#第一次查询:
GET/sms/_search?scroll=5m
{"size":20,"query":{"bool":{"must":[{"match":{"userId":"9d995c0b90fe4128896a1a84eca213bf"}}]}}}
返回结果:
{"_scroll_id":"DnF1ZXJ5VGhlbkZldGNoBgAAAAAATJH1FlFTYzlSZ0VNVGdlM2o0T0dTX2tVUncAAAAAAE0-zBZQUVp6Sy04X1J1NjJCaVZfQUhHWjFnAAAAAABMkfYWUVNjOVJnRU1UZ2UzajRPR1Nfa1VSdwAAAAAATXVxFk83UWRhNGg3UmxTQnpXTEUzd0dreXcAAAAAAEyR9xZRU2M5UmdFTVRnZTNqNE9HU19rVVJ3AAAAAABNPs0WUFFaekstOF9SdTYyQmlWX0FIR1oxZw==","took":6,......}
把上一次得到的_scroll_id拿到按以下查询即可得到下一轮的数据:
GET/_search/scroll/{"scroll":"1m","scroll_id":"DnF1ZXJ5VGhlbkZldGNoBgAAAAAATJH1FlFTYzlSZ0VNVGdlM2o0T0dTX2tVUncAAAAAAE0-zBZQUVp6Sy04X1J1NjJCaVZfQUhHWjFnAAAAAABMkfYWUVNjOVJnRU1UZ2UzajRPR1Nfa1VSdwAAAAAATXVxFk83UWRhNGg3UmxTQnpXTEUzd0dreXcAAAAAAEyR9xZRU2M5UmdFTVRnZTNqNE9HU19rVVJ3AAAAAABNPs0WUFFaekstOF9SdTYyQmlWX0FIR1oxZw=="}
这样直到把数据查完为止。

使用json查询只是做一个简单的理解,我们真正用到的还是使用语言去操作它,RestHignLevelClient就是一个很实用的es客户端,接下来我们使用java对其进行操作:导入的pom如下,需要与es版本对应:

<dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.0.0</version></dependency>

直接上一个对api调用的工具类吧:❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️
其中,获得SearchResponse使用了滚动查询:具体是由以下几个模块组成的:

1、构建searchRequest

Scroll scroll =newScroll(TimeValue.timeValueMillis(SCROLL_TIMEOUT));//构建searchRequestSearchRequest request =newSearchRequest(indices);SearchSourceBuilder sourceBuilder =newSearchSourceBuilder();if(includes !=null){//构造器加入需要查找的字段
            sourceBuilder.fetchSource(includes,null);}//加入query语句
        sourceBuilder.query(query);//每次滚动的长度
        sourceBuilder.size(SIZE);//加入排序字段if(orderField !=null&&!"".equals(orderField.trim())){
            sourceBuilder.sort(orderField, order);}//加入scroll和构造器
        request.scroll(scroll);
        request.source(sourceBuilder);

获取返回结果:

SearchResponse searchResponse = client.search(request,RequestOptions.DEFAULT);//拿到第一个ScrollId(游标)String scrollId = searchResponse.getScrollId();//拿到hits结果SearchHit[] hits = searchResponse.getHits().getHits();//保存返回结果ListList<T> result =newArrayList<>();
        scrollIdList.add(scrollId);

循环滚动查询—>保存结果:

//滚动查询将SearchHit封装到result中while(ArrayUtils.isNotEmpty(hits)){for(SearchHit hit : hits){//Function<SearchHit, T>, 输入SearchHit,经过操作后,返回T结果
                    result.add(fun.apply(hit));}//说明滚动完了,返回结果即可if(hits.length < SIZE){break;}//继续滚动,根据上一个游标,得到这次开始查询位置SearchScrollRequest searchScrollRequest =newSearchScrollRequest(scrollId);
                searchScrollRequest.scroll(scroll);//得到结果SearchResponse searchScrollResponse = client.scroll(searchScrollRequest,RequestOptions.DEFAULT);//定位游标
                scrollId = searchScrollResponse.getScrollId();
                hits = searchScrollResponse.getHits().getHits();
                scrollIdList.add(scrollId);

util全部代码如下:

packagecom.yq.demo.Util;importorg.apache.commons.lang3.ArrayUtils;importorg.elasticsearch.action.search.ClearScrollRequest;importorg.elasticsearch.action.search.SearchRequest;importorg.elasticsearch.action.search.SearchResponse;importorg.elasticsearch.action.search.SearchScrollRequest;importorg.elasticsearch.action.support.IndicesOptions;importorg.elasticsearch.client.RequestOptions;importorg.elasticsearch.client.RestHighLevelClient;importorg.elasticsearch.core.TimeValue;importorg.elasticsearch.index.query.QueryBuilder;importorg.elasticsearch.search.Scroll;importorg.elasticsearch.search.SearchHit;importorg.elasticsearch.search.aggregations.AggregationBuilder;importorg.elasticsearch.search.builder.SearchSourceBuilder;importorg.elasticsearch.search.sort.SortOrder;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importjava.util.ArrayList;importjava.util.List;importjava.util.function.Function;publicclassESUtil{privatestaticfinalLogger log =LoggerFactory.getLogger(ESUtil.class);privatestaticfinallong SCROLL_TIMEOUT =180000;privatestaticint SIZE =1000;privatestaticint MAX_BUFFER =209715200;/**
     * 构建SearchResponse
     *
     * @param client     restHighLevelClient
     * @param indices    索引
     * @param query      queryBuilder
     * @param includes   包含的字段
     * @param orderField 排序字段
     * @param order      排序类型
     * @param fun        返回函数
     * @param <T>        返回类型
     * @return List, 可以使用fun转换为T结果
     * @throws Exception
     */publicstatic<T>List<T>searchResponse(RestHighLevelClient client,String[] indices,QueryBuilder query,String[] includes,String orderField,SortOrder order,Function<SearchHit,T> fun)throwsException{//滚动查询的ScrollScroll scroll =newScroll(TimeValue.timeValueMillis(SCROLL_TIMEOUT));//构建searchRequestSearchRequest request =newSearchRequest(indices);SearchSourceBuilder sourceBuilder =newSearchSourceBuilder();if(includes !=null){//构造器加入需要查找的字段
            sourceBuilder.fetchSource(includes,null);}//加入query语句
        sourceBuilder.query(query);//每次滚动的长度
        sourceBuilder.size(SIZE);//加入排序字段if(orderField !=null&&!"".equals(orderField.trim())){
            sourceBuilder.sort(orderField, order);}//加入scroll和构造器
        request.scroll(scroll);
        request.source(sourceBuilder);//存储scroll的listList<String> scrollIdList =newArrayList<>();//返回结果SearchResponse searchResponse = client.search(request,RequestOptions.DEFAULT);//拿到第一个ScrollId(游标)String scrollId = searchResponse.getScrollId();//拿到hits结果SearchHit[] hits = searchResponse.getHits().getHits();//保存返回结果ListList<T> result =newArrayList<>();
        scrollIdList.add(scrollId);try{//滚动查询将SearchHit封装到result中while(ArrayUtils.isNotEmpty(hits)){for(SearchHit hit : hits){//Function<SearchHit, T>, 输入SearchHit,经过操作后,返回T结果
                    result.add(fun.apply(hit));}//说明滚动完了,返回结果即可if(hits.length < SIZE){break;}//继续滚动,根据上一个游标,得到这次开始查询位置SearchScrollRequest searchScrollRequest =newSearchScrollRequest(scrollId);
                searchScrollRequest.scroll(scroll);//得到结果SearchResponse searchScrollResponse = client.scroll(searchScrollRequest,RequestOptions.DEFAULT);//定位游标
                scrollId = searchScrollResponse.getScrollId();
                hits = searchScrollResponse.getHits().getHits();
                scrollIdList.add(scrollId);}}finally{//清理scroll,释放资源ClearScrollRequest clearScrollRequest =newClearScrollRequest();
            clearScrollRequest.setScrollIds(scrollIdList);
            client.clearScroll(clearScrollRequest,RequestOptions.DEFAULT);}return result;}/**
     * 聚合查询的SearchResponse
     * @param client
     * @param indices        索引
     * @param query QueryBuilder
     * @param aggregations  AggregationBuilder
     * @return SearchResponse
     * @throws Exception
     */publicstaticSearchResponsesearchResponse(RestHighLevelClient client,String[] indices,QueryBuilder query,AggregationBuilder... aggregations)throwsException{//构建request请求SearchRequest request =newSearchRequest(indices);SearchSourceBuilder sourceBuilder =newSearchSourceBuilder();
        sourceBuilder.query(query);//加入Aggif(aggregations !=null&& aggregations.length >0){for(AggregationBuilder aggregation : aggregations){
                sourceBuilder.aggregation(aggregation);}}
        sourceBuilder.size(0);//忽略不可用索引,只用于开放索引
        request.indicesOptions(IndicesOptions.lenientExpandOpen());
        request.source(sourceBuilder);return client.search(request,RequestOptions.DEFAULT);}}

希望对你有所帮助,Thank you for whatching!!!😆😆😆😆😆😆


本文转载自: https://blog.csdn.net/qq_40454136/article/details/124866251
版权归原作者 卡卡东~ 所有, 如有侵权,请联系我们删除。

“查es大于10000条数据-滚动查询(scroll)”的评论:

还没有评论