Skip to content

Commit

Permalink
improve robustness and ux in case of a missing transform node:
Browse files Browse the repository at this point in the history
 - warn if cluster lacks a transform node in all API's (except DELETE)
 - report waiting state in stats if transform waits for assignment
 - cancel p-task on stop transform even if config has been deleted

relates elastic#69518
  • Loading branch information
Hendrik Muhs committed Mar 9, 2021
1 parent c226958 commit ba2c7a8
Show file tree
Hide file tree
Showing 12 changed files with 315 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class TransformMessages {
public static final String UNKNOWN_TRANSFORM_STATS = "Statistics for transform [{0}] could not be found";

public static final String REST_DEPRECATED_ENDPOINT = "[_data_frame/transforms/] is deprecated, use [_transform/] in the future.";
public static final String REST_WARN_NO_TRANSFORM_NODES =
"Transform requires the transform node role for at least 1 node, found [{0}] transform nodes";

public static final String CANNOT_STOP_FAILED_TRANSFORM = "Unable to stop transform [{0}] as it is in a failed state with reason [{1}]."
+ " Use force stop to stop the transform.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_7_4_0)) {
out.writeString(id);
out.writeEnum(state);
// 7.13 introduced the waiting state, in older version report the state as started
if (out.getVersion().before(Version.V_8_0_0) && state.equals(State.WAITING)) { // TODO: V_7_13_0
out.writeEnum(State.STARTED);
} else {
out.writeEnum(state);
}
out.writeOptionalString(reason);
if (node != null) {
out.writeBoolean(true);
Expand Down Expand Up @@ -247,7 +252,8 @@ public enum State implements Writeable {
ABORTING,
STOPPING,
STOPPED,
FAILED;
FAILED,
WAITING;

public static State fromString(String name) {
return valueOf(name.trim().toUpperCase(Locale.ROOT));
Expand Down Expand Up @@ -299,6 +305,7 @@ public String value() {
return name().toLowerCase(Locale.ROOT);
}

// only used when speaking to nodes < 7.4 (can be removed for 8.0)
public Tuple<TransformTaskState, IndexerState> toComponents() {

switch (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,36 @@
package org.elasticsearch.xpack.transform.action;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.logging.HeaderWarning;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.Assignment;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask;
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.TransformMessages;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams;
import org.elasticsearch.xpack.transform.Transform;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

public final class TransformNodes {

private TransformNodes() {}

/**
* Get the list of nodes transforms are executing on
* Get node assignments for a given list of transforms.
*
* @param transformIds The transforms.
* @param clusterState State
* @return The executor nodes
* @return The {@link TransformNodeAssignments} for the given transforms.
*/
public static TransformNodeAssignments transformTaskNodes(List<String> transformIds, ClusterState clusterState) {

Expand Down Expand Up @@ -60,4 +71,86 @@ public static TransformNodeAssignments transformTaskNodes(List<String> transform

return new TransformNodeAssignments(executorNodes, assigned, waitingForAssignment, stopped);
}

/**
* Get node assignments for a given list of transform or transform pattern.
*
* Note: This only returns active assignments, stopped transforms are not reported. Active means running or waiting for a node.
*
* @param transformId The transform or a simply wildcard pattern.
* @param clusterState State
* @return The {@link TransformNodeAssignments} for the given transforms.
*/
public static TransformNodeAssignments findActiveTasks(String transformId, ClusterState clusterState) {

Set<String> executorNodes = new HashSet<>();
Set<String> assigned = new HashSet<>();
Set<String> waitingForAssignment = new HashSet<>();

PersistentTasksCustomMetadata tasksMetadata = PersistentTasksCustomMetadata.getPersistentTasksCustomMetadata(clusterState);

if (tasksMetadata != null) {
Predicate<PersistentTask<?>> taskMatcher = Strings.isAllOrWildcard(new String[] { transformId }) ? t -> true : t -> {
TransformTaskParams transformParams = (TransformTaskParams) t.getParams();
return Regex.simpleMatch(transformId, transformParams.getId());
};

for (PersistentTasksCustomMetadata.PersistentTask<?> task : tasksMetadata.findTasks(TransformField.TASK_NAME, taskMatcher)) {
if (task.isAssigned()) {
executorNodes.add(task.getExecutorNode());
assigned.add(task.getId());
} else {
waitingForAssignment.add(task.getId());
}
}
}

return new TransformNodeAssignments(executorNodes, assigned, waitingForAssignment, Collections.emptySet());
}


/**
* Get the assignment of a specific transform
*
* @param transformId the transform id
* @param clusterState state
* @return {@link Assignment} of task
*/
public static Assignment getAssignment(String transformId, ClusterState clusterState) {
PersistentTasksCustomMetadata tasksMetadata = PersistentTasksCustomMetadata.getPersistentTasksCustomMetadata(clusterState);

PersistentTask<?> task = tasksMetadata.getTask(transformId);

if (task != null) {
return task.getAssignment();
}

return PersistentTasksCustomMetadata.INITIAL_ASSIGNMENT;

}

/**
* Get the number of transform nodes in the cluster
*
* @param clusterState state
* @return number of transform nodes
*/
public static long getNumberOfTransformNodes(ClusterState clusterState) {
return StreamSupport.stream(clusterState.getNodes().spliterator(), false)
.filter(node -> node.getRoles().contains(Transform.TRANSFORM_ROLE))
.count();
}

/**
* Check if cluster has at least 1 transform nodes and add a header warning if not.
* To be used by transport actions.
*
* @param clusterState state
*/
public static void warnIfNoTransformNodes(ClusterState clusterState) {
long transformNodes = getNumberOfTransformNodes(clusterState);
if (transformNodes == 0) {
HeaderWarning.addWarning(TransformMessages.getMessage(TransformMessages.REST_WARN_NO_TRANSFORM_NODES, transformNodes));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
Expand All @@ -34,23 +36,37 @@
import static org.elasticsearch.xpack.core.transform.TransformField.INDEX_DOC_TYPE;


public class TransportGetTransformAction extends AbstractTransportGetResourcesAction<TransformConfig,
Request,
Response> {
public class TransportGetTransformAction extends AbstractTransportGetResourcesAction<TransformConfig, Request, Response> {

private final ClusterService clusterService;

@Inject
public TransportGetTransformAction(TransportService transportService, ActionFilters actionFilters, Client client,
NamedXContentRegistry xContentRegistry) {
this(GetTransformAction.NAME, transportService, actionFilters, client, xContentRegistry);
public TransportGetTransformAction(
TransportService transportService,
ActionFilters actionFilters,
ClusterService clusterService,
Client client,
NamedXContentRegistry xContentRegistry
) {
this(GetTransformAction.NAME, transportService, actionFilters, clusterService, client, xContentRegistry);
}

protected TransportGetTransformAction(String name, TransportService transportService, ActionFilters actionFilters, Client client,
NamedXContentRegistry xContentRegistry) {
protected TransportGetTransformAction(
String name,
TransportService transportService,
ActionFilters actionFilters,
ClusterService clusterService,
Client client,
NamedXContentRegistry xContentRegistry
) {
super(name, transportService, actionFilters, Request::new, client, xContentRegistry);
this.clusterService = clusterService;
}

@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
final ClusterState state = clusterService.state();
TransformNodes.warnIfNoTransformNodes(state);
searchResources(request, ActionListener.wrap(
r -> listener.onResponse(new Response(r.results(), r.count())),
listener::onFailure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.Assignment;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -131,17 +132,18 @@ protected void taskOperation(Request request, TransformTask task, ActionListener

@Override
protected void doExecute(Task task, Request request, ActionListener<Response> finalListener) {
final ClusterState state = clusterService.state();
TransformNodes.warnIfNoTransformNodes(state);

transformConfigManager.expandTransformIds(
request.getId(),
request.getPageParams(),
request.isAllowNoMatch(),
ActionListener.wrap(hitsAndIds -> {
request.setExpandedIds(hitsAndIds.v2());
final ClusterState state = clusterService.state();
TransformNodeAssignments transformNodeAssignments = TransformNodes.transformTaskNodes(hitsAndIds.v2(), state);
// TODO: if empty the request is send to all nodes(benign but superfluous)
request.setNodes(transformNodeAssignments.getExecutorNodes().toArray(new String[0]));
super.doExecute(task, request, ActionListener.wrap(response -> {
final TransformNodeAssignments transformNodeAssignments = TransformNodes.transformTaskNodes(hitsAndIds.v2(), state);

ActionListener<Response> doExecuteListener = ActionListener.wrap(response -> {
PersistentTasksCustomMetadata tasksInProgress = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
if (tasksInProgress != null) {
// Mutates underlying state object with the assigned node attributes
Expand All @@ -150,6 +152,8 @@ protected void doExecute(Task task, Request request, ActionListener<Response> fi
collectStatsForTransformsWithoutTasks(
request,
response,
transformNodeAssignments.getWaitingForAssignment(),
state,
ActionListener.wrap(
finalResponse -> finalListener.onResponse(
new Response(
Expand All @@ -162,7 +166,14 @@ protected void doExecute(Task task, Request request, ActionListener<Response> fi
finalListener::onFailure
)
);
}, finalListener::onFailure));
}, finalListener::onFailure);

if (transformNodeAssignments.getExecutorNodes().size() > 0) {
request.setNodes(transformNodeAssignments.getExecutorNodes().toArray(new String[0]));
super.doExecute(task, request, doExecuteListener);
} else {
doExecuteListener.onResponse(new Response(Collections.emptyList(), 0L));
}
},
e -> {
// If the index to search, or the individual config is not there, just return empty
Expand Down Expand Up @@ -210,7 +221,13 @@ static TransformStats deriveStats(TransformTask task, @Nullable TransformCheckpo
);
}

private void collectStatsForTransformsWithoutTasks(Request request, Response response, ActionListener<Response> listener) {
private void collectStatsForTransformsWithoutTasks(
Request request,
Response response,
Set<String> transformsWaitingForAssignment,
ClusterState clusterState,
ActionListener<Response> listener
) {
// We gathered all there is, no need to continue
if (request.getExpandedIds().size() == response.getTransformsStats().size()) {
listener.onResponse(response);
Expand All @@ -226,23 +243,30 @@ private void collectStatsForTransformsWithoutTasks(Request request, Response res
// There is a potential race condition where the saved document does not actually have a STOPPED state
// as the task is cancelled before we persist state.
ActionListener<List<TransformStoredDoc>> searchStatsListener = ActionListener.wrap(statsForTransformsWithoutTasks -> {
List<TransformStats> allStateAndStats = response.getTransformsStats();
addCheckpointingInfoForTransformsWithoutTasks(allStateAndStats, statsForTransformsWithoutTasks, ActionListener.wrap(aVoid -> {
transformsWithoutTasks.removeAll(
statsForTransformsWithoutTasks.stream().map(TransformStoredDoc::getId).collect(Collectors.toSet())
);
// copy the list as it might be immutable
List<TransformStats> allStateAndStats = new ArrayList<>(response.getTransformsStats());
addCheckpointingInfoForTransformsWithoutTasks(
allStateAndStats,
statsForTransformsWithoutTasks,
transformsWaitingForAssignment,
clusterState,
ActionListener.wrap(aVoid -> {
transformsWithoutTasks.removeAll(
statsForTransformsWithoutTasks.stream().map(TransformStoredDoc::getId).collect(Collectors.toSet())
);

// Transforms that have not been started and have no state or stats.
transformsWithoutTasks.forEach(transformId -> allStateAndStats.add(TransformStats.initialStats(transformId)));
// Transforms that have not been started and have no state or stats.
transformsWithoutTasks.forEach(transformId -> allStateAndStats.add(TransformStats.initialStats(transformId)));

// Any transform in collection could NOT have a task, so, even though the list is initially sorted
// it can easily become arbitrarily ordered based on which transforms don't have a task or stats docs
allStateAndStats.sort(Comparator.comparing(TransformStats::getId));
// Any transform in collection could NOT have a task, so, even though the list is initially sorted
// it can easily become arbitrarily ordered based on which transforms don't have a task or stats docs
allStateAndStats.sort(Comparator.comparing(TransformStats::getId));

listener.onResponse(
new Response(allStateAndStats, allStateAndStats.size(), response.getTaskFailures(), response.getNodeFailures())
);
}, listener::onFailure));
listener.onResponse(
new Response(allStateAndStats, allStateAndStats.size(), response.getTaskFailures(), response.getNodeFailures())
);
}, listener::onFailure)
);
}, e -> {
if (e instanceof IndexNotFoundException) {
listener.onResponse(response);
Expand Down Expand Up @@ -271,6 +295,8 @@ private void populateSingleStoppedTransformStat(TransformStoredDoc transform, Ac
private void addCheckpointingInfoForTransformsWithoutTasks(
List<TransformStats> allStateAndStats,
List<TransformStoredDoc> statsForTransformsWithoutTasks,
Set<String> transformsWaitingForAssignment,
ClusterState clusterState,
ActionListener<Void> listener
) {

Expand All @@ -285,9 +311,31 @@ private void addCheckpointingInfoForTransformsWithoutTasks(

statsForTransformsWithoutTasks.forEach(stat -> populateSingleStoppedTransformStat(stat, ActionListener.wrap(checkpointingInfo -> {
synchronized (allStateAndStats) {
allStateAndStats.add(
new TransformStats(stat.getId(), TransformStats.State.STOPPED, null, null, stat.getTransformStats(), checkpointingInfo)
);
if (transformsWaitingForAssignment.contains(stat.getId())) {
// corner case: as we
Assignment assignment = TransformNodes.getAssignment(stat.getId(), clusterState);
allStateAndStats.add(
new TransformStats(
stat.getId(),
TransformStats.State.WAITING,
assignment.getExplanation(),
null,
stat.getTransformStats(),
checkpointingInfo
)
);
} else {
allStateAndStats.add(
new TransformStats(
stat.getId(),
TransformStats.State.STOPPED,
null,
null,
stat.getTransformStats(),
checkpointingInfo
)
);
}
}
if (numberRemaining.decrementAndGet() == 0) {
listener.onResponse(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ protected TransportPreviewTransformAction(

@Override
protected void doExecute(Task task, PreviewTransformAction.Request request, ActionListener<PreviewTransformAction.Response> listener) {
ClusterState clusterState = clusterService.state();
final ClusterState clusterState = clusterService.state();
TransformNodes.warnIfNoTransformNodes(clusterState);

final TransformConfig config = request.getConfig();
sourceDestValidator.validate(
Expand Down
Loading

0 comments on commit ba2c7a8

Please sign in to comment.