再续一下前面两篇相关的内容,ElasticSearch 分页查询方法from&size,scroll,及search_after浅析es聚合导出(不是),Map#merge方法使用

在开发那个聚合导出功能的时候,最先考虑到的其实不是scroll扫数据并内存计算,之前了解过ElasticSeach是可以支持聚合计算查询的,但是没有实际在大批量数据的索引中使用过,所以再开发功能的设计阶段进行了测试之后发现并不能满足我们功能开发的需求,遂放弃了。网上使用aggs进行ElasticSearch聚合查询的介绍文章也很多,于是写下本文做一点记录,和给别人提供一点掉坑提醒

官方文档地址:https://www.elastic.co/guide/cn/elasticsearch/guide/current/aggregations.html

详尽的使用方式你也可以再这里面找到

下面是坑记录

  • 字段类型选择

因为开发不规范、对ElasticSearch不了解等原因,原本功能开发的时候建的ES索引中对应订单价格、运费等字段的字段类型全部都是keyword类型的。而这次要做聚合统计的话,要对价格字段进行聚合统计执行类似如下语句的时候

GET /your_es_index/your_es_type/_search
{
  "size":0,
  "aggs":{
    "sum_claims":{
      "sum":{
        "field":"cost_of_claims"
      }
    }
  }
}

会得到一个这样的错误信息

Expected numeric type on field [cost_of_claims], but got [keyword]"}

基本意思就是cost_of_claims字段类型为keyword,但是进行聚合统计运算的时候期望cost_of_claims字段为numeric型的。也就是说明ES的聚合统计运算只能作用于numeric型字段上。那么遇到问题解决问题,接下来就是想要尝试将cost_of_claims变更为numeric型字段这件事上。

在进行avg和sum等Metrics Aggregations(度量聚合)的时候报错的,进行数学维度的计算,期望得到一个数字字段,但是得到的是一个String的keyword类型,若把cost_of_claims.keyword换成cost_of_claims,则又会报另一个错误。则需要在index数据之前进行mapping配置,将该字段的值定义为numeric,或者开启mapping的Numeric detection(数字探测)

但是这时候遇到个问题,在es中,是不支持更改现有字段的映射或字段类型的,如果我们非得需要更改字段的类型,怎么办,数据迁移,重建索引,建立我们想要的正确的映射规则。接下来新建一个相同结构的索引,对应字段修改为numeric的类型,然后把老索引的数据reindex过来。同时,reindex之后把老索引删除,并给新索引添加一个别名和老索引的名字一样,这样就可以完整切换过来,且不用修改现有业务代码中和老索引相关的内容。

按照这个思路,到生产环境看了下现有使用的索引,总共有25亿多条数据,分了30个shard,数据有点多执行reindex的话也不是不可以,所以暂时勉强认为是可行的。到这里我依然觉得这个方向是可以的,所以继续往下尝试,验证方案的可行性。

  • ElasticSearch聚合问题

在测试环境构建了用于测试的索引,分10个shard,并且我写了份代码构造了20万条数据写入索引之后,接下来就是写出ES聚合查询语句,验证查询结果,确认这个方案是否可行了

结构如这样

get /numerical_aggr_test_index
{
  "numerical_aggr_test_index": {
    "aliases": {},
    "mappings": {
      "numerical_aggr_test_index": {
        "dynamic": "strict",
        "_all": {...},
        "properties": {
          "brand_code": {
            "type": "keyword"
          },
          "bu_code": {
            "type": "keyword"
          },
          "item_id": {
            "type": "keyword"
          },
          "oid": {
            "type": "keyword"
          },
          "parent_supplier_name": {
            "type": "keyword"
          },
          "sn_supplier_user_name": {
            "type": "keyword"
          },
          "store_id": {
            "type": "keyword"
          },
          "pay_time": {
            "type": "date",
            "format": "yyyy-MM-dd HH:mm:ss"
          },
          "total_fee": {
            "type": "double"
          }
        }
      }
    },
    "settings": {
      "index": {
        "search": {...},
        "indexing": {...},
        "number_of_shards": "10",
        "rate": {...},
        "provided_name": "numerical_aggr_test_index",
        "index": {...},
        "creation_date": "1672710199804",
        "number_of_replicas": "1",
        "uuid": "13Zt7A4QQd-oKvgPWBO7ig",
        "version": {...}
      }
    }
  }
}

部分不重要信息已经隐藏

写下查询语句,根据需求按照

测试结果

