Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Immutable cluster state controller #88224

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/88224.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 88224
summary: Immutable cluster state controller
area: Infra/Core
type: enhancement
issues: []
13 changes: 12 additions & 1 deletion server/src/main/java/org/elasticsearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.NamedRegistry;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.TypeLiteral;
Expand All @@ -262,6 +263,8 @@
import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards;
import org.elasticsearch.health.GetHealthAction;
import org.elasticsearch.health.RestGetHealthAction;
import org.elasticsearch.immutablestate.ImmutableClusterStateHandler;
import org.elasticsearch.immutablestate.service.ImmutableClusterStateController;
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
import org.elasticsearch.index.seqno.RetentionLeaseActions;
import org.elasticsearch.indices.SystemIndices;
Expand Down Expand Up @@ -448,6 +451,7 @@ public class ActionModule extends AbstractModule {
private final RequestValidators<PutMappingRequest> mappingRequestValidators;
private final RequestValidators<IndicesAliasesRequest> indicesAliasesRequestRequestValidators;
private final ThreadPool threadPool;
private final ImmutableClusterStateController immutableStateController;

public ActionModule(
Settings settings,
Expand All @@ -460,7 +464,9 @@ public ActionModule(
NodeClient nodeClient,
CircuitBreakerService circuitBreakerService,
UsageService usageService,
SystemIndices systemIndices
SystemIndices systemIndices,
ClusterService clusterService,
List<ImmutableClusterStateHandler<?>> reservedStateHandlers
) {
this.settings = settings;
this.indexNameExpressionResolver = indexNameExpressionResolver;
Expand Down Expand Up @@ -511,6 +517,7 @@ public ActionModule(
);

restController = new RestController(headers, restInterceptor, nodeClient, circuitBreakerService, usageService);
immutableStateController = new ImmutableClusterStateController(clusterService, reservedStateHandlers);
}

public Map<String, ActionHandler<?, ?>> getActions() {
Expand Down Expand Up @@ -920,4 +927,8 @@ public ActionFilters getActionFilters() {
public RestController getRestController() {
return restController;
}

public ImmutableClusterStateController getImmutableClusterStateController() {
return immutableStateController;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.ImmutableStateMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand All @@ -42,7 +43,9 @@
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
Expand Down Expand Up @@ -172,9 +175,39 @@ protected Set<String> modifiedKeys(Request request) {
return Collections.emptySet();
}

// package private for testing
void validateForImmutableState(Request request, ClusterState state) {
Optional<String> handlerName = immutableStateHandlerName();
assert handlerName.isPresent();

Set<String> modified = modifiedKeys(request);
List<String> errors = new ArrayList<>();

for (ImmutableStateMetadata metadata : state.metadata().immutableStateMetadata().values()) {
Set<String> conflicts = metadata.conflicts(handlerName.get(), modified);
if (conflicts.isEmpty() == false) {
errors.add(format("[%s] set as read-only by [%s]", String.join(",", conflicts), metadata.namespace()));
}
}

if (errors.isEmpty() == false) {
throw new IllegalArgumentException(
format("Failed to process request [%s] with errors: %s", request, String.join(System.lineSeparator(), errors))
);
}
}

// package private for testing
boolean supportsImmutableState() {
return immutableStateHandlerName().isPresent();
}

@Override
protected void doExecute(Task task, final Request request, ActionListener<Response> listener) {
ClusterState state = clusterService.state();
if (supportsImmutableState()) {
validateForImmutableState(request, state);
}
logger.trace("starting processing request [{}] with cluster state version [{}]", request, state.version());
if (task != null) {
request.setParentTask(clusterService.localNode().getId(), task.getId());
Expand Down
14 changes: 0 additions & 14 deletions server/src/main/java/org/elasticsearch/common/util/Maps.java
Original file line number Diff line number Diff line change
Expand Up @@ -284,18 +284,4 @@ static int capacity(int expectedSize) {
return expectedSize < 2 ? expectedSize + 1 : (int) (expectedSize / 0.75 + 1.0);
}

/**
* Convenience method to convert the passed in input object as a map with String keys.
*
* @param input the input passed into the operator handler after parsing the content
* @return
*/
@SuppressWarnings("unchecked")
public static Map<String, ?> asMap(Object input) {
if (input instanceof Map<?, ?> source) {
return (Map<String, Object>) source;
}
throw new IllegalStateException("Unsupported input format");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@

import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;

Expand All @@ -35,7 +37,6 @@ public interface ImmutableClusterStateHandler<T> {
* cluster state update and the cluster state is typically supplied as a combined content,
* unlike the REST handlers. This name must match a desired content key name in the combined
* cluster state update, e.g. "ilm" or "cluster_settings" (for persistent cluster settings update).
* </p>
*
* @return a String with the handler name, e.g "ilm".
*/
Expand All @@ -51,7 +52,6 @@ public interface ImmutableClusterStateHandler<T> {
* state in one go. For that reason, we supply a wrapper class to the cluster state called
* {@link TransformState}, which contains the current cluster state as well as any previous keys
* set by this handler on prior invocation.
* </p>
*
* @param source The parsed information specific to this handler from the combined cluster state content
* @param prevState The previous cluster state and keys set by this handler (if any)
Expand All @@ -70,7 +70,6 @@ public interface ImmutableClusterStateHandler<T> {
* to any immutable handler to declare other immutable state handlers it depends on. Given dependencies exist,
* the ImmutableClusterStateController will order those handlers such that the handlers that are dependent
* on are processed first.
* </p>
*
* @return a collection of immutable state handler names
*/
Expand All @@ -85,7 +84,6 @@ default Collection<String> dependencies() {
* All implementations of {@link ImmutableClusterStateHandler} should call the request validate method, by calling this default
* implementation. To aid in any special validation logic that may need to be implemented by the immutable cluster state handler
* we provide this convenience method.
* </p>
*
* @param request the master node request that we base this immutable state handler on
*/
Expand All @@ -95,4 +93,18 @@ default void validate(MasterNodeRequest<?> request) {
throw new IllegalStateException("Validation error", exception);
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Splitting off the parsing part of handling immutable cluster state, so that we can reuse the immutable cluster state update controller to work with plain Java objects. This will come in handy when plugins/modules will want to save state on initialization.

/**
* The parse content method which is called during parsing of file based content.
*
* <p>
* The immutable state can be provided as XContent, which means that each handler needs
* to implement a method to convert an XContent to an object it can consume later in
* transform
*
* @param parser the XContent parser we are parsing from
* @return
* @throws IOException
*/
T fromXContent(XContentParser parser) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,22 @@
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.immutablestate.ImmutableClusterStateHandler;
import org.elasticsearch.immutablestate.TransformState;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.elasticsearch.common.util.Maps.asMap;

/**
* This Action is the immutable state save version of RestClusterUpdateSettingsAction
* <p>
* It is used by the ImmutableClusterStateController to update the persistent cluster settings.
* Since transient cluster settings are deprecated, this action doesn't support updating transient cluster settings.
*/
public class ImmutableClusterSettingsAction implements ImmutableClusterStateHandler<ClusterUpdateSettingsRequest> {
public class ImmutableClusterSettingsAction implements ImmutableClusterStateHandler<Map<String, Object>> {

public static final String NAME = "cluster_settings";

Expand All @@ -49,11 +49,12 @@ public String name() {
private ClusterUpdateSettingsRequest prepare(Object input, Set<String> previouslySet) {
final ClusterUpdateSettingsRequest clusterUpdateSettingsRequest = Requests.clusterUpdateSettingsRequest();

Map<String, ?> source = asMap(input);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor change here in the interface, so that I can split the XContent parsing from the state transformation.

Map<String, Object> persistentSettings = new HashMap<>();
Set<String> toDelete = new HashSet<>(previouslySet);

source.forEach((k, v) -> {
Map<String, Object> settings = (Map<String, Object>) input;

settings.forEach((k, v) -> {
persistentSettings.put(k, v);
toDelete.remove(k);
});
Expand Down Expand Up @@ -87,4 +88,9 @@ public TransformState transform(Object input, TransformState prevState) {

return new TransformState(state, currentKeys);
}

@Override
public Map<String, Object> fromXContent(XContentParser parser) throws IOException {
return parser.map();
}
}
Loading