起因

生产环境一个最近新上了一个清理ES历史数据的定时任务,执行_delete_by_query语句。在检查系统异常日志的时候,发现这块功能报了数个异常,elastic: Error 409 (Conflict)。于是带着这个这个问题展开探索一番

代码中使用的方法就是最基本DSL根据查询条件进行删除的语句

POST /ES_INDEX/_delete_by_query
{
  "query": {
    "term": {
      "data_date": "${dataDate}"
    }
  }
}

在执行_delete_by_query期间,为了删除匹配到的所有文档,多个搜索请求是按顺序执行的。每次找到一批文档时,将会执行相应的批处理请求来删除找到的全部文档。如果搜索或者批处理请求被拒绝,_delete_by_query根据默认策略对被拒绝的请求进行重试(最多10次)。达到最大重试次数后,会造成_delete_by_query请求中止,并且会在failures字段中响应 所有的故障。已经删除的仍会执行。换句话说,该过程没有回滚,只有中断。

原因

Elasticsearch 会在删除文档前使用查询时获取的 internal 版本号和当前文档版本号进行对比。如果在查询和删除操作之间文档发生更改,则会导致版本冲突并且删除操作失败,报 elastic: Error 409 (Conflict) 错误。

1、refresh的执行
_delete_by_query基本上是搜索要删除的文档,然后一个一个地删除它们。如果一个或多个文档在搜索完成和删除操作开始之间进行refresh,则会发生版本冲突(elasticsearch默认每秒refresh)。

2、文档发生更改
删除接口是先搜索要删除的对象,然后通过版本冲突检查删除它们。如果中间没有_refresh,那么由_delete_by_query执行的搜索可能返回文档的旧版本,从而在尝试删除时导致版本冲突。(如果在捕获快照和处理删除请求之间的文档发生更改,您将会得到版本冲突。当版本匹配时,删除文档。)

https://discuss.elastic.co/t/elasticsearch-delete-by-query-409-version-conflict/174150/10

处理办法

虽然有ES有重试机制,但是重试不保证一定会删除成功,一般情况下在重试时冲突的概率会小很多。如果你开发的需求对此要求不是很严格,数据量并不算大,那么完全可以通过重试解决。

另外,可以选择跳过冲突的文档,继续执行删除操作,在语句上加上?conflicts=proceed参数来实现,例:

POST /twitter/_delete_by_query?conflicts=proceed
{
  "query": {
    "term": {
      "user": "kimchy"
    }
  }
}

返回结果

{
    "took": 247,
    "timed_out": false,
    "total": 1,
    "deleted": 1,
    "batches": 1,
    "version_conflicts": 0,
    "noops": 0,
    "retries": {
        "bulk": 0,
        "search": 0
    },
    "throttled_millis": 0,
    "requests_per_second": -1.0,
    "throttled_until_millis": 0,
    "failures": []
}

失败的数据结果可以在failures字段中获取,之后可以自行选择在此重试或者根据业务情况进行其他操作处理

如果删除数据量过多,那么也可以再加一个参数wait_for_completion=false,删除语句批量操作执行时间过长,容易导致超时问题,这样就能使用异步执行的方法来执行删除的语句,解决批量操作过程中导致的超时问题。即

POST /twitter/_delete_by_query?conflicts=proceed&wait_for_completion=false
{
  "query": {
    "term": {
      "user": "kimchy"
    }
  }
}

返回结果,ES会返回一个任务ID(task):我们可以根据任务ID查询任务是否完成。

{
    "task": "s0JDXuFrSBaNNrGgB6viiw:1266"
}

再根据刚刚拿到的task ID “s0JDXuFrSBaNNrGgB6viiw:1266“来查询异步任务执行情况

GET /_tasks/s0JDXuFrSBaNNrGgB6viiw:1266

根据”completed”: true判断删除任务是否执行完成

