Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add transport action immutable state checks #88491

Merged
merged 4 commits into from
Jul 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.ReservedStateMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -43,7 +44,9 @@
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
Expand Down Expand Up @@ -173,9 +176,39 @@ protected Set<String> modifiedKeys(Request request) {
return Collections.emptySet();
}

// package private for testing
void validateForImmutableState(Request request, ClusterState state) {
Optional<String> handlerName = reservedStateHandlerName();
assert handlerName.isPresent();

Set<String> modified = modifiedKeys(request);
List<String> errors = new ArrayList<>();

for (ReservedStateMetadata metadata : state.metadata().reservedStateMetadata().values()) {
Set<String> conflicts = metadata.conflicts(handlerName.get(), modified);
if (conflicts.isEmpty() == false) {
errors.add(format("[%s] set as read-only by [%s]", String.join(", ", conflicts), metadata.namespace()));
}
}

if (errors.isEmpty() == false) {
throw new IllegalArgumentException(
format("Failed to process request [%s] with errors: [%s]", request, String.join(", ", errors))
);
}
}

// package private for testing
boolean supportsImmutableState() {
return reservedStateHandlerName().isPresent();
}

@Override
protected void doExecute(Task task, final Request request, ActionListener<Response> listener) {
ClusterState state = clusterService.state();
if (supportsImmutableState()) {
validateForImmutableState(request, state);
}
logger.trace("starting processing request [{}] with cluster state version [{}]", request, state.version());
if (task != null) {
request.setParentTask(clusterService.localNode().getId(), task.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.block.ClusterBlock;
Expand All @@ -30,6 +32,8 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ReservedStateHandlerMetadata;
import org.elasticsearch.cluster.metadata.ReservedStateMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand All @@ -43,6 +47,7 @@
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.indices.TestIndexNameExpressionResolver;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
Expand All @@ -65,6 +70,7 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
Expand Down Expand Up @@ -254,6 +260,63 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state)
}
}

class ReservedStateAction extends Action {
ReservedStateAction(String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool) {
super(actionName, transportService, clusterService, threadPool, ThreadPool.Names.SAME);
}

@Override
protected Optional<String> reservedStateHandlerName() {
return Optional.of("test_reserved_state_action");
}
}

class FakeClusterStateUpdateAction extends TransportMasterNodeAction<ClusterUpdateSettingsRequest, Response> {
FakeClusterStateUpdateAction(
String actionName,
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
String executor
) {
super(
actionName,
transportService,
clusterService,
threadPool,
new ActionFilters(new HashSet<>()),
ClusterUpdateSettingsRequest::new,
TestIndexNameExpressionResolver.newInstance(),
Response::new,
executor
);
}

@Override
protected void masterOperation(
Task task,
ClusterUpdateSettingsRequest request,
ClusterState state,
ActionListener<Response> listener
) {}

@Override
protected ClusterBlockException checkBlock(ClusterUpdateSettingsRequest request, ClusterState state) {
return null;
}

@Override
protected Optional<String> reservedStateHandlerName() {
return Optional.of(ReservedClusterSettingsAction.NAME);
}

@Override
protected Set<String> modifiedKeys(ClusterUpdateSettingsRequest request) {
Settings allSettings = Settings.builder().put(request.persistentSettings()).put(request.transientSettings()).build();
return allSettings.keySet();
}
}

public void testLocalOperationWithoutBlocks() throws ExecutionException, InterruptedException {
final boolean masterOperationFailure = randomBoolean();

Expand Down Expand Up @@ -686,7 +749,6 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state)
indexNameExpressionResolver.concreteIndexNamesWithSystemIndexAccess(state, request)
);
}

};

PlainActionFuture<Response> listener = new PlainActionFuture<>();
Expand All @@ -697,6 +759,54 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state)
assertThat(ex.getCause().getCause(), instanceOf(ClusterBlockException.class));
}

public void testRejectImmutableConflictClusterStateUpdate() {
ReservedStateHandlerMetadata hmOne = new ReservedStateHandlerMetadata(ReservedClusterSettingsAction.NAME, Set.of("a", "b"));
ReservedStateHandlerMetadata hmThree = new ReservedStateHandlerMetadata(ReservedClusterSettingsAction.NAME, Set.of("e", "f"));
ReservedStateMetadata omOne = ReservedStateMetadata.builder("namespace_one").putHandler(hmOne).build();
ReservedStateMetadata omTwo = ReservedStateMetadata.builder("namespace_two").putHandler(hmThree).build();

Metadata metadata = Metadata.builder().put(omOne).put(omTwo).build();

ClusterState clusterState = ClusterState.builder(new ClusterName("test")).metadata(metadata).build();

Action noHandler = new Action("internal:testAction", transportService, clusterService, threadPool, ThreadPool.Names.SAME);

assertFalse(noHandler.supportsImmutableState());

noHandler = new ReservedStateAction("internal:testOpAction", transportService, clusterService, threadPool);

assertTrue(noHandler.supportsImmutableState());

// nothing should happen here, since the request doesn't touch any of the immutable state keys
noHandler.validateForImmutableState(new Request(), clusterState);

ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest().persistentSettings(
Settings.builder().put("a", "a value").build()
).transientSettings(Settings.builder().put("e", "e value").build());

FakeClusterStateUpdateAction action = new FakeClusterStateUpdateAction(
"internal:testClusterSettings",
transportService,
clusterService,
threadPool,
ThreadPool.Names.SAME
);

assertTrue(action.supportsImmutableState());

assertTrue(
expectThrows(IllegalArgumentException.class, () -> action.validateForImmutableState(request, clusterState)).getMessage()
.contains("with errors: [[a] set as read-only by [namespace_one], " + "[e] set as read-only by [namespace_two]")
);

ClusterUpdateSettingsRequest okRequest = new ClusterUpdateSettingsRequest().persistentSettings(
Settings.builder().put("m", "m value").build()
).transientSettings(Settings.builder().put("n", "n value").build());

// this should just work, no conflicts
action.validateForImmutableState(okRequest, clusterState);
}

private Runnable blockAllThreads(String executorName) throws Exception {
final int numberOfThreads = threadPool.info(executorName).getMax();
final EsThreadPoolExecutor executor = (EsThreadPoolExecutor) threadPool.executor(executorName);
Expand Down