作者: CheungQ

ElasticSearch聚合查询使用过程中遇到的一点问题

再续一下前面两篇相关的内容,ElasticSearch 分页查询方法from&size,scroll,及search_after浅析es聚合导出(不是),Map#merge方法使用

在开发那个聚合导出功能的时候,最先考虑到的其实不是scroll扫数据并内存计算,之前了解过ElasticSeach是可以支持聚合计算查询的,但是没有实际在大批量数据的索引中使用过,所以再开发功能的设计阶段进行了测试之后发现并不能满足我们功能开发的需求,遂放弃了。网上使用aggs进行ElasticSearch聚合查询的介绍文章也很多,于是写下本文做一点记录,和给别人提供一点掉坑提醒

官方文档地址:https://www.elastic.co/guide/cn/elasticsearch/guide/current/aggregations.html

详尽的使用方式你也可以再这里面找到

下面是坑记录

  • 字段类型选择

因为开发不规范、对ElasticSearch不了解等原因,原本功能开发的时候建的ES索引中对应订单价格、运费等字段的字段类型全部都是keyword类型的。而这次要做聚合统计的话,要对价格字段进行聚合统计执行类似如下语句的时候

GET /your_es_index/your_es_type/_search
{
  "size":0,
  "aggs":{
    "sum_claims":{
      "sum":{
        "field":"cost_of_claims"
      }
    }
  }
}

会得到一个这样的错误信息

Expected numeric type on field [cost_of_claims], but got [keyword]"}

基本意思就是cost_of_claims字段类型为keyword,但是进行聚合统计运算的时候期望cost_of_claims字段为numeric型的。也就是说明ES的聚合统计运算只能作用于numeric型字段上。那么遇到问题解决问题,接下来就是想要尝试将cost_of_claims变更为numeric型字段这件事上。

在进行avg和sum等Metrics Aggregations(度量聚合)的时候报错的,进行数学维度的计算,期望得到一个数字字段,但是得到的是一个String的keyword类型,若把cost_of_claims.keyword换成cost_of_claims,则又会报另一个错误。则需要在index数据之前进行mapping配置,将该字段的值定义为numeric,或者开启mapping的Numeric detection(数字探测)

但是这时候遇到个问题,在es中,是不支持更改现有字段的映射或字段类型的,如果我们非得需要更改字段的类型,怎么办,数据迁移,重建索引,建立我们想要的正确的映射规则。接下来新建一个相同结构的索引,对应字段修改为numeric的类型,然后把老索引的数据reindex过来。同时,reindex之后把老索引删除,并给新索引添加一个别名和老索引的名字一样,这样就可以完整切换过来,且不用修改现有业务代码中和老索引相关的内容。

Continue reading

es聚合导出(不是),Map#merge方法使用

水一篇,之前写了es分页查询的几种方法,ElasticSearch 分页查询方法from&size,scroll,及search_after浅析,起因是1月份的需求要做一个ES索引内数据聚合导出的需求。

这个ES索引每天有100万左右新增数据,而需求是每月根据不同的条件做聚合统计,并导出数据到csv文件。之前的同事做过一个类似的功能,但是性能很差,基本思路如下,可以简单分析下。

  1. java的job接收导出查询的参数,创建导出执行任务,启动两个线程,一个es查询线程,一个写csv线程
  2. java调用EsClient,进行search_after查询,每一页查询的结果保存到java内存中,并判断累计的结果数量,大于一个设定的数值之后则根据导出时候设定的参数进行聚合,保存到一个Map中
  3. 分页查询结束后,写csv线程读取聚合后的结果写入csv文件

首先,启动两个线程,一个读,一个等待读之后写csv这个其实是不合理的,在所有数据聚合完成之前,写线程就一直处于等待状态。但是这个模式我改动不了,这是目前项目代码里集成的导出任务jar包里的内容,我没法动它,其他很多导出功能也用的这一套代码,所以这边就只能先这样。(也不是说jar包里的逻辑我在外面就没法修改)

接下来,现有的功能是用的search_after查询的,排序字段是子订单id,是无序的。所以只能支持单线程操作,即只能用一个读线程(ReadThread),一页一页的往后翻页查询。且原来的逻辑中有点bug,大于一个设定的数值之后则根据导出时候设定的参数进行聚合。假定这个数值为5万,那么当不断地翻页某次聚合之后总结果数已经大于5万了,那么之后每次进行翻页查询都要把所有数据重新聚合一遍。最终实测,某次导出了300万数据进行聚合,导出任务总共执行超过了6个小时。

根据以上情况,我重新设计了这次的导出功能。

Continue reading

ElasticSearch 分页查询方法from&size,scroll,及search_after浅析

