diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/TransformStats.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/TransformStats.java index 0cdf4c2c12fdf..abdd3094b89ed 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/TransformStats.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/TransformStats.java @@ -117,7 +117,7 @@ public boolean equals(Object other) { public enum State { - STARTED, INDEXING, ABORTING, STOPPING, STOPPED, FAILED; + STARTED, INDEXING, ABORTING, STOPPING, STOPPED, FAILED, WAITING; public static State fromString(String name) { return valueOf(name.trim().toUpperCase(Locale.ROOT)); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java index 4f6a22f639fc8..21457002c03c4 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMessages.java @@ -30,6 +30,8 @@ public class TransformMessages { public static final String UNKNOWN_TRANSFORM_STATS = "Statistics for transform [{0}] could not be found"; public static final String REST_DEPRECATED_ENDPOINT = "[_data_frame/transforms/] is deprecated, use [_transform/] in the future."; + public static final String REST_WARN_NO_TRANSFORM_NODES = + "Transform requires the transform node role for at least 1 node, found no transform nodes"; public static final String CANNOT_STOP_FAILED_TRANSFORM = "Unable to stop transform [{0}] as it is in a failed state with reason [{1}]." + " Use force stop to stop the transform."; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformStats.java index da8cd08f1ceb6..198c0e274aee3 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformStats.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformStats.java @@ -150,7 +150,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_7_4_0)) { out.writeString(id); - out.writeEnum(state); + // 7.13 introduced the waiting state, in older version report the state as started + if (out.getVersion().before(Version.V_8_0_0) && state.equals(State.WAITING)) { // TODO: V_7_13_0 + out.writeEnum(State.STARTED); + } else { + out.writeEnum(state); + } out.writeOptionalString(reason); if (node != null) { out.writeBoolean(true); @@ -247,7 +252,8 @@ public enum State implements Writeable { ABORTING, STOPPING, STOPPED, - FAILED; + FAILED, + WAITING; public static State fromString(String name) { return valueOf(name.trim().toUpperCase(Locale.ROOT)); @@ -299,6 +305,7 @@ public String value() { return name().toLowerCase(Locale.ROOT); } + // only used when speaking to nodes < 7.4 (can be removed for 8.0) public Tuple toComponents() { switch (this) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStatsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStatsTests.java index 48727b9235c03..1e3d98f142f5d 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStatsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStatsTests.java @@ -18,6 +18,7 @@ import java.util.function.Predicate; import static org.elasticsearch.xpack.core.transform.transforms.TransformStats.State.STARTED; +import static org.elasticsearch.xpack.core.transform.transforms.TransformStats.State.WAITING; import static org.hamcrest.Matchers.equalTo; public class TransformStatsTests extends AbstractSerializingTestCase { @@ -120,4 +121,33 @@ public void testBwcWith76() throws IOException { } } } + + public void testBwcWith712() throws IOException { + for (int i = 0; i < NUMBER_OF_TEST_RUNS; i++) { + TransformStats stats = new TransformStats( + "bwc-id", + WAITING, + randomBoolean() ? null : randomAlphaOfLength(100), + randomBoolean() ? null : NodeAttributeTests.randomNodeAttributes(), + new TransformIndexerStats(1, 2, 3, 0, 5, 6, 7, 0, 0, 10, 11, 0, 13, 14, 15.0, 16.0, 17.0), + new TransformCheckpointingInfo( + new TransformCheckpointStats(0, null, null, 10, 100), + new TransformCheckpointStats(0, null, null, 100, 1000), + // changesLastDetectedAt aren't serialized back + 100, + null, + null + ) + ); + try (BytesStreamOutput output = new BytesStreamOutput()) { + output.setVersion(Version.V_7_12_0); + stats.writeTo(output); + try (StreamInput in = output.bytes().streamInput()) { + in.setVersion(Version.V_8_0_0); // TODO: V_7_13_0 + TransformStats statsFromOld = new TransformStats(in); + assertThat(statsFromOld.getState(), equalTo(STARTED)); + } + } + } + } } diff --git a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/TransformSingleNodeTestCase.java b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/TransformSingleNodeTestCase.java index e5334bb8702e4..85a357b5d8a66 100644 --- a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/TransformSingleNodeTestCase.java +++ b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/TransformSingleNodeTestCase.java @@ -13,7 +13,9 @@ import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.reindex.ReindexPlugin; +import org.elasticsearch.node.NodeRoleSettings; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.test.ESSingleNodeTestCase; @@ -32,6 +34,11 @@ protected Collection> getPlugins() { return pluginList(LocalStateTransform.class, ReindexPlugin.class); } + @Override + protected Settings nodeSettings() { + return Settings.builder().put(NodeRoleSettings.NODE_ROLES_SETTING.getKey(), "master, data, ingest, transform").build(); + } + protected void assertAsync(Consumer> function, T expected, CheckedConsumer onAnswer, Consumer onException) throws InterruptedException { diff --git a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/integration/TransformNoTransformNodeIT.java b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/integration/TransformNoTransformNodeIT.java new file mode 100644 index 0000000000000..54305c1d559c8 --- /dev/null +++ b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/integration/TransformNoTransformNodeIT.java @@ -0,0 +1,41 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.transform.integration; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.node.NodeRoleSettings; +import org.elasticsearch.xpack.core.transform.action.GetTransformAction; +import org.elasticsearch.xpack.core.transform.action.GetTransformStatsAction; +import org.elasticsearch.xpack.transform.TransformSingleNodeTestCase; + +public class TransformNoTransformNodeIT extends TransformSingleNodeTestCase { + @Override + protected Settings nodeSettings() { + return Settings.builder().put(NodeRoleSettings.NODE_ROLES_SETTING.getKey(), "master, data, ingest").build(); + } + + public void testWarningForStats() { + GetTransformStatsAction.Request getTransformStatsRequest = new GetTransformStatsAction.Request("_all"); + GetTransformStatsAction.Response getTransformStatsResponse = client().execute( + GetTransformStatsAction.INSTANCE, + getTransformStatsRequest + ).actionGet(); + + assertEquals(0, getTransformStatsResponse.getTransformsStats().size()); + + assertWarnings("Transform requires the transform node role for at least 1 node, found no transform nodes"); + } + + public void testWarningForGet() { + GetTransformAction.Request getTransformRequest = new GetTransformAction.Request("_all"); + GetTransformAction.Response getTransformResponse = client().execute(GetTransformAction.INSTANCE, getTransformRequest).actionGet(); + assertEquals(0, getTransformResponse.getTransformConfigurations().size()); + + assertWarnings("Transform requires the transform node role for at least 1 node, found no transform nodes"); + } +} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformNodes.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformNodes.java index 8a6d6f8f23111..853b7aaf71161 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformNodes.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransformNodes.java @@ -8,28 +8,38 @@ package org.elasticsearch.xpack.transform.action; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.logging.HeaderWarning; +import org.elasticsearch.common.regex.Regex; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata.Assignment; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask; import org.elasticsearch.xpack.core.transform.TransformField; +import org.elasticsearch.xpack.core.transform.TransformMessages; +import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams; +import org.elasticsearch.xpack.transform.Transform; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.function.Predicate; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; public final class TransformNodes { private TransformNodes() {} /** - * Get the list of nodes transforms are executing on + * Get node assignments for a given list of transforms. * * @param transformIds The transforms. * @param clusterState State - * @return The executor nodes + * @return The {@link TransformNodeAssignments} for the given transforms. */ public static TransformNodeAssignments transformTaskNodes(List transformIds, ClusterState clusterState) { - Set executorNodes = new HashSet<>(); Set assigned = new HashSet<>(); Set waitingForAssignment = new HashSet<>(); @@ -60,4 +70,81 @@ public static TransformNodeAssignments transformTaskNodes(List transform return new TransformNodeAssignments(executorNodes, assigned, waitingForAssignment, stopped); } + + /** + * Get node assignments for a given transform pattern. + * + * Note: This only returns p-task assignments, stopped transforms are not reported. P-Tasks can be running or waiting for a node. + * + * @param transformId The transform or a wildcard pattern, including '_all' to match against transform tasks. + * @param clusterState State + * @return The {@link TransformNodeAssignments} for the given pattern. + */ + public static TransformNodeAssignments findPersistentTasks(String transformId, ClusterState clusterState) { + Set executorNodes = new HashSet<>(); + Set assigned = new HashSet<>(); + Set waitingForAssignment = new HashSet<>(); + + PersistentTasksCustomMetadata tasksMetadata = PersistentTasksCustomMetadata.getPersistentTasksCustomMetadata(clusterState); + + if (tasksMetadata != null) { + Predicate> taskMatcher = Strings.isAllOrWildcard(new String[] { transformId }) ? t -> true : t -> { + TransformTaskParams transformParams = (TransformTaskParams) t.getParams(); + return Regex.simpleMatch(transformId, transformParams.getId()); + }; + + for (PersistentTasksCustomMetadata.PersistentTask task : tasksMetadata.findTasks(TransformField.TASK_NAME, taskMatcher)) { + if (task.isAssigned()) { + executorNodes.add(task.getExecutorNode()); + assigned.add(task.getId()); + } else { + waitingForAssignment.add(task.getId()); + } + } + } + return new TransformNodeAssignments(executorNodes, assigned, waitingForAssignment, Collections.emptySet()); + } + + /** + * Get the assignment of a specific transform. + * + * @param transformId the transform id + * @param clusterState state + * @return {@link Assignment} of task + */ + public static Assignment getAssignment(String transformId, ClusterState clusterState) { + PersistentTasksCustomMetadata tasksMetadata = PersistentTasksCustomMetadata.getPersistentTasksCustomMetadata(clusterState); + PersistentTask task = tasksMetadata.getTask(transformId); + + if (task != null) { + return task.getAssignment(); + } + + return PersistentTasksCustomMetadata.INITIAL_ASSIGNMENT; + } + + /** + * Get the number of transform nodes in the cluster + * + * @param clusterState state + * @return number of transform nodes + */ + public static long getNumberOfTransformNodes(ClusterState clusterState) { + return StreamSupport.stream(clusterState.getNodes().spliterator(), false) + .filter(node -> node.getRoles().contains(Transform.TRANSFORM_ROLE)) + .count(); + } + + /** + * Check if cluster has at least 1 transform nodes and add a header warning if not. + * To be used by transport actions only. + * + * @param clusterState state + */ + public static void warnIfNoTransformNodes(ClusterState clusterState) { + long transformNodes = getNumberOfTransformNodes(clusterState); + if (transformNodes == 0) { + HeaderWarning.addWarning(TransformMessages.REST_WARN_NO_TRANSFORM_NODES); + } + } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformAction.java index f5b22f72f6d2b..cfa212e225828 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformAction.java @@ -11,6 +11,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -34,23 +36,37 @@ import static org.elasticsearch.xpack.core.transform.TransformField.INDEX_DOC_TYPE; -public class TransportGetTransformAction extends AbstractTransportGetResourcesAction { +public class TransportGetTransformAction extends AbstractTransportGetResourcesAction { + + private final ClusterService clusterService; @Inject - public TransportGetTransformAction(TransportService transportService, ActionFilters actionFilters, Client client, - NamedXContentRegistry xContentRegistry) { - this(GetTransformAction.NAME, transportService, actionFilters, client, xContentRegistry); + public TransportGetTransformAction( + TransportService transportService, + ActionFilters actionFilters, + ClusterService clusterService, + Client client, + NamedXContentRegistry xContentRegistry + ) { + this(GetTransformAction.NAME, transportService, actionFilters, clusterService, client, xContentRegistry); } - protected TransportGetTransformAction(String name, TransportService transportService, ActionFilters actionFilters, Client client, - NamedXContentRegistry xContentRegistry) { + protected TransportGetTransformAction( + String name, + TransportService transportService, + ActionFilters actionFilters, + ClusterService clusterService, + Client client, + NamedXContentRegistry xContentRegistry + ) { super(name, transportService, actionFilters, Request::new, client, xContentRegistry); + this.clusterService = clusterService; } @Override protected void doExecute(Task task, Request request, ActionListener listener) { + final ClusterState state = clusterService.state(); + TransformNodes.warnIfNoTransformNodes(state); searchResources(request, ActionListener.wrap( r -> listener.onResponse(new Response(r.results(), r.count())), listener::onFailure diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java index 92977d1646c9d..8ca8155796d93 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata.Assignment; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -131,17 +132,18 @@ protected void taskOperation(Request request, TransformTask task, ActionListener @Override protected void doExecute(Task task, Request request, ActionListener finalListener) { + final ClusterState state = clusterService.state(); + TransformNodes.warnIfNoTransformNodes(state); + transformConfigManager.expandTransformIds( request.getId(), request.getPageParams(), request.isAllowNoMatch(), ActionListener.wrap(hitsAndIds -> { request.setExpandedIds(hitsAndIds.v2()); - final ClusterState state = clusterService.state(); - TransformNodeAssignments transformNodeAssignments = TransformNodes.transformTaskNodes(hitsAndIds.v2(), state); - // TODO: if empty the request is send to all nodes(benign but superfluous) - request.setNodes(transformNodeAssignments.getExecutorNodes().toArray(new String[0])); - super.doExecute(task, request, ActionListener.wrap(response -> { + final TransformNodeAssignments transformNodeAssignments = TransformNodes.transformTaskNodes(hitsAndIds.v2(), state); + + ActionListener doExecuteListener = ActionListener.wrap(response -> { PersistentTasksCustomMetadata tasksInProgress = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE); if (tasksInProgress != null) { // Mutates underlying state object with the assigned node attributes @@ -150,6 +152,8 @@ protected void doExecute(Task task, Request request, ActionListener fi collectStatsForTransformsWithoutTasks( request, response, + transformNodeAssignments.getWaitingForAssignment(), + state, ActionListener.wrap( finalResponse -> finalListener.onResponse( new Response( @@ -162,7 +166,14 @@ protected void doExecute(Task task, Request request, ActionListener fi finalListener::onFailure ) ); - }, finalListener::onFailure)); + }, finalListener::onFailure); + + if (transformNodeAssignments.getExecutorNodes().size() > 0) { + request.setNodes(transformNodeAssignments.getExecutorNodes().toArray(new String[0])); + super.doExecute(task, request, doExecuteListener); + } else { + doExecuteListener.onResponse(new Response(Collections.emptyList(), 0L)); + } }, e -> { // If the index to search, or the individual config is not there, just return empty @@ -210,7 +221,13 @@ static TransformStats deriveStats(TransformTask task, @Nullable TransformCheckpo ); } - private void collectStatsForTransformsWithoutTasks(Request request, Response response, ActionListener listener) { + private void collectStatsForTransformsWithoutTasks( + Request request, + Response response, + Set transformsWaitingForAssignment, + ClusterState clusterState, + ActionListener listener + ) { // We gathered all there is, no need to continue if (request.getExpandedIds().size() == response.getTransformsStats().size()) { listener.onResponse(response); @@ -226,23 +243,30 @@ private void collectStatsForTransformsWithoutTasks(Request request, Response res // There is a potential race condition where the saved document does not actually have a STOPPED state // as the task is cancelled before we persist state. ActionListener> searchStatsListener = ActionListener.wrap(statsForTransformsWithoutTasks -> { - List allStateAndStats = response.getTransformsStats(); - addCheckpointingInfoForTransformsWithoutTasks(allStateAndStats, statsForTransformsWithoutTasks, ActionListener.wrap(aVoid -> { - transformsWithoutTasks.removeAll( - statsForTransformsWithoutTasks.stream().map(TransformStoredDoc::getId).collect(Collectors.toSet()) - ); + // copy the list as it might be immutable + List allStateAndStats = new ArrayList<>(response.getTransformsStats()); + addCheckpointingInfoForTransformsWithoutTasks( + allStateAndStats, + statsForTransformsWithoutTasks, + transformsWaitingForAssignment, + clusterState, + ActionListener.wrap(aVoid -> { + transformsWithoutTasks.removeAll( + statsForTransformsWithoutTasks.stream().map(TransformStoredDoc::getId).collect(Collectors.toSet()) + ); - // Transforms that have not been started and have no state or stats. - transformsWithoutTasks.forEach(transformId -> allStateAndStats.add(TransformStats.initialStats(transformId))); + // Transforms that have not been started and have no state or stats. + transformsWithoutTasks.forEach(transformId -> allStateAndStats.add(TransformStats.initialStats(transformId))); - // Any transform in collection could NOT have a task, so, even though the list is initially sorted - // it can easily become arbitrarily ordered based on which transforms don't have a task or stats docs - allStateAndStats.sort(Comparator.comparing(TransformStats::getId)); + // Any transform in collection could NOT have a task, so, even though the list is initially sorted + // it can easily become arbitrarily ordered based on which transforms don't have a task or stats docs + allStateAndStats.sort(Comparator.comparing(TransformStats::getId)); - listener.onResponse( - new Response(allStateAndStats, allStateAndStats.size(), response.getTaskFailures(), response.getNodeFailures()) - ); - }, listener::onFailure)); + listener.onResponse( + new Response(allStateAndStats, allStateAndStats.size(), response.getTaskFailures(), response.getNodeFailures()) + ); + }, listener::onFailure) + ); }, e -> { if (e instanceof IndexNotFoundException) { listener.onResponse(response); @@ -271,6 +295,8 @@ private void populateSingleStoppedTransformStat(TransformStoredDoc transform, Ac private void addCheckpointingInfoForTransformsWithoutTasks( List allStateAndStats, List statsForTransformsWithoutTasks, + Set transformsWaitingForAssignment, + ClusterState clusterState, ActionListener listener ) { @@ -285,9 +311,30 @@ private void addCheckpointingInfoForTransformsWithoutTasks( statsForTransformsWithoutTasks.forEach(stat -> populateSingleStoppedTransformStat(stat, ActionListener.wrap(checkpointingInfo -> { synchronized (allStateAndStats) { - allStateAndStats.add( - new TransformStats(stat.getId(), TransformStats.State.STOPPED, null, null, stat.getTransformStats(), checkpointingInfo) - ); + if (transformsWaitingForAssignment.contains(stat.getId())) { + Assignment assignment = TransformNodes.getAssignment(stat.getId(), clusterState); + allStateAndStats.add( + new TransformStats( + stat.getId(), + TransformStats.State.WAITING, + assignment.getExplanation(), + null, + stat.getTransformStats(), + checkpointingInfo + ) + ); + } else { + allStateAndStats.add( + new TransformStats( + stat.getId(), + TransformStats.State.STOPPED, + null, + null, + stat.getTransformStats(), + checkpointingInfo + ) + ); + } } if (numberRemaining.decrementAndGet() == 0) { listener.onResponse(null); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java index b1410a71a6039..ef65260557fd8 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java @@ -120,7 +120,8 @@ protected TransportPreviewTransformAction( @Override protected void doExecute(Task task, PreviewTransformAction.Request request, ActionListener listener) { - ClusterState clusterState = clusterService.state(); + final ClusterState clusterState = clusterService.state(); + TransformNodes.warnIfNoTransformNodes(clusterState); final TransformConfig config = request.getConfig(); sourceDestValidator.validate( diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java index 3cb6e403b6043..5d09ccc29b130 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPutTransformAction.java @@ -192,6 +192,7 @@ static HasPrivilegesRequest buildPrivilegeCheck( @Override protected void masterOperation(Task task, Request request, ClusterState clusterState, ActionListener listener) { XPackPlugin.checkReadyForXPackCustomMetadata(clusterState); + TransformNodes.warnIfNoTransformNodes(clusterState); // set headers to run transform as calling user Map filteredHeaders = ClientHelper.filterSecurityHeaders(threadPool.getThreadContext().getHeaders()); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java index b926175d17141..9cd81111c8ff2 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStartTransformAction.java @@ -148,6 +148,8 @@ protected void masterOperation( ClusterState state, ActionListener listener ) throws Exception { + TransformNodes.warnIfNoTransformNodes(state); + final AtomicReference transformTaskHolder = new AtomicReference<>(); final AtomicReference transformConfigHolder = new AtomicReference<>(); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java index ccb9b9b8fc86d..82d91470dbc45 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportStopTransformAction.java @@ -23,10 +23,8 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.LoggerMessageFormat; -import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; @@ -37,13 +35,11 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.action.util.PageParams; -import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.TransformMessages; import org.elasticsearch.xpack.core.transform.action.StopTransformAction; import org.elasticsearch.xpack.core.transform.action.StopTransformAction.Request; import org.elasticsearch.xpack.core.transform.action.StopTransformAction.Response; import org.elasticsearch.xpack.core.transform.transforms.TransformState; -import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; @@ -55,7 +51,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -134,27 +129,6 @@ static void validateTaskState(ClusterState state, List transformIds, boo } } - static Tuple, Set> findTasksWithoutConfig(ClusterState state, String transformId) { - PersistentTasksCustomMetadata tasks = state.metadata().custom(PersistentTasksCustomMetadata.TYPE); - - Set taskIds = new HashSet<>(); - Set executorNodes = new HashSet<>(); - - if (tasks != null) { - Predicate> taskMatcher = Strings.isAllOrWildcard(new String[] { transformId }) ? t -> true : t -> { - TransformTaskParams transformParams = (TransformTaskParams) t.getParams(); - return Regex.simpleMatch(transformId, transformParams.getId()); - }; - - for (PersistentTasksCustomMetadata.PersistentTask pTask : tasks.findTasks(TransformField.TASK_NAME, taskMatcher)) { - executorNodes.add(pTask.getExecutorNode()); - taskIds.add(pTask.getId()); - } - } - - return new Tuple<>(taskIds, executorNodes); - } - @Override protected void doExecute(Task task, Request request, ActionListener listener) { final ClusterState state = clusterService.state(); @@ -173,6 +147,8 @@ protected void doExecute(Task task, Request request, ActionListener li ); } } else { + TransformNodes.warnIfNoTransformNodes(state); + final ActionListener finalListener; if (request.waitForCompletion()) { finalListener = waitForStopListener(request, listener); @@ -204,21 +180,42 @@ protected void doExecute(Task task, Request request, ActionListener li } }, e -> { if (e instanceof ResourceNotFoundException) { - Tuple, Set> runningTasksAndNodes = findTasksWithoutConfig(state, request.getId()); - if (runningTasksAndNodes.v1().isEmpty()) { + final TransformNodeAssignments transformNodeAssignments = TransformNodes.findPersistentTasks( + request.getId(), + state + ); + + if (transformNodeAssignments.getAssigned().isEmpty() + && transformNodeAssignments.getWaitingForAssignment().isEmpty()) { listener.onFailure(e); // found transforms without a config } else if (request.isForce()) { - // TODO: handle tasks waiting for assignment - request.setExpandedIds(runningTasksAndNodes.v1()); - request.setNodes(runningTasksAndNodes.v2().toArray(new String[0])); - super.doExecute(task, request, finalListener); + final ActionListener doExecuteListener; + + if (transformNodeAssignments.getWaitingForAssignment().size() > 0) { + doExecuteListener = cancelTransformTasksWithNoAssignment(finalListener, transformNodeAssignments); + } else { + doExecuteListener = finalListener; + } + + if (transformNodeAssignments.getExecutorNodes().size() > 0) { + request.setExpandedIds(transformNodeAssignments.getAssigned()); + request.setNodes(transformNodeAssignments.getExecutorNodes().toArray(new String[0])); + super.doExecute(task, request, doExecuteListener); + } else { + doExecuteListener.onResponse(new Response(true)); + } } else { + Set transformsWithoutConfig = Stream.concat( + transformNodeAssignments.getAssigned().stream(), + transformNodeAssignments.getWaitingForAssignment().stream() + ).collect(Collectors.toSet()); + listener.onFailure( new ElasticsearchStatusException( TransformMessages.getMessage( TransformMessages.REST_STOP_TRANSFORM_WITHOUT_CONFIG, - Strings.arrayToCommaDelimitedString(runningTasksAndNodes.v1().toArray(new String[0])) + Strings.collectionToCommaDelimitedString(transformsWithoutConfig) ), RestStatus.CONFLICT ) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java index 36997120eaebf..7a9c5f908aaa5 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java @@ -181,6 +181,8 @@ protected void doExecute(Task task, Request request, ActionListener li } return; } + TransformNodes.warnIfNoTransformNodes(clusterState); + // set headers to run transform as calling user Map filteredHeaders = ClientHelper.filterSecurityHeaders(threadPool.getThreadContext().getHeaders()); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/compat/TransportGetTransformActionDeprecated.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/compat/TransportGetTransformActionDeprecated.java index 46baa66e5e3d0..b57d092d76f4b 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/compat/TransportGetTransformActionDeprecated.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/compat/TransportGetTransformActionDeprecated.java @@ -9,6 +9,7 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.transport.TransportService; @@ -18,9 +19,14 @@ public class TransportGetTransformActionDeprecated extends TransportGetTransformAction { @Inject - public TransportGetTransformActionDeprecated(TransportService transportService, ActionFilters actionFilters, Client client, - NamedXContentRegistry xContentRegistry) { - super(GetTransformActionDeprecated.NAME, transportService, actionFilters, client, xContentRegistry); + public TransportGetTransformActionDeprecated( + TransportService transportService, + ActionFilters actionFilters, + ClusterService clusterService, + Client client, + NamedXContentRegistry xContentRegistry + ) { + super(GetTransformActionDeprecated.NAME, transportService, actionFilters, clusterService, client, xContentRegistry); } } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransformNodesTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransformNodesTests.java index 0e05839ea8517..4132d3228d376 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransformNodesTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransformNodesTests.java @@ -109,11 +109,19 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) { assertEquals(1, transformNodeAssignments.getStopped().size()); assertTrue(transformNodeAssignments.getStopped().contains(transformIdStopped)); - transformNodeAssignments = TransformNodes.transformTaskNodes( - Arrays.asList(transformIdFoo, transformIdFailed), - cs - ); + transformNodeAssignments = TransformNodes.transformTaskNodes(Arrays.asList(transformIdFoo, transformIdFailed), cs); + + assertEquals(1, transformNodeAssignments.getExecutorNodes().size()); + assertTrue(transformNodeAssignments.getExecutorNodes().contains("node-1")); + assertEquals(1, transformNodeAssignments.getWaitingForAssignment().size()); + assertTrue(transformNodeAssignments.getWaitingForAssignment().contains(transformIdFailed)); + assertEquals(1, transformNodeAssignments.getAssigned().size()); + assertTrue(transformNodeAssignments.getAssigned().contains(transformIdFoo)); + assertFalse(transformNodeAssignments.getAssigned().contains(transformIdFailed)); + assertEquals(0, transformNodeAssignments.getStopped().size()); + // test simple matching + transformNodeAssignments = TransformNodes.findPersistentTasks("df-id-f*", cs); assertEquals(1, transformNodeAssignments.getExecutorNodes().size()); assertTrue(transformNodeAssignments.getExecutorNodes().contains("node-1")); assertEquals(1, transformNodeAssignments.getWaitingForAssignment().size()); @@ -122,6 +130,57 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) { assertTrue(transformNodeAssignments.getAssigned().contains(transformIdFoo)); assertFalse(transformNodeAssignments.getAssigned().contains(transformIdFailed)); assertEquals(0, transformNodeAssignments.getStopped().size()); + + // test matching none + transformNodeAssignments = TransformNodes.findPersistentTasks("df-id-z*", cs); + assertEquals(0, transformNodeAssignments.getExecutorNodes().size()); + assertEquals(0, transformNodeAssignments.getWaitingForAssignment().size()); + assertEquals(0, transformNodeAssignments.getAssigned().size()); + assertEquals(0, transformNodeAssignments.getStopped().size()); + + // test matching all + transformNodeAssignments = TransformNodes.findPersistentTasks("df-id-*", cs); + assertEquals(3, transformNodeAssignments.getExecutorNodes().size()); + assertTrue(transformNodeAssignments.getExecutorNodes().contains("node-1")); + assertTrue(transformNodeAssignments.getExecutorNodes().contains("node-2")); + assertTrue(transformNodeAssignments.getExecutorNodes().contains("node-3")); + assertEquals(1, transformNodeAssignments.getWaitingForAssignment().size()); + assertTrue(transformNodeAssignments.getWaitingForAssignment().contains(transformIdFailed)); + assertEquals(4, transformNodeAssignments.getAssigned().size()); + assertTrue(transformNodeAssignments.getAssigned().contains(transformIdFoo)); + assertTrue(transformNodeAssignments.getAssigned().contains(transformIdBar)); + assertTrue(transformNodeAssignments.getAssigned().contains(transformIdBaz)); + assertTrue(transformNodeAssignments.getAssigned().contains(transformIdOther)); + assertFalse(transformNodeAssignments.getAssigned().contains(transformIdFailed)); + // stopped tasks are not reported when matching against _running_ tasks + assertEquals(0, transformNodeAssignments.getStopped().size()); + + // test matching all with _all + transformNodeAssignments = TransformNodes.findPersistentTasks("_all", cs); + assertEquals(3, transformNodeAssignments.getExecutorNodes().size()); + assertTrue(transformNodeAssignments.getExecutorNodes().contains("node-1")); + assertTrue(transformNodeAssignments.getExecutorNodes().contains("node-2")); + assertTrue(transformNodeAssignments.getExecutorNodes().contains("node-3")); + assertEquals(1, transformNodeAssignments.getWaitingForAssignment().size()); + assertTrue(transformNodeAssignments.getWaitingForAssignment().contains(transformIdFailed)); + assertEquals(4, transformNodeAssignments.getAssigned().size()); + assertTrue(transformNodeAssignments.getAssigned().contains(transformIdFoo)); + assertTrue(transformNodeAssignments.getAssigned().contains(transformIdBar)); + assertTrue(transformNodeAssignments.getAssigned().contains(transformIdBaz)); + assertTrue(transformNodeAssignments.getAssigned().contains(transformIdOther)); + assertFalse(transformNodeAssignments.getAssigned().contains(transformIdFailed)); + // stopped tasks are not reported when matching against _running_ tasks + assertEquals(0, transformNodeAssignments.getStopped().size()); + + // test matching exact + transformNodeAssignments = TransformNodes.findPersistentTasks(transformIdFoo, cs); + assertEquals(1, transformNodeAssignments.getExecutorNodes().size()); + assertTrue(transformNodeAssignments.getExecutorNodes().contains("node-1")); + assertEquals(0, transformNodeAssignments.getWaitingForAssignment().size()); + assertEquals(1, transformNodeAssignments.getAssigned().size()); + assertTrue(transformNodeAssignments.getAssigned().contains(transformIdFoo)); + // stopped tasks are not reported when matching against _running_ tasks + assertEquals(0, transformNodeAssignments.getStopped().size()); } public void testTransformNodes_NoTasks() { @@ -134,5 +193,12 @@ public void testTransformNodes_NoTasks() { assertEquals(0, transformNodeAssignments.getExecutorNodes().size()); assertEquals(1, transformNodeAssignments.getStopped().size()); assertTrue(transformNodeAssignments.getStopped().contains("df-id")); + + transformNodeAssignments = TransformNodes.findPersistentTasks("df-*", emptyState); + + assertEquals(0, transformNodeAssignments.getExecutorNodes().size()); + assertEquals(0, transformNodeAssignments.getWaitingForAssignment().size()); + assertEquals(0, transformNodeAssignments.getAssigned().size()); + assertEquals(0, transformNodeAssignments.getStopped().size()); } }