get /numerical_aggr_test_index/numerical_aggr_test_index/_search
{
  "size": 0,
  "query": {},
  "aggs": {
    "group_by_parent_supplier_name": {
      "terms": {
        "field": "parent_supplier_name"
      },
      "aggs": {
        "group_by_bu_code": {
          "terms": {
            "field": "bu_code"
          },
          "aggs": {
            "group_by_brand_code": {
              "terms": {
                "field": "brand_code"
              },
              "aggs": {
                "group_by_item_id": {
                  "terms": {
                    "field": "item_id"
                  },
                  "aggs": {
                    "total_fee_sum": {
                      "sum": {
                        "field": "total_fee"
                      }
                    }
                  }
                }
              }
            }
          }
        }
      }
    }
  }
}

ElasticSearch返回的结果基本如图

但是这里就遇到点问题了,先别着急,再修改下查询条件再看一下

get /numerical_aggr_test_index/numerical_aggr_test_index/_search
{
  "size": 0,
  "query": {},
  "aggs": {
    "group_by_parent_supplier_name": {
      "terms": {
        "field": "parent_supplier_name"
      },
      "aggs": {
        "total_fee_sum": {
          "sum": {
            "field": "total_fee"
          }
        }
      }
    }
  }
}

结果

{
  "took": 8,
  "timed_out": false,
  "_shards": {
    "total": 10,
    "successful": 10,
    "failed": 0
  },
  "hits": {
    "total": 223816,
    "max_score": 0.0,
    "hits": []
  },
  "aggregations": {
    "group_by_parent_supplier_name": {
      "doc_count_error_upper_bound": 2342,
      "sum_other_doc_count": 211814,
      "buckets": [
        {...},
        {...},
        {...},
        {...},
        {...},
        {
          "key": "SY_GYS3",
          "doc_count": 999,
          "total_fee_sum": {
            "value": 100169.95999999999
          }
        },
        {
          "key": "SY_GYS27",
          "doc_count": 993,
          "total_fee_sum": {
            "value": 99189.88
          }
        },
        {...},
        {...},
        {...}
      ]
    }
  }
}

从这个结果展示出来的内容可以看出几个问题

  1. 最明显的,之前创建索引的时候字段设置的是double型的,结果中的有一条记录很明显的发生了计算机在进行浮点运算的时候经常发生的精度丢失的问题,这个等下再说
  2. 对结果条数注意一下的话可以发现一共只返回了10条结果,这显然是不正确的,10条是ElasticSearch里没有条数设置的时候返回结果的默认条数
  3. 在es的返回结果中有doc_count_error_upper_bound和sum_other_doc_count这两个字段,这两个字段的值又代表什么意思呢?

开始尝试解决这几个问题

  • 浮点运算精度丢失问题

首先double类型在做运算时会存在丢失精度的问题。直接抛结论,使用scaled_float类型的字段代替double类型的字段进行运算,scaled_float类型,在指定合适的缩放因子的前提下可以规避浮点类型运算丢失精度的问题

不过使用scaled_float字段类型可能会带来的问题也要注意下,这篇文章可以参考下,https://blog.csdn.net/Weixiaohuai/article/details/124667345,具体举例了几种情况。我也简单的描述唠叨两句。

首先scaled_float实际存储的是一个具体数值和缩放参数,比如我们常用的金额单位是元,那么设置缩放参数为100,则ES中实际存储的数据为单位为分。比如我将一条金额为13.3277元的数据存入ES,实际存储中首先13.3277经过缩放参数100得到了结果1332.77的结果,再四舍五入得到整形1333并存入ES。不过在查询结果中显示的依然是数字13.3277。

那么接下来在查询语句中我们查询的金额为13.3267元的话,首先ES经过缩放得到1332.67,再四舍五入得到1333,最终匹配上ES中存储的这条数据返回结果。所以会看到这样的现象,当超出缩放参数控制的精度范围之后,我查询的是13.3267元,返回的结果却是13.3277元的。这部分内容在官方文档中也有详细的说明介绍

https://www.elastic.co/guide/en/elasticsearch/reference/current/number.html

另外官方文档中还给出了建议,ElasticSearch对数字类型的字段在range查询的时候做了优化,但是如果你不打算对这个字段进行range查询,那么keyword类型的字段在term查询的时候依然是一个更好的选择。

Consider mapping a numeric identifier as a keyword if:
    You don’t plan to search for the identifier data using range queries.
    Fast retrieval is important. term query searches on keyword fields are often faster than term searches on numeric fields.

精度丢失的问题就到这里,接下来是第二个问题

  • 聚合查询之后的结果默认只返回10条

