diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java index 4f6a22f639fc8..c07a7afe29a92 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java @@ -30,6 +30,8 @@ public class TransformMessages { public static final String UNKNOWN_TRANSFORM_STATS = "Statistics for transform [{0}] could not be found"; public static final String REST_DEPRECATED_ENDPOINT = "[_data_frame/transforms/] is deprecated, use [_transform/] in the future."; + public static final String REST_WARN_NO_TRANSFORM_NODES = + "Transform requires the transform node role for at least 1 node, found [{0}] transform nodes"; public static final String CANNOT_STOP_FAILED_TRANSFORM = "Unable to stop transform [{0}] as it is in a failed state with reason [{1}]." + " Use force stop to stop the transform."; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformStats.java index da8cd08f1ceb6..198c0e274aee3 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformStats.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformStats.java @@ -150,7 +150,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_7_4_0)) { out.writeString(id); - out.writeEnum(state); + // 7.13 introduced the waiting state, in older version report the state as started + if (out.getVersion().before(Version.V_8_0_0) && state.equals(State.WAITING)) { // TODO: V_7_13_0 + out.writeEnum(State.STARTED); + } else { + out.writeEnum(state); + } out.writeOptionalString(reason); if (node != null) { out.writeBoolean(true); @@ -247,7 +252,8 @@ public enum State implements Writeable { ABORTING, STOPPING, STOPPED, - FAILED; + FAILED, + WAITING; public static State fromString(String name) { return valueOf(name.trim().toUpperCase(Locale.ROOT)); @@ -299,6 +305,7 @@ public String value() { return name().toLowerCase(Locale.ROOT); } + // only used when speaking to nodes < 7.4 (can be removed for 8.0) public Tuple toComponents() { switch (this) { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformNodes.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformNodes.java index 8a6d6f8f23111..004c11b441161 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformNodes.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformNodes.java @@ -8,25 +8,36 @@ package org.elasticsearch.xpack.transform.action; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.logging.HeaderWarning; +import org.elasticsearch.common.regex.Regex; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata.Assignment; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask; import org.elasticsearch.xpack.core.transform.TransformField; +import org.elasticsearch.xpack.core.transform.TransformMessages; +import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams; +import org.elasticsearch.xpack.transform.Transform; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.function.Predicate; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; public final class TransformNodes { private TransformNodes() {} /** - * Get the list of nodes transforms are executing on + * Get node assignments for a given list of transforms. * * @param transformIds The transforms. * @param clusterState State - * @return The executor nodes + * @return The {@link TransformNodeAssignments} for the given transforms. */ public static TransformNodeAssignments transformTaskNodes(List transformIds, ClusterState clusterState) { @@ -60,4 +71,86 @@ public static TransformNodeAssignments transformTaskNodes(List transform return new TransformNodeAssignments(executorNodes, assigned, waitingForAssignment, stopped); } + + /** + * Get node assignments for a given list of transform or transform pattern. + * + * Note: This only returns active assignments, stopped transforms are not reported. Active means running or waiting for a node. + * + * @param transformId The transform or a simply wildcard pattern. + * @param clusterState State + * @return The {@link TransformNodeAssignments} for the given transforms. + */ + public static TransformNodeAssignments findActiveTasks(String transformId, ClusterState clusterState) { + + Set executorNodes = new HashSet<>(); + Set assigned = new HashSet<>(); + Set waitingForAssignment = new HashSet<>(); + + PersistentTasksCustomMetadata tasksMetadata = PersistentTasksCustomMetadata.getPersistentTasksCustomMetadata(clusterState); + + if (tasksMetadata != null) { + Predicate> taskMatcher = Strings.isAllOrWildcard(new String[] { transformId }) ? t -> true : t -> { + TransformTaskParams transformParams = (TransformTaskParams) t.getParams(); + return Regex.simpleMatch(transformId, transformParams.getId()); + }; + + for (PersistentTasksCustomMetadata.PersistentTask task : tasksMetadata.findTasks(TransformField.TASK_NAME, taskMatcher)) { + if (task.isAssigned()) { + executorNodes.add(task.getExecutorNode()); + assigned.add(task.getId()); + } else { + waitingForAssignment.add(task.getId()); + } + } + } + + return new TransformNodeAssignments(executorNodes, assigned, waitingForAssignment, Collections.emptySet()); + } + + + /** + * Get the assignment of a specific transform + * + * @param transformId the transform id + * @param clusterState state + * @return {@link Assignment} of task + */ + public static Assignment getAssignment(String transformId, ClusterState clusterState) { + PersistentTasksCustomMetadata tasksMetadata = PersistentTasksCustomMetadata.getPersistentTasksCustomMetadata(clusterState); + + PersistentTask task = tasksMetadata.getTask(transformId); + + if (task != null) { + return task.getAssignment(); + } + + return PersistentTasksCustomMetadata.INITIAL_ASSIGNMENT; + + } + + /** + * Get the number of transform nodes in the cluster + * + * @param clusterState state + * @return number of transform nodes + */ + public static long getNumberOfTransformNodes(ClusterState clusterState) { + return StreamSupport.stream(clusterState.getNodes().spliterator(), false) + .filter(node -> node.getRoles().contains(Transform.TRANSFORM_ROLE)) + .count(); + } + + /** + * Check if cluster has at least 1 transform nodes and add a header warning if not. + * To be used by transport actions. + * + * @param clusterState state + */ + public static void warnIfNoTransformNodes(ClusterState clusterState) { + long transformNodes = getNumberOfTransformNodes(clusterState); + if (transformNodes == 0) { + HeaderWarning.addWarning(TransformMessages.getMessage(TransformMessages.REST_WARN_NO_TRANSFORM_NODES, transformNodes)); + } + } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformAction.java index f5b22f72f6d2b..cfa212e225828 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformAction.java @@ -11,6 +11,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -34,23 +36,37 @@ import static org.elasticsearch.xpack.core.transform.TransformField.INDEX_DOC_TYPE; -public class TransportGetTransformAction extends AbstractTransportGetResourcesAction { +public class TransportGetTransformAction extends AbstractTransportGetResourcesAction { + + private final ClusterService clusterService; @Inject - public TransportGetTransformAction(TransportService transportService, ActionFilters actionFilters, Client client, - NamedXContentRegistry xContentRegistry) { - this(GetTransformAction.NAME, transportService, actionFilters, client, xContentRegistry); + public TransportGetTransformAction( + TransportService transportService, + ActionFilters actionFilters, + ClusterService clusterService, + Client client, + NamedXContentRegistry xContentRegistry + ) { + this(GetTransformAction.NAME, transportService, actionFilters, clusterService, client, xContentRegistry); } - protected TransportGetTransformAction(String name, TransportService transportService, ActionFilters actionFilters, Client client, - NamedXContentRegistry xContentRegistry) { + protected TransportGetTransformAction( + String name, + TransportService transportService, + ActionFilters actionFilters, + ClusterService clusterService, + Client client, + NamedXContentRegistry xContentRegistry + ) { super(name, transportService, actionFilters, Request::new, client, xContentRegistry); + this.clusterService = clusterService; } @Override protected void doExecute(Task task, Request request, ActionListener listener) { + final ClusterState state = clusterService.state(); + TransformNodes.warnIfNoTransformNodes(state); searchResources(request, ActionListener.wrap( r -> listener.onResponse(new Response(r.results(), r.count())), listener::onFailure diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java index 92977d1646c9d..fca2cae81f6d2 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata.Assignment; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -131,17 +132,18 @@ protected void taskOperation(Request request, TransformTask task, ActionListener @Override protected void doExecute(Task task, Request request, ActionListener finalListener) { + final ClusterState state = clusterService.state(); + TransformNodes.warnIfNoTransformNodes(state); + transformConfigManager.expandTransformIds( request.getId(), request.getPageParams(), request.isAllowNoMatch(), ActionListener.wrap(hitsAndIds -> { request.setExpandedIds(hitsAndIds.v2()); - final ClusterState state = clusterService.state(); - TransformNodeAssignments transformNodeAssignments = TransformNodes.transformTaskNodes(hitsAndIds.v2(), state); - // TODO: if empty the request is send to all nodes(benign but superfluous) - request.setNodes(transformNodeAssignments.getExecutorNodes().toArray(new String[0])); - super.doExecute(task, request, ActionListener.wrap(response -> { + final TransformNodeAssignments transformNodeAssignments = TransformNodes.transformTaskNodes(hitsAndIds.v2(), state); + + ActionListener doExecuteListener = ActionListener.wrap(response -> { PersistentTasksCustomMetadata tasksInProgress = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE); if (tasksInProgress != null) { // Mutates underlying state object with the assigned node attributes @@ -150,6 +152,8 @@ protected void doExecute(Task task, Request request, ActionListener fi collectStatsForTransformsWithoutTasks( request, response, + transformNodeAssignments.getWaitingForAssignment(), + state, ActionListener.wrap( finalResponse -> finalListener.onResponse( new Response( @@ -162,7 +166,14 @@ protected void doExecute(Task task, Request request, ActionListener fi finalListener::onFailure ) ); - }, finalListener::onFailure)); + }, finalListener::onFailure); + + if (transformNodeAssignments.getExecutorNodes().size() > 0) { + request.setNodes(transformNodeAssignments.getExecutorNodes().toArray(new String[0])); + super.doExecute(task, request, doExecuteListener); + } else { + doExecuteListener.onResponse(new Response(Collections.emptyList(), 0L)); + } }, e -> { // If the index to search, or the individual config is not there, just return empty @@ -210,7 +221,13 @@ static TransformStats deriveStats(TransformTask task, @Nullable TransformCheckpo ); } - private void collectStatsForTransformsWithoutTasks(Request request, Response response, ActionListener listener) { + private void collectStatsForTransformsWithoutTasks( + Request request, + Response response, + Set transformsWaitingForAssignment, + ClusterState clusterState, + ActionListener listener + ) { // We gathered all there is, no need to continue if (request.getExpandedIds().size() == response.getTransformsStats().size()) { listener.onResponse(response); @@ -226,23 +243,30 @@ private void collectStatsForTransformsWithoutTasks(Request request, Response res // There is a potential race condition where the saved document does not actually have a STOPPED state // as the task is cancelled before we persist state. ActionListener> searchStatsListener = ActionListener.wrap(statsForTransformsWithoutTasks -> { - List allStateAndStats = response.getTransformsStats(); - addCheckpointingInfoForTransformsWithoutTasks(allStateAndStats, statsForTransformsWithoutTasks, ActionListener.wrap(aVoid -> { - transformsWithoutTasks.removeAll( - statsForTransformsWithoutTasks.stream().map(TransformStoredDoc::getId).collect(Collectors.toSet()) - ); + // copy the list as it might be immutable + List allStateAndStats = new ArrayList<>(response.getTransformsStats()); + addCheckpointingInfoForTransformsWithoutTasks( + allStateAndStats, + statsForTransformsWithoutTasks, + transformsWaitingForAssignment, + clusterState, + ActionListener.wrap(aVoid -> { + transformsWithoutTasks.removeAll( + statsForTransformsWithoutTasks.stream().map(TransformStoredDoc::getId).collect(Collectors.toSet()) + ); - // Transforms that have not been started and have no state or stats. - transformsWithoutTasks.forEach(transformId -> allStateAndStats.add(TransformStats.initialStats(transformId))); + // Transforms that have not been started and have no state or stats. + transformsWithoutTasks.forEach(transformId -> allStateAndStats.add(TransformStats.initialStats(transformId))); - // Any transform in collection could NOT have a task, so, even though the list is initially sorted - // it can easily become arbitrarily ordered based on which transforms don't have a task or stats docs - allStateAndStats.sort(Comparator.comparing(TransformStats::getId)); + // Any transform in collection could NOT have a task, so, even though the list is initially sorted + // it can easily become arbitrarily ordered based on which transforms don't have a task or stats docs + allStateAndStats.sort(Comparator.comparing(TransformStats::getId)); - listener.onResponse( - new Response(allStateAndStats, allStateAndStats.size(), response.getTaskFailures(), response.getNodeFailures()) - ); - }, listener::onFailure)); + listener.onResponse( + new Response(allStateAndStats, allStateAndStats.size(), response.getTaskFailures(), response.getNodeFailures()) + ); + }, listener::onFailure) + ); }, e -> { if (e instanceof IndexNotFoundException) { listener.onResponse(response); @@ -271,6 +295,8 @@ private void populateSingleStoppedTransformStat(TransformStoredDoc transform, Ac private void addCheckpointingInfoForTransformsWithoutTasks( List allStateAndStats, List statsForTransformsWithoutTasks, + Set transformsWaitingForAssignment, + ClusterState clusterState, ActionListener listener ) { @@ -285,9 +311,31 @@ private void addCheckpointingInfoForTransformsWithoutTasks( statsForTransformsWithoutTasks.forEach(stat -> populateSingleStoppedTransformStat(stat, ActionListener.wrap(checkpointingInfo -> { synchronized (allStateAndStats) { - allStateAndStats.add( - new TransformStats(stat.getId(), TransformStats.State.STOPPED, null, null, stat.getTransformStats(), checkpointingInfo) - ); + if (transformsWaitingForAssignment.contains(stat.getId())) { + // corner case: as we + Assignment assignment = TransformNodes.getAssignment(stat.getId(), clusterState); + allStateAndStats.add( + new TransformStats( + stat.getId(), + TransformStats.State.WAITING, + assignment.getExplanation(), + null, + stat.getTransformStats(), + checkpointingInfo + ) + ); + } else { + allStateAndStats.add( + new TransformStats( + stat.getId(), + TransformStats.State.STOPPED, + null, + null, + stat.getTransformStats(), + checkpointingInfo + ) + ); + } } if (numberRemaining.decrementAndGet() == 0) { listener.onResponse(null); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java index b1410a71a6039..ef65260557fd8 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java @@ -120,7 +120,8 @@ protected TransportPreviewTransformAction( @Override protected void doExecute(Task task, PreviewTransformAction.Request request, ActionListener listener) { - ClusterState clusterState = clusterService.state(); + final ClusterState clusterState = clusterService.state(); + TransformNodes.warnIfNoTransformNodes(clusterState); final TransformConfig config = request.getConfig(); sourceDestValidator.validate( diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java index 3cb6e403b6043..5d09ccc29b130 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java @@ -192,6 +192,7 @@ static HasPrivilegesRequest buildPrivilegeCheck( @Override protected void masterOperation(Task task, Request request, ClusterState clusterState, ActionListener listener) { XPackPlugin.checkReadyForXPackCustomMetadata(clusterState); + TransformNodes.warnIfNoTransformNodes(clusterState); // set headers to run transform as calling user Map filteredHeaders = ClientHelper.filterSecurityHeaders(threadPool.getThreadContext().getHeaders()); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java index b926175d17141..9cd81111c8ff2 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java @@ -148,6 +148,8 @@ protected void masterOperation( ClusterState state, ActionListener listener ) throws Exception { + TransformNodes.warnIfNoTransformNodes(state); + final AtomicReference transformTaskHolder = new AtomicReference<>(); final AtomicReference transformConfigHolder = new AtomicReference<>(); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java index ccb9b9b8fc86d..555f5f60cfc4b 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java @@ -23,10 +23,8 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.LoggerMessageFormat; -import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; @@ -37,13 +35,11 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.action.util.PageParams; -import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.TransformMessages; import org.elasticsearch.xpack.core.transform.action.StopTransformAction; import org.elasticsearch.xpack.core.transform.action.StopTransformAction.Request; import org.elasticsearch.xpack.core.transform.action.StopTransformAction.Response; import org.elasticsearch.xpack.core.transform.transforms.TransformState; -import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; @@ -55,7 +51,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -134,27 +129,6 @@ static void validateTaskState(ClusterState state, List transformIds, boo } } - static Tuple, Set> findTasksWithoutConfig(ClusterState state, String transformId) { - PersistentTasksCustomMetadata tasks = state.metadata().custom(PersistentTasksCustomMetadata.TYPE); - - Set taskIds = new HashSet<>(); - Set executorNodes = new HashSet<>(); - - if (tasks != null) { - Predicate> taskMatcher = Strings.isAllOrWildcard(new String[] { transformId }) ? t -> true : t -> { - TransformTaskParams transformParams = (TransformTaskParams) t.getParams(); - return Regex.simpleMatch(transformId, transformParams.getId()); - }; - - for (PersistentTasksCustomMetadata.PersistentTask pTask : tasks.findTasks(TransformField.TASK_NAME, taskMatcher)) { - executorNodes.add(pTask.getExecutorNode()); - taskIds.add(pTask.getId()); - } - } - - return new Tuple<>(taskIds, executorNodes); - } - @Override protected void doExecute(Task task, Request request, ActionListener listener) { final ClusterState state = clusterService.state(); @@ -173,6 +147,8 @@ protected void doExecute(Task task, Request request, ActionListener li ); } } else { + TransformNodes.warnIfNoTransformNodes(state); + final ActionListener finalListener; if (request.waitForCompletion()) { finalListener = waitForStopListener(request, listener); @@ -204,21 +180,39 @@ protected void doExecute(Task task, Request request, ActionListener li } }, e -> { if (e instanceof ResourceNotFoundException) { - Tuple, Set> runningTasksAndNodes = findTasksWithoutConfig(state, request.getId()); - if (runningTasksAndNodes.v1().isEmpty()) { + final TransformNodeAssignments transformNodeAssignments = TransformNodes.findActiveTasks(request.getId(), state); + + if (transformNodeAssignments.getAssigned().isEmpty() + && transformNodeAssignments.getWaitingForAssignment().isEmpty()) { listener.onFailure(e); // found transforms without a config } else if (request.isForce()) { - // TODO: handle tasks waiting for assignment - request.setExpandedIds(runningTasksAndNodes.v1()); - request.setNodes(runningTasksAndNodes.v2().toArray(new String[0])); - super.doExecute(task, request, finalListener); + final ActionListener doExecuteListener; + + if (transformNodeAssignments.getWaitingForAssignment().size() > 0) { + doExecuteListener = cancelTransformTasksWithNoAssignment(finalListener, transformNodeAssignments); + } else { + doExecuteListener = finalListener; + } + + if (transformNodeAssignments.getExecutorNodes().size() > 0) { + request.setExpandedIds(transformNodeAssignments.getAssigned()); + request.setNodes(transformNodeAssignments.getExecutorNodes().toArray(new String[0])); + super.doExecute(task, request, doExecuteListener); + } else { + doExecuteListener.onResponse(new Response(true)); + } } else { + Set transformsWithoutConfig = Stream.concat( + transformNodeAssignments.getAssigned().stream(), + transformNodeAssignments.getWaitingForAssignment().stream() + ).collect(Collectors.toSet()); + listener.onFailure( new ElasticsearchStatusException( TransformMessages.getMessage( TransformMessages.REST_STOP_TRANSFORM_WITHOUT_CONFIG, - Strings.arrayToCommaDelimitedString(runningTasksAndNodes.v1().toArray(new String[0])) + Strings.collectionToCommaDelimitedString(transformsWithoutConfig) ), RestStatus.CONFLICT ) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java index 36997120eaebf..7a9c5f908aaa5 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java @@ -181,6 +181,8 @@ protected void doExecute(Task task, Request request, ActionListener li } return; } + TransformNodes.warnIfNoTransformNodes(clusterState); + // set headers to run transform as calling user Map filteredHeaders = ClientHelper.filterSecurityHeaders(threadPool.getThreadContext().getHeaders()); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/compat/TransportGetTransformActionDeprecated.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/compat/TransportGetTransformActionDeprecated.java index 46baa66e5e3d0..b57d092d76f4b 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/compat/TransportGetTransformActionDeprecated.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/compat/TransportGetTransformActionDeprecated.java @@ -9,6 +9,7 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.transport.TransportService; @@ -18,9 +19,14 @@ public class TransportGetTransformActionDeprecated extends TransportGetTransformAction { @Inject - public TransportGetTransformActionDeprecated(TransportService transportService, ActionFilters actionFilters, Client client, - NamedXContentRegistry xContentRegistry) { - super(GetTransformActionDeprecated.NAME, transportService, actionFilters, client, xContentRegistry); + public TransportGetTransformActionDeprecated( + TransportService transportService, + ActionFilters actionFilters, + ClusterService clusterService, + Client client, + NamedXContentRegistry xContentRegistry + ) { + super(GetTransformActionDeprecated.NAME, transportService, actionFilters, clusterService, client, xContentRegistry); } } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransformNodesTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransformNodesTests.java index 0e05839ea8517..d833967b26058 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransformNodesTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransformNodesTests.java @@ -109,11 +109,19 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) { assertEquals(1, transformNodeAssignments.getStopped().size()); assertTrue(transformNodeAssignments.getStopped().contains(transformIdStopped)); - transformNodeAssignments = TransformNodes.transformTaskNodes( - Arrays.asList(transformIdFoo, transformIdFailed), - cs - ); + transformNodeAssignments = TransformNodes.transformTaskNodes(Arrays.asList(transformIdFoo, transformIdFailed), cs); + + assertEquals(1, transformNodeAssignments.getExecutorNodes().size()); + assertTrue(transformNodeAssignments.getExecutorNodes().contains("node-1")); + assertEquals(1, transformNodeAssignments.getWaitingForAssignment().size()); + assertTrue(transformNodeAssignments.getWaitingForAssignment().contains(transformIdFailed)); + assertEquals(1, transformNodeAssignments.getAssigned().size()); + assertTrue(transformNodeAssignments.getAssigned().contains(transformIdFoo)); + assertFalse(transformNodeAssignments.getAssigned().contains(transformIdFailed)); + assertEquals(0, transformNodeAssignments.getStopped().size()); + // test simple matching + transformNodeAssignments = TransformNodes.findActiveTasks("df-id-f*", cs); assertEquals(1, transformNodeAssignments.getExecutorNodes().size()); assertTrue(transformNodeAssignments.getExecutorNodes().contains("node-1")); assertEquals(1, transformNodeAssignments.getWaitingForAssignment().size()); @@ -122,6 +130,57 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) { assertTrue(transformNodeAssignments.getAssigned().contains(transformIdFoo)); assertFalse(transformNodeAssignments.getAssigned().contains(transformIdFailed)); assertEquals(0, transformNodeAssignments.getStopped().size()); + + // test matching none + transformNodeAssignments = TransformNodes.findActiveTasks("df-id-z*", cs); + assertEquals(0, transformNodeAssignments.getExecutorNodes().size()); + assertEquals(0, transformNodeAssignments.getWaitingForAssignment().size()); + assertEquals(0, transformNodeAssignments.getAssigned().size()); + assertEquals(0, transformNodeAssignments.getStopped().size()); + + // test matching all + transformNodeAssignments = TransformNodes.findActiveTasks("df-id-*", cs); + assertEquals(3, transformNodeAssignments.getExecutorNodes().size()); + assertTrue(transformNodeAssignments.getExecutorNodes().contains("node-1")); + assertTrue(transformNodeAssignments.getExecutorNodes().contains("node-2")); + assertTrue(transformNodeAssignments.getExecutorNodes().contains("node-3")); + assertEquals(1, transformNodeAssignments.getWaitingForAssignment().size()); + assertTrue(transformNodeAssignments.getWaitingForAssignment().contains(transformIdFailed)); + assertEquals(4, transformNodeAssignments.getAssigned().size()); + assertTrue(transformNodeAssignments.getAssigned().contains(transformIdFoo)); + assertTrue(transformNodeAssignments.getAssigned().contains(transformIdBar)); + assertTrue(transformNodeAssignments.getAssigned().contains(transformIdBaz)); + assertTrue(transformNodeAssignments.getAssigned().contains(transformIdOther)); + assertFalse(transformNodeAssignments.getAssigned().contains(transformIdFailed)); + // stopped tasks are not reported when matching against _running_ tasks + assertEquals(0, transformNodeAssignments.getStopped().size()); + + // test matching all with _all + transformNodeAssignments = TransformNodes.findActiveTasks("_all", cs); + assertEquals(3, transformNodeAssignments.getExecutorNodes().size()); + assertTrue(transformNodeAssignments.getExecutorNodes().contains("node-1")); + assertTrue(transformNodeAssignments.getExecutorNodes().contains("node-2")); + assertTrue(transformNodeAssignments.getExecutorNodes().contains("node-3")); + assertEquals(1, transformNodeAssignments.getWaitingForAssignment().size()); + assertTrue(transformNodeAssignments.getWaitingForAssignment().contains(transformIdFailed)); + assertEquals(4, transformNodeAssignments.getAssigned().size()); + assertTrue(transformNodeAssignments.getAssigned().contains(transformIdFoo)); + assertTrue(transformNodeAssignments.getAssigned().contains(transformIdBar)); + assertTrue(transformNodeAssignments.getAssigned().contains(transformIdBaz)); + assertTrue(transformNodeAssignments.getAssigned().contains(transformIdOther)); + assertFalse(transformNodeAssignments.getAssigned().contains(transformIdFailed)); + // stopped tasks are not reported when matching against _running_ tasks + assertEquals(0, transformNodeAssignments.getStopped().size()); + + // test matching exact + transformNodeAssignments = TransformNodes.findActiveTasks(transformIdFoo, cs); + assertEquals(1, transformNodeAssignments.getExecutorNodes().size()); + assertTrue(transformNodeAssignments.getExecutorNodes().contains("node-1")); + assertEquals(0, transformNodeAssignments.getWaitingForAssignment().size()); + assertEquals(1, transformNodeAssignments.getAssigned().size()); + assertTrue(transformNodeAssignments.getAssigned().contains(transformIdFoo)); + // stopped tasks are not reported when matching against _running_ tasks + assertEquals(0, transformNodeAssignments.getStopped().size()); } public void testTransformNodes_NoTasks() { @@ -134,5 +193,12 @@ public void testTransformNodes_NoTasks() { assertEquals(0, transformNodeAssignments.getExecutorNodes().size()); assertEquals(1, transformNodeAssignments.getStopped().size()); assertTrue(transformNodeAssignments.getStopped().contains("df-id")); + + transformNodeAssignments = TransformNodes.findActiveTasks("df-*", emptyState); + + assertEquals(0, transformNodeAssignments.getExecutorNodes().size()); + assertEquals(0, transformNodeAssignments.getWaitingForAssignment().size()); + assertEquals(0, transformNodeAssignments.getAssigned().size()); + assertEquals(0, transformNodeAssignments.getStopped().size()); } }