Skip to content

Commit

Permalink
[Transform] Enhance transform role checks (#70139)
Browse files Browse the repository at this point in the history
improve robustness and ux in case of a missing transform node:

 - warn if cluster lacks a transform node in all API's (except DELETE)
 - report waiting state in stats if transform waits for assignment
 - cancel p-task on stop transform even if config has been deleted

relates #69518
  • Loading branch information
Hendrik Muhs authored Mar 10, 2021
1 parent ff50da5 commit 61f0c47
Show file tree
Hide file tree
Showing 16 changed files with 391 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public boolean equals(Object other) {

public enum State {

STARTED, INDEXING, ABORTING, STOPPING, STOPPED, FAILED;
STARTED, INDEXING, ABORTING, STOPPING, STOPPED, FAILED, WAITING;

public static State fromString(String name) {
return valueOf(name.trim().toUpperCase(Locale.ROOT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class TransformMessages {
public static final String UNKNOWN_TRANSFORM_STATS = "Statistics for transform [{0}] could not be found";

public static final String REST_DEPRECATED_ENDPOINT = "[_data_frame/transforms/] is deprecated, use [_transform/] in the future.";
public static final String REST_WARN_NO_TRANSFORM_NODES =
"Transform requires the transform node role for at least 1 node, found no transform nodes";

public static final String CANNOT_STOP_FAILED_TRANSFORM = "Unable to stop transform [{0}] as it is in a failed state with reason [{1}]."
+ " Use force stop to stop the transform.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_7_4_0)) {
out.writeString(id);
out.writeEnum(state);
// 7.13 introduced the waiting state, in older version report the state as started
if (out.getVersion().before(Version.V_8_0_0) && state.equals(State.WAITING)) { // TODO: V_7_13_0
out.writeEnum(State.STARTED);
} else {
out.writeEnum(state);
}
out.writeOptionalString(reason);
if (node != null) {
out.writeBoolean(true);
Expand Down Expand Up @@ -247,7 +252,8 @@ public enum State implements Writeable {
ABORTING,
STOPPING,
STOPPED,
FAILED;
FAILED,
WAITING;

public static State fromString(String name) {
return valueOf(name.trim().toUpperCase(Locale.ROOT));
Expand Down Expand Up @@ -299,6 +305,7 @@ public String value() {
return name().toLowerCase(Locale.ROOT);
}

// only used when speaking to nodes < 7.4 (can be removed for 8.0)
public Tuple<TransformTaskState, IndexerState> toComponents() {

switch (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.function.Predicate;

import static org.elasticsearch.xpack.core.transform.transforms.TransformStats.State.STARTED;
import static org.elasticsearch.xpack.core.transform.transforms.TransformStats.State.WAITING;
import static org.hamcrest.Matchers.equalTo;

public class TransformStatsTests extends AbstractSerializingTestCase<TransformStats> {
Expand Down Expand Up @@ -120,4 +121,33 @@ public void testBwcWith76() throws IOException {
}
}
}

public void testBwcWith712() throws IOException {
for (int i = 0; i < NUMBER_OF_TEST_RUNS; i++) {
TransformStats stats = new TransformStats(
"bwc-id",
WAITING,
randomBoolean() ? null : randomAlphaOfLength(100),
randomBoolean() ? null : NodeAttributeTests.randomNodeAttributes(),
new TransformIndexerStats(1, 2, 3, 0, 5, 6, 7, 0, 0, 10, 11, 0, 13, 14, 15.0, 16.0, 17.0),
new TransformCheckpointingInfo(
new TransformCheckpointStats(0, null, null, 10, 100),
new TransformCheckpointStats(0, null, null, 100, 1000),
// changesLastDetectedAt aren't serialized back
100,
null,
null
)
);
try (BytesStreamOutput output = new BytesStreamOutput()) {
output.setVersion(Version.V_7_12_0);
stats.writeTo(output);
try (StreamInput in = output.bytes().streamInput()) {
in.setVersion(Version.V_8_0_0); // TODO: V_7_13_0
TransformStats statsFromOld = new TransformStats(in);
assertThat(statsFromOld.getState(), equalTo(STARTED));
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.reindex.ReindexPlugin;
import org.elasticsearch.node.NodeRoleSettings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.ESSingleNodeTestCase;
Expand All @@ -32,6 +34,11 @@ protected Collection<Class<? extends Plugin>> getPlugins() {
return pluginList(LocalStateTransform.class, ReindexPlugin.class);
}

@Override
protected Settings nodeSettings() {
return Settings.builder().put(NodeRoleSettings.NODE_ROLES_SETTING.getKey(), "master, data, ingest, transform").build();
}

protected <T> void assertAsync(Consumer<ActionListener<T>> function, T expected, CheckedConsumer<T, ? extends Exception> onAnswer,
Consumer<Exception> onException) throws InterruptedException {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.transform.integration;

import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.NodeRoleSettings;
import org.elasticsearch.xpack.core.transform.action.GetTransformAction;
import org.elasticsearch.xpack.core.transform.action.GetTransformStatsAction;
import org.elasticsearch.xpack.transform.TransformSingleNodeTestCase;

public class TransformNoTransformNodeIT extends TransformSingleNodeTestCase {
@Override
protected Settings nodeSettings() {
return Settings.builder().put(NodeRoleSettings.NODE_ROLES_SETTING.getKey(), "master, data, ingest").build();
}

public void testWarningForStats() {
GetTransformStatsAction.Request getTransformStatsRequest = new GetTransformStatsAction.Request("_all");
GetTransformStatsAction.Response getTransformStatsResponse = client().execute(
GetTransformStatsAction.INSTANCE,
getTransformStatsRequest
).actionGet();

assertEquals(0, getTransformStatsResponse.getTransformsStats().size());

assertWarnings("Transform requires the transform node role for at least 1 node, found no transform nodes");
}

public void testWarningForGet() {
GetTransformAction.Request getTransformRequest = new GetTransformAction.Request("_all");
GetTransformAction.Response getTransformResponse = client().execute(GetTransformAction.INSTANCE, getTransformRequest).actionGet();
assertEquals(0, getTransformResponse.getTransformConfigurations().size());

assertWarnings("Transform requires the transform node role for at least 1 node, found no transform nodes");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,38 @@
package org.elasticsearch.xpack.transform.action;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.logging.HeaderWarning;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.Assignment;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask;
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.TransformMessages;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams;
import org.elasticsearch.xpack.transform.Transform;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

public final class TransformNodes {

private TransformNodes() {}

/**
* Get the list of nodes transforms are executing on
* Get node assignments for a given list of transforms.
*
* @param transformIds The transforms.
* @param clusterState State
* @return The executor nodes
* @return The {@link TransformNodeAssignments} for the given transforms.
*/
public static TransformNodeAssignments transformTaskNodes(List<String> transformIds, ClusterState clusterState) {

Set<String> executorNodes = new HashSet<>();
Set<String> assigned = new HashSet<>();
Set<String> waitingForAssignment = new HashSet<>();
Expand Down Expand Up @@ -60,4 +70,81 @@ public static TransformNodeAssignments transformTaskNodes(List<String> transform

return new TransformNodeAssignments(executorNodes, assigned, waitingForAssignment, stopped);
}

/**
* Get node assignments for a given transform pattern.
*
* Note: This only returns p-task assignments, stopped transforms are not reported. P-Tasks can be running or waiting for a node.
*
* @param transformId The transform or a wildcard pattern, including '_all' to match against transform tasks.
* @param clusterState State
* @return The {@link TransformNodeAssignments} for the given pattern.
*/
public static TransformNodeAssignments findPersistentTasks(String transformId, ClusterState clusterState) {
Set<String> executorNodes = new HashSet<>();
Set<String> assigned = new HashSet<>();
Set<String> waitingForAssignment = new HashSet<>();

PersistentTasksCustomMetadata tasksMetadata = PersistentTasksCustomMetadata.getPersistentTasksCustomMetadata(clusterState);

if (tasksMetadata != null) {
Predicate<PersistentTask<?>> taskMatcher = Strings.isAllOrWildcard(new String[] { transformId }) ? t -> true : t -> {
TransformTaskParams transformParams = (TransformTaskParams) t.getParams();
return Regex.simpleMatch(transformId, transformParams.getId());
};

for (PersistentTasksCustomMetadata.PersistentTask<?> task : tasksMetadata.findTasks(TransformField.TASK_NAME, taskMatcher)) {
if (task.isAssigned()) {
executorNodes.add(task.getExecutorNode());
assigned.add(task.getId());
} else {
waitingForAssignment.add(task.getId());
}
}
}
return new TransformNodeAssignments(executorNodes, assigned, waitingForAssignment, Collections.emptySet());
}

/**
* Get the assignment of a specific transform.
*
* @param transformId the transform id
* @param clusterState state
* @return {@link Assignment} of task
*/
public static Assignment getAssignment(String transformId, ClusterState clusterState) {
PersistentTasksCustomMetadata tasksMetadata = PersistentTasksCustomMetadata.getPersistentTasksCustomMetadata(clusterState);
PersistentTask<?> task = tasksMetadata.getTask(transformId);

if (task != null) {
return task.getAssignment();
}

return PersistentTasksCustomMetadata.INITIAL_ASSIGNMENT;
}

/**
* Get the number of transform nodes in the cluster
*
* @param clusterState state
* @return number of transform nodes
*/
public static long getNumberOfTransformNodes(ClusterState clusterState) {
return StreamSupport.stream(clusterState.getNodes().spliterator(), false)
.filter(node -> node.getRoles().contains(Transform.TRANSFORM_ROLE))
.count();
}

/**
* Check if cluster has at least 1 transform nodes and add a header warning if not.
* To be used by transport actions only.
*
* @param clusterState state
*/
public static void warnIfNoTransformNodes(ClusterState clusterState) {
long transformNodes = getNumberOfTransformNodes(clusterState);
if (transformNodes == 0) {
HeaderWarning.addWarning(TransformMessages.REST_WARN_NO_TRANSFORM_NODES);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
Expand All @@ -34,23 +36,37 @@
import static org.elasticsearch.xpack.core.transform.TransformField.INDEX_DOC_TYPE;


public class TransportGetTransformAction extends AbstractTransportGetResourcesAction<TransformConfig,
Request,
Response> {
public class TransportGetTransformAction extends AbstractTransportGetResourcesAction<TransformConfig, Request, Response> {

private final ClusterService clusterService;

@Inject
public TransportGetTransformAction(TransportService transportService, ActionFilters actionFilters, Client client,
NamedXContentRegistry xContentRegistry) {
this(GetTransformAction.NAME, transportService, actionFilters, client, xContentRegistry);
public TransportGetTransformAction(
TransportService transportService,
ActionFilters actionFilters,
ClusterService clusterService,
Client client,
NamedXContentRegistry xContentRegistry
) {
this(GetTransformAction.NAME, transportService, actionFilters, clusterService, client, xContentRegistry);
}

protected TransportGetTransformAction(String name, TransportService transportService, ActionFilters actionFilters, Client client,
NamedXContentRegistry xContentRegistry) {
protected TransportGetTransformAction(
String name,
TransportService transportService,
ActionFilters actionFilters,
ClusterService clusterService,
Client client,
NamedXContentRegistry xContentRegistry
) {
super(name, transportService, actionFilters, Request::new, client, xContentRegistry);
this.clusterService = clusterService;
}

@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
final ClusterState state = clusterService.state();
TransformNodes.warnIfNoTransformNodes(state);
searchResources(request, ActionListener.wrap(
r -> listener.onResponse(new Response(r.results(), r.count())),
listener::onFailure
Expand Down
Loading

0 comments on commit 61f0c47

Please sign in to comment.