Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Operator and plugin read-only settings #86224

Closed
wants to merge 58 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
74dc598
WIP.
Apr 21, 2022
44fc2d0
Fix compile error.
Apr 21, 2022
9244455
More WIP
Apr 25, 2022
764505e
More refactoring
Apr 25, 2022
31b5ae3
Make ILM do processing
Apr 26, 2022
7f4898e
Merge master
Apr 26, 2022
682dec4
Fix merge
Apr 26, 2022
5db45de
Add file watcher service
Apr 28, 2022
fc45962
Add additional exception handling
Apr 29, 2022
273e56d
Fix issue with watcher starting after service is stopped
Apr 29, 2022
2533448
Add delete paths
Apr 29, 2022
86ec3b4
One more thread to skip
Apr 30, 2022
38ebdad
Merge master
May 19, 2022
13160ea
Merge master
May 25, 2022
5dfc772
Merge branch 'master' into feature/bulk_settings
May 26, 2022
dc21c39
Refactor for modules
May 26, 2022
ae19af7
Refactor for modules
May 26, 2022
59b18ce
Add additional metadata to cluster state
May 30, 2022
5618019
Make file service work
May 31, 2022
5222fd1
Update test
May 31, 2022
8e02c4e
Merge master
May 31, 2022
1505524
Add error detection and save
Jun 2, 2022
f69562e
Fix tests
Jun 3, 2022
6a80673
Fix bwc
Jun 3, 2022
ba72b13
More bwc fixes
Jun 3, 2022
0b626e3
Undo unnecessary changes
Jun 3, 2022
4048a3c
switch to jdk Map
Jun 6, 2022
131dd07
Extend metadata with error kind
Jun 6, 2022
a7a7986
Add more tests
Jun 6, 2022
9e725e9
Add more error handling
Jun 6, 2022
3bd1468
Add verification for operator set keys
Jun 6, 2022
1f07274
Add checking for ILM policies
Jun 6, 2022
14a8828
Merge master
Jun 6, 2022
02b43a6
Fix compile error after merge
Jun 6, 2022
95d906b
Add more ILM action tests
Jun 7, 2022
931db49
Add ilm transport operator tests
Jun 7, 2022
aac8492
Revert not needed change
Jun 7, 2022
936376e
Add more unit tests
Jun 7, 2022
092af3e
Add controller tasks tests
Jun 7, 2022
002c5ed
Add handler ordering unit tests
Jun 7, 2022
45d5a69
Add FileSettingsService tests
Jun 8, 2022
6b6e99b
Document code
Jun 8, 2022
813398e
Add TransportMasterNodeActionTests unit tests
Jun 8, 2022
77dd3d0
Add more tests and fix issues
Jun 8, 2022
fd9a1a0
Apply PR suggestions
Jun 8, 2022
503b496
Make metadata hashing consistent
Jun 9, 2022
ebbcdfa
Merge branch 'master' into feature/bulk_settings
Jun 9, 2022
c89dccc
Support async error reporting in the controller
Jun 9, 2022
4180739
Merge branch 'master' into feature/bulk_settings
Jun 9, 2022
f7eee71
Merge branch 'master' into feature/bulk_settings
Jun 13, 2022
e4b5938
Apply martijnvg's suggestions
Jun 13, 2022
5630b52
Replace registration with SPI
Jun 14, 2022
eee70fb
fix javadoc
Jun 14, 2022
602b33c
Change to illegalargument exception
Jun 15, 2022
f61b5b7
Fail start on bad operator config
Jun 15, 2022
e81d779
Add version check on error metadata
Jun 16, 2022
4272392
Refactor ilm section to exclude policy
Jun 16, 2022
26e60a6
Merge master
Jun 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@ public void testThreadNames() throws Exception {
// or the ones that are occasionally come up from ESSingleNodeTestCase
if (threadName.contains("[node_s_0]") // TODO: this can't possibly be right! single node and integ test are unrelated!
|| threadName.contains("Keep-Alive-Timer")
|| threadName.contains("readiness-service")
|| threadName.contains("JVMCI-native") // GraalVM Compiler Thread
|| threadName.contains("readiness-service")) {
|| threadName.contains("file-settings-watcher")
|| threadName.contains("FileSystemWatchService")) {
continue;
}
String nodePrefix = "("
Expand Down
1 change: 1 addition & 0 deletions server/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@
exports org.elasticsearch.monitor.os;
exports org.elasticsearch.monitor.process;
exports org.elasticsearch.node;
exports org.elasticsearch.operator;
exports org.elasticsearch.persistent;
exports org.elasticsearch.persistent.decider;
exports org.elasticsearch.plugins;
Expand Down
33 changes: 32 additions & 1 deletion server/src/main/java/org/elasticsearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.NamedRegistry;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.TypeLiteral;
Expand All @@ -265,12 +266,17 @@
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetadata;
import org.elasticsearch.operator.OperatorHandler;
import org.elasticsearch.operator.OperatorHandlerProvider;
import org.elasticsearch.operator.action.OperatorClusterUpdateSettingsAction;
import org.elasticsearch.operator.service.OperatorClusterStateController;
import org.elasticsearch.persistent.CompletionPersistentTaskAction;
import org.elasticsearch.persistent.RemovePersistentTaskAction;
import org.elasticsearch.persistent.StartPersistentTaskAction;
import org.elasticsearch.persistent.UpdatePersistentTaskStatusAction;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.ActionPlugin.ActionHandler;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.plugins.interceptor.RestInterceptorActionPlugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
Expand Down Expand Up @@ -445,6 +451,8 @@ public class ActionModule extends AbstractModule {
private final RequestValidators<PutMappingRequest> mappingRequestValidators;
private final RequestValidators<IndicesAliasesRequest> indicesAliasesRequestRequestValidators;
private final ThreadPool threadPool;
private final OperatorClusterStateController operatorController;
private final ClusterService clusterService;

public ActionModule(
Settings settings,
Expand All @@ -457,7 +465,8 @@ public ActionModule(
NodeClient nodeClient,
CircuitBreakerService circuitBreakerService,
UsageService usageService,
SystemIndices systemIndices
SystemIndices systemIndices,
ClusterService clusterService
) {
this.settings = settings;
this.indexNameExpressionResolver = indexNameExpressionResolver;
Expand All @@ -466,6 +475,7 @@ public ActionModule(
this.settingsFilter = settingsFilter;
this.actionPlugins = actionPlugins;
this.threadPool = threadPool;
this.clusterService = clusterService;
actions = setupActions(actionPlugins);
actionFilters = setupActionFilters(actionPlugins);
autoCreateIndex = new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver, systemIndices);
Expand Down Expand Up @@ -508,6 +518,7 @@ public ActionModule(
);

restController = new RestController(headers, restInterceptor, nodeClient, circuitBreakerService, usageService);
operatorController = new OperatorClusterStateController(clusterService);
}

public Map<String, ActionHandler<?, ?>> getActions() {
Expand Down Expand Up @@ -882,6 +893,22 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestCatAction(catActions));
}

/**
* Initializes the operator action handlers for Elasticsearch and it's modules/plugins
*
* @param pluginsService needed to load all modules/plugins operator handlers through SPI
*/
public void initOperatorHandlers(PluginsService pluginsService) {
List<OperatorHandler<?>> handlers = new ArrayList<>();

List<? extends OperatorHandlerProvider> pluginHandlers = pluginsService.loadServiceProviders(OperatorHandlerProvider.class);

handlers.add(new OperatorClusterUpdateSettingsAction(clusterSettings));
pluginHandlers.forEach(h -> handlers.addAll(h.handlers()));

operatorController.initHandlers(handlers);
}

@Override
protected void configure() {
bind(ActionFilters.class).toInstance(actionFilters);
Expand Down Expand Up @@ -914,4 +941,8 @@ public ActionFilters getActionFilters() {
public RestController getRestController() {
return restController;
}

public OperatorClusterStateController getOperatorController() {
return operatorController;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.operator.action.OperatorClusterUpdateSettingsAction;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.HashSet;
import java.util.Optional;
import java.util.Set;

import static org.elasticsearch.common.settings.AbstractScopedSettings.ARCHIVED_SETTINGS_PREFIX;
Expand Down Expand Up @@ -128,6 +130,17 @@ private static boolean checkClearedBlockAndArchivedSettings(
return true;
}

@Override
protected Optional<String> operatorHandlerName() {
return Optional.of(OperatorClusterUpdateSettingsAction.NAME);
}

@Override
protected Set<String> modifiedKeys(ClusterUpdateSettingsRequest request) {
Settings allSettings = Settings.builder().put(request.persistentSettings()).put(request.transientSettings()).build();
return allSettings.keySet();
}

private static final String UPDATE_TASK_SOURCE = "cluster_update_settings";
private static final String REROUTE_TASK_SOURCE = "reroute_after_cluster_update_settings";

Expand Down Expand Up @@ -243,6 +256,10 @@ public ClusterUpdateSettingsTask(
this.request = request;
}

public ClusterUpdateSettingsTask(final ClusterSettings clusterSettings, ClusterUpdateSettingsRequest request) {
this(clusterSettings, Priority.IMMEDIATE, request, null);
}

@Override
public ClusterState execute(final ClusterState currentState) {
final ClusterState clusterState = updater.updateSettings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.OperatorMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -42,6 +43,11 @@
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;

import static org.elasticsearch.core.Strings.format;
Expand Down Expand Up @@ -142,9 +148,62 @@ private ClusterBlockException checkBlockIfStateRecovered(Request request, Cluste
}
}

/**
* Override this method if the master node action also has an {@link org.elasticsearch.operator.OperatorHandler}
* interaction. We need to check if certain settings or entities are allowed to be modified by the master node
* action, depending on if they are set already in operator mode.
*
* @return an Optional of the operator handler name
*/
protected Optional<String> operatorHandlerName() {
return Optional.empty();
}

/**
* Override this method to return the keys of the cluster state or cluster entities that are modified by
* the Request object. This method is used by the operator handler logic (see {@link org.elasticsearch.operator.OperatorHandler})
* to verify if the keys don't conflict with an existing key set in operator mode.
*
* @param request the TransportMasterNode request
* @return set of String keys intended to be modified/set/deleted by this request
*/
protected Set<String> modifiedKeys(Request request) {
return Collections.emptySet();
}

// package private for testing
void validateForOperatorState(Request request, ClusterState state) {
Optional<String> handlerName = operatorHandlerName();
assert handlerName.isPresent();

Set<String> modified = modifiedKeys(request);
List<String> errors = new ArrayList<>();

for (OperatorMetadata operator : state.metadata().operatorState().values()) {
Set<String> conflicts = operator.conflicts(handlerName.get(), modified);
if (conflicts.isEmpty() == false) {
errors.add(format("[%s] set in operator mode by [%s]", String.join(",", conflicts), operator.namespace()));
}
}

if (errors.isEmpty() == false) {
throw new IllegalArgumentException(
format("Failed to process request [%s] with errors: %s", request, String.join(System.lineSeparator(), errors))
);
}
}

// package private for testing
boolean supportsOperatorMode() {
return operatorHandlerName().isPresent();
}

@Override
protected void doExecute(Task task, final Request request, ActionListener<Response> listener) {
ClusterState state = clusterService.state();
if (supportsOperatorMode()) {
validateForOperatorState(request, state);
}
logger.trace("starting processing request [{}] with cluster state version [{}]", request, state.version());
if (task != null) {
request.setParentTask(clusterService.localNode().getId(), task.getId());
Expand Down
Loading