Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement few operator handlers #88097

Merged
merged 8 commits into from
Jul 4, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions server/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -359,4 +359,6 @@
org.elasticsearch.cluster.coordination.NodeToolCliProvider,
org.elasticsearch.index.shard.ShardToolCliProvider;
provides org.apache.logging.log4j.util.PropertySource with org.elasticsearch.common.logging.ESSystemPropertiesPropertySource;

uses org.elasticsearch.operator.OperatorHandlerProvider;
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.operator.action.OperatorClusterUpdateSettingsAction;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.HashSet;
import java.util.Optional;
import java.util.Set;

import static org.elasticsearch.common.settings.AbstractScopedSettings.ARCHIVED_SETTINGS_PREFIX;
Expand Down Expand Up @@ -128,6 +130,17 @@ private static boolean checkClearedBlockAndArchivedSettings(
return true;
}

@Override
protected Optional<String> operatorHandlerName() {
return Optional.of(OperatorClusterUpdateSettingsAction.NAME);
}

@Override
protected Set<String> modifiedKeys(ClusterUpdateSettingsRequest request) {
Settings allSettings = Settings.builder().put(request.persistentSettings()).put(request.transientSettings()).build();
return allSettings.keySet();
}

private static final String UPDATE_TASK_SOURCE = "cluster_update_settings";
private static final String REROUTE_TASK_SOURCE = "reroute_after_cluster_update_settings";

Expand Down Expand Up @@ -243,6 +256,11 @@ public ClusterUpdateSettingsTask(
this.request = request;
}

// Used by the operator handler
public ClusterUpdateSettingsTask(final ClusterSettings clusterSettings, ClusterUpdateSettingsRequest request) {
this(clusterSettings, Priority.IMMEDIATE, request, null);
}

@Override
public ClusterState execute(final ClusterState currentState) {
final ClusterState clusterState = updater.updateSettings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportService;

import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;

import static org.elasticsearch.core.Strings.format;
Expand Down Expand Up @@ -142,6 +145,33 @@ private ClusterBlockException checkBlockIfStateRecovered(Request request, Cluste
}
}

/**
* Override this method if the master node action also has an {@link org.elasticsearch.operator.OperatorHandler}
* interaction.
* <p>
* We need to check if certain settings or entities are allowed to be modified by the master node
* action, depending on if they are set already in operator mode.
*
* @return an Optional of the operator handler name
*/
protected Optional<String> operatorHandlerName() {
return Optional.empty();
}