因为考虑到聚合结果数量过多,最终生产环境中聚合出来的结果可能有上万条记录的情况,所以尝试想要分页处理下,那么就需要尝试加上bucket_sort参数来查询了。不过很遗憾,我得到的一个报错结果

Unknown BaseAggregationBuilder [bucket_sort] - elastic

ES未能识别这个聚合参数,根据官方更新文档所说的,bucket_sort的支持是直到6.1版本之后才新加入的,而我司的ES服务平台上依然运行的是5.X版本的,所以这条路不通

New Features
Aggregations
    Aggregations: bucket_sort pipeline aggregation #27152 (issue: #14928)

https://www.elastic.co/guide/en/elasticsearch/reference/6.8/release-notes-6.1.0.html#_new_features_2

如果你使用的ElasticSearch是支持的,我在网上也看到有人用这个方案实现过聚合分页查询的功能了,参见Elasticsearch聚合后将聚合结果进行分页的解决办法,包括后来这篇作者也提到了我之前写过的cardinality统计数量不精确的问题,《ElasticSearch去重查询统计数量不准确的问题

那么再尝试一下,如果换个更加简单粗暴点的方法,直接在terms查询中增加现在支持的参数”size”试下,修改DSL语句,执行查询

get /numerical_aggr_test_index/numerical_aggr_test_index/_search
{
  "size": 0,
  "query": {},
  "aggs": {
    "group_by_parent_supplier_name": {
      "terms": {
        "field": "parent_supplier_name",
        "size":3000
      },
      "aggs": {
        "total_fee_sum": {
          "sum": {
            "field": "total_fee"
          }
        }
      }
    }
  }
}

确实可以返回所有结果了,具体内容太多,不全都列出来了

{
  "took": 3,
  "timed_out": false,
  "_shards": {
    "total": 10,
    "successful": 10,
    "failed": 0
  },
  "hits": {
    "total": 223816,
    "max_score": 0.0,
    "hits": []
  },
  "aggregations": {
    "group_by_parent_supplier_name": {
      "doc_count_error_upper_bound": 0,
      "sum_other_doc_count": 0,
      "buckets": [
        {
          "key": "SY_GYS41",
          "doc_count": 2357,
          "total_fee_sum": {
            "value": 235060.80999999997
          }
        },
        {
          "key": "SY_GYS18",
          "doc_count": 2344,
          "total_fee_sum": {
            "value": 236917.43999999992
          }
        },
        {
          "key": "SY_GYS21",
          "doc_count": 2323,
          "total_fee_sum": {
            "value": 230288.71000000002
          }
        },
        ...
      ]
    }
  }
}

接下来新的问题,这个size的值我要怎么确定呢?有两个思路,第一直接就像这边demo里这么做的这样,预估一个比较大的,能保证覆盖到所有数据的值。第二个思路,在进行聚合查询之前,先执行一下去重查询得到一个总数的值,之后再带过来进行聚合查询。那么如果是多条件聚合查询的话,我们的查询语句就需要改成如下这样了,在每层的terms中都增加上size参数

get /numerical_aggr_test_index/numerical_aggr_test_index/_search
{
  "size": 0,
  "query": {},
  "aggs": {
    "group_by_parent_supplier_name": {
      "terms": {
        "field": "parent_supplier_name",
        "size": 3000
      },
      "aggs": {
        "group_by_bu_code": {
          "terms": {
            "field": "bu_code",
            "size": 3000
          },
          "aggs": {
            "group_by_brand_code": {
              "terms": {
                "field": "brand_code",
                "size": 3000
              },
              "aggs": {
                "group_by_item_id": {
                  "terms": {
                    "field": "item_id",
                    "size": 3000
                  },
                  "aggs": {
                    "total_fee_sum": {
                      "sum": {
                        "field": "total_fee"
                      }
                    }
                  }
                }
              }
            }
          }
        }
      }
    }
  }
}

首先直接加上size参数尝试执行下,如果是多参数的情况下,各层内的size参数的具体计算逻辑需要再分字段去重统计下,这个过程相对会比较复杂。

接下来的事情,当我把上面这段DSL语句敲入ES服务平台执行的时候,我等了一会儿,没有等到结果,查询超时了。因为平台对查询做了限制,不可能给查询请求超长的连接时间不超时的。这边到这里就没有办法了。

再下一步

  • 聚合统计结果不准确

重新回到上面的查询结果中,可以看到doc_count_error_upper_bound和sum_other_doc_count这两个字段的值都为0。翻翻官方文档,可以从这里找到这两个字段的含义说明

