From 27666b6e955ae43fb09954553f3ba7a5b1c186f5 Mon Sep 17 00:00:00 2001 From: Pat Whelan Date: Tue, 16 Jul 2024 16:41:26 -0400 Subject: [PATCH 1/3] [Transform] Make validate calls cancellable Validate will initiate a search request. In the event that the search request needs to be cancelled, rather than manually stopping the task, cancelling the Validate task will now propagate the cancel request to the Search task. Relate #88010 --- .../action/TransportPutTransformAction.java | 6 +++++- .../action/TransportStartTransformAction.java | 18 +++++++++++------- .../TransportValidateTransformAction.java | 14 ++++++++------ 3 files changed, 24 insertions(+), 14 deletions(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java index 4c978b1504a0f..ef42a2781962a 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.client.internal.ParentTaskAssigningClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -25,6 +26,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ClientHelper; @@ -110,9 +112,11 @@ protected void masterOperation(Task task, Request request, ClusterState clusterS ); // <2> Validate source and destination indices + + var parentTaskId = new TaskId(clusterService.localNode().getId(), task.getId()); ActionListener checkPrivilegesListener = validateTransformListener.delegateFailureAndWrap( (l, aVoid) -> ClientHelper.executeAsyncWithOrigin( - client, + new ParentTaskAssigningClient(client, parentTaskId), ClientHelper.TRANSFORM_ORIGIN, ValidateTransformAction.INSTANCE, new ValidateTransformAction.Request(config, request.isDeferValidation(), request.ackTimeout()), diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java index 23212636dc33c..59df3fa67074d 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java @@ -17,6 +17,7 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.client.internal.ParentTaskAssigningClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -31,6 +32,7 @@ import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ClientHelper; @@ -126,23 +128,25 @@ protected TransportStartTransformAction( @Override protected void masterOperation( - Task ignoredTask, + Task task, StartTransformAction.Request request, ClusterState state, ActionListener listener ) { TransformNodes.warnIfNoTransformNodes(state); - final SetOnce transformTaskParamsHolder = new SetOnce<>(); - final SetOnce transformConfigHolder = new SetOnce<>(); + var transformTaskParamsHolder = new SetOnce(); + var transformConfigHolder = new SetOnce(); + var parentTaskId = new TaskId(clusterService.localNode().getId(), task.getId()); + var parentClient = new ParentTaskAssigningClient(client, parentTaskId); // <5> Wait for the allocated task's state to STARTED ActionListener> newPersistentTaskActionListener = ActionListener - .wrap(task -> { + .wrap(t -> { TransformTaskParams transformTask = transformTaskParamsHolder.get(); assert transformTask != null; waitForTransformTaskStarted( - task.getId(), + t.getId(), transformTask, request.ackTimeout(), ActionListener.wrap(taskStarted -> listener.onResponse(new StartTransformAction.Response(true)), listener::onFailure) @@ -196,7 +200,7 @@ protected void masterOperation( return; } TransformIndex.createDestinationIndex( - client, + parentClient, auditor, indexNameExpressionResolver, state, @@ -257,7 +261,7 @@ protected void masterOperation( ) ); ClientHelper.executeAsyncWithOrigin( - client, + parentClient, ClientHelper.TRANSFORM_ORIGIN, ValidateTransformAction.INSTANCE, new ValidateTransformAction.Request(config, false, request.ackTimeout()), diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportValidateTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportValidateTransformAction.java index 71593d416577e..7041f18df1e4a 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportValidateTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportValidateTransformAction.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.client.internal.ParentTaskAssigningClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -23,6 +24,7 @@ import org.elasticsearch.license.License; import org.elasticsearch.license.RemoteClusterLicenseChecker; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.common.validation.SourceDestValidator; import org.elasticsearch.xpack.core.transform.TransformDeprecations; @@ -30,8 +32,6 @@ import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction; import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction.Request; import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction.Response; -import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; -import org.elasticsearch.xpack.transform.transforms.Function; import org.elasticsearch.xpack.transform.transforms.FunctionFactory; import org.elasticsearch.xpack.transform.transforms.TransformNodes; import org.elasticsearch.xpack.transform.utils.SourceDestValidations; @@ -99,8 +99,10 @@ protected void doExecute(Task task, Request request, ActionListener li TransformNodes.warnIfNoTransformNodes(clusterState); - final TransformConfig config = request.getConfig(); - final Function function = FunctionFactory.create(config); + var config = request.getConfig(); + var function = FunctionFactory.create(config); + var parentTaskId = new TaskId(clusterService.localNode().getId(), task.getId()); + var parentClient = new ParentTaskAssigningClient(client, parentTaskId); if (config.getVersion() == null || config.getVersion().before(TransformDeprecations.MIN_TRANSFORM_VERSION)) { listener.onFailure( @@ -130,7 +132,7 @@ protected void doExecute(Task task, Request request, ActionListener li if (request.isDeferValidation()) { deduceMappingsListener.onResponse(emptyMap()); } else { - function.deduceMappings(client, config.getHeaders(), config.getId(), config.getSource(), deduceMappingsListener); + function.deduceMappings(parentClient, config.getHeaders(), config.getId(), config.getSource(), deduceMappingsListener); } }, listener::onFailure); @@ -139,7 +141,7 @@ protected void doExecute(Task task, Request request, ActionListener li if (request.isDeferValidation()) { validateQueryListener.onResponse(true); } else { - function.validateQuery(client, config.getHeaders(), config.getSource(), request.ackTimeout(), validateQueryListener); + function.validateQuery(parentClient, config.getHeaders(), config.getSource(), request.ackTimeout(), validateQueryListener); } }, listener::onFailure); From 6a23c7db0ae2fb73f7c18d28ced7596b831fd4db Mon Sep 17 00:00:00 2001 From: Pat Whelan Date: Tue, 16 Jul 2024 17:04:51 -0400 Subject: [PATCH 2/3] Update docs/changelog/110951.yaml --- docs/changelog/110951.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/110951.yaml diff --git a/docs/changelog/110951.yaml b/docs/changelog/110951.yaml new file mode 100644 index 0000000000000..3cf0a983fbb72 --- /dev/null +++ b/docs/changelog/110951.yaml @@ -0,0 +1,5 @@ +pr: 110951 +summary: Make validate calls cancellable +area: Transform +type: bug +issues: [] From c7b1251f46959e78809f1b75beae358c0b34cc0c Mon Sep 17 00:00:00 2001 From: Pat Whelan Date: Wed, 17 Jul 2024 17:34:54 -0400 Subject: [PATCH 3/3] Create CancellableTasks and NodeClients --- docs/changelog/110951.yaml | 2 +- .../xpack/core/transform/action/PutTransformAction.java | 9 +++++++++ .../core/transform/action/StartTransformAction.java | 9 +++++++++ .../core/transform/action/ValidateTransformAction.java | 8 ++++++++ .../transform/rest/action/RestPutTransformAction.java | 7 ++++++- .../transform/rest/action/RestStartTransformAction.java | 7 ++++++- 6 files changed, 39 insertions(+), 3 deletions(-) diff --git a/docs/changelog/110951.yaml b/docs/changelog/110951.yaml index 3cf0a983fbb72..ec8bc9cae6347 100644 --- a/docs/changelog/110951.yaml +++ b/docs/changelog/110951.yaml @@ -1,5 +1,5 @@ pr: 110951 -summary: Make validate calls cancellable +summary: Allow task canceling of validate API calls area: Transform type: bug issues: [] diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PutTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PutTransformAction.java index 496e826651572..f9fde6b6816e0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PutTransformAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PutTransformAction.java @@ -14,6 +14,9 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xpack.core.common.validation.SourceDestValidator; import org.elasticsearch.xpack.core.transform.TransformField; @@ -22,6 +25,7 @@ import org.elasticsearch.xpack.core.transform.utils.TransformStrings; import java.io.IOException; +import java.util.Map; import java.util.Objects; import static org.elasticsearch.action.ValidateActions.addValidationError; @@ -154,6 +158,11 @@ public boolean equals(Object obj) { && this.deferValidation == other.deferValidation && ackTimeout().equals(other.ackTimeout()); } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, getDescription(), parentTaskId, headers); + } } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/StartTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/StartTransformAction.java index 838a0650c8afa..f02aaf553b8a9 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/StartTransformAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/StartTransformAction.java @@ -14,6 +14,9 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xpack.core.transform.TransformField; @@ -22,6 +25,7 @@ import java.io.IOException; import java.time.Instant; import java.util.Collections; +import java.util.Map; import java.util.Objects; public class StartTransformAction extends ActionType { @@ -89,6 +93,11 @@ public boolean equals(Object obj) { // the base class does not implement equals, therefore we need to check timeout ourselves return Objects.equals(id, other.id) && Objects.equals(from, other.from) && ackTimeout().equals(other.ackTimeout()); } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, getDescription(), parentTaskId, headers); + } } public static class Response extends BaseTasksResponse implements ToXContentObject { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/ValidateTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/ValidateTransformAction.java index 55c21b91b11d8..eae7d8a909c35 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/ValidateTransformAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/ValidateTransformAction.java @@ -14,6 +14,9 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xpack.core.common.validation.SourceDestValidator; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; @@ -94,6 +97,11 @@ public int hashCode() { // the base class does not implement hashCode, therefore we need to hash timeout ourselves return Objects.hash(ackTimeout(), config, deferValidation); } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, getDescription(), parentTaskId, headers); + } } public static class Response extends ActionResponse { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestPutTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestPutTransformAction.java index 78bcb9a12ffc0..e80d61589fed4 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestPutTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestPutTransformAction.java @@ -15,6 +15,7 @@ import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.Scope; import org.elasticsearch.rest.ServerlessScope; +import org.elasticsearch.rest.action.RestCancellableNodeClient; import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; @@ -66,6 +67,10 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient PutTransformAction.Request request = PutTransformAction.Request.fromXContent(parser, id, deferValidation, timeout); - return channel -> client.execute(PutTransformAction.INSTANCE, request, new RestToXContentListener<>(channel)); + return channel -> new RestCancellableNodeClient(client, restRequest.getHttpChannel()).execute( + PutTransformAction.INSTANCE, + request, + new RestToXContentListener<>(channel) + ); } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestStartTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestStartTransformAction.java index fdfe2fe1744e7..9f2f310d7a9b9 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestStartTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestStartTransformAction.java @@ -17,6 +17,7 @@ import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.Scope; import org.elasticsearch.rest.ServerlessScope; +import org.elasticsearch.rest.action.RestCancellableNodeClient; import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xpack.core.transform.TransformField; @@ -45,7 +46,11 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient TimeValue timeout = restRequest.paramAsTime(TransformField.TIMEOUT.getPreferredName(), AcknowledgedRequest.DEFAULT_ACK_TIMEOUT); StartTransformAction.Request request = new StartTransformAction.Request(id, from, timeout); - return channel -> client.execute(StartTransformAction.INSTANCE, request, new RestToXContentListener<>(channel)); + return channel -> new RestCancellableNodeClient(client, restRequest.getHttpChannel()).execute( + StartTransformAction.INSTANCE, + request, + new RestToXContentListener<>(channel) + ); } private static Instant parseDateOrThrow(String date, ParseField paramName, LongSupplier now) {