Elasticsearch学习总结八 ElasticSearch中的聚合操作

原创
2017/06/15 21:43
阅读数 235

首先准备数据,索引包含四个字段fieldA,fieldB,fieldC,fieldD,如下图,以下案列中都使用了基本REST命令和JavaAP两种方式实现

输入图片说明

1). 首先按照某个字段fieldC分组统计,相当于sql 中的group by操作,

curl -XPOST "http://121.40.128.155:9200/tempindex/_search?pretty" -d '{
"size": 0,
  "aggs": {
    "fieldC_count": {
      "terms": {
        "field": "fieldC"
      }
    }
  }
}'

返回值如下:
{
  "took" : 3,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 5,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "fieldC_count" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [ {
        "key" : "java",
        "doc_count" : 3
      }, {
        "key" : "c++",
        "doc_count" : 1
      }, {
        "key" : "ptyhone",
        "doc_count" : 1
      } ]
    }
  }
}

对应的JavaApi是如何实现的呢

EsSearchManager esSearchManager = EsSearchManager.getInstance();
SearchRequestBuilder searchReq = esSearchManager.client.prepareSearch("tempindex");
searchReq.setTypes("tempindex");
//group by 条件
TermsBuilder termsb = AggregationBuilders.terms("my_fieldC").field("fieldC").size(100);
searchReq.addAggregation(termsb);
SearchResponse searchRes = searchReq.execute().actionGet();
Terms fieldATerms = searchRes.getAggregations().get("my_fieldC");
for (Terms.Bucket filedABucket : fieldATerms.getBuckets()) {
	//fieldA
	String groupbyKey = filedABucket.getKey().toString();
	//COUNT(fieldA)
	long  countValue = filedABucket.getDocCount();
}

2).统计某一个字段的最大最小值

curl -XPOST "http://121.40.128.155:9200/tempindex/_search?pretty" -d '{
  "size": 0,
  "aggs": {
    "max_fieldA": {
      "max": {
        "field": "fieldA"
      }
    },
    "min_fieldA": {
      "min": {
        "field": "fieldA"
      }
    }
  }
}'

返回值如下:
{
  "took" : 10,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 5,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "max_fieldA" : {
      "value" : 25.0
    },
    "min_fieldA" : {
      "value" : 10.0
    }
  }
}

对应的JavaApi实现如下

EsSearchManager esSearchManager = EsSearchManager.getInstance();
SearchRequestBuilder searchReq = esSearchManager.client.prepareSearch("tempindex");
searchReq.setTypes("tempindex");
MaxBuilder maxBuilder = AggregationBuilders.max("max_fieldA").field("fieldA");
searchReq.addAggregation(maxBuilder);
MinBuilder minBuilder = AggregationBuilders.min("min_fieldA").field("fieldA");
searchReq.addAggregation(minBuilder);
SearchResponse searchRes = searchReq.execute().actionGet();
InternalMax internalMax=searchRes.getAggregations().get("max_fieldA");
System.out.println(internalMax.getName() +"="+ internalMax.getValue());
InternalMin internalMin=searchRes.getAggregations().get("min_fieldA");
System.out.println(internalMin.getName() +"="+ internalMin.getValue());

3).Average平均值 按照某个字段求平均值

curl -XPOST "http://121.40.128.155:9200/tempindex/_search?pretty" -d '{
"size": 0,
"aggs": {
    "per_count": {
        "terms": {
            "field": "fieldC"
        },
        "aggs": {
            "avg_fieldB": {
                "avg": {
                    "field": "fieldB"
                }
            }
        }
    }
}
}'	

返回值如下:
{
  "took" : 3,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 5,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "per_count" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [ {
        "key" : "java",
        "doc_count" : 3,
        "avg_fieldB" : {
          "value" : 20.0
        }
      }, {
        "key" : "c++",
        "doc_count" : 1,
        "avg_fieldB" : {
          "value" : 35.0
        }
      }, {
        "key" : "ptyhone",
        "doc_count" : 1,
        "avg_fieldB" : {
          "value" : 25.0
        }
      } ]
    }
  }
}

对应的JavaApi实现如下

EsSearchManager esSearchManager = EsSearchManager.getInstance();
SearchRequestBuilder searchReq = esSearchManager.client.prepareSearch("tempindex");
searchReq.setTypes("tempindex");
TermsBuilder termsb = AggregationBuilders.terms("my_fieldC").field("fieldC").size(100);
termsb.subAggregation(AggregationBuilders.avg("my_avg_fieldB").field("fieldB"));
searchReq.setQuery(QueryBuilders.matchAllQuery()).addAggregation(termsb);
SearchResponse searchRes = searchReq.execute().actionGet();
Terms fieldATerms = searchRes.getAggregations().get("my_fieldC");
for (Terms.Bucket filedABucket : fieldATerms.getBuckets()) {
	String fieldAValue = filedABucket.getKey().toString();
	long fieldACount = filedABucket.getDocCount();
	Avg avgagg = filedABucket.getAggregations().get("my_avg_fieldB");
	double avgFieldB = avgagg.getValue();
	System.out.println("fieldAValue="+fieldAValue);
	System.out.println("fieldACount="+fieldACount);
	System.out.println("avgFieldB="+avgFieldB);
}

4).Sum求和,求某个字段的sum之和

curl -XPOST "http://121.40.128.155:9200/tempindex/_search?pretty" -d '{
"size": 0,
"aggs": {
    "per_count": {
        "terms": {
            "field": "fieldC"
        },
        "aggs": {
            "sum_fieldB": {
                "sum": {
                    "field": "fieldB"
                }
            }
        }
    }
}
}'
返回值如下:
{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 5,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "per_count" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [ {
        "key" : "java",
        "doc_count" : 3,
        "sum_fieldB" : {
          "value" : 60.0
        }
      }, {
        "key" : "c++",
        "doc_count" : 1,
        "sum_fieldB" : {
          "value" : 35.0
        }
      }, {
        "key" : "ptyhone",
        "doc_count" : 1,
        "sum_fieldB" : {
          "value" : 25.0
        }
      } ]
    }
  }

对应的Java代码如下

EsSearchManager esSearchManager = EsSearchManager.getInstance();
SearchRequestBuilder searchReq = esSearchManager.client.prepareSearch("tempindex");
searchReq.setTypes("tempindex");
TermsBuilder termsb = AggregationBuilders.terms("my_fieldC").field("fieldC").size(100);
termsb.subAggregation(AggregationBuilders.sum("my_sum_fieldB").field("fieldB"));
searchReq.setQuery(QueryBuilders.matchAllQuery()).addAggregation(termsb);
SearchResponse searchRes = searchReq.execute().actionGet();
Terms fieldATerms = searchRes.getAggregations().get("my_fieldC");
for (Terms.Bucket filedABucket : fieldATerms.getBuckets()) {
	String fieldAValue = filedABucket.getKey().toString();
	long fieldACount = filedABucket.getDocCount();
	Avg avgagg = filedABucket.getAggregations().get("my_sum_fieldB");
	double avgFieldB = avgagg.getValue();
	System.out.println("fieldAValue="+fieldAValue);
	System.out.println("fieldACount="+fieldACount);
	System.out.println("avgFieldB="+avgFieldB);
}

以上总结了部分es基本操作API和调用demo,详细的代码请查看github地址 https://github.com/winstonelei/BigDataTools

更多资料请查询官网api https://www.elastic.co/guide/en/elasticsearch/client/java-api/2.3/_bucket_aggregations.html

展开阅读全文
打赏
0
3 收藏
分享
加载中
更多评论
打赏
0 评论
3 收藏
0
分享
返回顶部
顶部