标签: elasticsearch

ElasticSearch使用script更新文档失败问题排查

起因

起因是一个批量根据es文档id更新指定字段的功能,经过上线后使用反馈,经常性的偶发文档无法更新的情况

处理

找到相关代码,自行写个demo代码批量跑下试下,原来的代码大概示意如下

public String len10() throws Exception{
    Random random = new Random();
    String[] ls = new String[500];
    for (int i = 0; i < 500; i++) {
        int finalI = i;
        Runnable callable = new Runnable() {
            @Override
            public void run() {
                String str = String.valueOf(random.nextInt(40)+10);
                String str2 = String.valueOf(random.nextInt(40)+10);
                String str3 = String.valueOf(random.nextInt(40)+10);
                String action = "/XXXXXXX_index/XXXXXXX_type/_update_by_query";
                String dateTime = "20"+str+"-01-12 23:"+str2+":"+str3;
                String script = "{\n" +
                        "  \"script\": {\n" +
                        "    \"inline\": \"ctx._source.modify_time='"+dateTime+"'\"\n" +
                        "  },\n" +
                        "  \"query\": {\n" +
                        "    \"bool\": {\n" +
                        "      \"filter\": [{\n" +
                        "        \"term\": {\n" +
                        "          \"id\": \"2\"\n" +
                        "        }\n" +
                        "      }]\n" +
                        "    }\n" +
                        "  }\n" +
                        "}"
                        ;
                try {
                    String post = esRestClient.performRequest("POST", action, script);
                    System.out.println(post);
                    ls[finalI] = post;
                } catch (IOException e) {
                    e.printStackTrace();
                    ls[finalI] = e.getMessage();
                }
            }
        };
        Thread thread = new Thread( callable);
        thread.start();
    }
    Thread.sleep(50000);
    return "";
}

大意就是起500个线程,更新索引中指定文档id为2的文档的modify_time字段,通过script来更新。

执行之后其实就可以看到大量异常信息了

Continue reading

最近写HIVE SQL的一点笔记【3】

一个最近的开发需求,导出某ES索引上某嵌套字段值

promotion_details.promotion_name

符合以“官方立减”文字结尾的所有数据导出,因为es索引的结构的关系,目前es索引结构的原因,数据需要处理下之后再导出,目前es索引数据的结构大致如下

{
  "_index": "my_elasticsearch_index",
  "_type": "order",
  "_id": "3602478673110456764",
  "_version": 1699331966001,
  "found": true,
  "_source": {
    "adjust_fee": "0.00",
    "alipay_point": "0",
    "available_confirm_fee": "0.00",
    "buyer_alipay_no": "****",
    ......
    "oms_orders": [
      {
        "cart_item_no": "3602478673111456764",
        "order_item_id": "210100024285392788",
        ......
      },
      ......
    ],
    "promotion_details": [
      {
        "promotion_name": "2023天猫双11抢先购官方立减",
        ......
      },
    ],
    "sub_orders": [
      {
        "oid": "3602478673111456764",
        ......
      },
      ......
    ],
    "tid": "3602478673110456764",
    "trade_from": "WAP,WAP",
    "type": "fixed",
    ......
  }
}

tid为主订单单号、sub_orders.oid为子单单号、oms_orders.cart_item_no对应等于sub_orders.oid,有一条oid子单记录就对应有一条cart_item_no记录。

Continue reading

ElasticSearch数据迁移reindex命令

起因

一次需求中涉及两个索引,其中一个是本次新建的索引,需要将原始ES索引中的部分数据导入到当前新建的索引中。

分析

对于新所以的数据初始化有两个思路,因为我们的数据都是在HIVE中清洗完成之后再打到ElasticSearch的,所以可以从HIVE中捞取对应的数据重新打到ElasticSearch即可。但是这里有一个局限性的问题,因为HIVE表数量巨大,我们只保留的近7天的清洗结果,如果按照需求的时间跨度则需要重新清洗近3个月的数据,之后再将数据打到ES。

但是这样的话又会有另外一个问题,HIVE表数据在清洗过程中使用到的数据是会经常变动的,比如某个商品最近几天是挂靠在A品类下,但是上个月因为做活动是挂靠在B品类下的,这就导致事后隔了一段时间之后重新清洗的结果无法反应出当时这条数据正确的关联关系的情况。如果产品产品认可当前方案,那么也不失为可行方案之一。

那么,既然我们在原始的索引中有这部分数据,所以自然使用reindex命令来完成这一操作就最为方便了

代码

