diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java index 62db81bd68e01..7030bef17c4a6 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; @@ -50,6 +51,7 @@ import java.util.List; import java.util.Map; +import static org.elasticsearch.core.Strings.format; import static org.elasticsearch.xpack.transform.utils.SecondaryAuthorizationUtils.useSecondaryAuthIfAvailable; public class TransportUpdateTransformAction extends TransportTasksAction { @@ -169,9 +171,35 @@ protected void doExecute(Task task, Request request, ActionListener li && ((TransformState) transformTask.getState()).getTaskState() != TransformTaskState.FAILED && clusterState.nodes().get(transformTask.getExecutorNode()).getVersion().onOrAfter(Version.V_7_8_0)) { + ActionListener taskUpdateListener = ActionListener.wrap(listener::onResponse, e -> { + // benign: A transform might be stopped meanwhile, this is not a problem + if (e instanceof TransformTaskDisappearedDuringUpdateException) { + logger.debug("[{}] transform task disappeared during update, ignoring", request.getId()); + listener.onResponse(new Response(updatedConfig)); + return; + } + + if (e instanceof TransformTaskUpdateException) { + // BWC: only log a warning as response object can not be changed + logger.warn( + () -> format( + "[%s] failed to notify running transform task about update. " + + "New settings will be applied after next checkpoint.", + request.getId() + ), + e + ); + + listener.onResponse(new Response(updatedConfig)); + return; + } + + listener.onFailure(e); + }); + request.setNodes(transformTask.getExecutorNode()); request.setConfig(updatedConfig); - super.doExecute(task, request, listener); + super.doExecute(task, request, taskUpdateListener); return; } } @@ -208,8 +236,29 @@ protected Response newResponse( List taskOperationFailures, List failedNodeExceptions ) { - // there should be only 1 response, todo: check + if (tasks.isEmpty()) { + if (taskOperationFailures.isEmpty() == false) { + throw new TransformTaskUpdateException("Failed to update running transform task.", taskOperationFailures.get(0).getCause()); + } else if (failedNodeExceptions.isEmpty() == false) { + throw new TransformTaskUpdateException("Failed to update running transform task.", failedNodeExceptions.get(0)); + } else { + throw new TransformTaskDisappearedDuringUpdateException("Could not update running transform as it has been stopped."); + } + } + return tasks.get(0); } + private static class TransformTaskUpdateException extends ElasticsearchException { + TransformTaskUpdateException(String msg, Throwable cause, Object... args) { + super(msg, cause, args); + } + } + + private static class TransformTaskDisappearedDuringUpdateException extends ElasticsearchException { + TransformTaskDisappearedDuringUpdateException(String msg) { + super(msg); + } + } + }