From 0addf1b32e413bab6ecd4fd84df930757595ad00 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Witek?= Date: Mon, 9 Oct 2023 10:56:59 +0200 Subject: [PATCH] [Transform] Shutdown the task immediately when `force` == `true` (#100203) (cherry picked from commit 48e86ad9b7e6f673138f0f7b613eb39f1b9e1fd3) --- docs/changelog/100203.yaml | 5 +++++ .../action/TransportStopTransformAction.java | 14 ++++++++++++++ 2 files changed, 19 insertions(+) create mode 100644 docs/changelog/100203.yaml diff --git a/docs/changelog/100203.yaml b/docs/changelog/100203.yaml new file mode 100644 index 0000000000000..23a39cb5020e0 --- /dev/null +++ b/docs/changelog/100203.yaml @@ -0,0 +1,5 @@ +pr: 100203 +summary: Shutdown the task immediately when `force` == `true` +area: Transform +type: bug +issues: [] diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java index bf24c7d7c1d03..54d33f0df3638 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java @@ -242,6 +242,20 @@ protected void taskOperation( } if (ids.contains(transformTask.getTransformId())) { + if (request.isForce()) { + // If force==true, we skip the additional step (setShouldStopAtCheckpoint) and move directly to shutting down the task. + // This way we ensure that the persistent task is removed ASAP (as opposed to being removed in one of the listeners). + try { + // Here the task is deregistered in scheduler and marked as completed in persistent task service. + transformTask.shutdown(); + // Here the indexer is aborted so that its thread finishes work ASAP. + transformTask.onCancelled(); + listener.onResponse(new Response(true)); + } catch (ElasticsearchException ex) { + listener.onFailure(ex); + } + return; + } // move the call to the generic thread pool, so we do not block the network thread threadPool.generic().execute(() -> { transformTask.setShouldStopAtCheckpoint(request.isWaitForCheckpoint(), ActionListener.wrap(r -> {