Skip to content

Commit

Permalink
Add OperatorHandler interface (#87767)
Browse files Browse the repository at this point in the history
  • Loading branch information
grcevski authored Jun 27, 2022
1 parent 58e194c commit c2d1b22
Show file tree
Hide file tree
Showing 7 changed files with 285 additions and 0 deletions.
1 change: 1 addition & 0 deletions server/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@
exports org.elasticsearch.monitor.os;
exports org.elasticsearch.monitor.process;
exports org.elasticsearch.node;
exports org.elasticsearch.operator;
exports org.elasticsearch.persistent;
exports org.elasticsearch.persistent.decider;
exports org.elasticsearch.plugins;
Expand Down
15 changes: 15 additions & 0 deletions server/src/main/java/org/elasticsearch/common/util/Maps.java
Original file line number Diff line number Diff line change
Expand Up @@ -283,4 +283,19 @@ static int capacity(int expectedSize) {
assert expectedSize >= 0;
return expectedSize < 2 ? expectedSize + 1 : (int) (expectedSize / 0.75 + 1.0);
}

/**
* Convenience method to convert the passed in input object as a map with String keys.
*
* @param input the input passed into the operator handler after parsing the content
* @return
*/
@SuppressWarnings("unchecked")
public static Map<String, ?> asMap(Object input) {
if (input instanceof Map<?, ?> source) {
return (Map<String, Object>) source;
}
throw new IllegalStateException("Unsupported input format");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.common.xcontent;

import org.elasticsearch.ElasticsearchGenerationException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -600,4 +601,21 @@ public static void writeTo(StreamOutput out, XContentType xContentType) throws I
out.writeVInt(xContentType.ordinal());
}
}

