Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Transform] Enhance transform role checks #70139

Merged
merged 8 commits into from
Mar 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public boolean equals(Object other) {

public enum State {

STARTED, INDEXING, ABORTING, STOPPING, STOPPED, FAILED;
STARTED, INDEXING, ABORTING, STOPPING, STOPPED, FAILED, WAITING;
hendrikmuhs marked this conversation as resolved.
Show resolved Hide resolved

public static State fromString(String name) {
return valueOf(name.trim().toUpperCase(Locale.ROOT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class TransformMessages {
public static final String UNKNOWN_TRANSFORM_STATS = "Statistics for transform [{0}] could not be found";

public static final String REST_DEPRECATED_ENDPOINT = "[_data_frame/transforms/] is deprecated, use [_transform/] in the future.";
public static final String REST_WARN_NO_TRANSFORM_NODES =
"Transform requires the transform node role for at least 1 node, found no transform nodes";

public static final String CANNOT_STOP_FAILED_TRANSFORM = "Unable to stop transform [{0}] as it is in a failed state with reason [{1}]."
+ " Use force stop to stop the transform.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_7_4_0)) {
out.writeString(id);
out.writeEnum(state);
// 7.13 introduced the waiting state, in older version report the state as started
if (out.getVersion().before(Version.V_8_0_0) && state.equals(State.WAITING)) { // TODO: V_7_13_0
out.writeEnum(State.STARTED);
} else {
out.writeEnum(state);
}
out.writeOptionalString(reason);
if (node != null) {
out.writeBoolean(true);
Expand Down Expand Up @@ -247,7 +252,8 @@ public enum State implements Writeable {
ABORTING,
STOPPING,
STOPPED,
FAILED;
FAILED,
WAITING;

public static State fromString(String name) {
return valueOf(name.trim().toUpperCase(Locale.ROOT));
Expand Down Expand Up @@ -299,6 +305,7 @@ public String value() {
return name().toLowerCase(Locale.ROOT);
}

// only used when speaking to nodes < 7.4 (can be removed for 8.0)
public Tuple<TransformTaskState, IndexerState> toComponents() {

switch (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.function.Predicate;

import static org.elasticsearch.xpack.core.transform.transforms.TransformStats.State.STARTED;
import static org.elasticsearch.xpack.core.transform.transforms.TransformStats.State.WAITING;
import static org.hamcrest.Matchers.equalTo;

public class TransformStatsTests extends AbstractSerializingTestCase<TransformStats> {
Expand Down Expand Up @@ -120,4 +121,33 @@ public void testBwcWith76() throws IOException {
}
}
}

public void testBwcWith712() throws IOException {
for (int i = 0; i < NUMBER_OF_TEST_RUNS; i++) {
TransformStats stats = new TransformStats(
"bwc-id",
WAITING,
randomBoolean() ? null : randomAlphaOfLength(100),
randomBoolean() ? null : NodeAttributeTests.randomNodeAttributes(),
new TransformIndexerStats(1, 2, 3, 0, 5, 6, 7, 0, 0, 10, 11, 0, 13, 14, 15.0, 16.0, 17.0),
new TransformCheckpointingInfo(
new TransformCheckpointStats(0, null, null, 10, 100),
new TransformCheckpointStats(0, null, null, 100, 1000),
// changesLastDetectedAt aren't serialized back
100,
null,
null
)
);
try (BytesStreamOutput output = new BytesStreamOutput()) {
output.setVersion(Version.V_7_12_0);
stats.writeTo(output);
try (StreamInput in = output.bytes().streamInput()) {
in.setVersion(Version.V_8_0_0); // TODO: V_7_13_0
TransformStats statsFromOld = new TransformStats(in);
assertThat(statsFromOld.getState(), equalTo(STARTED));
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.reindex.ReindexPlugin;
import org.elasticsearch.node.NodeRoleSettings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.ESSingleNodeTestCase;
Expand All @@ -32,6 +34,11 @@ protected Collection<Class<? extends Plugin>> getPlugins() {
return pluginList(LocalStateTransform.class, ReindexPlugin.class);
}

@Override
protected Settings nodeSettings() {
return Settings.builder().put(NodeRoleSettings.NODE_ROLES_SETTING.getKey(), "master, data, ingest, transform").build();
}

protected <T> void assertAsync(Consumer<ActionListener<T>> function, T expected, CheckedConsumer<T, ? extends Exception> onAnswer,
Consumer<Exception> onException) throws InterruptedException {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.transform.integration;

import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.NodeRoleSettings;
import org.elasticsearch.xpack.core.transform.action.GetTransformAction;
import org.elasticsearch.xpack.core.transform.action.GetTransformStatsAction;
import org.elasticsearch.xpack.transform.TransformSingleNodeTestCase;

public class TransformNoTransformNodeIT extends TransformSingleNodeTestCase {
@Override
protected Settings nodeSettings() {
return Settings.builder().put(NodeRoleSettings.NODE_ROLES_SETTING.getKey(), "master, data, ingest").build();
}

public void testWarningForStats() {
GetTransformStatsAction.Request getTransformStatsRequest = new GetTransformStatsAction.Request("_all");
GetTransformStatsAction.Response getTransformStatsResponse = client().execute(
GetTransformStatsAction.INSTANCE,
getTransformStatsRequest
).actionGet();

assertEquals(0, getTransformStatsResponse.getTransformsStats().size());

assertWarnings("Transform requires the transform node role for at least 1 node, found no transform nodes");
}

public void testWarningForGet() {
GetTransformAction.Request getTransformRequest = new GetTransformAction.Request("_all");
GetTransformAction.Response getTransformResponse = client().execute(GetTransformAction.INSTANCE, getTransformRequest).actionGet();
assertEquals(0, getTransformResponse.getTransformConfigurations().size());

assertWarnings("Transform requires the transform node role for at least 1 node, found no transform nodes");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,38 @@
package org.elasticsearch.xpack.transform.action;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.logging.HeaderWarning;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.Assignment;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask;
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.TransformMessages;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams;
import org.elasticsearch.xpack.transform.Transform;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

public final class TransformNodes {

private TransformNodes() {}

/**
* Get the list of nodes transforms are executing on
* Get node assignments for a given list of transforms.
*
* @param transformIds The transforms.
* @param clusterState State
* @return The executor nodes
* @return The {@link TransformNodeAssignments} for the given transforms.
*/
public static TransformNodeAssignments transformTaskNodes(List<String> transformIds, ClusterState clusterState) {

Set<String> executorNodes = new HashSet<>();
Set<String> assigned = new HashSet<>();
Set<String> waitingForAssignment = new HashSet<>();
Expand Down Expand Up @@ -60,4 +70,81 @@ public static TransformNodeAssignments transformTaskNodes(List<String> transform

return new TransformNodeAssignments(executorNodes, assigned, waitingForAssignment, stopped);
}

/**
* Get node assignments for a given transform pattern.
*
* Note: This only returns p-task assignments, stopped transforms are not reported. P-Tasks can be running or waiting for a node.
*
* @param transformId The transform or a wildcard pattern, including '_all' to match against transform tasks.
* @param clusterState State
* @return The {@link TransformNodeAssignments} for the given pattern.
*/
public static TransformNodeAssignments findPersistentTasks(String transformId, ClusterState clusterState) {
Set<String> executorNodes = new HashSet<>();
Set<String> assigned = new HashSet<>();
Set<String> waitingForAssignment = new HashSet<>();

PersistentTasksCustomMetadata tasksMetadata = PersistentTasksCustomMetadata.getPersistentTasksCustomMetadata(clusterState);

if (tasksMetadata != null) {
Predicate<PersistentTask<?>> taskMatcher = Strings.isAllOrWildcard(new String[] { transformId }) ? t -> true : t -> {
TransformTaskParams transformParams = (TransformTaskParams) t.getParams();
return Regex.simpleMatch(transformId, transformParams.getId());
};

for (PersistentTasksCustomMetadata.PersistentTask<?> task : tasksMetadata.findTasks(TransformField.TASK_NAME, taskMatcher)) {
if (task.isAssigned()) {
executorNodes.add(task.getExecutorNode());
assigned.add(task.getId());
} else {
waitingForAssignment.add(task.getId());
}
}
}
return new TransformNodeAssignments(executorNodes, assigned, waitingForAssignment, Collections.emptySet());
}

/**
* Get the assignment of a specific transform.
*
* @param transformId the transform id
* @param clusterState state
* @return {@link Assignment} of task
*/
public static Assignment getAssignment(String transformId, ClusterState clusterState) {
PersistentTasksCustomMetadata tasksMetadata = PersistentTasksCustomMetadata.getPersistentTasksCustomMetadata(clusterState);
PersistentTask<?> task = tasksMetadata.getTask(transformId);

if (task != null) {
return task.getAssignment();
}

return PersistentTasksCustomMetadata.INITIAL_ASSIGNMENT;
}

/**
* Get the number of transform nodes in the cluster
*
* @param clusterState state
* @return number of transform nodes
*/
public static long getNumberOfTransformNodes(ClusterState clusterState) {
return StreamSupport.stream(clusterState.getNodes().spliterator(), false)
.filter(node -> node.getRoles().contains(Transform.TRANSFORM_ROLE))
.count();
}

/**
* Check if cluster has at least 1 transform nodes and add a header warning if not.
* To be used by transport actions only.
*
* @param clusterState state
*/
public static void warnIfNoTransformNodes(ClusterState clusterState) {
long transformNodes = getNumberOfTransformNodes(clusterState);
if (transformNodes == 0) {
HeaderWarning.addWarning(TransformMessages.REST_WARN_NO_TRANSFORM_NODES);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
Expand All @@ -34,23 +36,37 @@
import static org.elasticsearch.xpack.core.transform.TransformField.INDEX_DOC_TYPE;


public class TransportGetTransformAction extends AbstractTransportGetResourcesAction<TransformConfig,
Request,
Response> {
public class TransportGetTransformAction extends AbstractTransportGetResourcesAction<TransformConfig, Request, Response> {

private final ClusterService clusterService;

@Inject
public TransportGetTransformAction(TransportService transportService, ActionFilters actionFilters, Client client,
NamedXContentRegistry xContentRegistry) {
this(GetTransformAction.NAME, transportService, actionFilters, client, xContentRegistry);
public TransportGetTransformAction(
TransportService transportService,
ActionFilters actionFilters,
ClusterService clusterService,
Client client,
NamedXContentRegistry xContentRegistry
) {
this(GetTransformAction.NAME, transportService, actionFilters, clusterService, client, xContentRegistry);
}

protected TransportGetTransformAction(String name, TransportService transportService, ActionFilters actionFilters, Client client,
NamedXContentRegistry xContentRegistry) {
protected TransportGetTransformAction(
String name,
TransportService transportService,
ActionFilters actionFilters,
ClusterService clusterService,
Client client,
NamedXContentRegistry xContentRegistry
) {
super(name, transportService, actionFilters, Request::new, client, xContentRegistry);
this.clusterService = clusterService;
}

@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
final ClusterState state = clusterService.state();
TransformNodes.warnIfNoTransformNodes(state);
searchResources(request, ActionListener.wrap(
r -> listener.onResponse(new Response(r.results(), r.count())),
listener::onFailure
Expand Down
Loading