起因是一月份需求中有一个功能,需要遍历ES索引中的内容进行处理,原本之前常用的的是ES的scroll方法来处理的,但是对比以前其他同事开发的功能发现以前某个已离职的同事做的一个类似的功能的时候用的是search_after方法。所以在这里把3中方式都列下,对比说明下。题外话不多说,进入正题

我们在进行ES数据查询的时候,对于翻页需求的处理可以使用的方法有3种,如题中所述分别是from&size参数,scroll滚动处理,search_after参数

首先是基础环境

请求

GET /

结果

{
    "name": "olap05-sit",
    "cluster_name": "common",
    "cluster_uuid": "xgg8HEt3SzyI7-Jyg4-J4A",
    "version": {
        "number": "5.4.2",
        "build_hash": "Unknown",
        "build_date": "Unknown",
        "build_snapshot": true,
        "lucene_version": "6.5.1"
    },
    "tagline": "You Know, for Search"
}
  • from&size分页查询

请求

GET /your_es_index/your_es_index/_search
{
  "size": 20,
  "from": 0
}

结果

{
  "took": 10,
  "timed_out": false,
  "_shards": {
    "total": 30,
    "successful": 30,
    "failed": 0
  },
  "hits": {
    "total": 8645477,
    "max_score": 1.0,
    "hits": [
      {
        "_index": "your_es_index",
        "_type": "your_es_index",
        "_id": "6073958667689227072",
        "_score": 1.0,
        "_source": {
          "brand_name": "iPad",
          "bu_name": "冰洗",
          "end_time": "2022-05-27 12:14:56",
          "item_id": "000882022439",
          "oid": "6073958667689227072",
          "pay_time": "2022-05-15 05:11:43",
          "purch_title": "无印良品日式简约全棉被套单件纯棉被罩冬新款150x200x230单人87"
        }
      },
      ......
    ]
  }
}

返回结果太长,不全贴出来了。简单说明下,size参数表示的是每页返回结果条数,from表示从某个位置开始查询,即相当于偏移量的意思,不是表示的当前页码,如果在页面上需要做分页显示当前页码需要自行计算转换下。自己在代码中对请求参数的page、pageSize转换为es查询中的from和size参数。

Continue reading

一个ES查询生产环境异常排查过程

异常信息:java.io.IOException: listener timeout after waiting for [60000] ms

异常堆栈:

java.io.IOException: listener timeout after waiting for [60000] ms
	at org.elasticsearch.client.RestClient$SyncResponseListener.get(RestClient.java:660)
	at org.elasticsearch.client.RestClient.performRequest(RestClient.java:219)
	at org.elasticsearch.client.RestClient.performRequest(RestClient.java:191)
	at com.suning.maoning.core.es.EsClientHolder.performRequest(EsClientHolder.java:113)
	at com.suning.maoning.core.es.EsClientHolder.performRequest(EsClientHolder.java:127)
	at com.suning.maoning.core.es.EsClient.searchAfter(EsClient.java:343)
	at com.suning.maoning.mnbi.service.impl.commissionexport.XXXXXXXXXXXService.queryData(BaseCommissionEsExcelExportService.java:76)
	at com.suning.maoning.excel.common.export.service.impl.EsScrollRunnable.async(EsScrollRunnable.java:39)
	at com.suning.maoning.excel.common.export.service.impl.EsScrollRunnable$$FastClassBySpringCGLIB$$1736c315.invoke(<generated>)
	at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:721)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
	at com.suning.maoning.core.cache.interceptor.RedisInterceptor.invoke(RedisInterceptor.java:119)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:168)
	at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
	at org.springframework.aop.interceptor.AsyncExecutionInterceptor$1.call(AsyncExecutionInterceptor.java:115)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

根据异常堆栈指示的信息,找到对应发生异常的业务类,定位到相关具体语句,是一个ES查询调用的地方,调用方法为

esClient.searchAfter

继续跟踪对应方法

Continue reading

ElasticSearch去重查询统计数量不准确的问题

首先是需求场景介绍,上个月的需求用HIVE做数据清洗写了个T+1的任务,处理每天的退款订单数据,并将数据写入ElasticSearch中提供给业务系统进行查询及导出相关功能的开发。

ElasticSearch中的数据结构为子单纬度的,这里为了描述方便一下用sub_id替代说明。同时每条sub_id记录上关联了一个主订单id,下面的描述中主订单我们用main_id代替说明。每个sub_id都会关联一个main_id,且只能关联一个main_id,但是一个main_id可以关联多个sub_id,即非常常见标准的一个主订单关联多个子订单的1对N的结构模式。

