分组聚合及嵌套查询
聚合查询可以理解为SQL中的求和、求最大值、最小值以及求均值的需求
嵌套可以理解为es存值的某一个字段为对象属性的值做处理.
Elasticsearch Java API分组与聚合结合
- 其中对字段field_one进行分组,分组的别名为fieldOne,取2^31-1组数据.如果不设置size,查询的结果会返回默认size大小.
AggregationBuilder oneAgg =AggregationBuilders.terms(“fieldOne”).field(“field_one”).size(2^31-1);
- 需要对多个字段分组
// 再创建一个AggregationBuilder twoAgg
=AggregationBuilders.terms(“fieldTwo”).field(“field_two”).size(2^31-1);// 将两个分组结合
searchSourceBuilder.aggregation(oneAgg.subAggregation(twoAgg));
- 需要将分组结果的其他字段再进行统计的sum、min、max、avg聚合
AggregationBuilder threeAgg =AggregationBuilders.sum(“fieldThree”).field(“field_three”);// 将数据通过field_one、field_two字段进行分组,对field_three进行求和聚合
searchSourceBuilder.aggregation(oneAgg.subAggregation(twoAgg.subAggregation(threeAgg)));
- 只聚合不分组
searchSourceBuilder.aggregation(threeAgg);
简单的分组查询
// 分页设置privateInteger start =0;privateInteger length =20;EsPage esPage =newEsPage(start,length);// 排序设置List<EsSort> esSorts =Lists.newArrayList(newEsSort("userId",SortOrder.ASC),newEsSort(id,SortOrder.DESC));// 查询条件BoolQueryBuilder queryBuilder =newBoolQueryBuilder();
queryBuilder.must(newTermQueryBuilder("userName","xxxx"));// 组装查询对象SearchRequest searchRequest =newSearchRequest("user_list");// 1、添加普通查询条件SearchSourceBuilder searchSourceBuilder =newSearchSourceBuilder().query(queryBuilder)// 聚合无视query筛选出的数据.size(0).from(0).fetchSource(false);// 组装聚合查询的条件TermsAggregationBuilder aggregationBuilder =AggregationBuilders.terms("group_by_userId")// 需要查询的字段.field("userId")// 分页.size(page.getPage()+ page.getSize());// 2、添加聚合查询条件
searchSourceBuilder.aggregation(aggregationBuilder);// 3、添加排序条件
searchSourceBuilder.sort(sort.getSortFiled(), sort.getSortOrder());
searchRequest.source(searchSourceBuilder);try{SearchResponse searchResponse = restClient.search(searchRequest,RequestOptions.DEFAULT);// 获取总数long sumTotal = searchResponse.getHits().getTotalHits().value;if(sumTotal >=MAX_PAGE_DEPTH){thrownewEsDataException("目前暂不支持超过10000条的数据进行视图聚合,请修改聚合任务的数量");}// sumTotal:查询出来的总数// searchResponse.getAggregations():聚合之后查询出来的数据returnPair.of(sumTotal, searchResponse.getAggregations());}catch(IOException e){EsExceptionUtil.dealIOException(e);}catch(ElasticsearchException e1){EsExceptionUtil.dealElasticsearchException(e1);}
解析查聚合结果
// 查询出来的总数int count = pair.getLeft().intValue();// 聚合之后查询出来的数据Aggregations aggregations = pair.getRight();// 根据聚合时的聚合名称,获取数据Terms terms = aggregations.get("group_by_userId");List<Long> userIds =Lists.newArrayList();if(terms !=null&&CollectionUtils.isNotEmpty(terms.getBuckets())){// terms.getBuckets():就是聚合之后的每条数据桶for(Terms.Bucket bucket : terms.getBuckets()){// 遍历得到桶中的key,就是聚合后,查询的字段L:userIdlong key =NumberUtils.toLong(bucket.getKeyAsString(),0);if(key ==0){continue;}
userIds.add(key);}}
上面是最简单的一层聚合查询,其实其他复杂的聚合也都是大同小异,就是一个套娃的操作.
复杂聚合查询
publicMap<String,Map<String,Long>>aggTwoArgs(String indices,QueryBuilder queryBuilder,String args1,String args2,int i)throwsIOException{Map<String,Map<String,Long>> map =newHashMap<>();//设置要查询的索引SearchRequest request =newSearchRequest().indices(indices);//构建搜索SearchSourceBuilder sourceBuilder =newSearchSourceBuilder();//添加搜索长度
sourceBuilder.size(0);//添加搜索条件
sourceBuilder.query(queryBuilder);//设置要聚合的字段以及条数//设置该次聚合的名称 terms(args1)//以及要聚合的字段field(args1 + ".keyword") 添加keyword是对字段进行不分词查询。TermsAggregationBuilder agg1 =AggregationBuilders.terms(args1).field(args1 +".keyword").size(i);//设置子聚合以及条数,设置返回数据中该聚合的名称 terms(args2),以及要聚合的字段field(args2 + ".keyword")TermsAggregationBuilder agg2 =AggregationBuilders.terms(args2).field(args2 +".keyword").size(i);//合并子聚合
agg1.subAggregation(agg2);//添加聚合查询
sourceBuilder.aggregation(agg1);//创建请求
request.source(sourceBuilder);//发送请求SearchResponse response = client.search(request,RequestOptions.DEFAULT);Aggregations aggregations = response.getAggregations();// 取第一个分组Terms terms1 = aggregations.get(args1);for(Terms.Bucket bucket1 : terms1.getBuckets()){// 取分组下的分组Terms terms2 = bucket1.getAggregations().get(args2);Map<String,Long> map1 =newHashMap<>();for(Terms.Bucket bucket2 : terms2.getBuckets()){
map1.put(bucket2.getKeyAsString(), bucket2.getDocCount());}// bucket1.getKeyAsString():第一个分组的值// map1:第二个分组下的map值
map.put(bucket1.getKeyAsString(), map1);}return map;}
ElasticSearch Java API嵌套
publicstatic final String MAPPING=""+"{\n"+" \"properties\": {\n"+" \"itemAttributes\": {\n"+" \"type\": \"nested\",\n"+" \"properties\": {\n"+" \"specId\": {\n"+" \"type\": \"long\"\n"+" },\n"+" \"value\": {\n"+" \"type\": \"text\",\n"+" \"analyzer\": \"ngram_analyzer\",\n"+" \"fields\": {\n"+" \"value_keyword\": {\n"+" \"type\": \"keyword\"\n"+" }\n"+" }\n"+" },\n"+" \"key\": {\n"+" \"type\": \"long\"\n"+" },\n"+" \"ip\": {\n"+" \"type\": \"long\"\n"+" }\n"+" }\n"+" }\n"+" }\n"+"}";
上面ES的mapping中的itemAttributes字段就是一个嵌套式的数据结构.
所以上面的分组和聚合对itemAttributes对象下的属性不适用,需要再做一层处理.
// 对于嵌套的聚合,需要新建一个NestedAggregationBuilder 对象”nestedAgg”为别名NestedAggregationBuilder nestedAggregationBuilder =AggregationBuilders.nested(“nestedAgg”, “itemAttributes”);// 分别对nested对象下的属性建分组AggregationBuilder specIdAgg =AggregationBuilders.terms(“fieldSpecId”).field(“itemAttributes.specId”).size(2^31-1);AggregationBuilder valueAgg =AggregationBuilders.terms(“fieldValue”).field(“itemAttributes.value”).size(2^31-1);AggregationBuilder keyAgg =AggregationBuilders.terms(“fieldKey”).field(“itemAttributes.key”).size(2^31-1);AggregationBuilder ipAgg =AggregationBuilders.terms(“fieldIp”).field(“itemAttributes.ip”).size(2^31-1);// 再将上面写的一个个聚合体放入nestedAggregationBuilder,需要将上面的specIdAgg、valueAgg、keyAgg聚合相互之间有关联需要一层一层关联如下
nestedAggregationBuilder.subAggregation(
specIdAgg.subAggregation(valueAgg.subAggregation(ipAgg.subAggregation(AggregationBuilders.sum(“keyAgg”).field(“itemAttributes.key”)))));
searchSourceBuilder.query(boolQueryBuilder).size(0);
searchSourceBuilder.aggregation(nestedAggregationBuilder);
searchRequest.source(searchSourceBuilder);// 注意searchSourceBuilder.aggregation(nestedAggregationBuilder);// 这个段代码这是定义聚合方式的。// searchSourceBuilder//.aggregation(specIdAgg)//.aggregation(valueAgg)//.aggregation(ipAgg)//.aggregation(keyAgg); // 和上方聚合方式完全不一样,单一将数据分组,聚合相互之间没有关联.
try{SearchResponse searchResponse = restHighLevelClient.search(searchRequest);Nested nested = searchResponse.getAggregations().get(“nestedAgg”);Aggregations nestedAggregations = nested.getAggregations();Aggregation nestedAggregation = nestedAggregations.asList().get(0);// 取第一个分组List<?extendsTerms.Bucket> specIdBuckets =((ParsedStringTerms) nestedAggregation).getBuckets();for(Terms.Bucket specIdBucket : specIdBuckets ){// 第一个分组的值String specId = specIdBucket .getKey().toString();// 取第二个分组List<Aggregation> valueAggregations = specIdBucket .getAggregations().asList();List<?extendsTerms.Bucket> valueBuckets =((ParsedStringTerms) valueAggregations .get(0)).getBuckets();for(Terms.Bucket valueBucket : valueBuckets ){// 第二个分组的值String vlaue = valueBucket .getKey().toString();// 取第三个分组List<Aggregation> ipAggregations = valueBucket.getAggregations().asList();List<?extendsTerms.Bucket> ipBuckets =((ParsedStringTerms)ipAggregations .get(0)).getBuckets();for(Terms.Bucket ipBucket : ipBuckets ){// 第三个分组的值String ip= ipBucket .getKey().toString();// 第四个分组Aggregation keyeAggregation = ipBucket .getAggregations().asList().get(0);// 第四个分组的值Integer count =(int)((ParsedSum) keyAggregation ).getValue();System.out.print(fieldOne);System.out.print(filedtwo);System.out.print(ip);System.out.print(count );}}}}catch(IOException e){
log.error("数据解析错误:{}", e);}
版权归原作者 白帆瀚宇 所有, 如有侵权,请联系我们删除。