在解决工作中的一些ES查询问题时,遇到一些初级阶段的问题,所以在这篇文章里记录一下具体解决方案,包括此后将会遇到的问题

##聚合查询

###管道聚合(Pipeline Aggregation)

  • 示例问题:假设有A,B,C,D,E,F...若干种类型数据,我们想统计其每个类型的数据量(count),并且得到count后的maximum,minimum,average三个数据,通常在类型较少的情况下,可以采用编程解决,但如果数据类型多并且包含其他更复杂的查询情况,我使用了MaxBucket管道聚合查询,同时得出上面的所有期望结果。

代码示例:

SearchRequestBuilder requestBuilder = client.prepareSearch(scoreIndex).setTypes(scoreType);
AggregationBuilder count = AggregationBuilders.count("count_score").field("score");
BucketMetricsPipelineAggregationBuilder max = new MaxBucketPipelineAggregationBuilder("max_score", "fieldIds>count_score");
BucketMetricsPipelineAggregationBuilder min = new MinBucketPipelineAggregationBuilder("min_score", "fieldIds>count_score");
BucketMetricsPipelineAggregationBuilder avg = new AvgBucketPipelineAggregationBuilder("avg_score", "fieldIds>count_score");
TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("fieldIds").field("fieldId.keyword").order(Terms.Order.aggregation("count_score", true));
requestBuilder.addAggregation(termsAggregationBuilder.subAggregation(count));
requestBuilder.addAggregation(max).addAggregation(min).addAggregation(avg);
SearchResponse response = requestBuilder.execute().actionGet();
InternalBucketMetricValue maxCount = response.getAggregations().get("max_score");
InternalBucketMetricValue minCount = response.getAggregations().get("min_score");
InternalSimpleValue avgCount = response.getAggregations().get("avg_score");

结果:

"data": {
        "maxinum": 801,
        "minimum": 319,
        "average": 560
    }

###日期聚合

  • 示例问题:还是上面的问题,仍然还是A,B,C,D,E,F...若干种类型数据,但是这一次呢,我们不仅要查上面的期望结果,而且还想按照日期来分组,就比如我想知道
    今天,昨天,以及过去时的每种类型数据的总量,假如我们每条数据中有@timestamp与一些能标识时间的字段,看看怎么使用吧!

代码示例:

SearchRequestBuilder requestBuilder = client.prepareSearch(scoreIndex).setTypes(scoreType);
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery()
                .must(QueryBuilders.matchQuery("fieldId.keyword", fieldId));
AggregationBuilder time = AggregationBuilders.dateHistogram("timeAgg").field("@timestamp")
        .dateHistogramInterval(this.getIntervalType(timeUnit)).format(timeUnit.getObject().format)
        .timeZone(DateTimeZone.getDefault());
AggregationBuilder countAgg = AggregationBuilders.count("count_score").field("score");
time.subAggregation(countAgg);
boolQueryBuilder.must(buildServiceNameQuery(startDate, endDate));
SearchResponse searchResponse = requestBuilder.setQuery(boolQueryBuilder)
        .addAggregation(time)
        .execute().actionGet();
Histogram timeAgg = searchResponse.getAggregations().get("timeAgg");
for (Histogram.Bucket entry : timeAgg.getBuckets()) {
    InternalValueCount count = entry.getAggregations().get("count_score");
    Map result = new HashMap<String, Object>(1) {
                 private static final long serialVersionUID = 4403705784994979571L;
                 {put(entry.getKeyAsString(), count.value());}
                 };
}

结果:

"data": [
        {
            "2019-06-04 09": 83
        },
        {
            "2019-06-04 10": 118
        },
        {
            "2019-06-04 11": 80
        },
        {
            "2019-06-04 12": 38
        }
    ]

*备忘,aggregation sort hits

Date now = new DateTime().minusMinutes(10).toDate();
        SearchRequestBuilder requestBuilder = client.prepareSearch(esIndexProperties.getDeviceLogIndex() + "*").setTypes(esIndexProperties.getDeviceLogType());
        SearchResponse searchResponse = requestBuilder
                .setSize(0)
                .setQuery(QueryBuilders.boolQuery().must(QueryBuilders.rangeQuery("@timestamp")
                        .from(DateUtils.formatDate(now, "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"))
                        .to(DateUtils.formatDate(DateTime.now().toDate(), "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")))
                )
                .addAggregation(
                        AggregationBuilders.terms("projectTerm").field("projectId.keyword")
                                .subAggregation(AggregationBuilders.terms("fieldTerm").field("fieldId.keyword"))
                                .subAggregation(
                                        AggregationBuilders.terms("deviceTerm").size(Integer.MAX_VALUE).field("deviceId.keyword")
                                        .subAggregation(AggregationBuilders.terms("stateTerm").field("deviceStateEnum.keyword"))
                                        .subAggregation(
                                                AggregationBuilders.topHits("top")
                                                        .explain(true)
                                                        .size(1)
                                                        .from(0)
                                                        .sort("@timestamp", SortOrder.DESC))
                                )
                )
                .execute().actionGet();
        StringTerms projectTerms = searchResponse.getAggregations().get("projectTerm");
        Map<String, Object> result = new HashMap<>(projectTerms.getBuckets().size());
        for (StringTerms.Bucket bucket : projectTerms.getBuckets()) {
            StringTerms fieldTerm = (StringTerms) bucket.getAggregations().getAsMap().get("fieldTerm");
            String fieldId = fieldTerm.getBuckets().get(0).getKeyAsString();
            StringTerms serviceTerm = (StringTerms) bucket.getAggregations().getAsMap().get("deviceTerm");
            for (StringTerms.Bucket serviceBucket : serviceTerm.getBuckets()) {
                SearchHit[] hits= ((InternalTopHits)serviceBucket.getAggregations().getAsMap().get("top")).getHits().getHits();
                if (null != hits && hits.length > 0) {
                    Map<String, Object> source = hits[0].getSourceAsMap();
                    if (!CollectionUtils.isEmpty(source)) {
                        Date maxDate;
                        try {
                            DeviceStateEnum stateEnum = DeviceStateEnum.valueOf((String) source.get("deviceStateEnum"));
                            if (DeviceStateEnum.onLine == stateEnum) {
                                maxDate = DateUtils.parseDate((String) source.get("@timestamp"), "yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
                                result.put(fieldId + serviceBucket.getKeyAsString(), maxDate);
                            }
                        } catch (Exception e) {
                            log.error(e.getMessage(), e);
                        }
                    }
                }
            }
        }