在解决工作中的一些ES查询问题时,遇到一些初级阶段的问题,所以在这篇文章里记录一下具体解决方案,包括此后将会遇到的问题
##聚合查询
###管道聚合(Pipeline Aggregation)
- 示例问题:假设有A,B,C,D,E,F...若干种类型数据,我们想统计其每个类型的数据量(count),并且得到count后的maximum,minimum,average三个数据,通常在类型较少的情况下,可以采用编程解决,但如果数据类型多并且包含其他更复杂的查询情况,我使用了MaxBucket管道聚合查询,同时得出上面的所有期望结果。
代码示例:
SearchRequestBuilder requestBuilder = client.prepareSearch(scoreIndex).setTypes(scoreType);
AggregationBuilder count = AggregationBuilders.count("count_score").field("score");
BucketMetricsPipelineAggregationBuilder max = new MaxBucketPipelineAggregationBuilder("max_score", "fieldIds>count_score");
BucketMetricsPipelineAggregationBuilder min = new MinBucketPipelineAggregationBuilder("min_score", "fieldIds>count_score");
BucketMetricsPipelineAggregationBuilder avg = new AvgBucketPipelineAggregationBuilder("avg_score", "fieldIds>count_score");
TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("fieldIds").field("fieldId.keyword").order(Terms.Order.aggregation("count_score", true));
requestBuilder.addAggregation(termsAggregationBuilder.subAggregation(count));
requestBuilder.addAggregation(max).addAggregation(min).addAggregation(avg);
SearchResponse response = requestBuilder.execute().actionGet();
InternalBucketMetricValue maxCount = response.getAggregations().get("max_score");
InternalBucketMetricValue minCount = response.getAggregations().get("min_score");
InternalSimpleValue avgCount = response.getAggregations().get("avg_score");
结果:
"data": {
"maxinum": 801,
"minimum": 319,
"average": 560
}
###日期聚合
- 示例问题:还是上面的问题,仍然还是A,B,C,D,E,F...若干种类型数据,但是这一次呢,我们不仅要查上面的期望结果,而且还想按照日期来分组,就比如我想知道
今天,昨天,以及过去时的每种类型数据的总量,假如我们每条数据中有@timestamp与一些能标识时间的字段,看看怎么使用吧!
代码示例:
SearchRequestBuilder requestBuilder = client.prepareSearch(scoreIndex).setTypes(scoreType);
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("fieldId.keyword", fieldId));
AggregationBuilder time = AggregationBuilders.dateHistogram("timeAgg").field("@timestamp")
.dateHistogramInterval(this.getIntervalType(timeUnit)).format(timeUnit.getObject().format)
.timeZone(DateTimeZone.getDefault());
AggregationBuilder countAgg = AggregationBuilders.count("count_score").field("score");
time.subAggregation(countAgg);
boolQueryBuilder.must(buildServiceNameQuery(startDate, endDate));
SearchResponse searchResponse = requestBuilder.setQuery(boolQueryBuilder)
.addAggregation(time)
.execute().actionGet();
Histogram timeAgg = searchResponse.getAggregations().get("timeAgg");
for (Histogram.Bucket entry : timeAgg.getBuckets()) {
InternalValueCount count = entry.getAggregations().get("count_score");
Map result = new HashMap<String, Object>(1) {
private static final long serialVersionUID = 4403705784994979571L;
{put(entry.getKeyAsString(), count.value());}
};
}
结果:
"data": [
{
"2019-06-04 09": 83
},
{
"2019-06-04 10": 118
},
{
"2019-06-04 11": 80
},
{
"2019-06-04 12": 38
}
]
*备忘,aggregation sort hits
Date now = new DateTime().minusMinutes(10).toDate();
SearchRequestBuilder requestBuilder = client.prepareSearch(esIndexProperties.getDeviceLogIndex() + "*").setTypes(esIndexProperties.getDeviceLogType());
SearchResponse searchResponse = requestBuilder
.setSize(0)
.setQuery(QueryBuilders.boolQuery().must(QueryBuilders.rangeQuery("@timestamp")
.from(DateUtils.formatDate(now, "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"))
.to(DateUtils.formatDate(DateTime.now().toDate(), "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")))
)
.addAggregation(
AggregationBuilders.terms("projectTerm").field("projectId.keyword")
.subAggregation(AggregationBuilders.terms("fieldTerm").field("fieldId.keyword"))
.subAggregation(
AggregationBuilders.terms("deviceTerm").size(Integer.MAX_VALUE).field("deviceId.keyword")
.subAggregation(AggregationBuilders.terms("stateTerm").field("deviceStateEnum.keyword"))
.subAggregation(
AggregationBuilders.topHits("top")
.explain(true)
.size(1)
.from(0)
.sort("@timestamp", SortOrder.DESC))
)
)
.execute().actionGet();
StringTerms projectTerms = searchResponse.getAggregations().get("projectTerm");
Map<String, Object> result = new HashMap<>(projectTerms.getBuckets().size());
for (StringTerms.Bucket bucket : projectTerms.getBuckets()) {
StringTerms fieldTerm = (StringTerms) bucket.getAggregations().getAsMap().get("fieldTerm");
String fieldId = fieldTerm.getBuckets().get(0).getKeyAsString();
StringTerms serviceTerm = (StringTerms) bucket.getAggregations().getAsMap().get("deviceTerm");
for (StringTerms.Bucket serviceBucket : serviceTerm.getBuckets()) {
SearchHit[] hits= ((InternalTopHits)serviceBucket.getAggregations().getAsMap().get("top")).getHits().getHits();
if (null != hits && hits.length > 0) {
Map<String, Object> source = hits[0].getSourceAsMap();
if (!CollectionUtils.isEmpty(source)) {
Date maxDate;
try {
DeviceStateEnum stateEnum = DeviceStateEnum.valueOf((String) source.get("deviceStateEnum"));
if (DeviceStateEnum.onLine == stateEnum) {
maxDate = DateUtils.parseDate((String) source.get("@timestamp"), "yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
result.put(fieldId + serviceBucket.getKeyAsString(), maxDate);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}
}
}