diff --git a/server/src/internalClusterTest/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java b/server/src/internalClusterTest/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java index 95e1d23fc208e..51d4fc3437c9d 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java @@ -81,8 +81,10 @@ public void testThreadNames() throws Exception { // or the ones that are occasionally come up from ESSingleNodeTestCase if (threadName.contains("[node_s_0]") // TODO: this can't possibly be right! single node and integ test are unrelated! || threadName.contains("Keep-Alive-Timer") + || threadName.contains("readiness-service") || threadName.contains("JVMCI-native") // GraalVM Compiler Thread - || threadName.contains("readiness-service")) { + || threadName.contains("file-settings-watcher") + || threadName.contains("FileSystemWatchService")) { continue; } String nodePrefix = "(" diff --git a/server/src/main/java/module-info.java b/server/src/main/java/module-info.java index f0b4ccd3ff049..0ae63370a22d4 100644 --- a/server/src/main/java/module-info.java +++ b/server/src/main/java/module-info.java @@ -272,6 +272,7 @@ exports org.elasticsearch.monitor.os; exports org.elasticsearch.monitor.process; exports org.elasticsearch.node; + exports org.elasticsearch.operator; exports org.elasticsearch.persistent; exports org.elasticsearch.persistent.decider; exports org.elasticsearch.plugins; diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 1d0ceb6bea196..4eb7ee15f7e56 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -249,6 +249,7 @@ import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.NamedRegistry; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.TypeLiteral; @@ -265,12 +266,17 @@ import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetadata; +import org.elasticsearch.operator.OperatorHandler; +import org.elasticsearch.operator.OperatorHandlerProvider; +import org.elasticsearch.operator.action.OperatorClusterUpdateSettingsAction; +import org.elasticsearch.operator.service.OperatorClusterStateController; import org.elasticsearch.persistent.CompletionPersistentTaskAction; import org.elasticsearch.persistent.RemovePersistentTaskAction; import org.elasticsearch.persistent.StartPersistentTaskAction; import org.elasticsearch.persistent.UpdatePersistentTaskStatusAction; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.ActionPlugin.ActionHandler; +import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.plugins.interceptor.RestInterceptorActionPlugin; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; @@ -445,6 +451,8 @@ public class ActionModule extends AbstractModule { private final RequestValidators mappingRequestValidators; private final RequestValidators indicesAliasesRequestRequestValidators; private final ThreadPool threadPool; + private final OperatorClusterStateController operatorController; + private final ClusterService clusterService; public ActionModule( Settings settings, @@ -457,7 +465,8 @@ public ActionModule( NodeClient nodeClient, CircuitBreakerService circuitBreakerService, UsageService usageService, - SystemIndices systemIndices + SystemIndices systemIndices, + ClusterService clusterService ) { this.settings = settings; this.indexNameExpressionResolver = indexNameExpressionResolver; @@ -466,6 +475,7 @@ public ActionModule( this.settingsFilter = settingsFilter; this.actionPlugins = actionPlugins; this.threadPool = threadPool; + this.clusterService = clusterService; actions = setupActions(actionPlugins); actionFilters = setupActionFilters(actionPlugins); autoCreateIndex = new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver, systemIndices); @@ -508,6 +518,7 @@ public ActionModule( ); restController = new RestController(headers, restInterceptor, nodeClient, circuitBreakerService, usageService); + operatorController = new OperatorClusterStateController(clusterService); } public Map> getActions() { @@ -882,6 +893,22 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestCatAction(catActions)); } + /** + * Initializes the operator action handlers for Elasticsearch and it's modules/plugins + * + * @param pluginsService needed to load all modules/plugins operator handlers through SPI + */ + public void initOperatorHandlers(PluginsService pluginsService) { + List> handlers = new ArrayList<>(); + + List pluginHandlers = pluginsService.loadServiceProviders(OperatorHandlerProvider.class); + + handlers.add(new OperatorClusterUpdateSettingsAction(clusterSettings)); + pluginHandlers.forEach(h -> handlers.addAll(h.handlers())); + + operatorController.initHandlers(handlers); + } + @Override protected void configure() { bind(ActionFilters.class).toInstance(actionFilters); @@ -914,4 +941,8 @@ public ActionFilters getActionFilters() { public RestController getRestController() { return restController; } + + public OperatorClusterStateController getOperatorController() { + return operatorController; + } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java index 4f19aad41bc5e..b106fd45e6dc2 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java @@ -29,11 +29,13 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.SuppressForbidden; +import org.elasticsearch.operator.action.OperatorClusterUpdateSettingsAction; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.util.HashSet; +import java.util.Optional; import java.util.Set; import static org.elasticsearch.common.settings.AbstractScopedSettings.ARCHIVED_SETTINGS_PREFIX; @@ -128,6 +130,17 @@ private static boolean checkClearedBlockAndArchivedSettings( return true; } + @Override + protected Optional operatorHandlerName() { + return Optional.of(OperatorClusterUpdateSettingsAction.NAME); + } + + @Override + protected Set modifiedKeys(ClusterUpdateSettingsRequest request) { + Settings allSettings = Settings.builder().put(request.persistentSettings()).put(request.transientSettings()).build(); + return allSettings.keySet(); + } + private static final String UPDATE_TASK_SOURCE = "cluster_update_settings"; private static final String REROUTE_TASK_SOURCE = "reroute_after_cluster_update_settings"; @@ -243,6 +256,10 @@ public ClusterUpdateSettingsTask( this.request = request; } + public ClusterUpdateSettingsTask(final ClusterSettings clusterSettings, ClusterUpdateSettingsRequest request) { + this(clusterSettings, Priority.IMMEDIATE, request, null); + } + @Override public ClusterState execute(final ClusterState currentState) { final ClusterState clusterState = updater.updateSettings( diff --git a/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java b/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java index d92fb8f2956f9..cad5c5cd3d62f 100644 --- a/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java @@ -23,6 +23,7 @@ import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.OperatorMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; @@ -42,6 +43,11 @@ import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportService; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Set; import java.util.function.Predicate; import static org.elasticsearch.core.Strings.format; @@ -142,9 +148,62 @@ private ClusterBlockException checkBlockIfStateRecovered(Request request, Cluste } } + /** + * Override this method if the master node action also has an {@link org.elasticsearch.operator.OperatorHandler} + * interaction. We need to check if certain settings or entities are allowed to be modified by the master node + * action, depending on if they are set already in operator mode. + * + * @return an Optional of the operator handler name + */ + protected Optional operatorHandlerName() { + return Optional.empty(); + } + + /** + * Override this method to return the keys of the cluster state or cluster entities that are modified by + * the Request object. This method is used by the operator handler logic (see {@link org.elasticsearch.operator.OperatorHandler}) + * to verify if the keys don't conflict with an existing key set in operator mode. + * + * @param request the TransportMasterNode request + * @return set of String keys intended to be modified/set/deleted by this request + */ + protected Set modifiedKeys(Request request) { + return Collections.emptySet(); + } + + // package private for testing + void validateForOperatorState(Request request, ClusterState state) { + Optional handlerName = operatorHandlerName(); + assert handlerName.isPresent(); + + Set modified = modifiedKeys(request); + List errors = new ArrayList<>(); + + for (OperatorMetadata operator : state.metadata().operatorState().values()) { + Set conflicts = operator.conflicts(handlerName.get(), modified); + if (conflicts.isEmpty() == false) { + errors.add(format("[%s] set in operator mode by [%s]", String.join(",", conflicts), operator.namespace())); + } + } + + if (errors.isEmpty() == false) { + throw new IllegalArgumentException( + format("Failed to process request [%s] with errors: %s", request, String.join(System.lineSeparator(), errors)) + ); + } + } + + // package private for testing + boolean supportsOperatorMode() { + return operatorHandlerName().isPresent(); + } + @Override protected void doExecute(Task task, final Request request, ActionListener listener) { ClusterState state = clusterService.state(); + if (supportsOperatorMode()) { + validateForOperatorState(request, state); + } logger.trace("starting processing request [{}] with cluster state version [{}]", request, state.version()); if (task != null) { request.setParentTask(clusterService.localNode().getId(), task.getId()); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java index 940ea7e777afe..9849261178c01 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java @@ -207,6 +207,7 @@ default boolean isRestorable() { private final ImmutableOpenMap> aliasedIndices; private final ImmutableOpenMap templates; private final ImmutableOpenMap customs; + private final Map operatorState; private final transient int totalNumberOfShards; // Transient ? not serializable anyway? private final int totalOpenIndexShards; @@ -246,7 +247,8 @@ private Metadata( String[] visibleClosedIndices, SortedMap indicesLookup, Map mappingsByHash, - Version oldestIndexVersion + Version oldestIndexVersion, + Map operatorState ) { this.clusterUUID = clusterUUID; this.clusterUUIDCommitted = clusterUUIDCommitted; @@ -271,6 +273,7 @@ private Metadata( this.indicesLookup = indicesLookup; this.mappingsByHash = mappingsByHash; this.oldestIndexVersion = oldestIndexVersion; + this.operatorState = operatorState; } public Metadata withIncrementedVersion() { @@ -297,7 +300,8 @@ public Metadata withIncrementedVersion() { visibleClosedIndices, indicesLookup, mappingsByHash, - oldestIndexVersion + oldestIndexVersion, + operatorState ); } @@ -357,7 +361,8 @@ public Metadata withLifecycleState(final Index index, final LifecycleExecutionSt visibleClosedIndices, indicesLookup, mappingsByHash, - oldestIndexVersion + oldestIndexVersion, + operatorState ); } @@ -938,6 +943,14 @@ public Map customs() { return this.customs; } + public Map operatorState() { + return this.operatorState; + } + + public OperatorMetadata operatorState(String namespace) { + return this.operatorState.get(namespace); + } + /** * The collection of index deletions in the cluster. */ @@ -1079,6 +1092,7 @@ private static class MetadataDiff implements Diff { private final Diff> indices; private final Diff> templates; private final Diff> customs; + private final Diff> operatorState; MetadataDiff(Metadata before, Metadata after) { clusterUUID = after.clusterUUID; @@ -1091,12 +1105,15 @@ private static class MetadataDiff implements Diff { indices = DiffableUtils.diff(before.indices, after.indices, DiffableUtils.getStringKeySerializer()); templates = DiffableUtils.diff(before.templates, after.templates, DiffableUtils.getStringKeySerializer()); customs = DiffableUtils.diff(before.customs, after.customs, DiffableUtils.getStringKeySerializer(), CUSTOM_VALUE_SERIALIZER); + operatorState = DiffableUtils.diff(before.operatorState, after.operatorState, DiffableUtils.getStringKeySerializer()); } private static final DiffableUtils.DiffableValueReader INDEX_METADATA_DIFF_VALUE_READER = new DiffableUtils.DiffableValueReader<>(IndexMetadata::readFrom, IndexMetadata::readDiffFrom); private static final DiffableUtils.DiffableValueReader TEMPLATES_DIFF_VALUE_READER = new DiffableUtils.DiffableValueReader<>(IndexTemplateMetadata::readFrom, IndexTemplateMetadata::readDiffFrom); + private static final DiffableUtils.DiffableValueReader OPERATOR_DIFF_VALUE_READER = + new DiffableUtils.DiffableValueReader<>(OperatorMetadata::readFrom, OperatorMetadata::readDiffFrom); MetadataDiff(StreamInput in) throws IOException { clusterUUID = in.readString(); @@ -1113,6 +1130,11 @@ private static class MetadataDiff implements Diff { indices = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), INDEX_METADATA_DIFF_VALUE_READER); templates = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), TEMPLATES_DIFF_VALUE_READER); customs = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), CUSTOM_VALUE_SERIALIZER); + if (in.getVersion().onOrAfter(Version.V_8_4_0)) { + operatorState = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), OPERATOR_DIFF_VALUE_READER); + } else { + operatorState = OperatorMetadata.EMPTY_DIFF; + } } @Override @@ -1129,6 +1151,9 @@ public void writeTo(StreamOutput out) throws IOException { indices.writeTo(out); templates.writeTo(out); customs.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_8_4_0)) { + operatorState.writeTo(out); + } } @Override @@ -1146,6 +1171,7 @@ public Metadata apply(Metadata part) { builder.indices(indices.apply(part.indices)); builder.templates(templates.apply(part.templates)); builder.customs(customs.apply(part.customs)); + builder.operatorState(operatorState.apply(part.operatorState)); return builder.build(); } } @@ -1187,7 +1213,12 @@ public static Metadata readFrom(StreamInput in) throws IOException { Custom customIndexMetadata = in.readNamedWriteable(Custom.class); builder.putCustom(customIndexMetadata.getWriteableName(), customIndexMetadata); } - + if (in.getVersion().onOrAfter(Version.V_8_4_0)) { + int operatorSize = in.readVInt(); + for (int i = 0; i < operatorSize; i++) { + builder.putOperatorState(OperatorMetadata.readFrom(in)); + } + } return builder.build(); } @@ -1214,6 +1245,9 @@ public void writeTo(StreamOutput out) throws IOException { } out.writeCollection(templates.values()); VersionedNamedWriteable.writeVersionedWritables(out, customs); + if (out.getVersion().onOrAfter(Version.V_8_4_0)) { + out.writeCollection(operatorState.values()); + } } public static Builder builder() { @@ -1248,6 +1282,8 @@ public static class Builder { private SortedMap previousIndicesLookup; + private final Map operatorState; + // If this is set to false we can skip checking #mappingsByHash for unused entries in #build(). Used as an optimization to save // the rather expensive call to #purgeUnusedEntries when building from another instance and we know that no mappings can have // become unused because no indices were updated or removed from this builder in a way that would cause unused entries in @@ -1275,6 +1311,7 @@ public Builder() { this.previousIndicesLookup = metadata.indicesLookup; this.mappingsByHash = new HashMap<>(metadata.mappingsByHash); this.checkForUnusedMappings = false; + this.operatorState = new HashMap<>(metadata.operatorState); } private Builder(Map mappingsByHash) { @@ -1283,6 +1320,7 @@ private Builder(Map mappingsByHash) { aliasedIndices = ImmutableOpenMap.builder(); templates = ImmutableOpenMap.builder(); customs = ImmutableOpenMap.builder(); + operatorState = new HashMap<>(); indexGraveyard(IndexGraveyard.builder().build()); // create new empty index graveyard to initialize previousIndicesLookup = null; this.mappingsByHash = new HashMap<>(mappingsByHash); @@ -1635,6 +1673,20 @@ public Builder customs(Map customs) { return this; } + public OperatorMetadata operatorState(String namespace) { + return operatorState.get(namespace); + } + + public Builder operatorState(Map operatorState) { + this.operatorState.putAll(operatorState); + return this; + } + + public Builder putOperatorState(OperatorMetadata metadata) { + operatorState.put(metadata.namespace(), metadata); + return this; + } + public Builder indexGraveyard(final IndexGraveyard indexGraveyard) { putCustom(IndexGraveyard.TYPE, indexGraveyard); return this; @@ -1833,7 +1885,8 @@ public Metadata build() { visibleClosedIndicesArray, indicesLookup, Collections.unmodifiableMap(mappingsByHash), - Version.fromId(oldestIndexVersionId) + Version.fromId(oldestIndexVersionId), + Collections.unmodifiableMap(operatorState) ); } @@ -2136,6 +2189,12 @@ public static void toXContent(Metadata metadata, XContentBuilder builder, ToXCon } } + builder.startObject("operator"); + for (OperatorMetadata operatorMetadata : metadata.operatorState().values()) { + OperatorMetadata.Builder.toXContent(operatorMetadata, builder, params); + } + builder.endObject(); + builder.endObject(); } @@ -2179,6 +2238,10 @@ public static Metadata fromXContent(XContentParser parser) throws IOException { while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { builder.put(IndexTemplateMetadata.Builder.fromXContent(parser, parser.currentName())); } + } else if ("operator".equals(currentFieldName)) { + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + builder.putOperatorState(OperatorMetadata.fromXContent(parser)); + } } else { try { Custom custom = parser.namedObject(Custom.class, currentFieldName, null); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/OperatorErrorMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/OperatorErrorMetadata.java new file mode 100644 index 0000000000000..c1a15859bc509 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/OperatorErrorMetadata.java @@ -0,0 +1,185 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.cluster.Diff; +import org.elasticsearch.cluster.SimpleDiffable; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.xcontent.ToXContentFragment; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Metadata class to hold error information about errors encountered + * while applying a cluster state update for a given namespace. + *

+ * This information is held by the OperatorMetadata class. + */ +public record OperatorErrorMetadata( + Long version, + org.elasticsearch.cluster.metadata.OperatorErrorMetadata.ErrorKind errorKind, + List errors +) implements SimpleDiffable, ToXContentFragment { + /** + * Contructs an operator metadata + * + * @param version the metadata version which failed to apply + * @param errorKind the kind of error we encountered while processing + * @param errors the list of errors encountered during parsing and validation of the metadata + */ + public OperatorErrorMetadata {} + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(version); + out.writeString(errorKind.getKindValue()); + out.writeCollection(errors, StreamOutput::writeString); + } + + public static OperatorErrorMetadata readFrom(StreamInput in) throws IOException { + Builder builder = new Builder().version(in.readLong()) + .errorKind(ErrorKind.of(in.readString())) + .errors(in.readList(StreamInput::readString)); + return builder.build(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + Builder.toXContent(this, builder, params); + return builder; + } + + public static Diff readDiffFrom(StreamInput in) throws IOException { + return SimpleDiffable.readDiffFrom(OperatorErrorMetadata::readFrom, in); + } + + public static Builder builder() { + return new Builder(); + } + + /** + * Builder class for the OperatorErrorMetadata + */ + public static class Builder { + private static final String ERRORS = "errors"; + private static final String VERSION = "version"; + private static final String ERROR_KIND = "error_kind"; + + private Long version; + private List errors; + private ErrorKind errorKind; + + public Builder() { + this.version = 0L; + this.errors = new ArrayList<>(); + } + + public Builder version(Long version) { + this.version = version; + return this; + } + + public Builder errors(List errors) { + this.errors = errors; + return this; + } + + public Builder errorKind(ErrorKind errorKind) { + this.errorKind = errorKind; + return this; + } + + public OperatorErrorMetadata build() { + return new OperatorErrorMetadata(version, errorKind, Collections.unmodifiableList(errors)); + } + + /** + * Serializes the error metadata to xContent + * + * @param metadata + * @param builder + * @param params + */ + public static void toXContent(OperatorErrorMetadata metadata, XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(VERSION, metadata.version); + builder.field(ERROR_KIND, metadata.errorKind.getKindValue()); + builder.stringListField(ERRORS, metadata.errors); + builder.endObject(); + } + + /** + * Reads the error metadata from xContent + * + * @param parser + * @return + * @throws IOException + */ + public static OperatorErrorMetadata fromXContent(XContentParser parser) throws IOException { + Builder builder = new Builder(); + + String currentFieldName = parser.currentName(); + XContentParser.Token token; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token == XContentParser.Token.START_ARRAY) { + if (ERRORS.equals(currentFieldName)) { + List errors = new ArrayList<>(); + while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { + errors.add(parser.text()); + } + builder.errors(errors); + } + } else if (token.isValue()) { + if (VERSION.equals(currentFieldName)) { + builder.version(parser.longValue()); + } else if (ERROR_KIND.equals(currentFieldName)) { + builder.errorKind(ErrorKind.of(parser.text())); + } + } + } + return builder.build(); + } + } + + /** + * Enum for kinds of errors we might encounter while processing operator cluster state updates. + */ + public enum ErrorKind { + PARSING("parsing"), + VALIDATION("validation"), + TRANSIENT("transient"); + + private final String kind; + + ErrorKind(String kind) { + this.kind = kind; + } + + public String getKindValue() { + return kind; + } + + public static ErrorKind of(String kind) { + for (var report : values()) { + if (report.kind.equals(kind)) { + return report; + } + } + throw new IllegalArgumentException("kind not supported [" + kind + "]"); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/OperatorHandlerMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/OperatorHandlerMetadata.java new file mode 100644 index 0000000000000..5792f075dd5e7 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/OperatorHandlerMetadata.java @@ -0,0 +1,117 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.cluster.Diff; +import org.elasticsearch.cluster.SimpleDiffable; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.ToXContentFragment; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.TreeSet; + +/** + * Metadata class to hold the operator set keys for each operator handler + */ +public record OperatorHandlerMetadata(String name, Set keys) + implements + SimpleDiffable, + ToXContentFragment { + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(name); + out.writeCollection(keys, StreamOutput::writeString); + } + + public static OperatorHandlerMetadata readFrom(StreamInput in) throws IOException { + Builder builder = new Builder(in.readString()).keys(in.readSet(StreamInput::readString)); + return builder.build(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + Builder.toXContent(this, builder, params); + return builder; + } + + public static Diff readDiffFrom(StreamInput in) throws IOException { + return SimpleDiffable.readDiffFrom(OperatorHandlerMetadata::readFrom, in); + } + + public static class Builder { + private static final String HANDLER_KEYS = "keys"; + + private final String name; + private Set keys; + + public Builder(String name) { + this.name = name; + this.keys = new HashSet<>(); + } + + public Builder keys(Set keys) { + this.keys = keys; + return this; + } + + public OperatorHandlerMetadata build() { + return new OperatorHandlerMetadata(name, Collections.unmodifiableSet(keys)); + } + + /** + * Serializes the metadata to xContent + * + * @param metadata + * @param builder + * @param params + */ + public static void toXContent(OperatorHandlerMetadata metadata, XContentBuilder builder, ToXContent.Params params) + throws IOException { + builder.startObject(metadata.name()); + builder.stringListField(HANDLER_KEYS, new TreeSet<>(metadata.keys)); + builder.endObject(); + } + + /** + * Reads the metadata from xContent + * + * @param parser + * @return + * @throws IOException + */ + public static OperatorHandlerMetadata fromXContent(XContentParser parser) throws IOException { + Builder builder = new Builder(parser.currentName()); + + String currentFieldName = parser.currentName(); + XContentParser.Token token; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token == XContentParser.Token.START_ARRAY) { + if (HANDLER_KEYS.equals(currentFieldName)) { + Set handlerKeys = new HashSet<>(); + while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { + handlerKeys.add(parser.text()); + } + builder.keys(handlerKeys); + } + } + } + return builder.build(); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/OperatorMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/OperatorMetadata.java new file mode 100644 index 0000000000000..c87ece109e048 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/OperatorMetadata.java @@ -0,0 +1,244 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.cluster.Diff; +import org.elasticsearch.cluster.DiffableUtils; +import org.elasticsearch.cluster.SimpleDiffable; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.ToXContentFragment; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +/** + * Metadata class that contains information about cluster settings/entities set + * in an operator mode. These types of settings are read only through the REST API, + * and cannot be modified by the end user. + */ +public record OperatorMetadata( + String namespace, + Long version, + Map handlers, + OperatorErrorMetadata errorMetadata +) implements SimpleDiffable, ToXContentFragment { + /** + * OperatorMetadata contains information about settings set in operator mode. + * These settings cannot be updated by the end user and are set outside of the + * REST layer, e.g. through file based settings or by plugin/modules. + * + * @param namespace The namespace of the setting creator, e.g. file_settings, security plugin, etc. + * @param version The update version, must increase with each update + * @param handlers Per state update handler information on key set in by this update. These keys are validated at REST time. + * @param errorMetadata If the update failed for some reason, this is where we store the error information metadata. + */ + public OperatorMetadata {} + + public Set conflicts(String handlerName, Set modified) { + OperatorHandlerMetadata handlerMetadata = handlers.get(handlerName); + if (handlerMetadata == null || handlerMetadata.keys().isEmpty()) { + return Collections.emptySet(); + } + + Set intersect = new HashSet<>(handlerMetadata.keys()); + intersect.retainAll(modified); + return Collections.unmodifiableSet(intersect); + } + + public static OperatorMetadata readFrom(StreamInput in) throws IOException { + Builder builder = new Builder(in.readString()).version(in.readLong()); + + int handlersSize = in.readVInt(); + for (int i = 0; i < handlersSize; i++) { + OperatorHandlerMetadata handler = OperatorHandlerMetadata.readFrom(in); + builder.putHandler(handler); + } + + builder.errorMetadata(in.readOptionalWriteable(OperatorErrorMetadata::readFrom)); + return builder.build(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(namespace); + out.writeLong(version); + out.writeCollection(handlers.values()); + out.writeOptionalWriteable(errorMetadata); + } + + public static Diff readDiffFrom(StreamInput in) throws IOException { + return SimpleDiffable.readDiffFrom(OperatorMetadata::readFrom, in); + } + + public static final DiffableUtils.MapDiff> EMPTY_DIFF = + new DiffableUtils.MapDiff<>(null, null, List.of(), List.of(), List.of()) { + @Override + public Map apply(Map part) { + return part; + } + }; + + public static Builder builder(String namespace) { + return new Builder(namespace); + } + + public static Builder builder(String namespace, OperatorMetadata metadata) { + return new Builder(namespace, metadata); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + Builder.toXContent(this, builder, params); + return builder; + } + + public static OperatorMetadata fromXContent(final XContentParser parser) throws IOException { + parser.nextToken(); + return Builder.fromXContent(parser, parser.currentName()); + } + + /** + * Builder class for OperatorMetadata + */ + public static class Builder { + private static final String VERSION = "version"; + private static final String HANDLERS = "handlers"; + private static final String ERRORS_METADATA = "errors"; + + private final String namespace; + private Long version; + private Map handlers; + OperatorErrorMetadata errorMetadata; + + /** + * Empty builder for OperatorMetadata + * + * @param namespace The namespace for this metadata + */ + public Builder(String namespace) { + this.namespace = namespace; + this.version = 0L; + this.handlers = new HashMap<>(); + this.errorMetadata = null; + } + + /** + * Creates an operator metadata builder + * + * @param namespace the namespace for which we are storing metadata, e.g. file_settings + * @param metadata the previous metadata + */ + public Builder(String namespace, OperatorMetadata metadata) { + this(namespace); + if (metadata != null) { + this.version = metadata.version; + this.handlers = new HashMap<>(metadata.handlers); + this.errorMetadata = metadata.errorMetadata; + } + } + + public Builder version(Long version) { + this.version = version; + return this; + } + + public Builder errorMetadata(OperatorErrorMetadata errorMetadata) { + this.errorMetadata = errorMetadata; + return this; + } + + public Builder putHandler(OperatorHandlerMetadata handler) { + this.handlers.put(handler.name(), handler); + return this; + } + + public OperatorMetadata build() { + return new OperatorMetadata(namespace, version, Collections.unmodifiableMap(handlers), errorMetadata); + } + + /** + * Serializes the operator metadata to xContent + * + * @param operatorMetadata + * @param builder + * @param params + */ + public static void toXContent(OperatorMetadata operatorMetadata, XContentBuilder builder, ToXContent.Params params) + throws IOException { + builder.startObject(operatorMetadata.namespace()); + builder.field(VERSION, operatorMetadata.version()); + builder.startObject(HANDLERS); + var sortedKeys = new TreeSet<>(operatorMetadata.handlers.keySet()); + for (var key : sortedKeys) { + OperatorHandlerMetadata.Builder.toXContent(operatorMetadata.handlers.get(key), builder, params); + } + builder.endObject(); + builder.field(ERRORS_METADATA, operatorMetadata.errorMetadata); + builder.endObject(); + } + + /** + * Reads the operator metadata from xContent + * + * @param parser + * @param namespace + * @return + * @throws IOException + */ + public static OperatorMetadata fromXContent(XContentParser parser, String namespace) throws IOException { + OperatorMetadata.Builder builder = new OperatorMetadata.Builder(namespace); + + String currentFieldName = skipNamespace(parser); + XContentParser.Token token; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token == XContentParser.Token.START_OBJECT) { + if (HANDLERS.equals(currentFieldName)) { + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + builder.putHandler(OperatorHandlerMetadata.Builder.fromXContent(parser)); + } + } else if (ERRORS_METADATA.equals(currentFieldName)) { + builder.errorMetadata(OperatorErrorMetadata.Builder.fromXContent(parser)); + } else { + throw new ElasticsearchParseException("unknown key [{}] for operator metadata", currentFieldName); + } + } else if (token.isValue()) { + if (VERSION.equals(currentFieldName)) { + builder.version(parser.longValue()); + } + } + } + return builder.build(); + } + + private static String skipNamespace(XContentParser parser) throws IOException { + XContentParser.Token token = parser.nextToken(); + if (token == XContentParser.Token.START_OBJECT) { + token = parser.nextToken(); + if (token == XContentParser.Token.FIELD_NAME) { + return parser.currentName(); + } + } + + return null; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index c1ae9148b809f..44fa0ae52f972 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -91,6 +91,7 @@ import org.elasticsearch.monitor.process.ProcessService; import org.elasticsearch.node.Node; import org.elasticsearch.node.NodeRoleSettings; +import org.elasticsearch.operator.service.FileSettingsService; import org.elasticsearch.persistent.PersistentTasksClusterService; import org.elasticsearch.persistent.decider.EnableAssignmentDecider; import org.elasticsearch.plugins.PluginsService; @@ -519,7 +520,8 @@ public void apply(Settings value, Settings current, Settings previous) { StableMasterHealthIndicatorService.NODE_HAS_MASTER_LOOKUP_TIMEFRAME_SETTING, MasterHistory.MAX_HISTORY_AGE_SETTING, ReadinessService.PORT, - HealthNodeTaskExecutor.ENABLED_SETTING + HealthNodeTaskExecutor.ENABLED_SETTING, + FileSettingsService.OPERATOR_DIRECTORY ); static List> BUILT_IN_SETTING_UPGRADERS = Collections.emptyList(); diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 6c6ccef772937..1d2df997b50c4 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -133,6 +133,7 @@ import org.elasticsearch.monitor.MonitorService; import org.elasticsearch.monitor.fs.FsHealthService; import org.elasticsearch.monitor.jvm.JvmInfo; +import org.elasticsearch.operator.service.FileSettingsService; import org.elasticsearch.persistent.PersistentTasksClusterService; import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.persistent.PersistentTasksExecutorRegistry; @@ -697,7 +698,8 @@ protected Node( client, circuitBreakerService, usageService, - systemIndices + systemIndices, + clusterService ); modules.add(actionModule); @@ -993,6 +995,11 @@ protected Node( modules.add(b -> b.bind(ReadinessService.class).toInstance(new ReadinessService(clusterService, environment))); } + modules.add( + b -> b.bind(FileSettingsService.class) + .toInstance(new FileSettingsService(clusterService, actionModule.getOperatorController(), environment)) + ); + injector = modules.createInjector(); // We allocate copies of existing shards by looking for a viable copy of the shard in the cluster and assigning the shard there. @@ -1030,6 +1037,8 @@ protected Node( logger.debug("initializing HTTP handlers ..."); actionModule.initRestHandlers(() -> clusterService.state().nodesIfRecovered()); + logger.debug("initializing operator handlers ..."); + actionModule.initOperatorHandlers(pluginsService); logger.info("initialized"); success = true; @@ -1145,6 +1154,7 @@ public Node start() throws NodeValidationException { if (ReadinessService.enabled(environment)) { injector.getInstance(ReadinessService.class).start(); } + injector.getInstance(FileSettingsService.class).start(); injector.getInstance(MappingUpdatedAction.class).setClient(client); injector.getInstance(IndicesService.class).start(); injector.getInstance(IndicesClusterStateService.class).start(); @@ -1293,6 +1303,7 @@ private Node stop() { if (ReadinessService.enabled(environment)) { injector.getInstance(ReadinessService.class).stop(); } + injector.getInstance(FileSettingsService.class).stop(); injector.getInstance(ResourceWatcherService.class).close(); injector.getInstance(HttpServerTransport.class).stop(); @@ -1376,6 +1387,7 @@ public synchronized void close() throws IOException { if (ReadinessService.enabled(environment)) { toClose.add(injector.getInstance(ReadinessService.class)); } + toClose.add(injector.getInstance(FileSettingsService.class)); for (LifecycleComponent plugin : pluginLifecycleComponents) { toClose.add(() -> stopWatch.stop().start("plugin(" + plugin.getClass().getName() + ")")); diff --git a/server/src/main/java/org/elasticsearch/operator/OperatorHandler.java b/server/src/main/java/org/elasticsearch/operator/OperatorHandler.java new file mode 100644 index 0000000000000..08151bc2ba474 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/operator/OperatorHandler.java @@ -0,0 +1,118 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.operator; + +import org.elasticsearch.ElasticsearchGenerationException; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.common.Strings; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentParserConfiguration; +import org.elasticsearch.xcontent.XContentType; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; + +/** + * Updating cluster state in operator mode, for file based settings and modules/plugins, requires + * that we have a separate update handler interface to the REST handlers. This interface declares + * the basic contract for implementing cluster state update handlers in operator mode. + */ +public interface OperatorHandler { + String CONTENT = "content"; + + /** + * The operator handler name is a unique identifier that is matched to a section in a + * cluster state update content. The operator cluster state updates are done as a single + * cluster state update and the cluster state is typically supplied as a combined content, + * unlike the REST handlers. This name must match a desired content key name in the combined + * cluster state update, e.g. "ilm" or "cluster_settings" (for persistent cluster settings update). + * + * @return a String with the operator key name + */ + String name(); + + /** + * The transform method of the operator handler should apply the necessary changes to + * the cluster state as it normally would in a REST handler. One difference is that the + * transform method in an operator handler must perform all CRUD operations of the cluster + * state in one go. For that reason, we supply a wrapper class to the cluster state called + * {@link TransformState}, which contains the current cluster state as well as any previous keys + * set by this handler on prior invocation. + * + * @param source The parsed information specific to this handler from the combined cluster state content + * @param prevState The previous cluster state and keys set by this handler (if any) + * @return The modified state and the current keys set by this handler + * @throws Exception + */ + TransformState transform(Object source, TransformState prevState) throws Exception; + + /** + * Sometimes certain parts of the cluster state cannot be created/updated without previously + * setting other cluster state components, e.g. composable templates. Since the cluster state handlers + * are processed in random order by the OperatorClusterStateController, this method gives an opportunity + * to any operator handler to declare other operator handlers it depends on. Given dependencies exist, + * the OperatorClusterStateController will order those handlers such that the handlers that are dependent + * on are processed first. + * + * @return a collection of operator handler names + */ + default Collection dependencies() { + return Collections.emptyList(); + } + + /** + * All implementations of OperatorHandler should call the request validate method, by calling this default + * implementation. To aid in any special validation logic that may need to be implemented by the operator handler + * we provide this convenience method. + * + * @param request the master node request that we base this operator handler on + */ + default void validate(MasterNodeRequest request) { + ActionRequestValidationException exception = request.validate(); + if (exception != null) { + throw new IllegalStateException("Validation error", exception); + } + } + + /** + * Convenience method to convert the incoming passed in input to the transform method into a map. + * + * @param input the input passed into the operator handler after parsing the content + * @return + */ + @SuppressWarnings("unchecked") + default Map asMap(Object input) { + if (input instanceof Map source) { + return (Map) source; + } + throw new IllegalStateException("Unsupported " + name() + " request format"); + } + + /** + * Convenience method that creates a {@link XContentParser} from a content map so that it can be passed to + * existing REST based code for input parsing. + * + * @param config XContentParserConfiguration for this mapper + * @param source the operator content as a map + * @return + */ + default XContentParser mapToXContentParser(XContentParserConfiguration config, Map source) { + try (XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON)) { + builder.map(source); + return XContentFactory.xContent(builder.contentType()).createParser(config, Strings.toString(builder)); + } catch (IOException e) { + throw new ElasticsearchGenerationException("Failed to generate [" + source + "]", e); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/operator/OperatorHandlerProvider.java b/server/src/main/java/org/elasticsearch/operator/OperatorHandlerProvider.java new file mode 100644 index 0000000000000..0f168ea6e4d61 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/operator/OperatorHandlerProvider.java @@ -0,0 +1,25 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.operator; + +import java.util.Collection; + +/** + * SPI service interface for supplying OperatorHandler implementations to Elasticsearch + * from plugins/modules. + */ +public interface OperatorHandlerProvider { + /** + * Returns a list of OperatorHandler implementations that a module/plugin supplies. + * @see OperatorHandler + * + * @return a list of ${@link OperatorHandler}s + */ + Collection> handlers(); +} diff --git a/server/src/main/java/org/elasticsearch/operator/TransformState.java b/server/src/main/java/org/elasticsearch/operator/TransformState.java new file mode 100644 index 0000000000000..a68ce7e1f2290 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/operator/TransformState.java @@ -0,0 +1,21 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.operator; + +import org.elasticsearch.cluster.ClusterState; + +import java.util.Set; + +/** + * A {@link ClusterState} wrapper used by the OperatorClusterStateController to pass the + * current state as well as previous keys set by an {@link OperatorHandler} to each transform + * step of the cluster state update. + * + */ +public record TransformState(ClusterState state, Set keys) {} diff --git a/server/src/main/java/org/elasticsearch/operator/action/OperatorClusterUpdateSettingsAction.java b/server/src/main/java/org/elasticsearch/operator/action/OperatorClusterUpdateSettingsAction.java new file mode 100644 index 0000000000000..ecf2a4f605916 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/operator/action/OperatorClusterUpdateSettingsAction.java @@ -0,0 +1,88 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.operator.action; + +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.elasticsearch.action.admin.cluster.settings.TransportClusterUpdateSettingsAction; +import org.elasticsearch.client.internal.Requests; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.operator.OperatorHandler; +import org.elasticsearch.operator.TransformState; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This Action is the Operator version of RestClusterUpdateSettingsAction + * + * It is used by the OperatorClusterStateController to update the persistent cluster settings. + * Since transient cluster settings are deprecated, this action doesn't support updating cluster settings. + */ +public class OperatorClusterUpdateSettingsAction implements OperatorHandler { + + public static final String NAME = "cluster_settings"; + + private final ClusterSettings clusterSettings; + + public OperatorClusterUpdateSettingsAction(ClusterSettings clusterSettings) { + this.clusterSettings = clusterSettings; + } + + @Override + public String name() { + return NAME; + } + + @SuppressWarnings("unchecked") + private ClusterUpdateSettingsRequest prepare(Object input, Set previouslySet) { + final ClusterUpdateSettingsRequest clusterUpdateSettingsRequest = Requests.clusterUpdateSettingsRequest(); + + Map source = asMap(input); + Map persistentSettings = new HashMap<>(); + Set toDelete = new HashSet<>(previouslySet); + + source.forEach((k, v) -> { + persistentSettings.put(k, v); + toDelete.remove(k); + }); + + toDelete.forEach(k -> persistentSettings.put(k, null)); + + clusterUpdateSettingsRequest.persistentSettings(persistentSettings); + return clusterUpdateSettingsRequest; + } + + @Override + public TransformState transform(Object input, TransformState prevState) { + ClusterUpdateSettingsRequest request = prepare(input, prevState.keys()); + + // allow empty requests, this is how we clean up settings + if (request.persistentSettings().isEmpty() == false) { + validate(request); + } + + ClusterState state = prevState.state(); + + TransportClusterUpdateSettingsAction.ClusterUpdateSettingsTask updateSettingsTask = + new TransportClusterUpdateSettingsAction.ClusterUpdateSettingsTask(clusterSettings, request); + + state = updateSettingsTask.execute(state); + Set currentKeys = request.persistentSettings() + .keySet() + .stream() + .filter(k -> request.persistentSettings().hasValue(k)) + .collect(Collectors.toSet()); + + return new TransformState(state, currentKeys); + } +} diff --git a/server/src/main/java/org/elasticsearch/operator/service/FileSettingsService.java b/server/src/main/java/org/elasticsearch/operator/service/FileSettingsService.java new file mode 100644 index 0000000000000..40b8ba69606d4 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/operator/service/FileSettingsService.java @@ -0,0 +1,280 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.operator.service; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.core.PathUtils; +import org.elasticsearch.env.Environment; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentParserConfiguration; +import org.elasticsearch.xcontent.XContentType; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.concurrent.CountDownLatch; + +public class FileSettingsService extends AbstractLifecycleComponent implements ClusterStateListener { + private static final Logger logger = LogManager.getLogger(FileSettingsService.class); + + private static final String SETTINGS_FILE_NAME = "settings.json"; + private static final String NAMESPACE = "file_settings"; + + private final OperatorClusterStateController controller; + private final Environment environment; + + private WatchService watchService; // null; + private CountDownLatch watcherThreadLatch; + + private volatile FileUpdateState fileUpdateState = null; + + private volatile boolean active = false; + + public static final Setting OPERATOR_DIRECTORY = Setting.simpleString( + "path.config.operator_directory", + "operator", + Setting.Property.NodeScope + ); + + public FileSettingsService(ClusterService clusterService, OperatorClusterStateController controller, Environment environment) { + this.controller = controller; + this.environment = environment; + clusterService.addListener(this); + } + + // package private for testing + Path operatorSettingsDir() { + String dirPath = OPERATOR_DIRECTORY.get(environment.settings()); + return environment.configFile().toAbsolutePath().resolve(dirPath); + } + + // package private for testing + Path operatorSettingsFile() { + return operatorSettingsDir().resolve(SETTINGS_FILE_NAME); + } + + boolean watchedFileChanged(Path path) throws IOException { + if (Files.exists(path) == false) { + return false; + } + + FileUpdateState previousUpdateState = fileUpdateState; + + BasicFileAttributes attr = Files.readAttributes(path, BasicFileAttributes.class); + fileUpdateState = new FileUpdateState(attr.lastModifiedTime().toMillis(), path.toRealPath().toString(), attr.fileKey()); + + return (previousUpdateState == null || previousUpdateState.equals(fileUpdateState) == false); + } + + @Override + protected void doStart() { + // We start the file watcher when we know we are master. + // We need this additional flag, since cluster state can change after we've shutdown the service + // causing the watcher to start again. + this.active = true; + } + + @Override + protected void doStop() { + this.active = false; + logger.debug("Stopping file settings service"); + stopWatcher(); + } + + @Override + protected void doClose() {} + + private boolean currentNodeMaster(ClusterState clusterState) { + return clusterState.nodes().getMasterNodeId().equals(clusterState.nodes().getLocalNodeId()); + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + ClusterState clusterState = event.state(); + setWatching(currentNodeMaster(clusterState)); + } + + private void setWatching(boolean watching) { + if (watching) { + startWatcher(); + } else { + stopWatcher(); + } + } + + // package private for testing + boolean watching() { + return this.watchService != null; + } + + synchronized void startWatcher() { + if (watching() || active == false) { + // already watching or inactive, nothing to do + return; + } + + logger.info("starting file settings watcher ..."); + + Path settingsDir = operatorSettingsDir(); + + try { + this.watchService = PathUtils.getDefaultFileSystem().newWatchService(); + if (Files.exists(settingsDir)) { + Path settingsFilePath = operatorSettingsFile(); + if (Files.exists(settingsFilePath)) { + logger.info("found initial operator settings file [{}], applying...", settingsFilePath); + processFileSettings(settingsFilePath, true); + } + enableSettingsWatcher(settingsDir); + } else { + logger.info("operator settings directory [{}] not found, will watch for its creation...", settingsDir); + enableSettingsWatcher(environment.configFile()); + } + } catch (Exception e) { + if (watchService != null) { + try { + this.watchService.close(); + } catch (Exception ignore) {} finally { + this.watchService = null; + } + } + + throw new IllegalStateException("unable to launch a new watch service", e); + } + + this.watcherThreadLatch = new CountDownLatch(1); + + new Thread(() -> { + try { + logger.info("file settings service up and running [tid={}]", Thread.currentThread().getId()); + + WatchKey key; + while ((key = watchService.take()) != null) { + /** + * Reading and interpreting watch service events can vary from platform to platform. E.g: + * MacOS symlink delete and set (rm -rf operator && ln -s /file_settings/ operator): + * ENTRY_MODIFY:operator + * ENTRY_CREATE:settings.json + * ENTRY_MODIFY:settings.json + * Linux in Docker symlink delete and set (rm -rf operator && ln -s /file_settings/ operator): + * ENTRY_CREATE:operator + * After we get an indication that something has changed, we check the timestamp, file id, + * real path of our desired file. + */ + if (Files.exists(settingsDir)) { + try { + Path path = operatorSettingsFile(); + + if (logger.isDebugEnabled()) { + key.pollEvents().stream().forEach(e -> logger.debug("{}:{}", e.kind().toString(), e.context().toString())); + } + + key.pollEvents(); + key.reset(); + + enableSettingsWatcher(settingsDir); + + if (watchedFileChanged(path)) { + processFileSettings(path, false); + } + } catch (Exception e) { + logger.warn("unable to watch or read operator settings file", e); + } + } else { + key.pollEvents(); + key.reset(); + } + } + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug("encountered exception watching", e); + } + logger.info("shutting down watcher thread"); + } finally { + watcherThreadLatch.countDown(); + } + }, "elasticsearch[file-settings-watcher]").start(); + } + + synchronized void stopWatcher() { + logger.debug("stopping watcher ..."); + if (watching()) { + try { + watchService.close(); + if (watcherThreadLatch != null) { + watcherThreadLatch.await(); + } + } catch (IOException | InterruptedException e) { + logger.info("encountered exception while closing watch service", e); + } finally { + watchService = null; + logger.info("watcher service stopped"); + } + } else { + logger.debug("file settings service already stopped"); + } + } + + private void enableSettingsWatcher(Path settingsDir) throws IOException { + settingsDir.register( + watchService, + StandardWatchEventKinds.ENTRY_MODIFY, + StandardWatchEventKinds.ENTRY_CREATE, + StandardWatchEventKinds.ENTRY_DELETE + ); + } + + void processFileSettings(Path path, boolean onStartup) { + logger.info("processing path [{}] for [{}]", path, NAMESPACE); + try ( + XContentParser parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, Files.newInputStream(path)) + ) { + controller.process(NAMESPACE, parser, (e) -> { + if (e != null) { + if (e instanceof OperatorClusterStateController.IncompatibleVersionException) { + logger.info(e.getMessage()); + } else { + // If we encountered an exception trying to apply the operator state at + // startup time, we throw an error to force Elasticsearch to exit. + if (onStartup) { + throw new OperatorConfigurationError("Error applying operator settings", e); + } else { + logger.error("Error processing operator settings json file", e); + } + } + } + }); + } catch (Exception e) { + logger.error("Error processing operator settings json file", e); + } + } + + record FileUpdateState(long timestamp, String path, Object fileKey) {} + + /** + * Error subclass that is thrown when we encounter a fatal error while applying + * the operator cluster state at Elasticsearch boot time. + */ + public static class OperatorConfigurationError extends Error { + public OperatorConfigurationError(String message, Throwable t) { + super(message, t); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/operator/service/OperatorClusterStateController.java b/server/src/main/java/org/elasticsearch/operator/service/OperatorClusterStateController.java new file mode 100644 index 0000000000000..d3a10158d5f26 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/operator/service/OperatorClusterStateController.java @@ -0,0 +1,271 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.operator.service; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateTaskConfig; +import org.elasticsearch.cluster.metadata.OperatorErrorMetadata; +import org.elasticsearch.cluster.metadata.OperatorMetadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; +import org.elasticsearch.operator.OperatorHandler; +import org.elasticsearch.xcontent.ConstructingObjectParser; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.XContentParser; + +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.elasticsearch.core.Strings.format; + +/** + * Controller class for applying file based settings to ClusterState. + * This class contains the logic about validation, ordering and applying of + * the cluster state specified in a file. + */ +public class OperatorClusterStateController { + private static final Logger logger = LogManager.getLogger(FileSettingsService.class); + + public static final String SETTINGS = "settings"; + public static final String METADATA = "metadata"; + + Map> handlers = null; + final ClusterService clusterService; + + public OperatorClusterStateController(ClusterService clusterService) { + this.clusterService = clusterService; + } + + /** + * Initializes the controller with the currently implemented state handlers + * + * @param handlerList the list of supported operator handlers + */ + public void initHandlers(List> handlerList) { + handlers = handlerList.stream().collect(Collectors.toMap(OperatorHandler::name, Function.identity())); + } + + static class SettingsFile { + public static final ParseField STATE_FIELD = new ParseField("state"); + public static final ParseField METADATA_FIELD = new ParseField("metadata"); + @SuppressWarnings("unchecked") + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "operator_state", + a -> new SettingsFile((Map) a[0], (OperatorStateVersionMetadata) a[1]) + ); + static { + PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.map(), STATE_FIELD); + PARSER.declareObject(ConstructingObjectParser.constructorArg(), OperatorStateVersionMetadata::parse, METADATA_FIELD); + } + + Map state; + OperatorStateVersionMetadata metadata; + + SettingsFile(Map state, OperatorStateVersionMetadata metadata) { + this.state = state; + this.metadata = metadata; + } + } + + /** + * Saves an operator cluster state for a given 'namespace' from XContentParser + * + * @param namespace the namespace under which we'll store the operator keys in the cluster state metadata + * @param parser the XContentParser to process + * @param errorListener a consumer called with IllegalStateException if the content has errors and the + * cluster state cannot be correctly applied, IncompatibleVersionException if the content is stale or + * incompatible with this node {@link Version}, null if successful. + */ + public void process(String namespace, XContentParser parser, Consumer errorListener) { + SettingsFile operatorStateFileContent; + + try { + operatorStateFileContent = SettingsFile.PARSER.apply(parser, null); + } catch (Exception e) { + List errors = List.of(e.getMessage()); + recordErrorState(new OperatorErrorState(namespace, -1L, errors, OperatorErrorMetadata.ErrorKind.PARSING)); + logger.error("Error processing state change request for [{}] with the following errors [{}]", namespace, errors); + + errorListener.accept(new IllegalStateException("Error processing state change request for " + namespace, e)); + return; + } + + Map operatorState = operatorStateFileContent.state; + OperatorStateVersionMetadata stateVersionMetadata = operatorStateFileContent.metadata; + + LinkedHashSet orderedHandlers; + try { + orderedHandlers = orderedStateHandlers(operatorState.keySet()); + } catch (Exception e) { + List errors = List.of(e.getMessage()); + recordErrorState( + new OperatorErrorState(namespace, stateVersionMetadata.version(), errors, OperatorErrorMetadata.ErrorKind.PARSING) + ); + logger.error("Error processing state change request for [{}] with the following errors [{}]", namespace, errors); + + errorListener.accept(new IllegalStateException("Error processing state change request for " + namespace, e)); + return; + } + + ClusterState state = clusterService.state(); + OperatorMetadata existingMetadata = state.metadata().operatorState(namespace); + if (checkMetadataVersion(existingMetadata, stateVersionMetadata, errorListener) == false) { + return; + } + + // Do we need to retry this, or it retries automatically? + clusterService.submitStateUpdateTask( + "operator state [" + namespace + "]", + new OperatorUpdateStateTask( + namespace, + operatorStateFileContent, + handlers, + orderedHandlers, + (errorState) -> recordErrorState(errorState), + new ActionListener<>() { + @Override + public void onResponse(ActionResponse.Empty empty) { + logger.info("Successfully applied new cluster state for namespace [{}]", namespace); + errorListener.accept(null); + } + + @Override + public void onFailure(Exception e) { + logger.error("Failed to apply operator cluster state", e); + errorListener.accept(e); + } + } + ), + ClusterStateTaskConfig.build(Priority.URGENT), + new OperatorUpdateStateTask.OperatorUpdateStateTaskExecutor(namespace, clusterService.getRerouteService()) + ); + } + + // package private for testing + static boolean checkMetadataVersion( + OperatorMetadata existingMetadata, + OperatorStateVersionMetadata stateVersionMetadata, + Consumer errorListener + ) { + if (Version.CURRENT.before(stateVersionMetadata.minCompatibleVersion())) { + errorListener.accept( + new IncompatibleVersionException( + format( + "Cluster state version [%s] is not compatible with this Elasticsearch node", + stateVersionMetadata.minCompatibleVersion() + ) + ) + ); + return false; + } + + if (existingMetadata != null && existingMetadata.version() >= stateVersionMetadata.version()) { + errorListener.accept( + new IncompatibleVersionException( + format( + "Not updating cluster state because version [%s] is less or equal to the current metadata version [%s]", + stateVersionMetadata.version(), + existingMetadata.version() + ) + ) + ); + return false; + } + + return true; + } + + record OperatorErrorState(String namespace, Long version, List errors, OperatorErrorMetadata.ErrorKind errorKind) {} + + private void recordErrorState(OperatorErrorState state) { + clusterService.submitStateUpdateTask( + "operator state error for [ " + state.namespace + "]", + new OperatorUpdateErrorTask(state, new ActionListener<>() { + @Override + public void onResponse(ActionResponse.Empty empty) { + logger.info("Successfully applied new operator error state for namespace [{}]", state.namespace); + } + + @Override + public void onFailure(Exception e) { + logger.error("Failed to apply operator error cluster state", e); + } + }), + ClusterStateTaskConfig.build(Priority.URGENT), + new OperatorUpdateErrorTask.OperatorUpdateErrorTaskExecutor() + ); + } + + // package private for testing + LinkedHashSet orderedStateHandlers(Set keys) { + LinkedHashSet orderedHandlers = new LinkedHashSet<>(); + LinkedHashSet dependencyStack = new LinkedHashSet<>(); + + for (String key : keys) { + addStateHandler(key, keys, orderedHandlers, dependencyStack); + } + + return orderedHandlers; + } + + private void addStateHandler(String key, Set keys, LinkedHashSet ordered, LinkedHashSet visited) { + if (visited.contains(key)) { + StringBuilder msg = new StringBuilder("Cycle found in settings dependencies: "); + visited.forEach(s -> { + msg.append(s); + msg.append(" -> "); + }); + msg.append(key); + throw new IllegalStateException(msg.toString()); + } + + if (ordered.contains(key)) { + // already added by another dependent handler + return; + } + + visited.add(key); + OperatorHandler handler = handlers.get(key); + + if (handler == null) { + throw new IllegalStateException("Unknown settings definition type: " + key); + } + + for (String dependency : handler.dependencies()) { + if (keys.contains(dependency) == false) { + throw new IllegalStateException("Missing settings dependency definition: " + key + " -> " + dependency); + } + addStateHandler(dependency, keys, ordered, visited); + } + + visited.remove(key); + ordered.add(key); + } + + /** + * {@link IncompatibleVersionException} is thrown when we try to update the cluster state + * without changing the update version id, or if we try to update cluster state on + * an incompatible Elasticsearch version in mixed cluster mode. + */ + public static class IncompatibleVersionException extends RuntimeException { + public IncompatibleVersionException(String message) { + super(message); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/operator/service/OperatorStateVersionMetadata.java b/server/src/main/java/org/elasticsearch/operator/service/OperatorStateVersionMetadata.java new file mode 100644 index 0000000000000..c723e00a5f3a9 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/operator/service/OperatorStateVersionMetadata.java @@ -0,0 +1,57 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.operator.service; + +import org.elasticsearch.Version; +import org.elasticsearch.xcontent.ConstructingObjectParser; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.XContentParser; + +/** + * File settings metadata class that holds information about + * versioning and Elasticsearch version compatibility + */ +public class OperatorStateVersionMetadata { + public static final ParseField VERSION = new ParseField("version"); + public static final ParseField COMPATIBILITY = new ParseField("compatibility"); + + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "operator_settings_metadata", + a -> { + Long updateId = Long.parseLong((String) a[0]); + Version minCompatVersion = Version.fromString((String) a[1]); + + return new OperatorStateVersionMetadata(updateId, minCompatVersion); + } + ); + static { + PARSER.declareString(ConstructingObjectParser.constructorArg(), VERSION); + PARSER.declareString(ConstructingObjectParser.constructorArg(), COMPATIBILITY); + } + + private Long version; + private Version compatibleWith; + + public OperatorStateVersionMetadata(Long version, Version compatibleWith) { + this.version = version; + this.compatibleWith = compatibleWith; + } + + public static OperatorStateVersionMetadata parse(XContentParser parser, Void v) { + return PARSER.apply(parser, v); + } + + public Long version() { + return version; + } + + public Version minCompatibleVersion() { + return compatibleWith; + } +} diff --git a/server/src/main/java/org/elasticsearch/operator/service/OperatorUpdateErrorTask.java b/server/src/main/java/org/elasticsearch/operator/service/OperatorUpdateErrorTask.java new file mode 100644 index 0000000000000..2a52b194c3612 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/operator/service/OperatorUpdateErrorTask.java @@ -0,0 +1,92 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.operator.service; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.ClusterStateTaskListener; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.OperatorErrorMetadata; +import org.elasticsearch.cluster.metadata.OperatorMetadata; + +import java.util.List; + +/** + * Cluster state update task that sets the error state of the operator metadata. + * This is used when an operator cluster state update encounters error(s) while processing + * the file. + */ +public class OperatorUpdateErrorTask implements ClusterStateTaskListener { + + private final OperatorClusterStateController.OperatorErrorState errorState; + private final ActionListener listener; + + public OperatorUpdateErrorTask( + OperatorClusterStateController.OperatorErrorState errorState, + ActionListener listener + ) { + this.errorState = errorState; + this.listener = listener; + } + + private static final Logger logger = LogManager.getLogger(FileSettingsService.class); + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + + ActionListener listener() { + return listener; + } + + ClusterState execute(ClusterState currentState) { + ClusterState.Builder stateBuilder = new ClusterState.Builder(currentState); + Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata()); + OperatorMetadata operatorMetadata = currentState.metadata().operatorState(errorState.namespace()); + OperatorMetadata.Builder operatorMetadataBuilder = OperatorMetadata.builder(errorState.namespace(), operatorMetadata); + operatorMetadataBuilder.errorMetadata( + OperatorErrorMetadata.builder() + .version(errorState.version()) + .errorKind(errorState.errorKind()) + .errors(errorState.errors()) + .build() + ); + metadataBuilder.putOperatorState(operatorMetadataBuilder.build()); + ClusterState newState = stateBuilder.metadata(metadataBuilder).build(); + + return newState; + } + + /** + * Operator update cluster state task executor + */ + public record OperatorUpdateErrorTaskExecutor() implements ClusterStateTaskExecutor { + + @Override + public ClusterState execute(ClusterState currentState, List> taskContexts) throws Exception { + for (final var taskContext : taskContexts) { + currentState = taskContext.getTask().execute(currentState); + taskContext.success( + () -> taskContext.getTask().listener().delegateFailure((l, s) -> l.onResponse(ActionResponse.Empty.INSTANCE)) + ); + } + return currentState; + } + + @Override + public void clusterStatePublished(ClusterState newClusterState) { + logger.info("Wrote new error state in operator metadata"); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/operator/service/OperatorUpdateStateTask.java b/server/src/main/java/org/elasticsearch/operator/service/OperatorUpdateStateTask.java new file mode 100644 index 0000000000000..e993290a67d41 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/operator/service/OperatorUpdateStateTask.java @@ -0,0 +1,173 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.operator.service; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.ClusterStateTaskListener; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.OperatorErrorMetadata; +import org.elasticsearch.cluster.metadata.OperatorHandlerMetadata; +import org.elasticsearch.cluster.metadata.OperatorMetadata; +import org.elasticsearch.cluster.routing.RerouteService; +import org.elasticsearch.common.Priority; +import org.elasticsearch.operator.OperatorHandler; +import org.elasticsearch.operator.TransformState; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; + +import static org.elasticsearch.core.Strings.format; + +/** + * Generic operator cluster state update task + */ +public class OperatorUpdateStateTask implements ClusterStateTaskListener { + private static final Logger logger = LogManager.getLogger(FileSettingsService.class); + + private final String namespace; + private final OperatorClusterStateController.SettingsFile operatorStateFileContent; + private final Map> handlers; + private final Collection orderedHandlers; + private final Consumer recordErrorState; + private final ActionListener listener; + + public OperatorUpdateStateTask( + String namespace, + OperatorClusterStateController.SettingsFile operatorStateFileContent, + Map> handlers, + Collection orderedHandlers, + Consumer recordErrorState, + ActionListener listener + ) { + this.namespace = namespace; + this.operatorStateFileContent = operatorStateFileContent; + this.handlers = handlers; + this.orderedHandlers = orderedHandlers; + this.recordErrorState = recordErrorState; + this.listener = listener; + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + + ActionListener listener() { + return listener; + } + + protected ClusterState execute(ClusterState state) { + OperatorMetadata existingMetadata = state.metadata().operatorState(namespace); + Map operatorState = operatorStateFileContent.state; + OperatorStateVersionMetadata stateVersionMetadata = operatorStateFileContent.metadata; + + OperatorMetadata.Builder operatorMetadataBuilder = new OperatorMetadata.Builder(namespace).version(stateVersionMetadata.version()); + List errors = new ArrayList<>(); + + for (var handlerKey : orderedHandlers) { + OperatorHandler handler = handlers.get(handlerKey); + try { + Set existingKeys = keysForHandler(existingMetadata, handlerKey); + TransformState transformState = handler.transform(operatorState.get(handlerKey), new TransformState(state, existingKeys)); + state = transformState.state(); + operatorMetadataBuilder.putHandler(new OperatorHandlerMetadata.Builder(handlerKey).keys(transformState.keys()).build()); + } catch (Exception e) { + errors.add(format("Error processing %s state change: %s", handler.name(), e.getMessage())); + } + } + + if (errors.isEmpty() == false) { + // Check if we had previous error metadata with version information, don't spam with cluster state updates, if the + // version hasn't been updated. + if (existingMetadata != null + && existingMetadata.errorMetadata() != null + && existingMetadata.errorMetadata().version() >= stateVersionMetadata.version()) { + logger.error("Error processing state change request for [{}] with the following errors [{}]", namespace, errors); + + throw new OperatorClusterStateController.IncompatibleVersionException( + format( + "Not updating error state because version [%s] is less or equal to the last operator error version [%s]", + stateVersionMetadata.version(), + existingMetadata.errorMetadata().version() + ) + ); + } + + recordErrorState.accept( + new OperatorClusterStateController.OperatorErrorState( + namespace, + stateVersionMetadata.version(), + errors, + OperatorErrorMetadata.ErrorKind.VALIDATION + ) + ); + logger.error("Error processing state change request for [{}] with the following errors [{}]", namespace, errors); + + throw new IllegalStateException("Error processing state change request for " + namespace); + } + + // remove the last error if we had previously encountered any + operatorMetadataBuilder.errorMetadata(null); + + ClusterState.Builder stateBuilder = new ClusterState.Builder(state); + Metadata.Builder metadataBuilder = Metadata.builder(state.metadata()).putOperatorState(operatorMetadataBuilder.build()); + + return stateBuilder.metadata(metadataBuilder).build(); + } + + private Set keysForHandler(OperatorMetadata operatorMetadata, String handlerKey) { + if (operatorMetadata == null || operatorMetadata.handlers().get(handlerKey) == null) { + return Collections.emptySet(); + } + + return operatorMetadata.handlers().get(handlerKey).keys(); + } + + /** + * Operator update cluster state task executor + * + * @param namespace of the state we are updating + * @param rerouteService instance of RerouteService so we can execute reroute after cluster state is published + */ + public record OperatorUpdateStateTaskExecutor(String namespace, RerouteService rerouteService) + implements + ClusterStateTaskExecutor { + + @Override + public ClusterState execute(ClusterState currentState, List> taskContexts) throws Exception { + for (final var taskContext : taskContexts) { + currentState = taskContext.getTask().execute(currentState); + taskContext.success(() -> taskContext.getTask().listener().onResponse(ActionResponse.Empty.INSTANCE)); + } + return currentState; + } + + @Override + public void clusterStatePublished(ClusterState newClusterState) { + rerouteService.reroute( + "reroute after applying operator cluster state for namespace [" + namespace + "]", + Priority.NORMAL, + ActionListener.wrap( + r -> logger.trace("reroute after applying operator cluster state for [{}] succeeded", namespace), + e -> logger.debug("reroute after applying operator cluster state failed", e) + ) + ); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/plugins/PluginsService.java b/server/src/main/java/org/elasticsearch/plugins/PluginsService.java index d5af93af4a2c4..d91d047cfaf5c 100644 --- a/server/src/main/java/org/elasticsearch/plugins/PluginsService.java +++ b/server/src/main/java/org/elasticsearch/plugins/PluginsService.java @@ -26,6 +26,7 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.jdk.JarHell; import org.elasticsearch.node.ReportingService; +import org.elasticsearch.operator.OperatorHandlerProvider; import org.elasticsearch.plugins.spi.SPIClassIterator; import java.io.IOException; @@ -52,6 +53,7 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.ServiceLoader; import java.util.Set; import java.util.function.Consumer; import java.util.function.Function; @@ -98,6 +100,8 @@ record LoadedPlugin(PluginDescriptor descriptor, Plugin instance, ClassLoader lo private final List plugins; private final PluginsAndModules info; + private static Class[] SPI_CLASSES = { OperatorHandlerProvider.class }; + public static final Setting> MANDATORY_SETTING = Setting.listSetting( "plugin.mandatory", Collections.emptyList(), @@ -281,7 +285,6 @@ private Map loadBundles(Set bundles) { // package-private for test visibility static void loadExtensions(Collection plugins) { - Map> extendingPluginsByName = plugins.stream() .flatMap(t -> t.descriptor().getExtendedPlugins().stream().map(extendedPlugin -> Tuple.tuple(extendedPlugin, t.instance()))) .collect(Collectors.groupingBy(Tuple::v1, Collectors.mapping(Tuple::v2, Collectors.toList()))); @@ -295,6 +298,17 @@ static void loadExtensions(Collection plugins) { } } + public List loadServiceProviders(Class provider) { + List result = new ArrayList<>(); + getClass().getModule().addUses(provider); + + for (LoadedPlugin pluginTuple : plugins) { + ServiceLoader.load(provider, pluginTuple.loader()).iterator().forEachRemaining(c -> result.add(c)); + } + + return Collections.unmodifiableList(result); + } + private static void loadExtensionsForPlugin(ExtensiblePlugin extensiblePlugin, List extendingPlugins) { ExtensiblePlugin.ExtensionLoader extensionLoader = new ExtensiblePlugin.ExtensionLoader() { @Override diff --git a/server/src/test/java/org/elasticsearch/action/ActionModuleTests.java b/server/src/test/java/org/elasticsearch/action/ActionModuleTests.java index a1993bd942f91..ebc869a3aba2c 100644 --- a/server/src/test/java/org/elasticsearch/action/ActionModuleTests.java +++ b/server/src/test/java/org/elasticsearch/action/ActionModuleTests.java @@ -116,6 +116,7 @@ public void testSetupRestHandlerContainsKnownBuiltin() { null, null, usageService, + null, null ); actionModule.initRestHandlers(null); @@ -172,6 +173,7 @@ public String getName() { null, null, usageService, + null, null ); Exception e = expectThrows(IllegalArgumentException.class, () -> actionModule.initRestHandlers(null)); @@ -221,6 +223,7 @@ public List getRestHandlers( null, null, usageService, + null, null ); actionModule.initRestHandlers(null); @@ -265,6 +268,7 @@ public void test3rdPartyHandlerIsNotInstalled() { null, null, usageService, + null, null ) ); diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java index 332233d2b7d66..b9ee982a1f41e 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java @@ -142,7 +142,8 @@ public void testToXContent() throws IOException { }, "index-graveyard": { "tombstones": [] - } + }, + "operator":{} }, "routing_table": { "indices": {} @@ -246,7 +247,8 @@ public void testToXContent() throws IOException { }, "index-graveyard" : { "tombstones" : [ ] - } + }, + "operator":{} } } }"""), XContentHelper.stripWhitespace(Strings.toString(builder))); diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsRequestTests.java index 042f7f150788a..ce77b451e5dc2 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsRequestTests.java @@ -8,9 +8,17 @@ package org.elasticsearch.action.admin.cluster.settings; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.operator.action.OperatorClusterUpdateSettingsAction; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.XContentTestUtils; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentParseException; import org.elasticsearch.xcontent.XContentParser; @@ -21,6 +29,8 @@ import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.mockito.Mockito.mock; public class ClusterUpdateSettingsRequestTests extends ESTestCase { @@ -71,4 +81,43 @@ private static ClusterUpdateSettingsRequest createTestItem() { request.transientSettings(ClusterUpdateSettingsResponseTests.randomClusterSettings(0, 2)); return request; } + + public void testOperatorHandler() throws IOException { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + + TransportClusterUpdateSettingsAction action = new TransportClusterUpdateSettingsAction( + mock(TransportService.class), + mock(ClusterService.class), + mock(ThreadPool.class), + mock(ActionFilters.class), + mock(IndexNameExpressionResolver.class), + clusterSettings + ); + + assertEquals(OperatorClusterUpdateSettingsAction.NAME, action.operatorHandlerName().get()); + + String oneSettingJSON = """ + { + "persistent": { + "indices.recovery.max_bytes_per_sec": "25mb", + "cluster": { + "remote": { + "cluster_one": { + "seeds": [ + "127.0.0.1:9300" + ] + } + } + } + } + }"""; + + try (XContentParser parser = createParser(XContentType.JSON.xContent(), oneSettingJSON)) { + ClusterUpdateSettingsRequest parsedRequest = ClusterUpdateSettingsRequest.fromXContent(parser); + assertThat( + action.modifiedKeys(parsedRequest), + containsInAnyOrder("indices.recovery.max_bytes_per_sec", "cluster.remote.cluster_one.seeds") + ); + } + } } diff --git a/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java b/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java index 9770b1c42dc0f..b19b95b074a8e 100644 --- a/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java @@ -14,12 +14,14 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.block.ClusterBlock; @@ -30,6 +32,8 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.OperatorHandlerMetadata; +import org.elasticsearch.cluster.metadata.OperatorMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -43,6 +47,7 @@ import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.indices.TestIndexNameExpressionResolver; import org.elasticsearch.node.NodeClosedException; +import org.elasticsearch.operator.action.OperatorClusterUpdateSettingsAction; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; @@ -65,6 +70,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; @@ -254,6 +260,63 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state) } } + class OperatorAction extends Action { + OperatorAction(String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool) { + super(actionName, transportService, clusterService, threadPool, ThreadPool.Names.SAME); + } + + @Override + protected Optional operatorHandlerName() { + return Optional.of("test_operator"); + } + } + + class FakeClusterStateUpdateAction extends TransportMasterNodeAction { + FakeClusterStateUpdateAction( + String actionName, + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + String executor + ) { + super( + actionName, + transportService, + clusterService, + threadPool, + new ActionFilters(new HashSet<>()), + ClusterUpdateSettingsRequest::new, + TestIndexNameExpressionResolver.newInstance(), + Response::new, + executor + ); + } + + @Override + protected void masterOperation( + Task task, + ClusterUpdateSettingsRequest request, + ClusterState state, + ActionListener listener + ) {} + + @Override + protected ClusterBlockException checkBlock(ClusterUpdateSettingsRequest request, ClusterState state) { + return null; + } + + @Override + protected Optional operatorHandlerName() { + return Optional.of(OperatorClusterUpdateSettingsAction.NAME); + } + + @Override + protected Set modifiedKeys(ClusterUpdateSettingsRequest request) { + Settings allSettings = Settings.builder().put(request.persistentSettings()).put(request.transientSettings()).build(); + return allSettings.keySet(); + } + } + public void testLocalOperationWithoutBlocks() throws ExecutionException, InterruptedException { final boolean masterOperationFailure = randomBoolean(); @@ -686,7 +749,6 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state) indexNameExpressionResolver.concreteIndexNamesWithSystemIndexAccess(state, request) ); } - }; PlainActionFuture listener = new PlainActionFuture<>(); @@ -697,6 +759,58 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state) assertThat(ex.getCause().getCause(), instanceOf(ClusterBlockException.class)); } + public void testRejectOperatorConflictClusterStateUpdate() { + OperatorHandlerMetadata hmOne = new OperatorHandlerMetadata.Builder(OperatorClusterUpdateSettingsAction.NAME).keys(Set.of("a", "b")) + .build(); + OperatorHandlerMetadata hmThree = new OperatorHandlerMetadata.Builder(OperatorClusterUpdateSettingsAction.NAME).keys( + Set.of("e", "f") + ).build(); + + OperatorMetadata omOne = OperatorMetadata.builder("namespace_one").putHandler(hmOne).build(); + OperatorMetadata omTwo = OperatorMetadata.builder("namespace_two").putHandler(hmThree).build(); + + Metadata metadata = Metadata.builder().putOperatorState(omOne).putOperatorState(omTwo).build(); + + ClusterState clusterState = ClusterState.builder(new ClusterName("test")).metadata(metadata).build(); + + Action noHandler = new Action("internal:testAction", transportService, clusterService, threadPool, ThreadPool.Names.SAME); + + assertFalse(noHandler.supportsOperatorMode()); + + noHandler = new OperatorAction("internal:testOpAction", transportService, clusterService, threadPool); + + assertTrue(noHandler.supportsOperatorMode()); + + // nothing should happen here, since the request doesn't touch any of the operator key + noHandler.validateForOperatorState(new Request(), clusterState); + + ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest().persistentSettings( + Settings.builder().put("a", "a value").build() + ).transientSettings(Settings.builder().put("e", "e value").build()); + + FakeClusterStateUpdateAction action = new FakeClusterStateUpdateAction( + "internal:testClusterSettings", + transportService, + clusterService, + threadPool, + ThreadPool.Names.SAME + ); + + assertTrue(action.supportsOperatorMode()); + + assertTrue( + expectThrows(IllegalArgumentException.class, () -> action.validateForOperatorState(request, clusterState)).getMessage() + .contains("with errors: [a] set in operator mode by [namespace_one]\n" + "[e] set in operator mode by [namespace_two]") + ); + + ClusterUpdateSettingsRequest okRequest = new ClusterUpdateSettingsRequest().persistentSettings( + Settings.builder().put("m", "m value").build() + ).transientSettings(Settings.builder().put("n", "n value").build()); + + // this should just work, no conflicts + action.validateForOperatorState(okRequest, clusterState); + } + private Runnable blockAllThreads(String executorName) throws Exception { final int numberOfThreads = threadPool.info(executorName).getMax(); final EsThreadPoolExecutor executor = (EsThreadPoolExecutor) threadPool.executor(executorName); diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java index 5a93fa16359c0..05d417bef0077 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java @@ -286,7 +286,8 @@ public void testToXContent() throws IOException { }, "index-graveyard": { "tombstones": [] - } + }, + "operator" : { } }, "routing_table": { "indices": { @@ -489,7 +490,8 @@ public void testToXContent_FlatSettingTrue_ReduceMappingFalse() throws IOExcepti }, "index-graveyard" : { "tombstones" : [ ] - } + }, + "operator" : { } }, "routing_table" : { "indices" : { @@ -699,7 +701,8 @@ public void testToXContent_FlatSettingFalse_ReduceMappingTrue() throws IOExcepti }, "index-graveyard" : { "tombstones" : [ ] - } + }, + "operator" : { } }, "routing_table" : { "indices" : { @@ -840,7 +843,8 @@ public void testToXContentSameTypeName() throws IOException { }, "index-graveyard" : { "tombstones" : [ ] - } + }, + "operator" : { } }, "routing_table" : { "indices" : { } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/OperatorMetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/OperatorMetadataTests.java new file mode 100644 index 0000000000000..63464d0e4c0b2 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/OperatorMetadataTests.java @@ -0,0 +1,86 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.json.JsonXContent; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; + +/** + * Tests for the {@link OperatorMetadata}, {@link OperatorErrorMetadata}, {@link OperatorHandlerMetadata} classes + */ +public class OperatorMetadataTests extends ESTestCase { + + public void testEquals() { + final OperatorMetadata meta = createRandom(); + assertThat(meta, equalTo(OperatorMetadata.builder(meta.namespace(), meta).build())); + final OperatorMetadata.Builder newMeta = OperatorMetadata.builder(meta.namespace(), meta); + newMeta.putHandler(new OperatorHandlerMetadata("1", Collections.emptySet())); + assertThat(newMeta.build(), not(meta)); + } + + public void testSerialization() throws IOException { + final OperatorMetadata meta = createRandom(); + final BytesStreamOutput out = new BytesStreamOutput(); + meta.writeTo(out); + assertThat(OperatorMetadata.readFrom(out.bytes().streamInput()), equalTo(meta)); + } + + public void testXContent() throws IOException { + final OperatorMetadata meta = createRandom(); + final XContentBuilder builder = JsonXContent.contentBuilder(); + builder.startObject(); + meta.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder)); + parser.nextToken(); // the beginning of the object + assertThat(OperatorMetadata.fromXContent(parser), equalTo(meta)); + } + + private static OperatorMetadata createRandom() { + List handlers = randomList( + 0, + 10, + () -> new OperatorHandlerMetadata.Builder(randomAlphaOfLength(5)).keys(randomSet(1, 5, () -> randomAlphaOfLength(6))).build() + ); + + List errors = randomList( + 0, + 10, + () -> new OperatorErrorMetadata.Builder().version(1L) + .errorKind(randomFrom(OperatorErrorMetadata.ErrorKind.values())) + .errors(randomList(1, 5, () -> randomAlphaOfLength(10))) + .build() + ); + + OperatorMetadata.Builder builder = OperatorMetadata.builder(randomAlphaOfLength(7)); + + for (var handlerMeta : handlers) { + builder.putHandler(handlerMeta); + } + + for (var errorMeta : errors) { + builder.errorMetadata(errorMeta); + } + + return builder.build(); + } + +} diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetadataTests.java index 77ccb5bf93db5..7bd30d38190f3 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetadataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetadataTests.java @@ -29,6 +29,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import static org.elasticsearch.cluster.metadata.AliasMetadata.newAliasMetadataBuilder; import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createFirstBackingIndex; @@ -44,6 +45,23 @@ public class ToAndFromJsonMetadataTests extends ESTestCase { public void testSimpleJsonFromAndTo() throws IOException { IndexMetadata idx1 = createFirstBackingIndex("data-stream1").build(); IndexMetadata idx2 = createFirstBackingIndex("data-stream2").build(); + + OperatorHandlerMetadata hmOne = new OperatorHandlerMetadata.Builder("one").keys(Set.of("a", "b")).build(); + OperatorHandlerMetadata hmTwo = new OperatorHandlerMetadata.Builder("two").keys(Set.of("c", "d")).build(); + + OperatorErrorMetadata emOne = new OperatorErrorMetadata.Builder().version(1L) + .errorKind(OperatorErrorMetadata.ErrorKind.VALIDATION) + .errors(List.of("Test error 1", "Test error 2")) + .build(); + + OperatorMetadata operatorMetadata = OperatorMetadata.builder("namespace_one") + .errorMetadata(emOne) + .putHandler(hmOne) + .putHandler(hmTwo) + .build(); + + OperatorMetadata operatorMetadata1 = OperatorMetadata.builder("namespace_two").putHandler(hmTwo).build(); + Metadata metadata = Metadata.builder() .put( IndexTemplateMetadata.builder("foo") @@ -107,6 +125,8 @@ public void testSimpleJsonFromAndTo() throws IOException { .put(idx2, false) .put(DataStreamTestHelper.newInstance("data-stream1", List.of(idx1.getIndex()))) .put(DataStreamTestHelper.newInstance("data-stream2", List.of(idx2.getIndex()))) + .putOperatorState(operatorMetadata) + .putOperatorState(operatorMetadata1) .build(); XContentBuilder builder = JsonXContent.contentBuilder(); @@ -180,6 +200,10 @@ public void testSimpleJsonFromAndTo() throws IOException { assertThat(parsedMetadata.dataStreams().get("data-stream2").getName(), is("data-stream2")); assertThat(parsedMetadata.dataStreams().get("data-stream2").getTimeStampField().getName(), is("@timestamp")); assertThat(parsedMetadata.dataStreams().get("data-stream2").getIndices(), contains(idx2.getIndex())); + + // operator metadata + assertEquals(operatorMetadata, parsedMetadata.operatorState(operatorMetadata.namespace())); + assertEquals(operatorMetadata1, parsedMetadata.operatorState(operatorMetadata1.namespace())); } private static final String MAPPING_SOURCE1 = """ @@ -246,7 +270,8 @@ public void testToXContentGateway_FlatSettingTrue_ReduceMappingFalse() throws IO }, "index-graveyard" : { "tombstones" : [ ] - } + }, + "operator" : { } } }""".formatted(Version.CURRENT.id, Version.CURRENT.id), Strings.toString(builder)); } @@ -341,7 +366,8 @@ public void testToXContentAPI_SameTypeName() throws IOException { }, "index-graveyard" : { "tombstones" : [ ] - } + }, + "operator" : { } } }""".formatted(Version.CURRENT.id), Strings.toString(builder)); } @@ -405,7 +431,8 @@ public void testToXContentGateway_FlatSettingFalse_ReduceMappingTrue() throws IO }, "index-graveyard" : { "tombstones" : [ ] - } + }, + "operator" : { } } }""".formatted(Version.CURRENT.id, Version.CURRENT.id), Strings.toString(builder)); } @@ -507,7 +534,8 @@ public void testToXContentAPI_FlatSettingTrue_ReduceMappingFalse() throws IOExce }, "index-graveyard" : { "tombstones" : [ ] - } + }, + "operator" : { } } }""".formatted(Version.CURRENT.id, Version.CURRENT.id), Strings.toString(builder)); } @@ -615,6 +643,181 @@ public void testToXContentAPI_FlatSettingFalse_ReduceMappingTrue() throws IOExce }, "index-graveyard" : { "tombstones" : [ ] + }, + "operator" : { } + } + }""".formatted(Version.CURRENT.id, Version.CURRENT.id), Strings.toString(builder)); + } + + public void testToXContentAPIOperatorMetadata() throws IOException { + Map mapParams = new HashMap<>() { + { + put(Metadata.CONTEXT_MODE_PARAM, CONTEXT_MODE_API); + put("flat_settings", "false"); + put("reduce_mappings", "true"); + } + }; + + Metadata metadata = buildMetadata(); + + OperatorHandlerMetadata hmOne = new OperatorHandlerMetadata.Builder("one").keys(Set.of("a", "b")).build(); + OperatorHandlerMetadata hmTwo = new OperatorHandlerMetadata.Builder("two").keys(Set.of("c", "d")).build(); + OperatorHandlerMetadata hmThree = new OperatorHandlerMetadata.Builder("three").keys(Set.of("e", "f")).build(); + + OperatorErrorMetadata emOne = new OperatorErrorMetadata.Builder().version(1L) + .errorKind(OperatorErrorMetadata.ErrorKind.VALIDATION) + .errors(List.of("Test error 1", "Test error 2")) + .build(); + + OperatorErrorMetadata emTwo = new OperatorErrorMetadata.Builder().version(2L) + .errorKind(OperatorErrorMetadata.ErrorKind.TRANSIENT) + .errors(List.of("Test error 3", "Test error 4")) + .build(); + + OperatorMetadata omOne = OperatorMetadata.builder("namespace_one").errorMetadata(emOne).putHandler(hmOne).putHandler(hmTwo).build(); + + OperatorMetadata omTwo = OperatorMetadata.builder("namespace_two").errorMetadata(emTwo).putHandler(hmThree).build(); + + metadata = Metadata.builder(metadata).putOperatorState(omOne).putOperatorState(omTwo).build(); + + XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint(); + builder.startObject(); + metadata.toXContent(builder, new ToXContent.MapParams(mapParams)); + builder.endObject(); + + assertEquals(""" + { + "metadata" : { + "cluster_uuid" : "clusterUUID", + "cluster_uuid_committed" : false, + "cluster_coordination" : { + "term" : 1, + "last_committed_config" : [ + "commitedConfigurationNodeId" + ], + "last_accepted_config" : [ + "acceptedConfigurationNodeId" + ], + "voting_config_exclusions" : [ + { + "node_id" : "exlucdedNodeId", + "node_name" : "excludedNodeName" + } + ] + }, + "templates" : { + "template" : { + "order" : 0, + "index_patterns" : [ + "pattern1", + "pattern2" + ], + "settings" : { + "index" : { + "version" : { + "created" : "%s" + } + } + }, + "mappings" : { }, + "aliases" : { } + } + }, + "indices" : { + "index" : { + "version" : 2, + "mapping_version" : 1, + "settings_version" : 1, + "aliases_version" : 1, + "routing_num_shards" : 1, + "state" : "open", + "settings" : { + "index" : { + "number_of_shards" : "1", + "number_of_replicas" : "2", + "version" : { + "created" : "%s" + } + } + }, + "mappings" : { + "type" : { + "type1" : { + "key" : "value" + } + } + }, + "aliases" : [ + "alias" + ], + "primary_terms" : { + "0" : 1 + }, + "in_sync_allocations" : { + "0" : [ + "allocationId" + ] + }, + "rollover_info" : { + "rolloveAlias" : { + "met_conditions" : { }, + "time" : 1 + } + }, + "system" : false, + "timestamp_range" : { + "shards" : [ ] + } + } + }, + "index-graveyard" : { + "tombstones" : [ ] + }, + "operator" : { + "namespace_one" : { + "version" : 0, + "handlers" : { + "one" : { + "keys" : [ + "a", + "b" + ] + }, + "two" : { + "keys" : [ + "c", + "d" + ] + } + }, + "errors" : { + "version" : 1, + "error_kind" : "validation", + "errors" : [ + "Test error 1", + "Test error 2" + ] + } + }, + "namespace_two" : { + "version" : 0, + "handlers" : { + "three" : { + "keys" : [ + "e", + "f" + ] + } + }, + "errors" : { + "version" : 2, + "error_kind" : "transient", + "errors" : [ + "Test error 3", + "Test error 4" + ] + } + } } } }""".formatted(Version.CURRENT.id, Version.CURRENT.id), Strings.toString(builder)); diff --git a/server/src/test/java/org/elasticsearch/operator/action/OperatorClusterUpdateSettingsActionTests.java b/server/src/test/java/org/elasticsearch/operator/action/OperatorClusterUpdateSettingsActionTests.java new file mode 100644 index 0000000000000..c2e29a10117fa --- /dev/null +++ b/server/src/test/java/org/elasticsearch/operator/action/OperatorClusterUpdateSettingsActionTests.java @@ -0,0 +1,100 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.operator.action; + +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.operator.TransformState; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentParserConfiguration; +import org.elasticsearch.xcontent.XContentType; + +import java.util.Collections; + +import static org.hamcrest.Matchers.containsInAnyOrder; + +public class OperatorClusterUpdateSettingsActionTests extends ESTestCase { + + private TransformState processJSON(OperatorClusterUpdateSettingsAction action, TransformState prevState, String json) throws Exception { + try (XContentParser parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, json)) { + return action.transform(parser.map(), prevState); + } + } + + public void testValidation() throws Exception { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + + ClusterState state = ClusterState.builder(new ClusterName("elasticsearch")).build(); + TransformState prevState = new TransformState(state, Collections.emptySet()); + OperatorClusterUpdateSettingsAction action = new OperatorClusterUpdateSettingsAction(clusterSettings); + + String badPolicyJSON = """ + { + "indices.recovery.min_bytes_per_sec": "50mb" + }"""; + + assertEquals( + "persistent setting [indices.recovery.min_bytes_per_sec], not recognized", + expectThrows(IllegalArgumentException.class, () -> processJSON(action, prevState, badPolicyJSON)).getMessage() + ); + } + + public void testSetUnsetSettings() throws Exception { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + + ClusterState state = ClusterState.builder(new ClusterName("elasticsearch")).build(); + TransformState prevState = new TransformState(state, Collections.emptySet()); + OperatorClusterUpdateSettingsAction action = new OperatorClusterUpdateSettingsAction(clusterSettings); + + String emptyJSON = ""; + + TransformState updatedState = processJSON(action, prevState, emptyJSON); + assertEquals(0, updatedState.keys().size()); + assertEquals(prevState.state(), updatedState.state()); + + String settingsJSON = """ + { + "indices.recovery.max_bytes_per_sec": "50mb", + "cluster": { + "remote": { + "cluster_one": { + "seeds": [ + "127.0.0.1:9300" + ] + } + } + } + }"""; + + prevState = updatedState; + updatedState = processJSON(action, prevState, settingsJSON); + assertThat(updatedState.keys(), containsInAnyOrder("indices.recovery.max_bytes_per_sec", "cluster.remote.cluster_one.seeds")); + assertEquals("50mb", updatedState.state().metadata().persistentSettings().get("indices.recovery.max_bytes_per_sec")); + assertEquals("[127.0.0.1:9300]", updatedState.state().metadata().persistentSettings().get("cluster.remote.cluster_one.seeds")); + + String oneSettingJSON = """ + { + "indices.recovery.max_bytes_per_sec": "25mb" + }"""; + + prevState = updatedState; + updatedState = processJSON(action, prevState, oneSettingJSON); + assertThat(updatedState.keys(), containsInAnyOrder("indices.recovery.max_bytes_per_sec")); + assertEquals("25mb", updatedState.state().metadata().persistentSettings().get("indices.recovery.max_bytes_per_sec")); + assertNull(updatedState.state().metadata().persistentSettings().get("cluster.remote.cluster_one.seeds")); + + prevState = updatedState; + updatedState = processJSON(action, prevState, emptyJSON); + assertEquals(0, updatedState.keys().size()); + assertNull(updatedState.state().metadata().persistentSettings().get("indices.recovery.max_bytes_per_sec")); + } +} diff --git a/server/src/test/java/org/elasticsearch/operator/service/FileSettingsServiceTests.java b/server/src/test/java/org/elasticsearch/operator/service/FileSettingsServiceTests.java new file mode 100644 index 0000000000000..84025b29efc89 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/operator/service/FileSettingsServiceTests.java @@ -0,0 +1,190 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.operator.service; + +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.env.Environment; +import org.elasticsearch.operator.action.OperatorClusterUpdateSettingsAction; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; +import org.mockito.stubbing.Answer; + +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.attribute.FileTime; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class FileSettingsServiceTests extends ESTestCase { + private Environment env; + private ClusterService clusterService; + private FileSettingsService fileSettingsService; + private ThreadPool threadpool; + + @Before + public void setUp() throws Exception { + super.setUp(); + + threadpool = new TestThreadPool("file_settings_service_tests"); + + clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadpool + ); + env = newEnvironment(Settings.EMPTY); + + Files.createDirectories(env.configFile()); + + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + + OperatorClusterStateController controller = new OperatorClusterStateController(clusterService); + controller.initHandlers(List.of(new OperatorClusterUpdateSettingsAction(clusterSettings))); + + fileSettingsService = new FileSettingsService(clusterService, controller, env); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + threadpool.shutdownNow(); + } + + public void testOperatorDirName() { + Path operatorPath = fileSettingsService.operatorSettingsDir(); + assertTrue(operatorPath.startsWith(env.configFile())); + assertTrue(operatorPath.endsWith("operator")); + + Path operatorSettingsFile = fileSettingsService.operatorSettingsFile(); + assertTrue(operatorSettingsFile.startsWith(operatorPath)); + assertTrue(operatorSettingsFile.endsWith("settings.json")); + } + + public void testWatchedFile() throws Exception { + Path tmpFile = createTempFile(); + Path tmpFile1 = createTempFile(); + Path otherFile = tmpFile.getParent().resolve("other.json"); + // we return false on non-existent paths, we don't remember state + assertFalse(fileSettingsService.watchedFileChanged(otherFile)); + + // we remember the previous state + assertTrue(fileSettingsService.watchedFileChanged(tmpFile)); + assertFalse(fileSettingsService.watchedFileChanged(tmpFile)); + + // we modify the timestamp of the file, it should trigger a change + Instant now = LocalDateTime.now(ZoneId.systemDefault()).toInstant(ZoneOffset.ofHours(0)); + Files.setLastModifiedTime(tmpFile, FileTime.from(now)); + + assertTrue(fileSettingsService.watchedFileChanged(tmpFile)); + assertFalse(fileSettingsService.watchedFileChanged(tmpFile)); + + // we change to another real file, it should be changed + assertTrue(fileSettingsService.watchedFileChanged(tmpFile1)); + assertFalse(fileSettingsService.watchedFileChanged(tmpFile1)); + } + + public void testStartStop() { + fileSettingsService.start(); + fileSettingsService.startWatcher(); + assertTrue(fileSettingsService.watching()); + fileSettingsService.stop(); + assertFalse(fileSettingsService.watching()); + fileSettingsService.close(); + } + + public void testCallsProcessing() throws Exception { + FileSettingsService service = spy(fileSettingsService); + CountDownLatch processFileLatch = new CountDownLatch(1); + + doAnswer((Answer) invocation -> { + processFileLatch.countDown(); + return null; + }).when(service).processFileSettings(any(), anyBoolean()); + + service.start(); + service.startWatcher(); + assertTrue(service.watching()); + + Files.createDirectories(service.operatorSettingsDir()); + + Files.write(service.operatorSettingsFile(), "{}".getBytes(StandardCharsets.UTF_8)); + + // we need to wait a bit, on MacOS it may take up to 10 seconds for the Java watcher service to notice the file, + // on Linux is instantaneous. + processFileLatch.await(30, TimeUnit.SECONDS); + + verify(service, times(1)).watchedFileChanged(any()); + + service.stop(); + assertFalse(service.watching()); + service.close(); + } + + @SuppressWarnings("unchecked") + public void testInitialFile() throws Exception { + OperatorClusterStateController controller = mock(OperatorClusterStateController.class); + + doAnswer((Answer) invocation -> { + ((Consumer) invocation.getArgument(2)).accept(new IllegalStateException("Some exception")); + return null; + }).when(controller).process(any(), any(), any()); + + FileSettingsService service = spy(new FileSettingsService(clusterService, controller, env)); + + Files.createDirectories(service.operatorSettingsDir()); + + // contents of the JSON don't matter, we just need a file to exist + Files.write(service.operatorSettingsFile(), "{}".getBytes(StandardCharsets.UTF_8)); + + service.start(); + assertEquals( + "Error applying operator settings", + expectThrows(FileSettingsService.OperatorConfigurationError.class, () -> service.startWatcher()).getMessage() + ); + + verify(service, times(1)).processFileSettings(any(), eq(true)); + + service.stop(); + + clearInvocations(service); + + // Let's check that if we didn't throw an error that everything works + doAnswer((Answer) invocation -> null).when(controller).process(any(), any(), any()); + + service.start(); + service.startWatcher(); + + verify(service, times(1)).processFileSettings(any(), eq(true)); + + service.stop(); + service.close(); + } +} diff --git a/server/src/test/java/org/elasticsearch/operator/service/OperatorClusterStateControllerTests.java b/server/src/test/java/org/elasticsearch/operator/service/OperatorClusterStateControllerTests.java new file mode 100644 index 0000000000000..30042257bbf40 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/operator/service/OperatorClusterStateControllerTests.java @@ -0,0 +1,486 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.operator.service; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateAckListener; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.OperatorErrorMetadata; +import org.elasticsearch.cluster.metadata.OperatorHandlerMetadata; +import org.elasticsearch.cluster.metadata.OperatorMetadata; +import org.elasticsearch.cluster.routing.RerouteService; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.operator.OperatorHandler; +import org.elasticsearch.operator.TransformState; +import org.elasticsearch.operator.action.OperatorClusterUpdateSettingsAction; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentParserConfiguration; +import org.elasticsearch.xcontent.XContentType; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class OperatorClusterStateControllerTests extends ESTestCase { + + public void testOperatorController() throws IOException { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterService clusterService = mock(ClusterService.class); + final ClusterName clusterName = new ClusterName("elasticsearch"); + + ClusterState state = ClusterState.builder(clusterName).build(); + when(clusterService.state()).thenReturn(state); + + OperatorClusterStateController controller = new OperatorClusterStateController(clusterService); + controller.initHandlers(List.of(new OperatorClusterUpdateSettingsAction(clusterSettings))); + + String testJSON = """ + { + "metadata": { + "version": "1234", + "compatibility": "8.4.0" + }, + "state": { + "cluster_settings": { + "indices.recovery.max_bytes_per_sec": "50mb" + + } + } + """; + + AtomicReference x = new AtomicReference<>(); + + try (XContentParser parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON)) { + controller.process("operator", parser, (e) -> x.set(e)); + + assertTrue(x.get() instanceof IllegalStateException); + assertEquals("Error processing state change request for operator", x.get().getMessage()); + } + + testJSON = """ + { + "metadata": { + "version": "1234", + "compatibility": "8.4.0" + }, + "state": { + "cluster_settings": { + "indices.recovery.max_bytes_per_sec": "50mb", + "cluster": { + "remote": { + "cluster_one": { + "seeds": [ + "127.0.0.1:9300" + ] + } + } + } + } + } + } + """; + + try (XContentParser parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON)) { + controller.process("operator", parser, (e) -> { + if (e != null) { + fail("Should not fail"); + } + }); + } + } + + public void testUpdateStateTasks() throws Exception { + ClusterService clusterService = mock(ClusterService.class); + RerouteService rerouteService = mock(RerouteService.class); + + when(clusterService.getRerouteService()).thenReturn(rerouteService); + ClusterState state = ClusterState.builder(new ClusterName("test")).build(); + + OperatorUpdateStateTask.OperatorUpdateStateTaskExecutor taskExecutor = new OperatorUpdateStateTask.OperatorUpdateStateTaskExecutor( + "test", + clusterService.getRerouteService() + ); + + AtomicBoolean successCalled = new AtomicBoolean(false); + + OperatorUpdateStateTask task = spy( + new OperatorUpdateStateTask( + "test", + null, + Collections.emptyMap(), + Collections.emptySet(), + (errorState) -> {}, + new ActionListener<>() { + @Override + public void onResponse(ActionResponse.Empty empty) {} + + @Override + public void onFailure(Exception e) {} + } + ) + ); + + doReturn(state).when(task).execute(any()); + + ClusterStateTaskExecutor.TaskContext taskContext = new ClusterStateTaskExecutor.TaskContext<>() { + @Override + public OperatorUpdateStateTask getTask() { + return task; + } + + @Override + public void success(Runnable onPublicationSuccess) { + onPublicationSuccess.run(); + successCalled.set(true); + } + + @Override + public void success(Consumer publishedStateConsumer) {} + + @Override + public void success(Runnable onPublicationSuccess, ClusterStateAckListener clusterStateAckListener) {} + + @Override + public void success(Consumer publishedStateConsumer, ClusterStateAckListener clusterStateAckListener) {} + + @Override + public void onFailure(Exception failure) {} + }; + + ClusterState newState = taskExecutor.execute(state, List.of(taskContext)); + assertEquals(state, newState); + assertTrue(successCalled.get()); + verify(task, times(1)).execute(any()); + + taskExecutor.clusterStatePublished(state); + verify(rerouteService, times(1)).reroute(anyString(), any(), any()); + } + + public void testErrorStateTask() throws Exception { + ClusterState state = ClusterState.builder(new ClusterName("test")).build(); + + OperatorUpdateErrorTask task = spy( + new OperatorUpdateErrorTask( + new OperatorClusterStateController.OperatorErrorState( + "test", + 1L, + List.of("some parse error", "some io error"), + OperatorErrorMetadata.ErrorKind.PARSING + ), + new ActionListener<>() { + @Override + public void onResponse(ActionResponse.Empty empty) {} + + @Override + public void onFailure(Exception e) {} + } + ) + ); + + OperatorUpdateErrorTask.OperatorUpdateErrorTaskExecutor.TaskContext taskContext = + new OperatorUpdateErrorTask.OperatorUpdateErrorTaskExecutor.TaskContext<>() { + @Override + public OperatorUpdateErrorTask getTask() { + return task; + } + + @Override + public void success(Runnable onPublicationSuccess) { + onPublicationSuccess.run(); + } + + @Override + public void success(Consumer publishedStateConsumer) {} + + @Override + public void success(Runnable onPublicationSuccess, ClusterStateAckListener clusterStateAckListener) {} + + @Override + public void success(Consumer publishedStateConsumer, ClusterStateAckListener clusterStateAckListener) {} + + @Override + public void onFailure(Exception failure) {} + }; + + OperatorUpdateErrorTask.OperatorUpdateErrorTaskExecutor executor = new OperatorUpdateErrorTask.OperatorUpdateErrorTaskExecutor(); + + ClusterState newState = executor.execute(state, List.of(taskContext)); + + verify(task, times(1)).execute(any()); + + OperatorMetadata operatorMetadata = newState.metadata().operatorState("test"); + assertNotNull(operatorMetadata); + assertNotNull(operatorMetadata.errorMetadata()); + assertEquals(1L, (long) operatorMetadata.errorMetadata().version()); + assertEquals(OperatorErrorMetadata.ErrorKind.PARSING, operatorMetadata.errorMetadata().errorKind()); + assertThat(operatorMetadata.errorMetadata().errors(), contains("some parse error", "some io error")); + } + + public void testUpdateTaskDuplicateError() { + OperatorHandler dummy = new OperatorHandler<>() { + @Override + public String name() { + return "one"; + } + + @Override + public TransformState transform(Object source, TransformState prevState) throws Exception { + throw new Exception("anything"); + } + }; + + OperatorUpdateStateTask task = spy( + new OperatorUpdateStateTask( + "namespace_one", + new OperatorClusterStateController.SettingsFile( + Map.of("one", "two"), + new OperatorStateVersionMetadata(1L, Version.CURRENT) + ), + Map.of("one", dummy), + List.of(dummy.name()), + (errorState) -> {}, + new ActionListener<>() { + @Override + public void onResponse(ActionResponse.Empty empty) {} + + @Override + public void onFailure(Exception e) {} + } + ) + ); + + OperatorHandlerMetadata hmOne = new OperatorHandlerMetadata.Builder("one").keys(Set.of("a", "b")).build(); + + OperatorErrorMetadata emOne = new OperatorErrorMetadata.Builder().version(1L) + .errorKind(OperatorErrorMetadata.ErrorKind.VALIDATION) + .errors(List.of("Test error 1", "Test error 2")) + .build(); + + OperatorMetadata operatorMetadata = OperatorMetadata.builder("namespace_one") + .errorMetadata(emOne) + .version(1L) + .putHandler(hmOne) + .build(); + + Metadata metadata = Metadata.builder().putOperatorState(operatorMetadata).build(); + ClusterState state = ClusterState.builder(new ClusterName("test")).metadata(metadata).build(); + + // We exit on duplicate errors before we update the cluster state error metadata + assertEquals( + "Not updating error state because version [1] is less or equal to the last operator error version [1]", + expectThrows(OperatorClusterStateController.IncompatibleVersionException.class, () -> task.execute(state)).getMessage() + ); + + emOne = new OperatorErrorMetadata.Builder().version(0L) + .errorKind(OperatorErrorMetadata.ErrorKind.VALIDATION) + .errors(List.of("Test error 1", "Test error 2")) + .build(); + + // If we are writing with older error metadata, we should get proper IllegalStateException + operatorMetadata = OperatorMetadata.builder("namespace_one").errorMetadata(emOne).version(0L).putHandler(hmOne).build(); + + metadata = Metadata.builder().putOperatorState(operatorMetadata).build(); + ClusterState newState = ClusterState.builder(new ClusterName("test")).metadata(metadata).build(); + + // We exit on duplicate errors before we update the cluster state error metadata + assertEquals( + "Error processing state change request for namespace_one", + expectThrows(IllegalStateException.class, () -> task.execute(newState)).getMessage() + ); + } + + public void testCheckMetadataVersion() { + OperatorMetadata operatorMetadata = OperatorMetadata.builder("test").version(123L).build(); + + assertTrue( + OperatorClusterStateController.checkMetadataVersion( + operatorMetadata, + new OperatorStateVersionMetadata(124L, Version.CURRENT), + (e) -> {} + ) + ); + + AtomicReference x = new AtomicReference<>(); + + assertFalse( + OperatorClusterStateController.checkMetadataVersion( + operatorMetadata, + new OperatorStateVersionMetadata(123L, Version.CURRENT), + (e) -> x.set(e) + ) + ); + + assertTrue(x.get() instanceof OperatorClusterStateController.IncompatibleVersionException); + assertTrue(x.get().getMessage().contains("is less or equal to the current metadata version")); + + assertFalse( + OperatorClusterStateController.checkMetadataVersion( + operatorMetadata, + new OperatorStateVersionMetadata(124L, Version.fromId(Version.CURRENT.id + 1)), + (e) -> x.set(e) + ) + ); + + assertEquals(OperatorClusterStateController.IncompatibleVersionException.class, x.get().getClass()); + assertTrue(x.get().getMessage().contains("is not compatible with this Elasticsearch node")); + } + + public void testHandlerOrdering() { + OperatorHandler oh1 = new OperatorHandler<>() { + @Override + public String name() { + return "one"; + } + + @Override + public TransformState transform(Object source, TransformState prevState) throws Exception { + return null; + } + + @Override + public Collection dependencies() { + return List.of("two", "three"); + } + }; + + OperatorHandler oh2 = new OperatorHandler<>() { + @Override + public String name() { + return "two"; + } + + @Override + public TransformState transform(Object source, TransformState prevState) throws Exception { + return null; + } + }; + + OperatorHandler oh3 = new OperatorHandler<>() { + @Override + public String name() { + return "three"; + } + + @Override + public TransformState transform(Object source, TransformState prevState) throws Exception { + return null; + } + + @Override + public Collection dependencies() { + return List.of("two"); + } + }; + + ClusterService clusterService = mock(ClusterService.class); + OperatorClusterStateController controller = new OperatorClusterStateController(clusterService); + + controller.initHandlers(List.of(oh1, oh2, oh3)); + Collection ordered = controller.orderedStateHandlers(Set.of("one", "two", "three")); + assertThat(ordered, contains("two", "three", "one")); + + // assure that we bail on unknown handler + assertEquals( + "Unknown settings definition type: four", + expectThrows(IllegalStateException.class, () -> controller.orderedStateHandlers(Set.of("one", "two", "three", "four"))) + .getMessage() + ); + + // assure that we bail on missing dependency link + assertEquals( + "Missing settings dependency definition: one -> three", + expectThrows(IllegalStateException.class, () -> controller.orderedStateHandlers(Set.of("one", "two"))).getMessage() + ); + + // Change the second handler so that we create cycle + oh2 = new OperatorHandler<>() { + @Override + public String name() { + return "two"; + } + + @Override + public TransformState transform(Object source, TransformState prevState) throws Exception { + return null; + } + + @Override + public Collection dependencies() { + return List.of("one"); + } + }; + + controller.initHandlers(List.of(oh1, oh2)); + assertThat( + expectThrows(IllegalStateException.class, () -> controller.orderedStateHandlers(Set.of("one", "two"))).getMessage(), + anyOf( + is("Cycle found in settings dependencies: one -> two -> one"), + is("Cycle found in settings dependencies: two -> one -> two") + ) + ); + } + + public void testDuplicateHandlerNames() { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterService clusterService = mock(ClusterService.class); + final ClusterName clusterName = new ClusterName("elasticsearch"); + + ClusterState state = ClusterState.builder(clusterName).build(); + when(clusterService.state()).thenReturn(state); + + OperatorClusterStateController controller = new OperatorClusterStateController(clusterService); + + assertTrue( + expectThrows( + IllegalStateException.class, + () -> controller.initHandlers(List.of(new OperatorClusterUpdateSettingsAction(clusterSettings), new TestHandler())) + ).getMessage().startsWith("Duplicate key cluster_settings") + ); + } + + class TestHandler implements OperatorHandler { + + @Override + public String name() { + return OperatorClusterUpdateSettingsAction.NAME; + } + + @Override + public TransformState transform(Object source, TransformState prevState) throws Exception { + return prevState; + } + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java index 8526f60e033b3..e3322a7b3a846 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -991,6 +991,10 @@ public static Map randomMap(int minMapSize, int maxMapSize, Supplie return list; } + public static Set randomSet(int minSetSize, int maxSetSize, Supplier valueConstructor) { + return new HashSet<>(randomList(minSetSize, maxSetSize, valueConstructor)); + } + private static final String[] TIME_SUFFIXES = new String[] { "d", "h", "ms", "s", "m", "micros", "nanos" }; public static String randomTimeValue(int lower, int upper, String... suffixes) { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/action/DeleteLifecycleAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/action/DeleteLifecycleAction.java index 623c9797ffde1..2d59d07a55d0d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/action/DeleteLifecycleAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/action/DeleteLifecycleAction.java @@ -18,6 +18,8 @@ import java.io.IOException; import java.util.Objects; +import static org.elasticsearch.core.Strings.format; + public class DeleteLifecycleAction extends ActionType { public static final DeleteLifecycleAction INSTANCE = new DeleteLifecycleAction(); public static final String NAME = "cluster:admin/ilm/delete"; @@ -75,6 +77,11 @@ public boolean equals(Object obj) { return Objects.equals(policyName, other.policyName); } + @Override + public String toString() { + return format("delete lifecycle policy [%s]", policyName); + } + } } diff --git a/x-pack/plugin/ilm/src/main/java/module-info.java b/x-pack/plugin/ilm/src/main/java/module-info.java new file mode 100644 index 0000000000000..af6a442e46f4f --- /dev/null +++ b/x-pack/plugin/ilm/src/main/java/module-info.java @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +module org.elasticsearch.ilm { + requires org.apache.lucene.core; + requires org.elasticsearch.server; + requires org.elasticsearch.base; + requires org.elasticsearch.xcore; + requires org.elasticsearch.xcontent; + requires org.apache.logging.log4j; + + exports org.elasticsearch.xpack.ilm.action to org.elasticsearch.server; + exports org.elasticsearch.xpack.ilm.operator.action to org.elasticsearch.server; + exports org.elasticsearch.xpack.ilm to org.elasticsearch.server; + exports org.elasticsearch.xpack.slm.action to org.elasticsearch.server; + exports org.elasticsearch.xpack.slm to org.elasticsearch.server; + + provides org.elasticsearch.operator.OperatorHandlerProvider with org.elasticsearch.xpack.ilm.operator.ILMOperatorHandlerProvider; +} diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java index c18b0d632a8bc..fc908b71ee8d4 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java @@ -107,6 +107,8 @@ import org.elasticsearch.xpack.ilm.action.TransportStopILMAction; import org.elasticsearch.xpack.ilm.history.ILMHistoryStore; import org.elasticsearch.xpack.ilm.history.ILMHistoryTemplateRegistry; +import org.elasticsearch.xpack.ilm.operator.ILMOperatorHandlerProvider; +import org.elasticsearch.xpack.ilm.operator.action.OperatorLifecycleAction; import org.elasticsearch.xpack.slm.SLMInfoTransportAction; import org.elasticsearch.xpack.slm.SLMUsageTransportAction; import org.elasticsearch.xpack.slm.SlmHealthIndicatorService; @@ -158,6 +160,7 @@ public class IndexLifecycle extends Plugin implements ActionPlugin, HealthPlugin private final SetOnce snapshotHistoryStore = new SetOnce<>(); private final SetOnce ilmHealthIndicatorService = new SetOnce<>(); private final SetOnce slmHealthIndicatorService = new SetOnce<>(); + private final SetOnce ilmOperatorAction = new SetOnce<>(); private final Settings settings; public IndexLifecycle(Settings settings) { @@ -267,6 +270,9 @@ public Collection createComponents( components.addAll(Arrays.asList(snapshotLifecycleService.get(), snapshotHistoryStore.get(), snapshotRetentionService.get())); ilmHealthIndicatorService.set(new IlmHealthIndicatorService(clusterService)); slmHealthIndicatorService.set(new SlmHealthIndicatorService(clusterService)); + ilmOperatorAction.set(new OperatorLifecycleAction(xContentRegistry, client, XPackPlugin.getSharedLicenseState())); + + ILMOperatorHandlerProvider.handler(ilmOperatorAction.get()); return components; } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportDeleteLifecycleAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportDeleteLifecycleAction.java index 1387093782e48..c66188e3fd6e7 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportDeleteLifecycleAction.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportDeleteLifecycleAction.java @@ -29,8 +29,11 @@ import org.elasticsearch.xpack.core.ilm.LifecyclePolicyMetadata; import org.elasticsearch.xpack.core.ilm.action.DeleteLifecycleAction; import org.elasticsearch.xpack.core.ilm.action.DeleteLifecycleAction.Request; +import org.elasticsearch.xpack.ilm.operator.action.OperatorLifecycleAction; import java.util.List; +import java.util.Optional; +import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -70,6 +73,10 @@ public DeleteLifecyclePolicyTask(Request request, ActionListener operatorHandlerName() { + return Optional.of(OperatorLifecycleAction.NAME); + } + + @Override + protected Set modifiedKeys(Request request) { + return Set.of(request.getPolicyName()); + } } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleAction.java index 7baecc4754672..d1d75d7333d85 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleAction.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleAction.java @@ -41,10 +41,14 @@ import org.elasticsearch.xpack.core.ilm.action.PutLifecycleAction; import org.elasticsearch.xpack.core.ilm.action.PutLifecycleAction.Request; import org.elasticsearch.xpack.core.slm.SnapshotLifecycleMetadata; +import org.elasticsearch.xpack.ilm.operator.action.OperatorLifecycleAction; import java.time.Instant; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -111,7 +115,7 @@ protected void masterOperation(Task task, Request request, ClusterState state, A submitUnbatchedTask( "put-lifecycle-" + request.getPolicy().getName(), - new UpdateLifecyclePolicyTask(request, listener, licenseState, filteredHeaders, xContentRegistry, client) + new UpdateLifecyclePolicyTask(request, listener, licenseState, filteredHeaders, xContentRegistry, client, true) ); } @@ -121,6 +125,7 @@ public static class UpdateLifecyclePolicyTask extends AckedClusterStateUpdateTas private final Map filteredHeaders; private final NamedXContentRegistry xContentRegistry; private final Client client; + private final boolean verboseLogging; public UpdateLifecyclePolicyTask( Request request, @@ -128,7 +133,8 @@ public UpdateLifecyclePolicyTask( XPackLicenseState licenseState, Map filteredHeaders, NamedXContentRegistry xContentRegistry, - Client client + Client client, + boolean verboseLogging ) { super(request, listener); this.request = request; @@ -136,6 +142,24 @@ public UpdateLifecyclePolicyTask( this.filteredHeaders = filteredHeaders; this.xContentRegistry = xContentRegistry; this.client = client; + this.verboseLogging = verboseLogging; + } + + /** + * Constructor used in operator mode. It disables verbose logging and has no filtered headers. + * + * @param request + * @param licenseState + * @param xContentRegistry + * @param client + */ + public UpdateLifecyclePolicyTask( + Request request, + XPackLicenseState licenseState, + NamedXContentRegistry xContentRegistry, + Client client + ) { + this(request, null, licenseState, new HashMap<>(), xContentRegistry, client, false); } @Override @@ -161,10 +185,12 @@ public ClusterState execute(ClusterState currentState) throws Exception { Instant.now().toEpochMilli() ); LifecyclePolicyMetadata oldPolicy = newPolicies.put(lifecyclePolicyMetadata.getName(), lifecyclePolicyMetadata); - if (oldPolicy == null) { - logger.info("adding index lifecycle policy [{}]", request.getPolicy().getName()); - } else { - logger.info("updating index lifecycle policy [{}]", request.getPolicy().getName()); + if (verboseLogging) { + if (oldPolicy == null) { + logger.info("adding index lifecycle policy [{}]", request.getPolicy().getName()); + } else { + logger.info("updating index lifecycle policy [{}]", request.getPolicy().getName()); + } } IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(newPolicies, currentMetadata.getOperationMode()); stateBuilder.metadata(Metadata.builder(currentState.getMetadata()).putCustom(IndexLifecycleMetadata.TYPE, newMetadata).build()); @@ -285,4 +311,14 @@ private static void validatePrerequisites(LifecyclePolicy policy, ClusterState s protected ClusterBlockException checkBlock(Request request, ClusterState state) { return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); } + + @Override + protected Optional operatorHandlerName() { + return Optional.of(OperatorLifecycleAction.NAME); + } + + @Override + protected Set modifiedKeys(Request request) { + return Set.of(request.getPolicy().getName()); + } } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/operator/ILMOperatorHandlerProvider.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/operator/ILMOperatorHandlerProvider.java new file mode 100644 index 0000000000000..5577401719cfd --- /dev/null +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/operator/ILMOperatorHandlerProvider.java @@ -0,0 +1,31 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.ilm.operator; + +import org.elasticsearch.operator.OperatorHandler; +import org.elasticsearch.operator.OperatorHandlerProvider; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * ILM Provider implementation for the OperatorHandlerProvider service interface + */ +public class ILMOperatorHandlerProvider implements OperatorHandlerProvider { + private static final Set> handlers = ConcurrentHashMap.newKeySet(); + + @Override + public Collection> handlers() { + return handlers; + } + + public static void handler(OperatorHandler handler) { + handlers.add(handler); + } +} diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/operator/action/OperatorLifecycleAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/operator/action/OperatorLifecycleAction.java new file mode 100644 index 0000000000000..d5df7d37595d6 --- /dev/null +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/operator/action/OperatorLifecycleAction.java @@ -0,0 +1,108 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.ilm.operator.action; + +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.operator.OperatorHandler; +import org.elasticsearch.operator.TransformState; +import org.elasticsearch.xcontent.NamedXContentRegistry; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentParserConfiguration; +import org.elasticsearch.xpack.core.ilm.LifecyclePolicy; +import org.elasticsearch.xpack.core.ilm.action.PutLifecycleAction; +import org.elasticsearch.xpack.core.template.LifecyclePolicyConfig; +import org.elasticsearch.xpack.ilm.action.TransportDeleteLifecycleAction; +import org.elasticsearch.xpack.ilm.action.TransportPutLifecycleAction; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This {@link OperatorHandler} is responsible for CRUD operations on ILM policies in + * operator mode, e.g. file based settings. Internally it uses {@link TransportPutLifecycleAction} and + * {@link TransportDeleteLifecycleAction} to add, update and delete ILM policies. + */ +public class OperatorLifecycleAction implements OperatorHandler { + + private final NamedXContentRegistry xContentRegistry; + private final Client client; + private final XPackLicenseState licenseState; + + public static final String NAME = "ilm"; + + public OperatorLifecycleAction(NamedXContentRegistry xContentRegistry, Client client, XPackLicenseState licenseState) { + this.xContentRegistry = xContentRegistry; + this.client = client; + this.licenseState = licenseState; + } + + @Override + public String name() { + return NAME; + } + + @SuppressWarnings("unchecked") + public Collection prepare(Object input) throws IOException { + List result = new ArrayList<>(); + + Map source = asMap(input); + + for (String name : source.keySet()) { + Map content = (Map) source.get(name); + var config = XContentParserConfiguration.EMPTY.withRegistry(LifecyclePolicyConfig.DEFAULT_X_CONTENT_REGISTRY); + try (XContentParser parser = mapToXContentParser(config, content)) { + LifecyclePolicy policy = LifecyclePolicy.parse(parser, name); + PutLifecycleAction.Request request = new PutLifecycleAction.Request(policy); + validate(request); + result.add(request); + } + } + + return result; + } + + @Override + public TransformState transform(Object source, TransformState prevState) throws Exception { + var requests = prepare(source); + + ClusterState state = prevState.state(); + + for (var request : requests) { + TransportPutLifecycleAction.UpdateLifecyclePolicyTask task = new TransportPutLifecycleAction.UpdateLifecyclePolicyTask( + request, + licenseState, + xContentRegistry, + client + ); + + state = task.execute(state); + } + + Set entities = requests.stream().map(r -> r.getPolicy().getName()).collect(Collectors.toSet()); + + Set toDelete = new HashSet<>(prevState.keys()); + toDelete.removeAll(entities); + + for (var policyToDelete : toDelete) { + TransportDeleteLifecycleAction.DeleteLifecyclePolicyTask task = new TransportDeleteLifecycleAction.DeleteLifecyclePolicyTask( + policyToDelete + ); + state = task.execute(state); + } + + return new TransformState(state, entities); + } +} diff --git a/x-pack/plugin/ilm/src/main/resources/META-INF/services/org.elasticsearch.operator.OperatorHandlerProvider b/x-pack/plugin/ilm/src/main/resources/META-INF/services/org.elasticsearch.operator.OperatorHandlerProvider new file mode 100644 index 0000000000000..55bfd83cbf428 --- /dev/null +++ b/x-pack/plugin/ilm/src/main/resources/META-INF/services/org.elasticsearch.operator.OperatorHandlerProvider @@ -0,0 +1 @@ +org.elasticsearch.xpack.ilm.operator.ILMOperatorHandlerProvider diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/TransportDeleteLifecycleActionTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/TransportDeleteLifecycleActionTests.java new file mode 100644 index 0000000000000..51df10fb7a190 --- /dev/null +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/TransportDeleteLifecycleActionTests.java @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.ilm.action; + +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ilm.action.DeleteLifecycleAction; +import org.elasticsearch.xpack.ilm.operator.action.OperatorLifecycleAction; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.mockito.Mockito.mock; + +public class TransportDeleteLifecycleActionTests extends ESTestCase { + public void testOperatorHandler() { + TransportDeleteLifecycleAction putAction = new TransportDeleteLifecycleAction( + mock(TransportService.class), + mock(ClusterService.class), + mock(ThreadPool.class), + mock(ActionFilters.class), + mock(IndexNameExpressionResolver.class) + ); + assertEquals(OperatorLifecycleAction.NAME, putAction.operatorHandlerName().get()); + + DeleteLifecycleAction.Request request = new DeleteLifecycleAction.Request("my_timeseries_lifecycle2"); + assertThat(putAction.modifiedKeys(request), containsInAnyOrder("my_timeseries_lifecycle2")); + } +} diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleActionTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleActionTests.java index b2202f0e5126f..6731957cb43ab 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleActionTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleActionTests.java @@ -7,13 +7,29 @@ package org.elasticsearch.xpack.ilm.action; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xcontent.NamedXContentRegistry; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentParserConfiguration; +import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xpack.core.ilm.LifecyclePolicy; import org.elasticsearch.xpack.core.ilm.LifecyclePolicyMetadata; +import org.elasticsearch.xpack.core.ilm.action.PutLifecycleAction; import org.elasticsearch.xpack.ilm.LifecyclePolicyTestsUtils; +import org.elasticsearch.xpack.ilm.operator.action.OperatorLifecycleAction; import java.util.Map; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.mockito.Mockito.mock; + public class TransportPutLifecycleActionTests extends ESTestCase { public void testIsNoop() { LifecyclePolicy policy1 = LifecyclePolicyTestsUtils.randomTimeseriesLifecyclePolicy("policy"); @@ -29,4 +45,37 @@ public void testIsNoop() { assertFalse(TransportPutLifecycleAction.isNoopUpdate(existing, policy1, headers2)); assertFalse(TransportPutLifecycleAction.isNoopUpdate(null, policy1, headers1)); } + + public void testOperatorHandler() throws Exception { + TransportPutLifecycleAction putAction = new TransportPutLifecycleAction( + mock(TransportService.class), + mock(ClusterService.class), + mock(ThreadPool.class), + mock(ActionFilters.class), + mock(IndexNameExpressionResolver.class), + mock(NamedXContentRegistry.class), + mock(XPackLicenseState.class), + mock(Client.class) + ); + assertEquals(OperatorLifecycleAction.NAME, putAction.operatorHandlerName().get()); + + String json = """ + { + "policy": { + "phases": { + "warm": { + "min_age": "10s", + "actions": { + } + } + } + } + }"""; + + try (XContentParser parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, json)) { + PutLifecycleAction.Request request = PutLifecycleAction.Request.parseRequest("my_timeseries_lifecycle2", parser); + + assertThat(putAction.modifiedKeys(request), containsInAnyOrder("my_timeseries_lifecycle2")); + } + } } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/operator/OperatorILMControllerTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/operator/OperatorILMControllerTests.java new file mode 100644 index 0000000000000..5472e61e719c1 --- /dev/null +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/action/operator/OperatorILMControllerTests.java @@ -0,0 +1,289 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.ilm.action.operator; + +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.ClusterModule; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.operator.TransformState; +import org.elasticsearch.operator.action.OperatorClusterUpdateSettingsAction; +import org.elasticsearch.operator.service.OperatorClusterStateController; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xcontent.NamedXContentRegistry; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.XContentParseException; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentParserConfiguration; +import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xpack.core.ilm.AllocateAction; +import org.elasticsearch.xpack.core.ilm.DeleteAction; +import org.elasticsearch.xpack.core.ilm.ForceMergeAction; +import org.elasticsearch.xpack.core.ilm.FreezeAction; +import org.elasticsearch.xpack.core.ilm.IndexLifecycleMetadata; +import org.elasticsearch.xpack.core.ilm.LifecycleAction; +import org.elasticsearch.xpack.core.ilm.LifecycleType; +import org.elasticsearch.xpack.core.ilm.MigrateAction; +import org.elasticsearch.xpack.core.ilm.ReadOnlyAction; +import org.elasticsearch.xpack.core.ilm.RolloverAction; +import org.elasticsearch.xpack.core.ilm.RollupILMAction; +import org.elasticsearch.xpack.core.ilm.SearchableSnapshotAction; +import org.elasticsearch.xpack.core.ilm.SetPriorityAction; +import org.elasticsearch.xpack.core.ilm.ShrinkAction; +import org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType; +import org.elasticsearch.xpack.core.ilm.UnfollowAction; +import org.elasticsearch.xpack.core.ilm.WaitForSnapshotAction; +import org.elasticsearch.xpack.ilm.operator.action.OperatorLifecycleAction; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class OperatorILMControllerTests extends ESTestCase { + + protected NamedXContentRegistry xContentRegistry() { + List entries = new ArrayList<>(ClusterModule.getNamedXWriteables()); + entries.addAll( + Arrays.asList( + new NamedXContentRegistry.Entry( + LifecycleType.class, + new ParseField(TimeseriesLifecycleType.TYPE), + (p) -> TimeseriesLifecycleType.INSTANCE + ), + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(AllocateAction.NAME), AllocateAction::parse), + new NamedXContentRegistry.Entry( + LifecycleAction.class, + new ParseField(WaitForSnapshotAction.NAME), + WaitForSnapshotAction::parse + ), + new NamedXContentRegistry.Entry( + LifecycleAction.class, + new ParseField(SearchableSnapshotAction.NAME), + SearchableSnapshotAction::parse + ), + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse), + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ForceMergeAction.NAME), ForceMergeAction::parse), + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ReadOnlyAction.NAME), ReadOnlyAction::parse), + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(RolloverAction.NAME), RolloverAction::parse), + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ShrinkAction.NAME), ShrinkAction::parse), + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(FreezeAction.NAME), FreezeAction::parse), + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse), + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(MigrateAction.NAME), MigrateAction::parse), + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(UnfollowAction.NAME), UnfollowAction::parse), + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(RollupILMAction.NAME), RollupILMAction::parse) + ) + ); + return new NamedXContentRegistry(entries); + } + + private TransformState processJSON(OperatorLifecycleAction action, TransformState prevState, String json) throws Exception { + try (XContentParser parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, json)) { + return action.transform(parser.map(), prevState); + } + } + + public void testValidationFails() { + Client client = mock(Client.class); + when(client.settings()).thenReturn(Settings.EMPTY); + final ClusterName clusterName = new ClusterName("elasticsearch"); + + ClusterState state = ClusterState.builder(clusterName).build(); + OperatorLifecycleAction action = new OperatorLifecycleAction(xContentRegistry(), client, mock(XPackLicenseState.class)); + TransformState prevState = new TransformState(state, Collections.emptySet()); + + String badPolicyJSON = """ + { + "my_timeseries_lifecycle": { + "phase": { + "warm": { + "min_age": "10s", + "actions": { + } + } + } + } + }"""; + + assertEquals( + "[1:2] [lifecycle_policy] unknown field [phase] did you mean [phases]?", + expectThrows(XContentParseException.class, () -> processJSON(action, prevState, badPolicyJSON)).getMessage() + ); + } + + public void testActionAddRemove() throws Exception { + Client client = mock(Client.class); + when(client.settings()).thenReturn(Settings.EMPTY); + final ClusterName clusterName = new ClusterName("elasticsearch"); + + ClusterState state = ClusterState.builder(clusterName).build(); + + OperatorLifecycleAction action = new OperatorLifecycleAction(xContentRegistry(), client, mock(XPackLicenseState.class)); + + String emptyJSON = ""; + + TransformState prevState = new TransformState(state, Collections.emptySet()); + + TransformState updatedState = processJSON(action, prevState, emptyJSON); + assertEquals(0, updatedState.keys().size()); + assertEquals(prevState.state(), updatedState.state()); + + String twoPoliciesJSON = """ + { + "my_timeseries_lifecycle": { + "phases": { + "warm": { + "min_age": "10s", + "actions": { + } + } + } + }, + "my_timeseries_lifecycle1": { + "phases": { + "warm": { + "min_age": "10s", + "actions": { + } + }, + "delete": { + "min_age": "30s", + "actions": { + } + } + } + } + }"""; + + prevState = updatedState; + updatedState = processJSON(action, prevState, twoPoliciesJSON); + assertThat(updatedState.keys(), containsInAnyOrder("my_timeseries_lifecycle", "my_timeseries_lifecycle1")); + IndexLifecycleMetadata ilmMetadata = updatedState.state() + .metadata() + .custom(IndexLifecycleMetadata.TYPE, IndexLifecycleMetadata.EMPTY); + assertThat(ilmMetadata.getPolicyMetadatas().keySet(), containsInAnyOrder("my_timeseries_lifecycle", "my_timeseries_lifecycle1")); + + String onePolicyRemovedJSON = """ + { + "my_timeseries_lifecycle": { + "phases": { + "warm": { + "min_age": "10s", + "actions": { + } + } + } + } + }"""; + + prevState = updatedState; + updatedState = processJSON(action, prevState, onePolicyRemovedJSON); + assertThat(updatedState.keys(), containsInAnyOrder("my_timeseries_lifecycle")); + ilmMetadata = updatedState.state().metadata().custom(IndexLifecycleMetadata.TYPE, IndexLifecycleMetadata.EMPTY); + assertThat(ilmMetadata.getPolicyMetadatas().keySet(), containsInAnyOrder("my_timeseries_lifecycle")); + + String onePolicyRenamedJSON = """ + { + "my_timeseries_lifecycle2": { + "phases": { + "warm": { + "min_age": "10s", + "actions": { + } + } + } + } + }"""; + + prevState = updatedState; + updatedState = processJSON(action, prevState, onePolicyRenamedJSON); + assertThat(updatedState.keys(), containsInAnyOrder("my_timeseries_lifecycle2")); + ilmMetadata = updatedState.state().metadata().custom(IndexLifecycleMetadata.TYPE, IndexLifecycleMetadata.EMPTY); + assertThat(ilmMetadata.getPolicyMetadatas().keySet(), containsInAnyOrder("my_timeseries_lifecycle2")); + } + + public void testOperatorController() throws IOException { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterService clusterService = mock(ClusterService.class); + final ClusterName clusterName = new ClusterName("elasticsearch"); + + ClusterState state = ClusterState.builder(clusterName).build(); + when(clusterService.state()).thenReturn(state); + + OperatorClusterStateController controller = new OperatorClusterStateController(clusterService); + controller.initHandlers(List.of(new OperatorClusterUpdateSettingsAction(clusterSettings))); + + String testJSON = """ + { + "metadata": { + "version": "1234", + "compatibility": "8.4.0" + }, + "state": { + "cluster_settings": { + "indices.recovery.max_bytes_per_sec": "50mb" + }, + "ilm": { + "my_timeseries_lifecycle": { + "phases": { + "warm": { + "min_age": "10s", + "actions": { + } + }, + "delete": { + "min_age": "30s", + "actions": { + } + } + } + } + } + } + }"""; + + AtomicReference x = new AtomicReference<>(); + + try (XContentParser parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON)) { + controller.process("operator", parser, (e) -> x.set(e)); + + assertTrue(x.get() instanceof IllegalStateException); + assertEquals("Error processing state change request for operator", x.get().getMessage()); + } + + Client client = mock(Client.class); + when(client.settings()).thenReturn(Settings.EMPTY); + + XPackLicenseState licenseState = mock(XPackLicenseState.class); + + controller.initHandlers( + List.of( + new OperatorClusterUpdateSettingsAction(clusterSettings), + new OperatorLifecycleAction(xContentRegistry(), client, licenseState) + ) + ); + + try (XContentParser parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, testJSON)) { + controller.process("operator", parser, (e) -> { + if (e != null) { + fail("Should not fail"); + } + }); + } + } +} diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java index 65d4267f541ec..604a521da9a8e 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java @@ -778,6 +778,7 @@ public void testSecurityRestHandlerInterceptorCanBeInstalled() throws IllegalAcc null, null, usageService, + null, null ); actionModule.initRestHandlers(null);