/**
* Override this method to return the keys of the cluster state or cluster entities that are modified by
* the Request object.
* <p>
* This method is used by the operator handler logic (see {@link org.elasticsearch.operator.OperatorHandler})
* to verify if the keys don't conflict with an existing key set in operator mode.
*
* @param request the TransportMasterNode request
* @return set of String keys intended to be modified/set/deleted by this request
*/
protected Set<String> modifiedKeys(Request request) {
return Collections.emptySet();
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's an example of how these two methods operatorHandlerName and modifiedKeys could be used in doExecute below to verify that an operation is allowed to execute:

https://github.com/elastic/elasticsearch/pull/86224/files#diff-e7c445981d60f65376f04699ec430437d9b1c78fac6b5ef582f39f84e43de9ceR175

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the validateForOperatorState(...) validation method in the referenced link,
I do think the logic looks good, but I do think this also needs to be validated as part
of a cluster state update. Right now the validation there only occurs on the action/transport
level and there is no guarantee that what is validated at that point in time is also
true when the update if really performed. I think it is a good pre-check, that is likely
to catch almost all cases when resources are updates that are being managed operator state.

For example in order to guarantee that no operator state managed ilm policies are modified
via API. The UpdateLifecyclePolicyTask/DeleteLifecyclePolicyTask should do this validation
check as well as part of their respective execute(...) method.

@Override
protected void doExecute(Task task, final Request request, ActionListener<Response> listener) {
ClusterState state = clusterService.state();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.operator.action;

import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.TransportClusterUpdateSettingsAction;
import org.elasticsearch.client.internal.Requests;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.operator.OperatorHandler;
import org.elasticsearch.operator.TransformState;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.elasticsearch.common.util.Maps.asMap;

/**
* This Action is the Operator version of RestClusterUpdateSettingsAction
* <p>
* It is used by the OperatorClusterStateController to update the persistent cluster settings.
* Since transient cluster settings are deprecated, this action doesn't support updating cluster settings.
*/
public class OperatorClusterUpdateSettingsAction implements OperatorHandler<ClusterUpdateSettingsRequest> {

public static final String NAME = "cluster_settings";

private final ClusterSettings clusterSettings;

public OperatorClusterUpdateSettingsAction(ClusterSettings clusterSettings) {
this.clusterSettings = clusterSettings;
}

@Override
public String name() {
return NAME;
}

@SuppressWarnings("unchecked")
private ClusterUpdateSettingsRequest prepare(Object input, Set<String> previouslySet) {
final ClusterUpdateSettingsRequest clusterUpdateSettingsRequest = Requests.clusterUpdateSettingsRequest();

Map<String, ?> source = asMap(input);
Map<String, Object> persistentSettings = new HashMap<>();
Set<String> toDelete = new HashSet<>(previouslySet);

source.forEach((k, v) -> {
persistentSettings.put(k, v);
toDelete.remove(k);
});

toDelete.forEach(k -> persistentSettings.put(k, null));

clusterUpdateSettingsRequest.persistentSettings(persistentSettings);
return clusterUpdateSettingsRequest;
}

@Override
public TransformState transform(Object input, TransformState prevState) {
ClusterUpdateSettingsRequest request = prepare(input, prevState.keys());

// allow empty requests, this is how we clean up settings
if (request.persistentSettings().isEmpty() == false) {
validate(request);
}

ClusterState state = prevState.state();

TransportClusterUpdateSettingsAction.ClusterUpdateSettingsTask updateSettingsTask =
new TransportClusterUpdateSettingsAction.ClusterUpdateSettingsTask(clusterSettings, request);

state = updateSettingsTask.execute(state);
Set<String> currentKeys = request.persistentSettings()
.keySet()
.stream()
.filter(k -> request.persistentSettings().hasValue(k))
.collect(Collectors.toSet());

return new TransformState(state, currentKeys);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.operator.action;

import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.operator.TransformState;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParserConfiguration;
import org.elasticsearch.xcontent.XContentType;

import java.util.Collections;

import static org.hamcrest.Matchers.containsInAnyOrder;

public class OperatorClusterUpdateSettingsActionTests extends ESTestCase {

private TransformState processJSON(OperatorClusterUpdateSettingsAction action, TransformState prevState, String json) throws Exception {
try (XContentParser parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, json)) {
return action.transform(parser.map(), prevState);
}
}

public void testValidation() throws Exception {
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);

ClusterState state = ClusterState.builder(new ClusterName("elasticsearch")).build();
TransformState prevState = new TransformState(state, Collections.emptySet());
OperatorClusterUpdateSettingsAction action = new OperatorClusterUpdateSettingsAction(clusterSettings);

String badPolicyJSON = """
{
"indices.recovery.min_bytes_per_sec": "50mb"
}""";

assertEquals(
"persistent setting [indices.recovery.min_bytes_per_sec], not recognized",
expectThrows(IllegalArgumentException.class, () -> processJSON(action, prevState, badPolicyJSON)).getMessage()
);
}

public void testSetUnsetSettings() throws Exception {
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);

ClusterState state = ClusterState.builder(new ClusterName("elasticsearch")).build();
TransformState prevState = new TransformState(state, Collections.emptySet());
OperatorClusterUpdateSettingsAction action = new OperatorClusterUpdateSettingsAction(clusterSettings);

String emptyJSON = "";

TransformState updatedState = processJSON(action, prevState, emptyJSON);
assertEquals(0, updatedState.keys().size());
assertEquals(prevState.state(), updatedState.state());

String settingsJSON = """
{
"indices.recovery.max_bytes_per_sec": "50mb",
"cluster": {
"remote": {
"cluster_one": {
"seeds": [
"127.0.0.1:9300"
]
}
}
}
}""";

prevState = updatedState;
updatedState = processJSON(action, prevState, settingsJSON);
assertThat(updatedState.keys(), containsInAnyOrder("indices.recovery.max_bytes_per_sec", "cluster.remote.cluster_one.seeds"));
assertEquals("50mb", updatedState.state().metadata().persistentSettings().get("indices.recovery.max_bytes_per_sec"));
assertEquals("[127.0.0.1:9300]", updatedState.state().metadata().persistentSettings().get("cluster.remote.cluster_one.seeds"));

String oneSettingJSON = """
{
"indices.recovery.max_bytes_per_sec": "25mb"
}""";

prevState = updatedState;
updatedState = processJSON(action, prevState, oneSettingJSON);
assertThat(updatedState.keys(), containsInAnyOrder("indices.recovery.max_bytes_per_sec"));
assertEquals("25mb", updatedState.state().metadata().persistentSettings().get("indices.recovery.max_bytes_per_sec"));
assertNull(updatedState.state().metadata().persistentSettings().get("cluster.remote.cluster_one.seeds"));

prevState = updatedState;
updatedState = processJSON(action, prevState, emptyJSON);
assertEquals(0, updatedState.keys().size());
assertNull(updatedState.state().metadata().persistentSettings().get("indices.recovery.max_bytes_per_sec"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import java.io.IOException;
import java.util.Objects;

import static org.elasticsearch.core.Strings.format;

public class DeleteLifecycleAction extends ActionType<AcknowledgedResponse> {
public static final DeleteLifecycleAction INSTANCE = new DeleteLifecycleAction();
public static final String NAME = "cluster:admin/ilm/delete";
Expand Down Expand Up @@ -75,6 +77,11 @@ public boolean equals(Object obj) {
return Objects.equals(policyName, other.policyName);
}

@Override
public String toString() {
return format("delete lifecycle policy [%s]", policyName);
}

}

}
2 changes: 2 additions & 0 deletions x-pack/plugin/ilm/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,6 @@
exports org.elasticsearch.xpack.ilm to org.elasticsearch.server;
exports org.elasticsearch.xpack.slm.action to org.elasticsearch.server;
exports org.elasticsearch.xpack.slm to org.elasticsearch.server;

provides org.elasticsearch.operator.OperatorHandlerProvider with org.elasticsearch.xpack.ilm.operator.ILMOperatorHandlerProvider;
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@
import org.elasticsearch.xpack.core.ilm.LifecyclePolicyMetadata;
import org.elasticsearch.xpack.core.ilm.action.DeleteLifecycleAction;
import org.elasticsearch.xpack.core.ilm.action.DeleteLifecycleAction.Request;
import org.elasticsearch.xpack.ilm.operator.action.OperatorLifecycleAction;

import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;

Expand Down Expand Up @@ -70,6 +73,11 @@ public DeleteLifecyclePolicyTask(Request request, ActionListener<AcknowledgedRes
this.request = request;
}

// used by the operator handler
public DeleteLifecyclePolicyTask(String policyName) {
this(new Request(policyName), null);
}

@Override
public ClusterState execute(ClusterState currentState) {
String policyToDelete = request.getPolicyName();
Expand Down Expand Up @@ -107,4 +115,14 @@ private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}

@Override
protected Optional<String> operatorHandlerName() {
return Optional.of(OperatorLifecycleAction.NAME);
}

@Override
protected Set<String> modifiedKeys(Request request) {
return Set.of(request.getPolicyName());
}
}
Loading