{
    "completed": true,
    "task": {
        "node": "s0JDXuFrSBaNNrGgB6viiw",
        "id": 1266,
        "type": "transport",
        "action": "indices:data/write/delete/byquery",
        "status": {
            "total": 6,
            "updated": 0,
            "created": 0,
            "deleted": 6,
            "batches": 1,
            "version_conflicts": 0,
            "noops": 0,
            "retries": {
                "bulk": 0,
                "search": 0
            },
            "throttled_millis": 0,
            "requests_per_second": -1.0,
            "throttled_until_millis": 0
        },
        "description": "delete-by-query [twitter]",
        "start_time_in_millis": 1686045000120,
        "running_time_in_nanos": 295123441,
        "cancellable": true,
        "cancelled": false,
        "headers": {}
    },
    "response": {
        "took": 292,
        "timed_out": false,
        "total": 6,
        "updated": 0,
        "created": 0,
        "deleted": 6,
        "batches": 1,
        "version_conflicts": 0,
        "noops": 0,
        "retries": {
            "bulk": 0,
            "search": 0
        },
        "throttled": "0s",
        "throttled_millis": 0,
        "requests_per_second": -1.0,
        "throttled_until": "0s",
        "throttled_until_millis": 0,
        "failures": []
    }
}

以上这些,409 Conflict冲突无法避免,只能说是用适当柔缓的方式来处理409 Conflict冲突,不让其阻塞了整个删除操作的整体执行。对于冲突部分的数据给予列出,并由开发或者业务决定后续如何处理,是进行再次重试删除还是执行相关业务逻辑确认删除失败后数据再做其他处理

除此之外,我们仍有另一种选择。就是把整个_delete_by_query删除的操作在逻辑上拆分开来,第一步是查询符合条件的文档,然后再根据文档 ID 进行批量删除。这相当于忽略查询与删除操作间的变更,进行强制删除。例如:

PUT /twitter/_bulk
{"delete":{"_id":"1"}}
{"delete":{"_id":"2"}}

记得请求的body部分需要以一个新的换行符为结束,否则会报错

{
    "error": {
        "root_cause": [
            {
                "type": "illegal_argument_exception",
                "reason": "The bulk request must be terminated by a newline [\\n]"
            }
        ],
        "type": "illegal_argument_exception",
        "reason": "The bulk request must be terminated by a newline [\\n]"
    },
    "status": 400
}

正确删除请求的相应结果,表示正确删除了id为1和2的文档数据

{
    "took": 157,
    "errors": false,
    "items": [
        {
            "delete": {
                "_index": "twitter",
                "_id": "1",
                "_version": 2,
                "result": "deleted",
                "_shards": {
                    "total": 2,
                    "successful": 1,
                    "failed": 0
                },
                "_seq_no": 18,
                "_primary_term": 1,
                "status": 200
            }
        },
        {
            "delete": {
                "_index": "twitter",
                "_id": "2",
                "_version": 2,
                "result": "deleted",
                "_shards": {
                    "total": 2,
                    "successful": 1,
                    "failed": 0
                },
                "_seq_no": 19,
                "_primary_term": 1,
                "status": 200
            }
        }
    ]
}

如果对应id的文档没有找到。则会得到这样的返回结果,其中status为404表示文档不存在

{
    "took": 137,
    "errors": false,
    "items": [
        {
            "delete": {
                "_index": "twitter",
                "_id": "1",
                "_version": 1,
                "result": "not_found",
                "_shards": {
                    "total": 2,
                    "successful": 1,
                    "failed": 0
                },
                "_seq_no": 20,
                "_primary_term": 1,
                "status": 404
            }
        },
        {
            "delete": {
                "_index": "twitter",
                "_id": "2",
                "_version": 1,
                "result": "not_found",
                "_shards": {
                    "total": 2,
                    "successful": 1,
                    "failed": 0
                },
                "_seq_no": 21,
                "_primary_term": 1,
                "status": 404
            }
        }
    ]
}