https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-terms-aggregation.html#search-aggregations-bucket-terms-aggregation-approximate-counts

doc_count_error_upper_bound is the maximum number of those missing documents.

sum_other_doc_count is the number of documents that didn’t make it into the the top size terms.

doc_count_error_upper_bound:表示没有在这次聚合中返回,但是可能存在的潜在聚合结果。

sum_other_doc_count:表示这次聚合中没有统计到的文档数。这个好理解,因为ES统计的时候默认只会根据count显示排名前十的分桶。如果分类(这里是目的地)比较多,自然会有文档没有被统计到。

当数据分布与多个 shard 上的时候,Coordinating Node 无法看到数据的全貌,从而会导致聚合的数据不准确。参考文章https://zhuanlan.zhihu.com/p/107820698/中的最后一段“聚合分析计算结果的精确度问题”中的内容,要解决这个问题有两个方案。

第一种,直截了当的把所有索引只使用一个shard,不分分片存储,这样的方式只适合在数据量不多的情况下处理。如果需要处理的数据量巨大,显然不可能把ElasticSearch索引设置为单shard的。那么这条路不通。

另外的,设置更大的shard_size参数,指每次从每个分片上获取 的bucket的 数量,这样获取的数量越多,统计得越多,就必然越是精确的。这和我上面之前修改了size参数,获取更多的返回结果数量达到的效果是一致的。

我设置了更大的size的返回结果数量,那么ElasticSearch则需要在计算得到结果达到这个size之前会不断的从各个shard获取数据,直到获结果数据达到了size数量或者所有数据都获取完成了。而更大的shard_size参数表示ElasticSearch从各个Shard上更多bucket中获取数据,直到达到了shard_size设置的上限,或者Shard中的所有bucket都获取过了。

调整参数直接测试

get /numerical_aggr_test_index/numerical_aggr_test_index/_search
{
  "size": 0,
  "query": {},
  "aggs": {
    "group_by_parent_supplier_name": {
      "terms": {
        "field": "parent_supplier_name",
        "shard_size": 110
      },
      "aggs": {
        "total_fee_sum": {
          "sum": {
            "field": "total_fee"
          }
        }
      }
    }
  }
}

获取结果

{
  "took": 10,
  "timed_out": false,
  "_shards": {
    "total": 10,
    "successful": 10,
    "failed": 0
  },
  "hits": {
    "total": 223816,
    "max_score": 0.0,
    "hits": []
  },
  "aggregations": {
    "group_by_parent_supplier_name": {
      "doc_count_error_upper_bound": 0,
      "sum_other_doc_count": 200659,
      "buckets": [
        {
          "key": "SY_GYS41",
          "doc_count": 2357,
          "total_fee_sum": {
            "value": 235060.81
          }
        },
        {
          "key": "SY_GYS18",
          "doc_count": 2344,
          "total_fee_sum": {
            "value": 236917.43999999992
          }
        },
        {
          "key": "SY_GYS21",
          "doc_count": 2323,
          "total_fee_sum": {
            "value": 230288.71000000005
          }
        },
        {...},
        {...},
        {...},
        {...},
        {...},
        {...},
        {...}
      ]
    }
  }
}

跟我之前把size参数改为3000的时候获取所有聚合结果的前三条相同,计算结果因为浮点运算精度丢失的问题在这里不用太在意。

结语

至此,尝试使用ElasticSearch的聚合统计功能来实现这次需求的探索结束,最终以失败告终,主要原因还是公司ElasticSearch服务平台对请求超时的控制,超过30秒的请求会被主动断开,这也是对平台的必要保护措施。实际生产环境上直接要跑几百万的数据多条件字段聚合的话实在是没法压缩到这个范围之内了。

但是在这次尝试过程中学习到了很多东西,简单列下

  1. 只能在numeric类型的字段上进行数学运算,会有浮点类型运算精度丢失的问题,建议使用scaled_float类型的字段,但是scaled_float类型也只能保证在精度范围内的效果
  2. 聚合结果也需要在term条件内添加size参数控制获取结果的条数
  3. ElasticSearch的聚合运算统计是一个近似计算统计,不是完全精确的。小数据量的时候可以用单shard索引,海量数据的时候可以通过shard_size获取更多的数据计算来提高精度,但代价是更多的计算资源耗费和更长的时间消耗

最终还是回到了原来的思路上,根据条件翻页遍历索引数据,在JVM内存中做聚合统计处理。es聚合导出(不是),Map#merge方法使用