POST _reindex?wait_for_completion=false&slices=5&refresh
{
  "source": {
    "index": "SOURCE_ES_INDEX",
    "type": "SOURCE_ES_TYPE",
    "query": {
      "range": {
        "last_modify_time": {
          "gte": "2023-01-01 00:00:00",
          "lte": "2023-07-24 23:59:59"
        }
      }
    }
  },
  "dest": {
    "index": "TARGET_ES_INDEX",
    "type": "TARGET_ES_TYPE"
  }
}

简单说下这条语句,公司生产环境使用的ES版本为5.4.2的,所以除了index参数外,还需要有对应的type参数,但是Elasticsearch在 7.X版本中去除了type的概念,所以如果你的ES版本较新的话,可以不用这个type参数,即:

Continue reading

ElasticSearch Nested数组嵌套对象的更新插入操作

在ES的日常使用中,需要使用到Nested结构存储数个同级的子节点数据,例如一条主订单下的N条子订单的数据。

新增更新操作

现在,假设我们在ES中有这样一条数据

PUT /celebrities/_doc/114
{
    "user" : "Kun",
    "post_date" : "2009-11-15T14:12:12",
    "message" : "Idol who has been practicing for two and a half years",
    "skills":[
        {
            "name":"sing",
            "skill_level":"A"
        },
        {
            "name":"jump",
            "skill_level":"S"
        },
        {
            "name":"rap",
            "skill_level":"SS"
        }
    ]
}

我们需要往skills的Nested数组中添加一个新的节点,节点的name为“consecutive five whips”,则可以这么写

POST /celebrities/_doc/114
{
    "script": {
        "source": "if (ctx._source.skills == null) {List ls = new ArrayList();ls.add(params.skill);ctx._source.skills = ls;} else {ctx._source.skills.add(params.skill);}",
        "lang": "painless",
        "params": {
            "skill": {
                "name": "consecutive five whips",
                "skill_level": "SSS"
            }
        }
    }
}

得到返回结果,表明执行成功

{
    "_index": "celebrities",
    "_id": "114",
    "_version": 6,
    "result": "updated",
    "_shards": {
        "total": 2,
        "successful": 1,
        "failed": 0
    },
    "_seq_no": 6,
    "_primary_term": 1
}

通过代码可以简单的看出逻辑,如果_source.skills为null,则创建一个新的ArrayList,并将参数中的skill节点的内容放进去,最后赋值给_source.skills,反之如果不为null的话,则直接往里加入当前的skill字段的内容。

if (ctx._source.skills == null) {
    List ls = new ArrayList();
    ls.add(params.skill);
    ctx._source.skills = ls;
} else {
    ctx._source.skills.add(params.skill);
}

要先判断是否为null,否则无法调用add方法,并会抛出一个异常

"caused_by": {
                "type": "null_pointer_exception",
                "reason": "cannot access method/field [add] from a null def reference"
}
Continue reading

ElasticSearch使用_delete_by_query删除大批数据,及409 Conflict版本冲突问题处理

起因

生产环境一个最近新上了一个清理ES历史数据的定时任务,执行_delete_by_query语句。在检查系统异常日志的时候,发现这块功能报了数个异常,elastic: Error 409 (Conflict)。于是带着这个这个问题展开探索一番

代码中使用的方法就是最基本DSL根据查询条件进行删除的语句

POST /ES_INDEX/_delete_by_query
{
  "query": {
    "term": {
      "data_date": "${dataDate}"
    }
  }
}

在执行_delete_by_query期间,为了删除匹配到的所有文档,多个搜索请求是按顺序执行的。每次找到一批文档时,将会执行相应的批处理请求来删除找到的全部文档。如果搜索或者批处理请求被拒绝,_delete_by_query根据默认策略对被拒绝的请求进行重试(最多10次)。达到最大重试次数后,会造成_delete_by_query请求中止,并且会在failures字段中响应 所有的故障。已经删除的仍会执行。换句话说,该过程没有回滚,只有中断。

原因

Elasticsearch 会在删除文档前使用查询时获取的 internal 版本号和当前文档版本号进行对比。如果在查询和删除操作之间文档发生更改,则会导致版本冲突并且删除操作失败,报 elastic: Error 409 (Conflict) 错误。

Continue reading

ElasticSearch聚合查询使用过程中遇到的一点问题

再续一下前面两篇相关的内容,ElasticSearch 分页查询方法from&size,scroll,及search_after浅析es聚合导出(不是),Map#merge方法使用

