起因

一次需求中涉及两个索引,其中一个是本次新建的索引,需要将原始ES索引中的部分数据导入到当前新建的索引中。

分析

对于新所以的数据初始化有两个思路,因为我们的数据都是在HIVE中清洗完成之后再打到ElasticSearch的,所以可以从HIVE中捞取对应的数据重新打到ElasticSearch即可。但是这里有一个局限性的问题,因为HIVE表数量巨大,我们只保留的近7天的清洗结果,如果按照需求的时间跨度则需要重新清洗近3个月的数据,之后再将数据打到ES。

但是这样的话又会有另外一个问题,HIVE表数据在清洗过程中使用到的数据是会经常变动的,比如某个商品最近几天是挂靠在A品类下,但是上个月因为做活动是挂靠在B品类下的,这就导致事后隔了一段时间之后重新清洗的结果无法反应出当时这条数据正确的关联关系的情况。如果产品产品认可当前方案,那么也不失为可行方案之一。

那么,既然我们在原始的索引中有这部分数据,所以自然使用reindex命令来完成这一操作就最为方便了

代码

POST _reindex?wait_for_completion=false&slices=5&refresh
{
  "source": {
    "index": "SOURCE_ES_INDEX",
    "type": "SOURCE_ES_TYPE",
    "query": {
      "range": {
        "last_modify_time": {
          "gte": "2023-01-01 00:00:00",
          "lte": "2023-07-24 23:59:59"
        }
      }
    }
  },
  "dest": {
    "index": "TARGET_ES_INDEX",
    "type": "TARGET_ES_TYPE"
  }
}

简单说下这条语句,公司生产环境使用的ES版本为5.4.2的,所以除了index参数外,还需要有对应的type参数,但是Elasticsearch在 7.X版本中去除了type的概念,所以如果你的ES版本较新的话,可以不用这个type参数,即:

POST _reindex
{
  "source": {
    "index": "SOURCE_ES_INDEX"
  },
  "dest": {
    "index": "TARGET_ES_INDEX"
  }
}

低版本中如果不指定type参数会发生连同type一起reindex到TARGET_ES_INDEX的情况,即执行之后数据被保存到了/TARGET_ES_INDEX/SOURCE_ES_TYPE

另外,因为数据量较大,一次执行时间耗时会比较久,所以增加了wait_for_completion=false参数,当前操作不会立刻返回结果,而是通过一个异步任务执行,执行结果会显示在异步任务结果中,通过

GET /_tasks

命令可以查看所以当前在处理的异步任务,通过

GET /_tasks/<task_id>

的命令返回指定的任务信息。task_id格式为node_id:task_number,这在之前的文章中有提到过。

为了加速整个reindex的过程,我们还增加了slices=5&refresh参数,此举是自动划分为5个slices,将一个操作拆分为多个子操作并行化处理。slices数量不能过大,比如500可能出现CPU问题。查询性能角度看,设置slices为源索引的分片的倍数是比较合适的,一倍是最有效的。索引性能角度看,应该随着可用资源的数量线性地扩展。然而索引或查询性能是否在此过程中占据主导,取决于许多因素,比如重新索引的文档和重新索引的集群。

也可手动进行拆分处理,这里不做过多赘述。

POST _reindex
{
  "source": {
    "index": "SOURCE_ES_INDEX",
    "slice": {   // 第一slice执行操作
      "id": 0,
      "max": 2
    }
  },
  "dest": {
    "index": "TARGET_ES_INDEX"
  }
}
POST _reindex
{
  "source": {
    "index": "SOURCE_ES_INDEX",
    "slice": {   // 第二slice执行操作
      "id": 1,
      "max": 2
    }
  },
  "dest": {
    "index": "TARGET_ES_INDEX"
  }
}

reindex过程中的冲突

将op_type设置为create时,只会对发生不同的document进行reindex,(若定时机制的reindex则可以使用该方式只对最新的不存在的document进行reindex)。并且可以将conflicts属性设置为proceed,将冲突进行类似于continue的操作,设置方式如下

POST _reindex
{
  "conflicts": "proceed",
  "source": {
    "index": "SOURCE_ES_INDEX"
  },
  "dest": {
    "index": "TARGET_ES_INDEX",
    "op_type": "create"
  }
}

除此之外,也可添加script完成在reindex过程中对数据的值进行修改,或者字段名进行修改、删除等操作,具体可以参见官方文档

https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html