/**
* Convenience method that creates a {@link XContentParser} from a content map so that it can be passed to
* existing REST based code for input parsing.
*
* @param config XContentParserConfiguration for this mapper
* @param source the operator content as a map
* @return
*/
public static XContentParser mapToXContentParser(XContentParserConfiguration config, Map<String, ?> source) {
try (XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON)) {
builder.map(source);
return XContentFactory.xContent(builder.contentType()).createParser(config, Strings.toString(builder));
} catch (IOException e) {
throw new ElasticsearchGenerationException("Failed to generate [" + source + "]", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.operator;

import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.MasterNodeRequest;

import java.util.Collection;
import java.util.Collections;

/**
* OperatorHandler base interface used for implementing operator state actions.
*
* <p>
* Updating cluster state in operator mode, for file based settings and modules/plugins, requires
* that we have a separate update handler interface that is different than REST handlers. This interface declares
* the basic contract for implementing cluster state update handlers in operator mode.
* </p>
*/
public interface OperatorHandler<T> {
String CONTENT = "content";

/**
* Unique identifier for the handler.
*
* <p>
* The operator handler name is a unique identifier that is matched to a section in a
* cluster state update content. The operator cluster state updates are done as a single
* cluster state update and the cluster state is typically supplied as a combined content,
* unlike the REST handlers. This name must match a desired content key name in the combined
* cluster state update, e.g. "ilm" or "cluster_settings" (for persistent cluster settings update).
* </p>
*
* @return a String with the operator key name, e.g "ilm".
*/
String name();

/**
* The transformation method implemented by the handler.
*
* <p>
* The transform method of the operator handler should apply the necessary changes to
* the cluster state as it normally would in a REST handler. One difference is that the
* transform method in an operator handler must perform all CRUD operations of the cluster
* state in one go. For that reason, we supply a wrapper class to the cluster state called
* {@link TransformState}, which contains the current cluster state as well as any previous keys
* set by this handler on prior invocation.
* </p>
*
* @param source The parsed information specific to this handler from the combined cluster state content
* @param prevState The previous cluster state and keys set by this handler (if any)
* @return The modified state and the current keys set by this handler
* @throws Exception
*/
TransformState transform(Object source, TransformState prevState) throws Exception;

/**
* List of dependent handler names for this handler.
*
* <p>
* Sometimes certain parts of the cluster state cannot be created/updated without previously
* setting other cluster state components, e.g. composable templates. Since the cluster state handlers
* are processed in random order by the OperatorClusterStateController, this method gives an opportunity
* to any operator handler to declare other operator handlers it depends on. Given dependencies exist,
* the OperatorClusterStateController will order those handlers such that the handlers that are dependent
* on are processed first.
* </p>
*
* @return a collection of operator handler names
*/
default Collection<String> dependencies() {
return Collections.emptyList();
}

/**
* Generic validation helper method that throws consistent exception for all handlers.
*
* <p>
* All implementations of OperatorHandler should call the request validate method, by calling this default
* implementation. To aid in any special validation logic that may need to be implemented by the operator handler
* we provide this convenience method.
* </p>
*
* @param request the master node request that we base this operator handler on
*/
default void validate(MasterNodeRequest<?> request) {
ActionRequestValidationException exception = request.validate();
if (exception != null) {
throw new IllegalStateException("Validation error", exception);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.operator;

import java.util.Collection;

/**
* SPI service interface for supplying OperatorHandler implementations to Elasticsearch
* from plugins/modules.
*/
public interface OperatorHandlerProvider {
/**
* Returns a list of OperatorHandler implementations that a module/plugin supplies.
* @see OperatorHandler
*
* @return a list of ${@link OperatorHandler}s
*/
Collection<OperatorHandler<?>> handlers();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.operator;

import org.elasticsearch.cluster.ClusterState;

import java.util.Set;

/**
* A {@link ClusterState} wrapper used by the OperatorClusterStateController to pass the
* current state as well as previous keys set by an {@link OperatorHandler} to each transform
* step of the cluster state update.
*
*/
public record TransformState(ClusterState state, Set<String> keys) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.operator;

import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.indices.settings.InternalOrPrivateSettingsPlugin;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParserConfiguration;
import org.elasticsearch.xcontent.XContentType;

import java.io.IOException;
import java.util.Map;

import static org.hamcrest.Matchers.containsInAnyOrder;

public class OperatorHandlerTests extends ESTestCase {
public void testValidation() {
OperatorHandler<ValidRequest> handler = new OperatorHandler<>() {
@Override
public String name() {
return "handler";
}

@Override
public TransformState transform(Object source, TransformState prevState) throws Exception {
return prevState;
}
};

handler.validate(new ValidRequest());
assertEquals(
"Validation error",
expectThrows(IllegalStateException.class, () -> handler.validate(new InvalidRequest())).getMessage()
);
}

public void testAsMapAndFromMap() throws IOException {
String someJSON = """
{
"persistent": {
"indices.recovery.max_bytes_per_sec": "25mb",
"cluster": {
"remote": {
"cluster_one": {
"seeds": [
"127.0.0.1:9300"
]
}
}
}
}
}""";

OperatorHandler<ValidRequest> persistentHandler = new OperatorHandler<>() {
@Override
public String name() {
return "persistent";
}

@Override
public TransformState transform(Object source, TransformState prevState) throws Exception {
return prevState;
}
};

try (XContentParser parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, someJSON)) {
Map<String, Object> originalMap = parser.map();

Map<String, ?> internalHandlerMap = Maps.asMap(originalMap.get(persistentHandler.name()));
assertThat(internalHandlerMap.keySet(), containsInAnyOrder("indices.recovery.max_bytes_per_sec", "cluster"));
assertEquals(
"Unsupported input format",
expectThrows(IllegalStateException.class, () -> Maps.asMap(Integer.valueOf(123))).getMessage()
);

try (XContentParser newParser = XContentHelper.mapToXContentParser(XContentParserConfiguration.EMPTY, originalMap)) {
Map<String, Object> newMap = newParser.map();

assertThat(newMap.keySet(), containsInAnyOrder("persistent"));
}
}
}

static class ValidRequest extends MasterNodeRequest<InternalOrPrivateSettingsPlugin.UpdateInternalOrPrivateAction.Request> {
@Override
public ActionRequestValidationException validate() {
return null;
}
}

static class InvalidRequest extends ValidRequest {
@Override
public ActionRequestValidationException validate() {
return new ActionRequestValidationException();
}
}
}

0 comments on commit c2d1b22

Please sign in to comment.