在开发那个聚合导出功能的时候,最先考虑到的其实不是scroll扫数据并内存计算,之前了解过ElasticSeach是可以支持聚合计算查询的,但是没有实际在大批量数据的索引中使用过,所以再开发功能的设计阶段进行了测试之后发现并不能满足我们功能开发的需求,遂放弃了。网上使用aggs进行ElasticSearch聚合查询的介绍文章也很多,于是写下本文做一点记录,和给别人提供一点掉坑提醒

官方文档地址:https://www.elastic.co/guide/cn/elasticsearch/guide/current/aggregations.html

详尽的使用方式你也可以再这里面找到

下面是坑记录

  • 字段类型选择

因为开发不规范、对ElasticSearch不了解等原因,原本功能开发的时候建的ES索引中对应订单价格、运费等字段的字段类型全部都是keyword类型的。而这次要做聚合统计的话,要对价格字段进行聚合统计执行类似如下语句的时候

GET /your_es_index/your_es_type/_search
{
  "size":0,
  "aggs":{
    "sum_claims":{
      "sum":{
        "field":"cost_of_claims"
      }
    }
  }
}

会得到一个这样的错误信息

Expected numeric type on field [cost_of_claims], but got [keyword]"}

基本意思就是cost_of_claims字段类型为keyword,但是进行聚合统计运算的时候期望cost_of_claims字段为numeric型的。也就是说明ES的聚合统计运算只能作用于numeric型字段上。那么遇到问题解决问题,接下来就是想要尝试将cost_of_claims变更为numeric型字段这件事上。

在进行avg和sum等Metrics Aggregations(度量聚合)的时候报错的,进行数学维度的计算,期望得到一个数字字段,但是得到的是一个String的keyword类型,若把cost_of_claims.keyword换成cost_of_claims,则又会报另一个错误。则需要在index数据之前进行mapping配置,将该字段的值定义为numeric,或者开启mapping的Numeric detection(数字探测)

但是这时候遇到个问题,在es中,是不支持更改现有字段的映射或字段类型的,如果我们非得需要更改字段的类型,怎么办,数据迁移,重建索引,建立我们想要的正确的映射规则。接下来新建一个相同结构的索引,对应字段修改为numeric的类型,然后把老索引的数据reindex过来。同时,reindex之后把老索引删除,并给新索引添加一个别名和老索引的名字一样,这样就可以完整切换过来,且不用修改现有业务代码中和老索引相关的内容。

Continue reading

ElasticSearch 分页查询方法from&size,scroll,及search_after浅析

起因是一月份需求中有一个功能,需要遍历ES索引中的内容进行处理,原本之前常用的的是ES的scroll方法来处理的,但是对比以前其他同事开发的功能发现以前某个已离职的同事做的一个类似的功能的时候用的是search_after方法。所以在这里把3中方式都列下,对比说明下。题外话不多说,进入正题

我们在进行ES数据查询的时候,对于翻页需求的处理可以使用的方法有3种,如题中所述分别是from&size参数,scroll滚动处理,search_after参数

首先是基础环境

请求

GET /

结果

{
    "name": "olap05-sit",
    "cluster_name": "common",
    "cluster_uuid": "xgg8HEt3SzyI7-Jyg4-J4A",
    "version": {
        "number": "5.4.2",
        "build_hash": "Unknown",
        "build_date": "Unknown",
        "build_snapshot": true,
        "lucene_version": "6.5.1"
    },
    "tagline": "You Know, for Search"
}
  • from&size分页查询

请求

GET /your_es_index/your_es_index/_search
{
  "size": 20,
  "from": 0
}

结果

{
  "took": 10,
  "timed_out": false,
  "_shards": {
    "total": 30,
    "successful": 30,
    "failed": 0
  },
  "hits": {
    "total": 8645477,
    "max_score": 1.0,
    "hits": [
      {
        "_index": "your_es_index",
        "_type": "your_es_index",
        "_id": "6073958667689227072",
        "_score": 1.0,
        "_source": {
          "brand_name": "iPad",
          "bu_name": "冰洗",
          "end_time": "2022-05-27 12:14:56",
          "item_id": "000882022439",
          "oid": "6073958667689227072",
          "pay_time": "2022-05-15 05:11:43",
          "purch_title": "无印良品日式简约全棉被套单件纯棉被罩冬新款150x200x230单人87"
        }
      },
      ......
    ]
  }
}

返回结果太长,不全贴出来了。简单说明下,size参数表示的是每页返回结果条数,from表示从某个位置开始查询,即相当于偏移量的意思,不是表示的当前页码,如果在页面上需要做分页显示当前页码需要自行计算转换下。自己在代码中对请求参数的page、pageSize转换为es查询中的from和size参数。

Continue reading