在业务场景中,根据业务需求开发订单展示的页面功能,页面上以main_id纬度为基准进行展示,所以在这里,我们做如下方式的开发

  1. 根据页面查询入参条件先对main_id去重分页查询,同时统计总数量
  2. 根据main_id去重查询的分页结果,取得当页的main_id数据,并把这些main_id连通之前本次查询的参数一起作为新的参数,用这些新的参数重新去ElasticSearch中查询当前这些main_id下对应的sub_id的数据
  3. 取得这些sub_id数据后,在内存中对这些数据按照main_id分组聚合,并通过接口返回给前端

这样我们便实现了从main_id维度的分页查询功能,具体用到的相关ElasticSearch查询DSL语句大致如下

GET /refund_order_index/refund_order_type/_search
{
  "size": 20,
  "from": 0,
  "query": {
    "bool": {
      "filter": [
        {}
      ]
    }
  },
  "collapse": {
    "field": "main_id"
  },
  "_source": {
    "includes": [
      "main_id"
    ],"excludes": []
  },
  "aggs": {
    "total_count": {
      "cardinality": {
        "field": "main_id"
      }
    }
  }
}

ElasticSearch查询返回结果如下,内容和字段等我已做脱敏处理

Continue reading

小坑+1,Mysql的UNIQUE索引中允许存在Null值

一个小坑,如题。

新建了一张表,假设如下

create table table_c
(
id int auto_increment
primary key,
req_id int null,
req_type int default 0 null,
req_version int default 0 null,
constraint table_c_req_id_req_type_uindex
unique (req_id, req_type)
)
charset = latin1;

表中有个(req_id, req_type)组成的唯一索引,插入数据一些数据后得到如下结果

从上表中插入的结果来看,可以看到有两条(req_id = 124, req_type = null)组成的行数据,可见,unique索引中可以包含null值字段

A UNIQUE index permits multiple NULL values for columns that can contain NULL.

https://dev.mysql.com/doc/refman/5.6/en/create-index.html

官方文档中提到了这么一段,确实可以包含的,且根据上面的实验结果,但凡某个字段为null了,那么唯一性约束就失效了。

相关处理办法

  1. 给unique索引相关字段添加上 not null属性 和 default
  2. 程序代码中相关对象拼装的时候校验相关字段,并赋予默认值

首先,在设置了default值之后,如果插入执行insert语句的时候没有对应字段,mysql会赋予默认的default值。其次,某些封装的JDBC操作的工具如果我们传入的字段没有值,那么工具会自动给这个字段赋值null,那么就导致如果没有设置 not null属性会被插入一个null值,仍然导致唯一索引约束失效。

ON DUPLICATE KEY UPDATE踩入一个小坑

评审了新迭代的需求,翻以前功能的代码,看到了一个ON DUPLICATE KEY UPDATE的语句,有点不对劲。

实际数据库设计业务相关内容,这里不写,单独抽象出来一个场景如下

有一张表table_c,主键id无关紧要,其中有一个唯一键索引req_id,这个索引保证了表中每行的req_id都是唯一的

create table table_c
(
    id          int auto_increment
        primary key,
    req_id      int           null,
    req_type    int           null,
    req_version int default 0 null,
    constraint table_c_req_id_uindex
        unique (req_id)
);

需求场景是这样的,从第三方拉取到数据,对应唯一的req_id,如果表中没有则插入一条新数据。如果表中有,则判断req_version字段,只有当新数据的req_version比原表中的大的时候才更新整行数据。于是根据这个需求场景来原先的代码中的SQL如下

INSERT INTO table_c (req_id, req_type, req_version)
VALUES (124, 4, 1)
ON DUPLICATE KEY UPDATE 
    req_version = IF(VALUES(req_version) > req_version, VALUES(req_version), req_version),
    req_type    = IF(VALUES(req_version) > req_version, VALUES(req_type), req_type)

乍一看起来使用了ON DUPLICATE KEY UPDATE,也使用了IF判断了req_version字段,好像没毛病。但是实际上是有问题的,我们跑一个实际数据看下

表中现有数据

执行上面的SQL,按照预期req_type应该会被更新为4,同时req_version被更新为1,执行一下

可以看到执行成功,而表中现在的结果如下

req_version确实更新了但是req_type却没有更新,那么我们可以推断出req_type后面的IF语句没有获得返回结果true,那么可能的情况就是,在前面先执行

req_version = IF(VALUES(req_version) > req_version, VALUES(req_version), req_version)

这一段的时候req_version的值已经被更新为1了,且VALUES(req_version)的值也是1,那么在后面再执行VALUES(req_version) > req_version判断的时候就会得到false

不妨改变下SQL再试下

在UPDATE语句部分直接填上req_version的值,注意不是VALUES(req_version)的值

表中结果如下

可以确定的看到在执行到req_type这行的时候req_version的值确实变成2了。

根据这样的结论,我们再次调整下SQL语句,执行查看结果

Continue reading