From 580e2a57760fc6c69a6ab86037d4f8a89ca588e0 Mon Sep 17 00:00:00 2001 From: Przemyslaw Witek Date: Tue, 3 Oct 2023 17:27:38 +0200 Subject: [PATCH 1/4] [Transform] Shutdown the task when force==true --- .../transform/action/TransportStopTransformAction.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java index bf24c7d7c1d03..b3c1370c984ab 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java @@ -242,6 +242,15 @@ protected void taskOperation( } if (ids.contains(transformTask.getTransformId())) { + if (request.isForce()) { + try { + transformTask.shutdown(); + listener.onResponse(new Response(true)); + } catch (ElasticsearchException ex) { + listener.onFailure(ex); + } + return; + } // move the call to the generic thread pool, so we do not block the network thread threadPool.generic().execute(() -> { transformTask.setShouldStopAtCheckpoint(request.isWaitForCheckpoint(), ActionListener.wrap(r -> { From 5ddab59d84ad92ac7733da93e369d2ebc85522be Mon Sep 17 00:00:00 2001 From: Przemyslaw Witek Date: Wed, 4 Oct 2023 12:01:24 +0200 Subject: [PATCH 2/4] Add comment. --- .../xpack/transform/action/TransportStopTransformAction.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java index b3c1370c984ab..d2973f3863853 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java @@ -244,6 +244,8 @@ protected void taskOperation( if (ids.contains(transformTask.getTransformId())) { if (request.isForce()) { try { + // If force==true, we skip all the additional steps and only deregister and complete persistent task via shutdown call. + // This way we ensure that the persistent task is removed ASAP (as opposed to being removed in one of the listeners). transformTask.shutdown(); listener.onResponse(new Response(true)); } catch (ElasticsearchException ex) { From a6d405607f0a3f0092f295c27a658511bde58047 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Witek?= Date: Wed, 4 Oct 2023 12:05:05 +0200 Subject: [PATCH 3/4] Update docs/changelog/100203.yaml --- docs/changelog/100203.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/100203.yaml diff --git a/docs/changelog/100203.yaml b/docs/changelog/100203.yaml new file mode 100644 index 0000000000000..23a39cb5020e0 --- /dev/null +++ b/docs/changelog/100203.yaml @@ -0,0 +1,5 @@ +pr: 100203 +summary: Shutdown the task immediately when `force` == `true` +area: Transform +type: bug +issues: [] From f386db10c79679097a508ae0eaf98d1ae2afeb07 Mon Sep 17 00:00:00 2001 From: Przemyslaw Witek Date: Thu, 5 Oct 2023 10:59:56 +0200 Subject: [PATCH 4/4] Apply review comment --- .../transform/action/TransportStopTransformAction.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java index d2973f3863853..54d33f0df3638 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java @@ -243,10 +243,13 @@ protected void taskOperation( if (ids.contains(transformTask.getTransformId())) { if (request.isForce()) { + // If force==true, we skip the additional step (setShouldStopAtCheckpoint) and move directly to shutting down the task. + // This way we ensure that the persistent task is removed ASAP (as opposed to being removed in one of the listeners). try { - // If force==true, we skip all the additional steps and only deregister and complete persistent task via shutdown call. - // This way we ensure that the persistent task is removed ASAP (as opposed to being removed in one of the listeners). + // Here the task is deregistered in scheduler and marked as completed in persistent task service. transformTask.shutdown(); + // Here the indexer is aborted so that its thread finishes work ASAP. + transformTask.onCancelled(); listener.onResponse(new Response(true)); } catch (ElasticsearchException ex) { listener.onFailure(ex);