diff --git a/build.gradle b/build.gradle index f0e28409331b0..4246e0cdf515a 100644 --- a/build.gradle +++ b/build.gradle @@ -190,9 +190,9 @@ tasks.register("verifyVersions") { * after the backport of the backcompat code is complete. */ -boolean bwc_tests_enabled = true +boolean bwc_tests_enabled = false // place a PR link here when committing bwc changes: -String bwc_tests_disabled_issue = "" +String bwc_tests_disabled_issue = "https://github.com/elastic/elasticsearch/pull/71667" /* * FIPS 140-2 behavior was fixed in 7.11.0. Before that there is no way to run elasticsearch in a * JVM that is properly configured to be in fips mode with BCFIPS. For now we need to disable diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/indices/DataStream.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/indices/DataStream.java index 32f62aeeafa0b..db27600a5cc9c 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/indices/DataStream.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/indices/DataStream.java @@ -26,6 +26,7 @@ public final class DataStream { private final List indices; private final long generation; private final boolean hidden; + private final boolean system; ClusterHealthStatus dataStreamStatus; @Nullable String indexTemplate; @@ -36,7 +37,7 @@ public final class DataStream { public DataStream(String name, String timeStampField, List indices, long generation, ClusterHealthStatus dataStreamStatus, @Nullable String indexTemplate, @Nullable String ilmPolicyName, @Nullable Map metadata, - boolean hidden) { + boolean hidden, boolean system) { this.name = name; this.timeStampField = timeStampField; this.indices = indices; @@ -46,6 +47,7 @@ public DataStream(String name, String timeStampField, List indices, long this.ilmPolicyName = ilmPolicyName; this.metadata = metadata; this.hidden = hidden; + this.system = system; } public String getName() { @@ -84,6 +86,10 @@ public boolean isHidden() { return hidden; } + public boolean isSystem() { + return system; + } + public static final ParseField NAME_FIELD = new ParseField("name"); public static final ParseField TIMESTAMP_FIELD_FIELD = new ParseField("timestamp_field"); public static final ParseField INDICES_FIELD = new ParseField("indices"); @@ -93,6 +99,7 @@ public boolean isHidden() { public static final ParseField ILM_POLICY_FIELD = new ParseField("ilm_policy"); public static final ParseField METADATA_FIELD = new ParseField("_meta"); public static final ParseField HIDDEN_FIELD = new ParseField("hidden"); + public static final ParseField SYSTEM_FIELD = new ParseField("system"); @SuppressWarnings("unchecked") private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("data_stream", @@ -107,9 +114,10 @@ public boolean isHidden() { String indexTemplate = (String) args[5]; String ilmPolicy = (String) args[6]; Map metadata = (Map) args[7]; - Boolean hidden = (Boolean) args[8]; - hidden = hidden != null && hidden; - return new DataStream(dataStreamName, timeStampField, indices, generation, status, indexTemplate, ilmPolicy, metadata, hidden); + boolean hidden = args[8] != null && (boolean) args[8]; + boolean system = args[9] != null && (boolean) args[9]; + return new DataStream(dataStreamName, timeStampField, indices, generation, status, indexTemplate, ilmPolicy, metadata, hidden, + system); }); static { @@ -122,6 +130,7 @@ public boolean isHidden() { PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), ILM_POLICY_FIELD); PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.map(), METADATA_FIELD); PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), HIDDEN_FIELD); + PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), SYSTEM_FIELD); } public static DataStream fromXContent(XContentParser parser) throws IOException { @@ -138,6 +147,8 @@ public boolean equals(Object o) { timeStampField.equals(that.timeStampField) && indices.equals(that.indices) && dataStreamStatus == that.dataStreamStatus && + hidden == that.hidden && + system == that.system && Objects.equals(indexTemplate, that.indexTemplate) && Objects.equals(ilmPolicyName, that.ilmPolicyName) && Objects.equals(metadata, that.metadata); @@ -145,6 +156,7 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(name, timeStampField, indices, generation, dataStreamStatus, indexTemplate, ilmPolicyName, metadata); + return Objects.hash(name, timeStampField, indices, generation, dataStreamStatus, indexTemplate, ilmPolicyName, metadata, hidden, + system); } } diff --git a/docs/reference/data-streams/change-mappings-and-settings.asciidoc b/docs/reference/data-streams/change-mappings-and-settings.asciidoc index 7f596be6edfb6..0136b9d224be8 100644 --- a/docs/reference/data-streams/change-mappings-and-settings.asciidoc +++ b/docs/reference/data-streams/change-mappings-and-settings.asciidoc @@ -572,7 +572,8 @@ stream's oldest backing index. "generation": 2, "status": "GREEN", "template": "my-data-stream-template", - "hidden": false + "hidden": false, + "system": false } ] } diff --git a/docs/reference/indices/get-data-stream.asciidoc b/docs/reference/indices/get-data-stream.asciidoc index 3038c96ed50e7..2e71396e1838e 100644 --- a/docs/reference/indices/get-data-stream.asciidoc +++ b/docs/reference/indices/get-data-stream.asciidoc @@ -203,6 +203,11 @@ use the <>. `hidden`:: (Boolean) If `true`, the data stream is <>. + +`system`:: +(Boolean) +If `true`, the data stream is created and managed by an Elastic stack component +and cannot be modified through normal user interaction. ==== [[get-data-stream-api-example]] @@ -241,7 +246,8 @@ The API returns the following response: "status": "GREEN", "template": "my-index-template", "ilm_policy": "my-lifecycle-policy", - "hidden": false + "hidden": false, + "system": false }, { "name": "my-data-stream-two", @@ -261,7 +267,8 @@ The API returns the following response: "status": "YELLOW", "template": "my-index-template", "ilm_policy": "my-lifecycle-policy", - "hidden": false + "hidden": false, + "system": false } ] } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/alias/get/TransportGetAliasesAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/alias/get/TransportGetAliasesAction.java index dc4203392160a..d1c284274eb5c 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/alias/get/TransportGetAliasesAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/alias/get/TransportGetAliasesAction.java @@ -16,7 +16,6 @@ import org.elasticsearch.cluster.metadata.AliasMetadata; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.SystemIndexAccessLevel; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.inject.Inject; @@ -24,6 +23,7 @@ import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.indices.SystemIndices; +import org.elasticsearch.indices.SystemIndices.SystemIndexAccessLevel; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -66,11 +66,9 @@ protected void masterOperation(Task task, GetAliasesRequest request, ClusterStat concreteIndices = indexNameExpressionResolver.concreteIndexNames(state, request); } final SystemIndexAccessLevel systemIndexAccessLevel = indexNameExpressionResolver.getSystemIndexAccessLevel(); - final String elasticProduct = - threadPool.getThreadContext().getHeader(IndexNameExpressionResolver.EXTERNAL_SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY); ImmutableOpenMap> aliases = state.metadata().findAliases(request, concreteIndices); listener.onResponse(new GetAliasesResponse(postProcess(request, concreteIndices, aliases, state, - systemIndexAccessLevel, elasticProduct, systemIndices))); + systemIndexAccessLevel, threadPool.getThreadContext(), systemIndices))); } /** @@ -79,7 +77,7 @@ protected void masterOperation(Task task, GetAliasesRequest request, ClusterStat static ImmutableOpenMap> postProcess(GetAliasesRequest request, String[] concreteIndices, ImmutableOpenMap> aliases, ClusterState state, SystemIndexAccessLevel systemIndexAccessLevel, - String elasticProduct, SystemIndices systemIndices) { + ThreadContext threadContext, SystemIndices systemIndices) { boolean noAliasesSpecified = request.getOriginalAliases() == null || request.getOriginalAliases().length == 0; ImmutableOpenMap.Builder> mapBuilder = ImmutableOpenMap.builder(aliases); for (String index : concreteIndices) { @@ -90,19 +88,19 @@ static ImmutableOpenMap> postProcess(GetAliasesReque } final ImmutableOpenMap> finalResponse = mapBuilder.build(); if (systemIndexAccessLevel != SystemIndexAccessLevel.ALL) { - checkSystemIndexAccess(request, systemIndices, state, finalResponse, systemIndexAccessLevel, elasticProduct); + checkSystemIndexAccess(request, systemIndices, state, finalResponse, systemIndexAccessLevel, threadContext); } return finalResponse; } private static void checkSystemIndexAccess(GetAliasesRequest request, SystemIndices systemIndices, ClusterState state, ImmutableOpenMap> aliasesMap, - SystemIndexAccessLevel systemIndexAccessLevel, String elasticProduct) { + SystemIndexAccessLevel systemIndexAccessLevel, ThreadContext threadContext) { final Predicate systemIndexAccessAllowPredicate; if (systemIndexAccessLevel == SystemIndexAccessLevel.NONE) { systemIndexAccessAllowPredicate = indexMetadata -> false; } else if (systemIndexAccessLevel == SystemIndexAccessLevel.RESTRICTED) { - systemIndexAccessAllowPredicate = systemIndices.getProductSystemIndexMetadataPredicate(elasticProduct); + systemIndexAccessAllowPredicate = systemIndices.getProductSystemIndexMetadataPredicate(threadContext); } else { throw new IllegalArgumentException("Unexpected system index access level: " + systemIndexAccessLevel); } @@ -122,23 +120,23 @@ private static void checkSystemIndexAccess(GetAliasesRequest request, SystemIndi "this request accesses system indices: {}, but in a future major version, direct access to system " + "indices will be prevented by default", systemIndicesNames); } else { - checkSystemAliasAccess(request, systemIndices, systemIndexAccessLevel, elasticProduct); + checkSystemAliasAccess(request, systemIndices, systemIndexAccessLevel, threadContext); } } private static void checkSystemAliasAccess(GetAliasesRequest request, SystemIndices systemIndices, - SystemIndexAccessLevel systemIndexAccessLevel, String elasticProduct) { + SystemIndexAccessLevel systemIndexAccessLevel, ThreadContext threadContext) { final Predicate systemIndexAccessAllowPredicate; if (systemIndexAccessLevel == SystemIndexAccessLevel.NONE) { systemIndexAccessAllowPredicate = name -> true; } else if (systemIndexAccessLevel == SystemIndexAccessLevel.RESTRICTED) { - systemIndexAccessAllowPredicate = systemIndices.getProductSystemIndexNamePredicate(elasticProduct).negate(); + systemIndexAccessAllowPredicate = systemIndices.getProductSystemIndexNamePredicate(threadContext).negate(); } else { throw new IllegalArgumentException("Unexpected system index access level: " + systemIndexAccessLevel); } final List systemAliases = Arrays.stream(request.aliases()) - .filter(systemIndices::isSystemIndex) + .filter(systemIndices::isSystemName) .filter(systemIndexAccessAllowPredicate) .collect(Collectors.toList()); if (systemAliases.isEmpty() == false) { diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java index ead911c31df6d..e358327ee6a7e 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.indices.SystemDataStreamDescriptor; import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.tasks.Task; @@ -111,11 +112,17 @@ protected void masterOperation(Task task, @Override public ClusterState execute(ClusterState currentState) throws Exception { + final SystemDataStreamDescriptor dataStreamDescriptor = + systemIndices.validateDataStreamAccess(request.index(), threadPool.getThreadContext()); + final boolean isSystemDataStream = dataStreamDescriptor != null; + final boolean isSystemIndex = isSystemDataStream == false && systemIndices.isSystemIndex(request.index()); final ComposableIndexTemplate template = resolveTemplate(request, currentState.metadata()); + final boolean isDataStream = isSystemIndex == false && + (isSystemDataStream || (template != null && template.getDataStreamTemplate() != null)); - if (template != null && template.getDataStreamTemplate() != null) { + if (isDataStream) { // This expression only evaluates to true when the argument is non-null and false - if (Boolean.FALSE.equals(template.getAllowAutoCreate())) { + if (isSystemDataStream == false && Boolean.FALSE.equals(template.getAllowAutoCreate())) { throw new IndexNotFoundException( "composable template " + template.indexPatterns() + " forbids index auto creation" ); @@ -123,6 +130,7 @@ public ClusterState execute(ClusterState currentState) throws Exception { CreateDataStreamClusterStateUpdateRequest createRequest = new CreateDataStreamClusterStateUpdateRequest( request.index(), + dataStreamDescriptor, request.masterNodeTimeout(), request.timeout() ); @@ -132,21 +140,27 @@ public ClusterState execute(ClusterState currentState) throws Exception { } else { String indexName = indexNameExpressionResolver.resolveDateMathExpression(request.index()); indexNameRef.set(indexName); + if (isSystemIndex) { + if (indexName.equals(request.index()) == false) { + throw new IllegalStateException("system indices do not support date math expressions"); + } + } else { + // This will throw an exception if the index does not exist and creating it is prohibited + final boolean shouldAutoCreate = autoCreateIndex.shouldAutoCreate(indexName, currentState); - // This will throw an exception if the index does not exist and creating it is prohibited - final boolean shouldAutoCreate = autoCreateIndex.shouldAutoCreate(indexName, currentState); - - if (shouldAutoCreate == false) { - // The index already exists. - return currentState; + if (shouldAutoCreate == false) { + // The index already exists. + return currentState; + } } - final SystemIndexDescriptor mainDescriptor = systemIndices.findMatchingDescriptor(indexName); - final boolean isSystemIndex = mainDescriptor != null && mainDescriptor.isAutomaticallyManaged(); + final SystemIndexDescriptor mainDescriptor = + isSystemIndex ? systemIndices.findMatchingDescriptor(indexName) : null; + final boolean isManagedSystemIndex = mainDescriptor != null && mainDescriptor.isAutomaticallyManaged(); final CreateIndexClusterStateUpdateRequest updateRequest; - if (isSystemIndex) { + if (isManagedSystemIndex) { final SystemIndexDescriptor descriptor = mainDescriptor.getDescriptorCompatibleWith(state.nodes().getSmallestNonClientNodeVersion()); if (descriptor == null) { diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java index 898c7b0b83b7b..f294929706fa2 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; +import org.elasticsearch.indices.SystemDataStreamDescriptor; import java.util.HashSet; import java.util.Set; @@ -33,6 +34,7 @@ public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequ private Index recoverFrom; private ResizeType resizeType; private boolean copySettings; + private SystemDataStreamDescriptor systemDataStreamDescriptor; private Settings settings = Settings.Builder.EMPTY_SETTINGS; @@ -93,6 +95,11 @@ public CreateIndexClusterStateUpdateRequest nameResolvedInstant(long nameResolve return this; } + public CreateIndexClusterStateUpdateRequest systemDataStreamDescriptor(SystemDataStreamDescriptor systemDataStreamDescriptor) { + this.systemDataStreamDescriptor = systemDataStreamDescriptor; + return this; + } + public String cause() { return cause; } @@ -121,6 +128,10 @@ public Index recoverFrom() { return recoverFrom; } + public SystemDataStreamDescriptor systemDataStreamDescriptor() { + return systemDataStreamDescriptor; + } + /** * The name that was provided by the user. This might contain a date math expression. * @see IndexMetadata#SETTING_INDEX_PROVIDED_NAME @@ -176,6 +187,7 @@ public String toString() { ", aliases=" + aliases + ", blocks=" + blocks + ", waitForActiveShards=" + waitForActiveShards + + ", systemDataStreamDescriptor=" + systemDataStreamDescriptor + '}'; } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java index 2326c218e3991..63e5a35ce01ae 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java @@ -27,6 +27,8 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.indices.SystemDataStreamDescriptor; +import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.snapshots.SnapshotInProgressException; import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.threadpool.ThreadPool; @@ -55,14 +57,17 @@ public class MetadataRolloverService { private final MetadataCreateIndexService createIndexService; private final MetadataIndexAliasesService indexAliasesService; private final IndexNameExpressionResolver indexNameExpressionResolver; + private final SystemIndices systemIndices; + @Inject public MetadataRolloverService(ThreadPool threadPool, MetadataCreateIndexService createIndexService, MetadataIndexAliasesService indexAliasesService, - IndexNameExpressionResolver indexNameExpressionResolver) { + IndexNameExpressionResolver indexNameExpressionResolver, SystemIndices systemIndices) { this.threadPool = threadPool; this.createIndexService = createIndexService; this.indexAliasesService = indexAliasesService; this.indexNameExpressionResolver = indexNameExpressionResolver; + this.systemIndices = systemIndices; } public static class RolloverResult { @@ -209,7 +214,16 @@ private RolloverResult rolloverDataStream(ClusterState currentState, IndexAbstra ); } - lookupTemplateForDataStream(dataStreamName, currentState.metadata()); + final SystemDataStreamDescriptor systemDataStreamDescriptor; + if (dataStream.isSystem() == false) { + systemDataStreamDescriptor = null; + lookupTemplateForDataStream(dataStreamName, currentState.metadata()); + } else { + systemDataStreamDescriptor = systemIndices.findMatchingDataStreamDescriptor(dataStreamName); + if (systemDataStreamDescriptor == null) { + throw new IllegalArgumentException("no system data stream descriptor found for data stream [" + dataStreamName + "]"); + } + } final DataStream ds = dataStream.getDataStream(); final IndexMetadata originalWriteIndex = dataStream.getWriteIndex(); @@ -219,8 +233,12 @@ private RolloverResult rolloverDataStream(ClusterState currentState, IndexAbstra return new RolloverResult(rolledDataStream.getWriteIndex().getName(), originalWriteIndex.getIndex().getName(), currentState); } - CreateIndexClusterStateUpdateRequest createIndexClusterStateRequest = - prepareDataStreamCreateIndexRequest(dataStreamName, rolledDataStream.getWriteIndex().getName(), createIndexRequest); + CreateIndexClusterStateUpdateRequest createIndexClusterStateRequest = prepareDataStreamCreateIndexRequest( + dataStreamName, + rolledDataStream.getWriteIndex().getName(), + createIndexRequest, + systemDataStreamDescriptor + ); ClusterState newState = createIndexService.applyCreateIndexRequest(currentState, createIndexClusterStateRequest, silent, (builder, indexMetadata) -> builder.put(ds.rollover(currentState.metadata(), indexMetadata.getIndexUUID()))); @@ -252,10 +270,12 @@ static String generateRolloverIndexName(String sourceIndexName, IndexNameExpress static CreateIndexClusterStateUpdateRequest prepareDataStreamCreateIndexRequest(final String dataStreamName, final String targetIndexName, - CreateIndexRequest createIndexRequest) { - Settings settings = Settings.builder().put("index.hidden", true).build(); + CreateIndexRequest createIndexRequest, + final SystemDataStreamDescriptor descriptor) { + Settings settings = descriptor != null ? Settings.EMPTY : Settings.builder().put("index.hidden", true).build(); return prepareCreateIndexRequest(targetIndexName, targetIndexName, "rollover_data_stream", createIndexRequest, settings) - .dataStreamName(dataStreamName); + .dataStreamName(dataStreamName) + .systemDataStreamDescriptor(descriptor); } static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest( diff --git a/server/src/main/java/org/elasticsearch/action/support/AutoCreateIndex.java b/server/src/main/java/org/elasticsearch/action/support/AutoCreateIndex.java index 6dc49eb76f03e..3a79ac54d9a25 100644 --- a/server/src/main/java/org/elasticsearch/action/support/AutoCreateIndex.java +++ b/server/src/main/java/org/elasticsearch/action/support/AutoCreateIndex.java @@ -59,7 +59,7 @@ public boolean shouldAutoCreate(String index, ClusterState state) { } // Always auto-create system indexes - if (systemIndices.isSystemIndex(index)) { + if (systemIndices.isSystemName(index)) { return true; } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index 80b52d3a26ff3..c3bb68607b2c8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -7,6 +7,7 @@ */ package org.elasticsearch.cluster.metadata; +import org.elasticsearch.Version; import org.elasticsearch.cluster.AbstractDiffable; import org.elasticsearch.cluster.Diff; import org.elasticsearch.common.Nullable; @@ -45,19 +46,25 @@ public final class DataStream extends AbstractDiffable implements To private final Map metadata; private final boolean hidden; private final boolean replicated; + private final boolean system; public DataStream(String name, TimestampField timeStampField, List indices, long generation, Map metadata) { - this(name, timeStampField, indices, generation, metadata, false, false); + this(name, timeStampField, indices, generation, metadata, false, false, false); } public DataStream(String name, TimestampField timeStampField, List indices, long generation, Map metadata, boolean hidden, boolean replicated) { - this(name, timeStampField, indices, generation, metadata, hidden, replicated, System::currentTimeMillis); + this(name, timeStampField, indices, generation, metadata, hidden, replicated, false, System::currentTimeMillis); + } + + public DataStream(String name, TimestampField timeStampField, List indices, long generation, Map metadata, + boolean hidden, boolean replicated, boolean system) { + this(name, timeStampField, indices, generation, metadata, hidden, replicated, system, System::currentTimeMillis); } // visible for testing DataStream(String name, TimestampField timeStampField, List indices, long generation, Map metadata, - boolean hidden, boolean replicated, LongSupplier timeProvider) { + boolean hidden, boolean replicated, boolean system, LongSupplier timeProvider) { this.name = name; this.timeStampField = timeStampField; this.indices = Collections.unmodifiableList(indices); @@ -66,6 +73,7 @@ public DataStream(String name, TimestampField timeStampField, List indice this.hidden = hidden; this.replicated = replicated; this.timeProvider = timeProvider; + this.system = system; assert indices.size() > 0; } @@ -112,6 +120,10 @@ public boolean isReplicated() { return replicated; } + public boolean isSystem() { + return system; + } + /** * Performs a rollover on a {@code DataStream} instance and returns a new instance containing * the updated list of backing indices and incremented generation. @@ -135,7 +147,7 @@ public DataStream rollover(Metadata clusterMetadata, String writeIndexUuid) { newWriteIndexName = DataStream.getDefaultBackingIndexName(getName(), ++generation, currentTimeMillis); } while (clusterMetadata.getIndicesLookup().containsKey(newWriteIndexName)); backingIndices.add(new Index(newWriteIndexName, writeIndexUuid)); - return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden, replicated); + return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden, replicated, system); } /** @@ -149,7 +161,7 @@ public DataStream removeBackingIndex(Index index) { List backingIndices = new ArrayList<>(indices); backingIndices.remove(index); assert backingIndices.size() == indices.size() - 1; - return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden, replicated); + return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden, replicated, system); } /** @@ -174,11 +186,11 @@ public DataStream replaceBackingIndex(Index existingBackingIndex, Index newBacki "it is the write index", existingBackingIndex.getName(), name)); } backingIndices.set(backingIndexPosition, newBackingIndex); - return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden, replicated); + return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden, replicated, system); } public DataStream promoteDataStream() { - return new DataStream(name, timeStampField, indices, getGeneration(), metadata, hidden, false, timeProvider); + return new DataStream(name, timeStampField, indices, getGeneration(), metadata, hidden, false, system, timeProvider); } /** @@ -208,7 +220,8 @@ public DataStream snapshot(Collection indicesInSnapshot) { generation, metadata == null ? null : new HashMap<>(metadata), hidden, - replicated + replicated, + system ); } @@ -240,7 +253,9 @@ public static String getDefaultBackingIndexName(String dataStreamName, long gene public DataStream(StreamInput in) throws IOException { this(in.readString(), new TimestampField(in), in.readList(Index::new), in.readVLong(), - in.readMap(), in.readBoolean(), in.readBoolean()); + in.readMap(), in.readBoolean(), in.readBoolean(), + in.getVersion().onOrAfter(Version.V_7_13_0) && in.readBoolean() + ); } public static Diff readDiffFrom(StreamInput in) throws IOException { @@ -256,6 +271,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeMap(metadata); out.writeBoolean(hidden); out.writeBoolean(replicated); + if (out.getVersion().onOrAfter(Version.V_7_13_0)) { + out.writeBoolean(system); + } } public static final ParseField NAME_FIELD = new ParseField("name"); @@ -265,11 +283,13 @@ public void writeTo(StreamOutput out) throws IOException { public static final ParseField METADATA_FIELD = new ParseField("_meta"); public static final ParseField HIDDEN_FIELD = new ParseField("hidden"); public static final ParseField REPLICATED_FIELD = new ParseField("replicated"); + public static final ParseField SYSTEM_FIELD = new ParseField("system"); @SuppressWarnings("unchecked") private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("data_stream", args -> new DataStream((String) args[0], (TimestampField) args[1], (List) args[2], (Long) args[3], - (Map) args[4], args[5] != null && (boolean) args[5], args[6] != null && (boolean) args[6])); + (Map) args[4], args[5] != null && (boolean) args[5], args[6] != null && (boolean) args[6], + args[7] != null && (boolean) args[7])); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME_FIELD); @@ -279,6 +299,7 @@ public void writeTo(StreamOutput out) throws IOException { PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.map(), METADATA_FIELD); PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), HIDDEN_FIELD); PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), REPLICATED_FIELD); + PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), SYSTEM_FIELD); } public static DataStream fromXContent(XContentParser parser) throws IOException { @@ -297,6 +318,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } builder.field(HIDDEN_FIELD.getPreferredName(), hidden); builder.field(REPLICATED_FIELD.getPreferredName(), replicated); + builder.field(SYSTEM_FIELD.getPreferredName(), system); builder.endObject(); return builder; } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java index 845068a007c4d..f285191854a56 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java @@ -317,8 +317,7 @@ public boolean isHidden() { @Override public boolean isSystem() { - // No such thing as system data streams (yet) - return false; + return dataStream.isSystem(); } public org.elasticsearch.cluster.metadata.DataStream getDataStream() { diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java index badb07f10ada1..032013732a8c3 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java @@ -13,7 +13,7 @@ import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.common.Booleans; +import org.elasticsearch.cluster.metadata.IndexAbstraction.Type; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.ImmutableOpenMap; @@ -31,6 +31,7 @@ import org.elasticsearch.indices.IndexClosedException; import org.elasticsearch.indices.InvalidIndexNameException; import org.elasticsearch.indices.SystemIndices; +import org.elasticsearch.indices.SystemIndices.SystemIndexAccessLevel; import java.time.Instant; import java.time.ZoneId; @@ -57,8 +58,6 @@ public class IndexNameExpressionResolver { private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(IndexNameExpressionResolver.class); public static final String EXCLUDED_DATA_STREAMS_KEY = "es.excluded_ds"; - public static final String SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY = "_system_index_access_allowed"; - public static final String EXTERNAL_SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY = "_external_system_index_access_origin"; public static final Version SYSTEM_INDEX_ENFORCEMENT_VERSION = Version.V_8_0_0; private final DateMathExpressionResolver dateMathExpressionResolver = new DateMathExpressionResolver(); @@ -79,7 +78,7 @@ public IndexNameExpressionResolver(ThreadContext threadContext, SystemIndices sy */ public String[] concreteIndexNames(ClusterState state, IndicesRequest request) { Context context = new Context(state, request.indicesOptions(), false, false, request.includeDataStreams(), - getSystemIndexAccessLevel()); + getSystemIndexAccessPredicate()); return concreteIndexNames(context, request.indices()); } @@ -88,7 +87,7 @@ public String[] concreteIndexNames(ClusterState state, IndicesRequest request) { */ public String[] concreteIndexNamesWithSystemIndexAccess(ClusterState state, IndicesRequest request) { Context context = new Context(state, request.indicesOptions(), false, false, request.includeDataStreams(), - SystemIndexAccessLevel.ALL); + name -> true); return concreteIndexNames(context, request.indices()); } @@ -98,7 +97,7 @@ public String[] concreteIndexNamesWithSystemIndexAccess(ClusterState state, Indi */ public Index[] concreteIndices(ClusterState state, IndicesRequest request) { Context context = new Context(state, request.indicesOptions(), false, false, request.includeDataStreams(), - getSystemIndexAccessLevel()); + getSystemIndexAccessPredicate()); return concreteIndices(context, request.indices()); } @@ -116,28 +115,27 @@ public Index[] concreteIndices(ClusterState state, IndicesRequest request) { * indices options in the context don't allow such a case. */ public String[] concreteIndexNames(ClusterState state, IndicesOptions options, String... indexExpressions) { - Context context = new Context(state, options, getSystemIndexAccessLevel()); + Context context = new Context(state, options, getSystemIndexAccessPredicate()); return concreteIndexNames(context, indexExpressions); } public String[] concreteIndexNames(ClusterState state, IndicesOptions options, boolean includeDataStreams, String... indexExpressions) { - Context context = new Context(state, options, false, false, includeDataStreams, getSystemIndexAccessLevel()); + Context context = new Context(state, options, false, false, includeDataStreams, getSystemIndexAccessPredicate()); return concreteIndexNames(context, indexExpressions); } public String[] concreteIndexNames(ClusterState state, IndicesOptions options, IndicesRequest request) { - Context context = new Context(state, options, false, false, request.includeDataStreams(), getSystemIndexAccessLevel()); + Context context = new Context(state, options, false, false, request.includeDataStreams(), getSystemIndexAccessPredicate()); return concreteIndexNames(context, request.indices()); } public String[] concreteIndexNamesWithSystemIndexAccess(ClusterState state, IndicesOptions options, String... indexExpressions) { - Context context = new Context(state, options, SystemIndexAccessLevel.ALL); + Context context = new Context(state, options, name -> true); return concreteIndexNames(context, indexExpressions); } public List dataStreamNames(ClusterState state, IndicesOptions options, String... indexExpressions) { - // Allow system index access - they'll be filtered out below as there's no such thing (yet) as system data streams - Context context = new Context(state, options, false, false, true, true, SystemIndexAccessLevel.ALL); + Context context = new Context(state, options, false, false, true, true, getSystemIndexAccessPredicate()); if (indexExpressions == null || indexExpressions.length == 0) { indexExpressions = new String[]{"*"}; } @@ -170,7 +168,7 @@ public Index[] concreteIndices(ClusterState state, IndicesOptions options, Strin public Index[] concreteIndices(ClusterState state, IndicesOptions options, boolean includeDataStreams, String... indexExpressions) { Context context = new Context(state, options, false, false, includeDataStreams, - getSystemIndexAccessLevel()); + getSystemIndexAccessPredicate()); return concreteIndices(context, indexExpressions); } @@ -188,7 +186,7 @@ public Index[] concreteIndices(ClusterState state, IndicesOptions options, boole */ public Index[] concreteIndices(ClusterState state, IndicesRequest request, long startTime) { Context context = new Context(state, request.indicesOptions(), startTime, false, false, request.includeDataStreams(), false, - getSystemIndexAccessLevel()); + getSystemIndexAccessPredicate()); return concreteIndices(context, request.indices()); } @@ -315,31 +313,38 @@ Index[] concreteIndices(Context context, String... indexExpressions) { } private void checkSystemIndexAccess(Context context, Metadata metadata, Set concreteIndices, String[] originalPatterns) { - final SystemIndexAccessLevel systemIndexAccessLevel = context.getSystemIndexAccessLevel(); - if (systemIndexAccessLevel != SystemIndexAccessLevel.ALL) { - final Predicate systemIndexAccessLevelPredicate; - if (systemIndexAccessLevel == SystemIndexAccessLevel.NONE) { - // everything should be included in the deprecation message - systemIndexAccessLevelPredicate = indexMetadata -> true; + final Predicate systemIndexAccessPredicate = context.getSystemIndexAccessPredicate().negate(); + final List systemIndicesThatShouldNotBeAccessed = concreteIndices.stream() + .map(metadata::index) + .filter(IndexMetadata::isSystem) + .filter(idxMetadata -> systemIndexAccessPredicate.test(idxMetadata.getIndex().getName())) + .collect(Collectors.toList()); + + if (systemIndicesThatShouldNotBeAccessed.isEmpty()) { + return; + } + + final List resolvedSystemIndices = new ArrayList<>(); + final Set resolvedSystemDataStreams = new HashSet<>(); + final SortedMap indicesLookup = metadata.getIndicesLookup(); + for (IndexMetadata idxMetadata : systemIndicesThatShouldNotBeAccessed) { + IndexAbstraction abstraction = indicesLookup.get(idxMetadata.getIndex().getName()); + if (abstraction.getParentDataStream() != null) { + resolvedSystemDataStreams.add(abstraction.getParentDataStream().getName()); } else { - // everything other than allowed should be included in the deprecation message - systemIndexAccessLevelPredicate = systemIndices - .getProductSystemIndexMetadataPredicate(threadContext.getHeader(EXTERNAL_SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY)) - .negate(); - } - final List resolvedSystemIndices = concreteIndices.stream() - .map(metadata::index) - .filter(IndexMetadata::isSystem) - .filter(systemIndexAccessLevelPredicate) - .map(i -> i.getIndex().getName()) - .sorted() // reliable order for testing - .collect(Collectors.toList()); - if (resolvedSystemIndices.isEmpty() == false) { - deprecationLogger.deprecate(DeprecationCategory.API, "open_system_index_access", - "this request accesses system indices: {}, but in a future major version, direct access to system " + - "indices will be prevented by default", resolvedSystemIndices); + resolvedSystemIndices.add(idxMetadata.getIndex().getName()); } } + + if (resolvedSystemIndices.isEmpty() == false) { + Collections.sort(resolvedSystemIndices); + deprecationLogger.deprecate(DeprecationCategory.API, "open_system_index_access", + "this request accesses system indices: {}, but in a future major version, direct access to system " + + "indices will be prevented by default", resolvedSystemIndices); + } + if (resolvedSystemDataStreams.isEmpty() == false) { + throw systemIndices.dataStreamAccessException(threadContext, resolvedSystemDataStreams); + } } private static boolean shouldTrackConcreteIndex(Context context, IndicesOptions options, IndexMetadata index) { @@ -426,7 +431,7 @@ public Index concreteWriteIndex(ClusterState state, IndicesOptions options, Stri options.allowAliasesToMultipleIndices(), options.forbidClosedIndices(), options.ignoreAliases(), options.ignoreThrottled()); - Context context = new Context(state, combinedOptions, false, true, includeDataStreams, getSystemIndexAccessLevel()); + Context context = new Context(state, combinedOptions, false, true, includeDataStreams, getSystemIndexAccessPredicate()); Index[] indices = concreteIndices(context, index); if (allowNoIndices && indices.length == 0) { return null; @@ -443,7 +448,7 @@ public Index concreteWriteIndex(ClusterState state, IndicesOptions options, Stri * If the data stream, index or alias contains date math then that is resolved too. */ public boolean hasIndexAbstraction(String indexAbstraction, ClusterState state) { - Context context = new Context(state, IndicesOptions.lenientExpandOpen(), false, false, true, getSystemIndexAccessLevel()); + Context context = new Context(state, IndicesOptions.lenientExpandOpen(), false, false, true, getSystemIndexAccessPredicate()); String resolvedAliasOrIndex = DateMathExpressionResolver.resolveExpression(indexAbstraction, context); return state.metadata().getIndicesLookup().containsKey(resolvedAliasOrIndex); } @@ -454,7 +459,7 @@ public boolean hasIndexAbstraction(String indexAbstraction, ClusterState state) public String resolveDateMathExpression(String dateExpression) { // The data math expression resolver doesn't rely on cluster state or indices options, because // it just resolves the date math to an actual date. - return DateMathExpressionResolver.resolveExpression(dateExpression, new Context(null, null, getSystemIndexAccessLevel())); + return DateMathExpressionResolver.resolveExpression(dateExpression, new Context(null, null, getSystemIndexAccessPredicate())); } /** @@ -462,14 +467,14 @@ public String resolveDateMathExpression(String dateExpression) { * @return If the specified string is data math expression then this method returns the resolved expression. */ public String resolveDateMathExpression(String dateExpression, long time) { - return DateMathExpressionResolver.resolveExpression(dateExpression, new Context(null, null, time, getSystemIndexAccessLevel())); + return DateMathExpressionResolver.resolveExpression(dateExpression, new Context(null, null, time, getSystemIndexAccessPredicate())); } /** * Resolve an array of expressions to the set of indices and aliases that these expressions match. */ public Set resolveExpressions(ClusterState state, String... expressions) { - Context context = new Context(state, IndicesOptions.lenientExpandOpen(), true, false, true, getSystemIndexAccessLevel()); + Context context = new Context(state, IndicesOptions.lenientExpandOpen(), true, false, true, getSystemIndexAccessPredicate()); List resolvedExpressions = Arrays.asList(expressions); for (ExpressionResolver expressionResolver : expressionResolvers) { resolvedExpressions = expressionResolver.resolve(context, resolvedExpressions); @@ -563,7 +568,7 @@ public String[] indexAliases(ClusterState state, String index, Predicate> resolveSearchRouting(ClusterState state, @Nullable String routing, String... expressions) { List resolvedExpressions = expressions != null ? Arrays.asList(expressions) : Collections.emptyList(); - Context context = new Context(state, IndicesOptions.lenientExpandOpen(), false, false, true, getSystemIndexAccessLevel()); + Context context = new Context(state, IndicesOptions.lenientExpandOpen(), false, false, true, getSystemIndexAccessPredicate()); for (ExpressionResolver expressionResolver : expressionResolvers) { resolvedExpressions = expressionResolver.resolve(context, resolvedExpressions); } @@ -714,33 +719,22 @@ boolean isPatternMatchingAllIndices(Metadata metadata, String[] indicesOrAliases return false; } - /** - * Determines what level of system index access should be allowed in the current context. - * - * @return {@link SystemIndexAccessLevel#ALL} if unrestricted system index access should be allowed, - * {@link SystemIndexAccessLevel#RESTRICTED} if a subset of system index access should be allowed, or - * {@link SystemIndexAccessLevel#NONE} if no system index access should be allowed. - */ public SystemIndexAccessLevel getSystemIndexAccessLevel() { - final String headerValue = threadContext.getHeader(SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY); - final String productHeaderValue = threadContext.getHeader(EXTERNAL_SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY); + return systemIndices.getSystemIndexAccessLevel(threadContext); + } - final boolean allowed = Booleans.parseBoolean(headerValue, true); - if (allowed) { - if (productHeaderValue != null) { - return SystemIndexAccessLevel.RESTRICTED; - } else { - return SystemIndexAccessLevel.ALL; - } + public Predicate getSystemIndexAccessPredicate() { + final SystemIndexAccessLevel systemIndexAccessLevel = getSystemIndexAccessLevel(); + final Predicate systemIndexAccessLevelPredicate; + if (systemIndexAccessLevel == SystemIndexAccessLevel.NONE) { + systemIndexAccessLevelPredicate = s -> false; + } else if (systemIndexAccessLevel == SystemIndexAccessLevel.ALL) { + systemIndexAccessLevelPredicate = s -> true; } else { - return SystemIndexAccessLevel.NONE; + // everything other than allowed should be included in the deprecation message + systemIndexAccessLevelPredicate = systemIndices.getProductSystemIndexNamePredicate(threadContext); } - } - - public enum SystemIndexAccessLevel { - ALL, - NONE, - RESTRICTED + return systemIndexAccessLevelPredicate; } public static class Context { @@ -752,30 +746,30 @@ public static class Context { private final boolean resolveToWriteIndex; private final boolean includeDataStreams; private final boolean preserveDataStreams; - private final SystemIndexAccessLevel systemIndexAccessLevel; + private final Predicate systemIndexAccessPredicate; - Context(ClusterState state, IndicesOptions options, SystemIndexAccessLevel systemIndexAccessLevel) { - this(state, options, System.currentTimeMillis(), systemIndexAccessLevel); + Context(ClusterState state, IndicesOptions options, Predicate systemIndexAccessPredicate) { + this(state, options, System.currentTimeMillis(), systemIndexAccessPredicate); } Context(ClusterState state, IndicesOptions options, boolean preserveAliases, boolean resolveToWriteIndex, - boolean includeDataStreams, SystemIndexAccessLevel systemIndexAccessLevel) { + boolean includeDataStreams, Predicate systemIndexAccessPredicate) { this(state, options, System.currentTimeMillis(), preserveAliases, resolveToWriteIndex, includeDataStreams, false, - systemIndexAccessLevel); + systemIndexAccessPredicate); } Context(ClusterState state, IndicesOptions options, boolean preserveAliases, boolean resolveToWriteIndex, - boolean includeDataStreams, boolean preserveDataStreams, SystemIndexAccessLevel systemIndexAccessLevel) { + boolean includeDataStreams, boolean preserveDataStreams, Predicate systemIndexAccessPredicate) { this(state, options, System.currentTimeMillis(), preserveAliases, resolveToWriteIndex, includeDataStreams, preserveDataStreams, - systemIndexAccessLevel); + systemIndexAccessPredicate); } - Context(ClusterState state, IndicesOptions options, long startTime, SystemIndexAccessLevel systemIndexAccessLevel) { - this(state, options, startTime, false, false, false, false, systemIndexAccessLevel); + Context(ClusterState state, IndicesOptions options, long startTime, Predicate systemIndexAccessPredicate) { + this(state, options, startTime, false, false, false, false, systemIndexAccessPredicate); } protected Context(ClusterState state, IndicesOptions options, long startTime, boolean preserveAliases, boolean resolveToWriteIndex, - boolean includeDataStreams, boolean preserveDataStreams, SystemIndexAccessLevel systemIndexAccessLevel) { + boolean includeDataStreams, boolean preserveDataStreams, Predicate systemIndexAccessPredicate) { this.state = state; this.options = options; this.startTime = startTime; @@ -783,7 +777,7 @@ protected Context(ClusterState state, IndicesOptions options, long startTime, bo this.resolveToWriteIndex = resolveToWriteIndex; this.includeDataStreams = includeDataStreams; this.preserveDataStreams = preserveDataStreams; - this.systemIndexAccessLevel = systemIndexAccessLevel; + this.systemIndexAccessPredicate = systemIndexAccessPredicate; } public ClusterState getState() { @@ -826,8 +820,8 @@ public boolean isPreserveDataStreams() { /** * Used to determine system index access is allowed in this context (e.g. for this request). */ - public SystemIndexAccessLevel getSystemIndexAccessLevel() { - return systemIndexAccessLevel; + public Predicate getSystemIndexAccessPredicate() { + return systemIndexAccessPredicate; } } @@ -859,7 +853,20 @@ public List resolve(Context context, List expressions) { } if (isEmptyOrTrivialWildcard(expressions)) { - List resolvedExpressions = resolveEmptyOrTrivialWildcard(options, metadata); + List resolvedExpressions = resolveEmptyOrTrivialWildcard(options, metadata).stream() + .filter(expression -> { + IndexAbstraction abstraction = metadata.getIndicesLookup().get(expression); + if (abstraction != null && abstraction.isSystem()) { + if (abstraction.getType() == Type.DATA_STREAM || abstraction.getParentDataStream() != null) { + return context.systemIndexAccessPredicate.test(abstraction.getName()); + } else { + return true; + } + } else { + return true; + } + }) + .collect(Collectors.toList()); if (context.includeDataStreams()) { final IndexMetadata.State excludeState = excludeState(options); final Map dataStreamsAbstractions = metadata.getIndicesLookup().entrySet() @@ -1059,6 +1066,13 @@ private static Set expand(Context context, IndexMetadata.State excludeSt String aliasOrIndexName = entry.getKey(); IndexAbstraction indexAbstraction = entry.getValue(); + if (indexAbstraction.isSystem() && + (indexAbstraction.getType() == Type.DATA_STREAM || indexAbstraction.getParentDataStream() != null)) { + if (context.systemIndexAccessPredicate.test(indexAbstraction.getName()) == false) { + continue; + } + } + if (indexAbstraction.isHidden() == false || includeHidden || implicitHiddenMatch(aliasOrIndexName, expression)) { if (context.isPreserveAliases() && indexAbstraction.getType() == IndexAbstraction.Type.ALIAS) { expand.add(aliasOrIndexName); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java index 05d6795339c2e..e138937fc26fe 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java @@ -29,6 +29,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MetadataFieldMapper; +import org.elasticsearch.indices.SystemDataStreamDescriptor; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; @@ -91,23 +92,47 @@ public ClusterState createDataStream(CreateDataStreamClusterStateUpdateRequest r return createDataStream(metadataCreateIndexService, current, request); } - public static final class CreateDataStreamClusterStateUpdateRequest extends ClusterStateUpdateRequest { + public static final class CreateDataStreamClusterStateUpdateRequest + extends ClusterStateUpdateRequest { private final String name; + private final SystemDataStreamDescriptor descriptor; public CreateDataStreamClusterStateUpdateRequest(String name, TimeValue masterNodeTimeout, TimeValue timeout) { + this(name, null, masterNodeTimeout, timeout); + } + + public CreateDataStreamClusterStateUpdateRequest(String name, + SystemDataStreamDescriptor systemDataStreamDescriptor, + TimeValue masterNodeTimeout, + TimeValue timeout) { this.name = name; + this.descriptor = systemDataStreamDescriptor; masterNodeTimeout(masterNodeTimeout); ackTimeout(timeout); } + + public boolean isSystem() { + return descriptor != null; + } + + public SystemDataStreamDescriptor getSystemDataStreamDescriptor() { + return descriptor; + } } static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIndexService, ClusterState currentState, CreateDataStreamClusterStateUpdateRequest request) throws Exception { - return createDataStream(metadataCreateIndexService, currentState, request.name, List.of(), null); + return createDataStream( + metadataCreateIndexService, + currentState, + request.name, + List.of(), + null, + request.getSystemDataStreamDescriptor()); } /** @@ -124,11 +149,20 @@ static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIn ClusterState currentState, String dataStreamName, List backingIndices, - IndexMetadata writeIndex) throws Exception + IndexMetadata writeIndex) throws Exception { + assert metadataCreateIndexService.getSystemIndices().isSystemDataStream(dataStreamName) == false : + "dataStream [" + dataStreamName + "] is system but no system descriptor was provided!"; + return createDataStream(metadataCreateIndexService, currentState, dataStreamName, backingIndices, writeIndex, null); + } + + static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIndexService, + ClusterState currentState, + String dataStreamName, + List backingIndices, + IndexMetadata writeIndex, + SystemDataStreamDescriptor systemDataStreamDescriptor) throws Exception { - if (writeIndex == null) { - Objects.requireNonNull(metadataCreateIndexService); - } + Objects.requireNonNull(metadataCreateIndexService); Objects.requireNonNull(currentState); Objects.requireNonNull(backingIndices); if (currentState.metadata().dataStreams().containsKey(dataStreamName)) { @@ -146,14 +180,22 @@ static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIn + DataStream.BACKING_INDEX_PREFIX + "'"); } - ComposableIndexTemplate template = lookupTemplateForDataStream(dataStreamName, currentState.metadata()); + final boolean isSystem = systemDataStreamDescriptor != null; + final ComposableIndexTemplate template = isSystem ? + systemDataStreamDescriptor.getComposableIndexTemplate() : + lookupTemplateForDataStream(dataStreamName, currentState.metadata()); if (writeIndex == null) { String firstBackingIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1); CreateIndexClusterStateUpdateRequest createIndexRequest = new CreateIndexClusterStateUpdateRequest("initialize_data_stream", firstBackingIndexName, firstBackingIndexName) .dataStreamName(dataStreamName) - .settings(Settings.builder().put("index.hidden", true).build()); + .systemDataStreamDescriptor(systemDataStreamDescriptor); + + if (isSystem == false) { + createIndexRequest.settings(Settings.builder().put("index.hidden", true).build()); + } + try { currentState = metadataCreateIndexService.applyCreateIndexRequest(currentState, createIndexRequest, false); } catch (ResourceAlreadyExistsException e) { @@ -172,9 +214,9 @@ static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIn DataStream.TimestampField timestampField = new DataStream.TimestampField(fieldName); List dsBackingIndices = backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()); dsBackingIndices.add(writeIndex.getIndex()); - boolean hidden = template.getDataStreamTemplate().isHidden(); + boolean hidden = isSystem ? false : template.getDataStreamTemplate().isHidden(); DataStream newDataStream = new DataStream(dataStreamName, timestampField, dsBackingIndices, 1L, - template.metadata() != null ? Map.copyOf(template.metadata()) : null, hidden, false); + template.metadata() != null ? Map.copyOf(template.metadata()) : null, hidden, false, isSystem); Metadata.Builder builder = Metadata.builder(currentState.metadata()).put(newDataStream); logger.info("adding data stream [{}] with write index [{}] and backing indices [{}]", dataStreamName, writeIndex.getIndex().getName(), diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java index 35565d2b971ef..b0c6ab0ebfbe5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java @@ -63,7 +63,6 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.InvalidIndexNameException; import org.elasticsearch.indices.ShardLimitValidator; -import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.threadpool.ThreadPool; @@ -78,6 +77,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; @@ -98,6 +98,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService.validateTimestampFieldMapping; +import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.resolveSettings; /** * Service responsible for submitting create index requests @@ -192,11 +193,9 @@ public void validateIndexName(String index, ClusterState state) { public boolean validateDotIndex(String index, @Nullable Boolean isHidden) { boolean isSystem = false; if (index.charAt(0) == '.') { - SystemIndexDescriptor matchingDescriptor = systemIndices.findMatchingDescriptor(index); - if (matchingDescriptor != null) { - logger.trace("index [{}] is a system index because it matches index pattern [{}] with description [{}]", - index, matchingDescriptor.getIndexPattern(), matchingDescriptor.getDescription()); - isSystem = true; + isSystem = systemIndices.isSystemName(index); + if (isSystem) { + logger.trace("index [{}] is a system index", index); } else if (isHidden) { logger.trace("index [{}] is a hidden index", index); } else { @@ -209,6 +208,10 @@ public boolean validateDotIndex(String index, @Nullable Boolean isHidden) { return isSystem; } + public SystemIndices getSystemIndices() { + return systemIndices; + } + /** * Validate the name for an index or alias against some static rules. */ @@ -329,14 +332,20 @@ public ClusterState applyCreateIndexRequest(ClusterState currentState, CreateInd // in which case templates don't apply, so create the index from the source metadata return applyCreateIndexRequestWithExistingMetadata(currentState, request, silent, sourceMetadata, metadataTransformer); } else { + // The backing index may have a different name or prefix than the data stream name. + final String name = request.dataStreamName() != null ? request.dataStreamName() : request.index(); + + // The index being created is for a system data stream, so the backing index will also be a system index + if (request.systemDataStreamDescriptor() != null) { + return applyCreateIndexRequestForSystemDataStream(currentState, request, silent, metadataTransformer); + } + // Hidden indices apply templates slightly differently (ignoring wildcard '*' // templates), so we need to check to see if the request is creating a hidden index // prior to resolving which templates it matches final Boolean isHiddenFromRequest = IndexMetadata.INDEX_HIDDEN_SETTING.exists(request.settings()) ? IndexMetadata.INDEX_HIDDEN_SETTING.get(request.settings()) : null; - // The backing index may have a different name or prefix than the data stream name. - final String name = request.dataStreamName() != null ? request.dataStreamName() : request.index(); // Check to see if a v2 template matched final String v2Template = MetadataIndexTemplateService.findV2Template(currentState.metadata(), name, isHiddenFromRequest == null ? false : isHiddenFromRequest); @@ -466,7 +475,7 @@ private ClusterState applyCreateIndexRequestWithV1Templates(final ClusterState c templates.stream().map(IndexTemplateMetadata::getMappings).collect(toList()), xContentRegistry)); final Settings aggregatedIndexSettings = - aggregateIndexSettings(currentState, request, MetadataIndexTemplateService.resolveSettings(templates), + aggregateIndexSettings(currentState, request, resolveSettings(templates), null, settings, indexScopedSettings, shardLimitValidator, indexSettingProviders); int routingNumShards = getIndexNumberOfRoutingShards(aggregatedIndexSettings, null); IndexMetadata tmpImd = buildAndValidateTemporaryIndexMetadata(aggregatedIndexSettings, request, routingNumShards); @@ -500,7 +509,7 @@ private ClusterState applyCreateIndexRequestWithV2Template(final ClusterState cu collectV2Mappings(request.mappings(), currentState, templateName, xContentRegistry, request.index()); final Settings aggregatedIndexSettings = aggregateIndexSettings(currentState, request, - MetadataIndexTemplateService.resolveSettings(currentState.metadata(), templateName), + resolveSettings(currentState.metadata(), templateName), null, settings, indexScopedSettings, shardLimitValidator, indexSettingProviders); int routingNumShards = getIndexNumberOfRoutingShards(aggregatedIndexSettings, null); IndexMetadata tmpImd = buildAndValidateTemporaryIndexMetadata(aggregatedIndexSettings, request, routingNumShards); @@ -515,15 +524,71 @@ private ClusterState applyCreateIndexRequestWithV2Template(final ClusterState cu Collections.singletonList(templateName), metadataTransformer); } + private ClusterState applyCreateIndexRequestForSystemDataStream(final ClusterState currentState, + final CreateIndexClusterStateUpdateRequest request, + final boolean silent, + final BiConsumer metadataTransformer) + throws Exception { + Objects.requireNonNull(request.systemDataStreamDescriptor()); + logger.debug("applying create index request for system data stream [{}]", request.systemDataStreamDescriptor()); + + ComposableIndexTemplate template = request.systemDataStreamDescriptor().getComposableIndexTemplate(); + if (request.dataStreamName() == null && template.getDataStreamTemplate() != null) { + throw new IllegalArgumentException("cannot create index with name [" + request.index() + + "], because it matches with a system data stream"); + } + + final Map componentTemplates = request.systemDataStreamDescriptor().getComponentTemplates(); + final List> mappings = + collectSystemV2Mappings(template, componentTemplates, xContentRegistry, request.index()); + + final Settings aggregatedIndexSettings = aggregateIndexSettings( + currentState, + request, + resolveSettings(template, componentTemplates), + null, + settings, + indexScopedSettings, + shardLimitValidator, + indexSettingProviders + ); + final int routingNumShards = getIndexNumberOfRoutingShards(aggregatedIndexSettings, null); + final IndexMetadata tmpImd = buildAndValidateTemporaryIndexMetadata(aggregatedIndexSettings, request, routingNumShards); + + return applyCreateIndexWithTemporaryService(currentState, request, silent, null, tmpImd, mappings, + indexService -> resolveAndValidateAliases(request.index(), request.aliases(), + MetadataIndexTemplateService.resolveAliases(template, componentTemplates, null), currentState.metadata(), + // the context is only used for validation so it's fine to pass fake values for the + // shard id and the current timestamp + aliasValidator, xContentRegistry, indexService.newSearchExecutionContext(0, 0, null, () -> 0L, null, emptyMap()), + indexService.dateMathExpressionResolverAt(request.getNameResolvedAt())), + List.of(), metadataTransformer); + } + + private static List> collectSystemV2Mappings(final ComposableIndexTemplate composableIndexTemplate, + final Map componentTemplates, + final NamedXContentRegistry xContentRegistry, + final String indexName) throws Exception { + List templateMappings = + MetadataIndexTemplateService.collectMappings(composableIndexTemplate, componentTemplates, indexName, xContentRegistry); + return collectV2Mappings("{}", templateMappings, xContentRegistry); + } + public static List> collectV2Mappings(final String requestMappings, final ClusterState currentState, final String templateName, final NamedXContentRegistry xContentRegistry, final String indexName) throws Exception { - List> result = new ArrayList<>(); - List templateMappings = MetadataIndexTemplateService.collectMappings(currentState, templateName, indexName, xContentRegistry); + return collectV2Mappings(requestMappings, templateMappings, xContentRegistry); + } + + public static List> collectV2Mappings(final String requestMappings, + final List templateMappings, + final NamedXContentRegistry xContentRegistry) throws Exception { + List> result = new ArrayList<>(); + for (CompressedXContent templateMapping : templateMappings) { Map parsedTemplateMapping = MapperService.parseMapping(xContentRegistry, templateMapping.string()); result.add(parsedTemplateMapping); @@ -533,6 +598,7 @@ public static List> collectV2Mappings(final String requestMa result.add(parsedRequestMappings); return result; } + private ClusterState applyCreateIndexRequestWithExistingMetadata(final ClusterState currentState, final CreateIndexClusterStateUpdateRequest request, final boolean silent, diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java index 2cde3491fedb0..9aaab63164c8d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java @@ -432,7 +432,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS public static void validateV2TemplateRequest(Metadata metadata, String name, ComposableIndexTemplate template) { if (template.indexPatterns().stream().anyMatch(Regex::isMatchAllPattern)) { - Settings mergedSettings = resolveSettings(metadata, template); + Settings mergedSettings = resolveSettings(template, metadata.componentTemplates()); if (IndexMetadata.INDEX_HIDDEN_SETTING.exists(mergedSettings)) { throw new InvalidIndexTemplateException(name, "global composable templates may not specify the setting " + IndexMetadata.INDEX_HIDDEN_SETTING.getKey()); @@ -998,6 +998,17 @@ public static List collectMappings(final ClusterState state, } final Map componentTemplates = state.metadata().componentTemplates(); + return collectMappings(template, componentTemplates, indexName, xContentRegistry); + } + + /** + * Collect the given v2 template into an ordered list of mappings. + */ + public static List collectMappings(final ComposableIndexTemplate template, + final Map componentTemplates, + final String indexName, + final NamedXContentRegistry xContentRegistry) throws Exception { + Objects.requireNonNull(template, "Composable index template must be provided"); List mappings = template.composedOf().stream() .map(componentTemplates::get) .filter(Objects::nonNull) @@ -1057,11 +1068,15 @@ public static Settings resolveSettings(final Metadata metadata, final String tem if (template == null) { return Settings.EMPTY; } - return resolveSettings(metadata, template); + return resolveSettings(template, metadata.componentTemplates()); } - private static Settings resolveSettings(Metadata metadata, ComposableIndexTemplate template) { - final Map componentTemplates = metadata.componentTemplates(); + /** + * Resolve the provided v2 template and component templates into a collected {@link Settings} object + */ + public static Settings resolveSettings(ComposableIndexTemplate template, Map componentTemplates) { + Objects.requireNonNull(template, "attempted to resolve settings for a null template"); + Objects.requireNonNull(componentTemplates, "attempted to resolve settings with null component templates"); List componentSettings = template.composedOf().stream() .map(componentTemplates::get) .filter(Objects::nonNull) @@ -1107,6 +1122,17 @@ public static List> resolveAliases(final Metadata met return List.of(); } final Map componentTemplates = metadata.componentTemplates(); + return resolveAliases(template, componentTemplates, templateName); + } + + /** + * Resolve the given v2 template and component templates into an ordered list of aliases + */ + static List> resolveAliases(final ComposableIndexTemplate template, + final Map componentTemplates, + @Nullable String templateName) { + Objects.requireNonNull(template, "attempted to resolve aliases for a null template"); + Objects.requireNonNull(componentTemplates, "attempted to resolve aliases with null component templates"); List> aliases = template.composedOf().stream() .map(componentTemplates::get) .filter(Objects::nonNull) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamService.java index ba443d9364fd6..8bba3cd8e6633 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamService.java @@ -20,9 +20,9 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.indices.IndicesService; @@ -43,18 +43,23 @@ public class MetadataMigrateToDataStreamService { private final ClusterService clusterService; private final ActiveShardsObserver activeShardsObserver; private final IndicesService indexServices; + private final ThreadContext threadContext; + private final MetadataCreateIndexService metadataCreateIndexService; - @Inject public MetadataMigrateToDataStreamService(ThreadPool threadPool, ClusterService clusterService, - IndicesService indexServices) { + IndicesService indexServices, + MetadataCreateIndexService metadataCreateIndexService) { this.clusterService = clusterService; this.indexServices = indexServices; this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool); + this.threadContext = threadPool.getThreadContext(); + this.metadataCreateIndexService = metadataCreateIndexService; } public void migrateToDataStream(MigrateToDataStreamClusterStateUpdateRequest request, ActionListener finalListener) { + metadataCreateIndexService.getSystemIndices().validateDataStreamAccess(request.aliasName, threadContext); AtomicReference writeIndexRef = new AtomicReference<>(); ActionListener listener = ActionListener.wrap( response -> { @@ -89,7 +94,9 @@ public ClusterState execute(ClusterState currentState) throws Exception { throw new IllegalStateException(e); } }, - request); + request, + threadContext, + metadataCreateIndexService); writeIndexRef.set(clusterState.metadata().dataStreams().get(request.aliasName).getWriteIndex().getName()); return clusterState; } @@ -98,7 +105,9 @@ public ClusterState execute(ClusterState currentState) throws Exception { static ClusterState migrateToDataStream(ClusterState currentState, Function mapperSupplier, - MigrateToDataStreamClusterStateUpdateRequest request) throws Exception { + MigrateToDataStreamClusterStateUpdateRequest request, + ThreadContext threadContext, + MetadataCreateIndexService metadataCreateIndexService) throws Exception { validateRequest(currentState, request); IndexAbstraction.Alias alias = (IndexAbstraction.Alias) currentState.metadata().getIndicesLookup().get(request.aliasName); @@ -117,7 +126,8 @@ static ClusterState migrateToDataStream(ClusterState currentState, .collect(Collectors.toList()); logger.info("submitting request to migrate alias [{}] to a data stream", request.aliasName); - return MetadataCreateDataStreamService.createDataStream(null, currentState, request.aliasName, backingIndices, writeIndex); + return MetadataCreateDataStreamService.createDataStream(metadataCreateIndexService, currentState, request.aliasName, + backingIndices, writeIndex); } // package-visible for testing diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeService.java index dfce857d69099..1e0144fe3e39d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeService.java @@ -54,7 +54,9 @@ public void clusterChanged(ClusterChangedEvent event) { if (lastIndexMetadataMap != indexMetadataMap) { for (ObjectObjectCursor cursor : indexMetadataMap) { if (cursor.value != lastIndexMetadataMap.get(cursor.key)) { - if (systemIndices.isSystemIndex(cursor.value.getIndex()) != cursor.value.isSystem()) { + final boolean isSystem = systemIndices.isSystemIndex(cursor.value.getIndex()) || + systemIndices.isSystemIndexBackingDataStream(cursor.value.getIndex().getName()); + if (isSystem != cursor.value.isSystem()) { updateTaskPending = true; clusterService.submitStateUpdateTask("system_index_metadata_upgrade_service {system metadata change}", new SystemIndexMetadataUpdateTask()); @@ -74,7 +76,9 @@ public ClusterState execute(ClusterState currentState) throws Exception { final List updatedMetadata = new ArrayList<>(); for (ObjectObjectCursor cursor : indexMetadataMap) { if (cursor.value != lastIndexMetadataMap.get(cursor.key)) { - if (systemIndices.isSystemIndex(cursor.value.getIndex()) != cursor.value.isSystem()) { + final boolean isSystem = systemIndices.isSystemIndex(cursor.value.getIndex()) || + systemIndices.isSystemIndexBackingDataStream(cursor.value.getIndex().getName()); + if (isSystem != cursor.value.isSystem()) { updatedMetadata.add(IndexMetadata.builder(cursor.value).system(cursor.value.isSystem() == false).build()); } } diff --git a/server/src/main/java/org/elasticsearch/indices/SystemDataStreamDescriptor.java b/server/src/main/java/org/elasticsearch/indices/SystemDataStreamDescriptor.java new file mode 100644 index 0000000000000..108876f933a27 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/indices/SystemDataStreamDescriptor.java @@ -0,0 +1,91 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.indices; + +import org.elasticsearch.cluster.metadata.ComponentTemplate; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.cluster.metadata.DataStream; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * Describes a {@link DataStream} that is reserved for use by a system component. The data stream will be managed by the system and also + * protected by the system against user modification so that system features are not broken by inadvertent user operations. + */ +public class SystemDataStreamDescriptor { + + private final String dataStreamName; + private final String description; + private final Type type; + private final ComposableIndexTemplate composableIndexTemplate; + private final Map componentTemplates; + private final List allowedElasticProductOrigins; + + /** + * Creates a new descriptor for a system data descriptor + * @param dataStreamName the name of the data stream. Must not be {@code null} + * @param description a brief description of what the data stream is used for. Must not be {@code null} + * @param type the {@link Type} of the data stream which determines how the data stream can be accessed. Must not be {@code null} + * @param composableIndexTemplate the {@link ComposableIndexTemplate} that contains the mappings and settings for the data stream. + * Must not be {@code null} + * @param componentTemplates a map that contains {@link ComponentTemplate} instances corresponding to those references in the + * {@link ComposableIndexTemplate} + * @param allowedElasticProductOrigins a list of product origin values that are allowed to access this data stream if the + * type is {@link Type#EXTERNAL}. Must not be {@code null} + */ + public SystemDataStreamDescriptor(String dataStreamName, String description, Type type, + ComposableIndexTemplate composableIndexTemplate, Map componentTemplates, + List allowedElasticProductOrigins) { + this.dataStreamName = Objects.requireNonNull(dataStreamName, "dataStreamName must be specified"); + this.description = Objects.requireNonNull(description, "description must be specified"); + this.type = Objects.requireNonNull(type, "type must be specified"); + this.composableIndexTemplate = Objects.requireNonNull(composableIndexTemplate, "composableIndexTemplate must be provided"); + this.componentTemplates = componentTemplates == null ? Map.of() : Map.copyOf(componentTemplates); + this.allowedElasticProductOrigins = + Objects.requireNonNull(allowedElasticProductOrigins, "allowedElasticProductOrigins must not be null"); + if (type == Type.EXTERNAL && allowedElasticProductOrigins.isEmpty()) { + throw new IllegalArgumentException("External system data stream without allowed products is not a valid combination"); + } + } + + public String getDataStreamName() { + return dataStreamName; + } + + public String getDescription() { + return description; + } + + public ComposableIndexTemplate getComposableIndexTemplate() { + return composableIndexTemplate; + } + + public boolean isExternal() { + return type == Type.EXTERNAL; + } + + public String getBackingIndexPattern() { + return DataStream.BACKING_INDEX_PREFIX + getDataStreamName() + "-*"; + } + + public List getAllowedElasticProductOrigins() { + return allowedElasticProductOrigins; + } + + public Map getComponentTemplates() { + return componentTemplates; + } + + public enum Type { + INTERNAL, + EXTERNAL + } +} diff --git a/server/src/main/java/org/elasticsearch/indices/SystemIndexDescriptor.java b/server/src/main/java/org/elasticsearch/indices/SystemIndexDescriptor.java index d34532bf92b42..a5912ead4b240 100644 --- a/server/src/main/java/org/elasticsearch/indices/SystemIndexDescriptor.java +++ b/server/src/main/java/org/elasticsearch/indices/SystemIndexDescriptor.java @@ -249,14 +249,17 @@ public SystemIndexDescriptor(String indexPattern, String description, Type type, this.indexPattern = indexPattern; this.primaryIndex = primaryIndex; + this.aliasName = aliasName; final Automaton automaton = buildAutomaton(indexPattern, aliasName); this.indexPatternAutomaton = new CharacterRunAutomaton(automaton); + if (primaryIndex != null && indexPatternAutomaton.run(primaryIndex) == false) { + throw new IllegalArgumentException("primary index does not match the index pattern!"); + } this.description = description; this.mappings = mappings; this.settings = settings; - this.aliasName = aliasName; this.indexFormat = indexFormat; this.versionMetaKey = versionMetaKey; this.origin = origin; @@ -384,7 +387,7 @@ public List getAllowedElasticProductOrigins() { public Version getMappingVersion() { if (type.isManaged() == false) { - throw new IllegalStateException(toString() + " is not managed so there are no mappings or version"); + throw new IllegalStateException(this + " is not managed so there are no mappings or version"); } return mappingVersion; } diff --git a/server/src/main/java/org/elasticsearch/indices/SystemIndices.java b/server/src/main/java/org/elasticsearch/indices/SystemIndices.java index f1a5c852c6bfb..80eb97ddf10f3 100644 --- a/server/src/main/java/org/elasticsearch/indices/SystemIndices.java +++ b/server/src/main/java/org/elasticsearch/indices/SystemIndices.java @@ -21,13 +21,18 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.TriConsumer; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.Index; +import org.elasticsearch.plugins.SystemIndexPlugin; import org.elasticsearch.snapshots.SnapshotsService; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -36,6 +41,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Optional; +import java.util.Set; import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -50,11 +56,18 @@ * to reduce the locations within the code that need to deal with {@link SystemIndexDescriptor}s. */ public class SystemIndices { + public static final String SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY = "_system_index_access_allowed"; + public static final String EXTERNAL_SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY = "_external_system_index_access_origin"; + + private static final Automaton EMPTY = Automata.makeEmpty(); + private static final Map SERVER_SYSTEM_INDEX_DESCRIPTORS = Map.of( TASKS_FEATURE_NAME, new Feature(TASKS_FEATURE_NAME, "Manages task results", List.of(TASKS_DESCRIPTOR)) ); - private final CharacterRunAutomaton runAutomaton; + private final CharacterRunAutomaton systemIndexAutomaton; + private final CharacterRunAutomaton systemDataStreamIndicesAutomaton; + private final Predicate systemDataStreamAutomaton; private final Map featureDescriptors; private final Map productToSystemIndicesMatcher; @@ -67,11 +80,13 @@ public SystemIndices(Map pluginAndModulesDescriptors) { featureDescriptors = buildSystemIndexDescriptorMap(pluginAndModulesDescriptors); checkForOverlappingPatterns(featureDescriptors); checkForDuplicateAliases(this.getSystemIndexDescriptors()); - this.runAutomaton = buildCharacterRunAutomaton(featureDescriptors); - this.productToSystemIndicesMatcher = getProductToSystemIndicesMap(this.getSystemIndexDescriptors()); + this.systemIndexAutomaton = buildIndexCharacterRunAutomaton(featureDescriptors); + this.systemDataStreamIndicesAutomaton = buildDataStreamBackingIndicesAutomaton(featureDescriptors); + this.systemDataStreamAutomaton = buildDataStreamNamePredicate(featureDescriptors); + this.productToSystemIndicesMatcher = getProductToSystemIndicesMap(featureDescriptors); } - private void checkForDuplicateAliases(Collection descriptors) { + private static void checkForDuplicateAliases(Collection descriptors) { final Map aliasCounts = new HashMap<>(); for (SystemIndexDescriptor descriptor : descriptors) { @@ -93,20 +108,46 @@ private void checkForDuplicateAliases(Collection descript } } - private static Map getProductToSystemIndicesMap(Collection descriptors) { - Map map = descriptors.stream() - .filter(SystemIndexDescriptor::isExternal) - .flatMap(descriptor -> descriptor.getAllowedElasticProductOrigins().stream().map(product -> new Tuple<>(product, descriptor))) - .collect(Collectors.toUnmodifiableMap(Tuple::v1, tuple -> { - SystemIndexDescriptor descriptor = tuple.v2(); - return SystemIndexDescriptor.buildAutomaton(descriptor.getIndexPattern(), descriptor.getAliasName()); - }, Operations::union)); + private static Map getProductToSystemIndicesMap(Map descriptors) { + Map productToSystemIndicesMap = new HashMap<>(); + for (Feature feature : descriptors.values()) { + feature.getIndexDescriptors().forEach(systemIndexDescriptor -> { + if (systemIndexDescriptor.isExternal()) { + systemIndexDescriptor.getAllowedElasticProductOrigins().forEach(origin -> + productToSystemIndicesMap.compute(origin, (key, value) -> { + Automaton automaton = SystemIndexDescriptor.buildAutomaton( + systemIndexDescriptor.getIndexPattern(), systemIndexDescriptor.getAliasName()); + return value == null ? automaton : Operations.union(value, automaton); + }) + ); + } + }); + feature.getDataStreamDescriptors().forEach(dataStreamDescriptor -> { + if (dataStreamDescriptor.isExternal()) { + dataStreamDescriptor.getAllowedElasticProductOrigins().forEach(origin -> + productToSystemIndicesMap.compute(origin, (key, value) -> { + Automaton automaton = SystemIndexDescriptor.buildAutomaton( + dataStreamDescriptor.getBackingIndexPattern(), dataStreamDescriptor.getDataStreamName()); + return value == null ? automaton : Operations.union(value, automaton); + }) + ); + } + }); + } - return map.entrySet().stream() + return productToSystemIndicesMap.entrySet().stream() .collect(Collectors.toUnmodifiableMap(Entry::getKey, entry -> new CharacterRunAutomaton(MinimizationOperations.minimize(entry.getValue(), Integer.MAX_VALUE)))); } + /** + * Checks whether the given name matches a reserved name or pattern that is intended for use by a system component. The name + * is checked against index names, aliases, data stream names, and the names of indices that back a system data stream. + */ + public boolean isSystemName(String name) { + return isSystemIndex(name) || isSystemDataStream(name) || isSystemIndexBackingDataStream(name); + } + /** * Determines whether a given index is a system index by comparing its name to the collection of loaded {@link SystemIndexDescriptor}s * @param index the {@link Index} object to check against loaded {@link SystemIndexDescriptor}s @@ -117,12 +158,28 @@ public boolean isSystemIndex(Index index) { } /** - * Determines whether a given index is a system index by comparing its name to the collection of loaded {@link SystemIndexDescriptor}s + * Determines whether a given index is a system index by comparing its name to the collection of loaded {@link SystemIndexDescriptor}s. + * This will also match alias names that belong to system indices. * @param indexName the index name to check against loaded {@link SystemIndexDescriptor}s * @return true if the index name matches a pattern from a {@link SystemIndexDescriptor} */ public boolean isSystemIndex(String indexName) { - return runAutomaton.run(indexName); + return systemIndexAutomaton.run(indexName); + } + + /** + * Determines whether the provided name matches that of a system data stream that has been defined by a + * {@link SystemDataStreamDescriptor} + */ + public boolean isSystemDataStream(String name) { + return systemDataStreamAutomaton.test(name); + } + + /** + * Determines whether the provided name matches that of an index that backs a system data stream. + */ + public boolean isSystemIndexBackingDataStream(String name) { + return systemDataStreamIndicesAutomaton.run(name); } /** @@ -156,12 +213,48 @@ public boolean isSystemIndex(String indexName) { } } + /** + * Finds a single matching {@link SystemDataStreamDescriptor}, if any, for the given DataStream name. + * @param name the name of the DataStream + * @return The matching {@link SystemDataStreamDescriptor} or {@code null} if no descriptor is found + * @throws IllegalStateException if multiple descriptors match the name + */ + public @Nullable SystemDataStreamDescriptor findMatchingDataStreamDescriptor(String name) { + final List matchingDescriptors = featureDescriptors.values().stream() + .flatMap(feature -> feature.getDataStreamDescriptors().stream()) + .filter(descriptor -> descriptor.getDataStreamName().equals(name)) + .collect(toUnmodifiableList()); + + if (matchingDescriptors.isEmpty()) { + return null; + } else if (matchingDescriptors.size() == 1) { + return matchingDescriptors.get(0); + } else { + // This should be prevented by failing on overlapping patterns at startup time, but is here just in case. + StringBuilder errorMessage = new StringBuilder() + .append("DataStream name [") + .append(name) + .append("] is claimed as a system data stream by multiple descriptors: [") + .append(matchingDescriptors.stream() + .map(descriptor -> "name: [" + descriptor.getDataStreamName() + + "], description: [" + descriptor.getDescription() + "]").collect(Collectors.joining("; "))); + // Throw AssertionError if assertions are enabled, or a regular exception otherwise: + assert false : errorMessage.toString(); + throw new IllegalStateException(errorMessage.toString()); + } + } + /** * Builds a predicate that tests if a system index should be accessible based on the provided product name - * @param product the name of the product that is attempting to access an external system index + * contained in headers. + * @param threadContext the threadContext containing headers used for system index access * @return Predicate to check external system index metadata with */ - public Predicate getProductSystemIndexMetadataPredicate(String product) { + public Predicate getProductSystemIndexMetadataPredicate(ThreadContext threadContext) { + final String product = threadContext.getHeader(EXTERNAL_SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY); + if (product == null) { + return indexMetadata -> false; + } final CharacterRunAutomaton automaton = productToSystemIndicesMatcher.get(product); if (automaton == null) { return indexMetadata -> false; @@ -171,10 +264,15 @@ public Predicate getProductSystemIndexMetadataPredicate(String pr /** * Builds a predicate that tests if a system index name should be accessible based on the provided product name - * @param product the name of the product that is attempting to access an external system index + * contained in headers. + * @param threadContext the threadContext containing headers used for system index access * @return Predicate to check external system index names with */ - public Predicate getProductSystemIndexNamePredicate(String product) { + public Predicate getProductSystemIndexNamePredicate(ThreadContext threadContext) { + final String product = threadContext.getHeader(EXTERNAL_SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY); + if (product == null) { + return name -> false; + } final CharacterRunAutomaton automaton = productToSystemIndicesMatcher.get(product); if (automaton == null) { return name -> false; @@ -186,36 +284,152 @@ public Map getFeatures() { return featureDescriptors; } - private static CharacterRunAutomaton buildCharacterRunAutomaton(Map descriptors) { + private static CharacterRunAutomaton buildIndexCharacterRunAutomaton(Map descriptors) { Optional automaton = descriptors.values().stream() - .flatMap(feature -> feature.getIndexDescriptors().stream()) + .map(SystemIndices::featureToIndexAutomaton) + .reduce(Operations::union); + return new CharacterRunAutomaton(MinimizationOperations.minimize(automaton.orElse(EMPTY), Integer.MAX_VALUE)); + } + + private static Automaton featureToIndexAutomaton(Feature feature) { + Optional systemIndexAutomaton = feature.getIndexDescriptors().stream() .map(descriptor -> SystemIndexDescriptor.buildAutomaton(descriptor.getIndexPattern(), descriptor.getAliasName())) .reduce(Operations::union); - return new CharacterRunAutomaton(MinimizationOperations.minimize(automaton.orElse(Automata.makeEmpty()), Integer.MAX_VALUE)); + + return systemIndexAutomaton.orElse(EMPTY); + } + + private static Predicate buildDataStreamNamePredicate(Map descriptors) { + Set systemDataStreamNames = descriptors.values().stream() + .flatMap(feature -> feature.getDataStreamDescriptors().stream()) + .map(SystemDataStreamDescriptor::getDataStreamName) + .collect(Collectors.toUnmodifiableSet()); + return systemDataStreamNames::contains; + } + + private static CharacterRunAutomaton buildDataStreamBackingIndicesAutomaton(Map descriptors) { + Optional automaton = descriptors.values().stream() + .map(SystemIndices::featureToDataStreamBackingIndicesAutomaton) + .reduce(Operations::union); + return new CharacterRunAutomaton(automaton.orElse(EMPTY)); + } + + private static Automaton featureToDataStreamBackingIndicesAutomaton(Feature feature) { + Optional systemDataStreamAutomaton = feature.getDataStreamDescriptors().stream() + .map(descriptor -> SystemIndexDescriptor.buildAutomaton( + descriptor.getBackingIndexPattern(), + null + )) + .reduce(Operations::union); + return systemDataStreamAutomaton.orElse(EMPTY); + } + + public SystemDataStreamDescriptor validateDataStreamAccess(String dataStreamName, ThreadContext threadContext) { + if (systemDataStreamAutomaton.test(dataStreamName)) { + SystemDataStreamDescriptor dataStreamDescriptor = featureDescriptors.values().stream() + .flatMap(feature -> feature.getDataStreamDescriptors().stream()) + .filter(descriptor -> descriptor.getDataStreamName().equals(dataStreamName)) + .findFirst() + .orElseThrow(() -> new IllegalStateException("system data stream descriptor not found for [" + dataStreamName + "]")); + if (dataStreamDescriptor.isExternal()) { + final SystemIndexAccessLevel accessLevel = getSystemIndexAccessLevel(threadContext); + if (accessLevel == SystemIndexAccessLevel.NONE) { + throw dataStreamAccessException(null, dataStreamName); + } else if (accessLevel == SystemIndexAccessLevel.RESTRICTED) { + if (getProductSystemIndexNamePredicate(threadContext).test(dataStreamName) == false) { + throw dataStreamAccessException( + threadContext.getHeader(EXTERNAL_SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY), + dataStreamName); + } else { + return dataStreamDescriptor; + } + } else { + assert accessLevel == SystemIndexAccessLevel.ALL; + return dataStreamDescriptor; + } + } else { + return dataStreamDescriptor; + } + } else { + return null; + } + } + + public IllegalArgumentException dataStreamAccessException(ThreadContext threadContext, Collection names) { + return dataStreamAccessException( + threadContext.getHeader(EXTERNAL_SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY), + names.toArray(Strings.EMPTY_ARRAY) + ); + } + + IllegalArgumentException dataStreamAccessException(@Nullable String product, String... dataStreamNames) { + if (product == null) { + return new IllegalArgumentException("Data stream(s) " + Arrays.toString(dataStreamNames) + + " use and access is reserved for system operations"); + } else { + return new IllegalArgumentException("Data stream(s) " + Arrays.toString(dataStreamNames) + " may not be accessed by product [" + + product + "]"); + } + } + + /** + * Determines what level of system index access should be allowed in the current context. + * + * @return {@link SystemIndexAccessLevel#ALL} if unrestricted system index access should be allowed, + * {@link SystemIndexAccessLevel#RESTRICTED} if a subset of system index access should be allowed, or + * {@link SystemIndexAccessLevel#NONE} if no system index access should be allowed. + */ + public SystemIndexAccessLevel getSystemIndexAccessLevel(ThreadContext threadContext) { + final String headerValue = threadContext.getHeader(SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY); + final String productHeaderValue = threadContext.getHeader(EXTERNAL_SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY); + + final boolean allowed = Booleans.parseBoolean(headerValue, true); + if (allowed) { + if (productHeaderValue != null) { + return SystemIndexAccessLevel.RESTRICTED; + } else { + return SystemIndexAccessLevel.ALL; + } + } else { + return SystemIndexAccessLevel.NONE; + } + } + + public enum SystemIndexAccessLevel { + ALL, + NONE, + RESTRICTED } /** * Given a collection of {@link SystemIndexDescriptor}s and their sources, checks to see if the index patterns of the listed * descriptors overlap with any of the other patterns. If any do, throws an exception. * - * @param sourceToDescriptors A map of source (plugin) names to the SystemIndexDescriptors they provide. + * @param sourceToFeature A map of source (plugin) names to the SystemIndexDescriptors they provide. * @throws IllegalStateException Thrown if any of the index patterns overlaps with another. */ - static void checkForOverlappingPatterns(Map sourceToDescriptors) { - List> sourceDescriptorPair = sourceToDescriptors.entrySet().stream() + static void checkForOverlappingPatterns(Map sourceToFeature) { + List> sourceDescriptorPair = sourceToFeature.entrySet().stream() .flatMap(entry -> entry.getValue().getIndexDescriptors().stream().map(descriptor -> new Tuple<>(entry.getKey(), descriptor))) .sorted(Comparator.comparing(d -> d.v1() + ":" + d.v2().getIndexPattern())) // Consistent ordering -> consistent error message .collect(Collectors.toUnmodifiableList()); + List> sourceDataStreamDescriptorPair = sourceToFeature.entrySet().stream() + .filter(entry -> entry.getValue().getDataStreamDescriptors().isEmpty() == false) + .flatMap(entry -> + entry.getValue().getDataStreamDescriptors().stream().map(descriptor -> new Tuple<>(entry.getKey(), descriptor))) + .sorted( + Comparator.comparing(d -> d.v1() + ":" + d.v2().getDataStreamName())) // Consistent ordering -> consistent error message + .collect(Collectors.toUnmodifiableList()); // This is O(n^2) with the number of system index descriptors, and each check is quadratic with the number of states in the // automaton, but the absolute number of system index descriptors should be quite small (~10s at most), and the number of states // per pattern should be low as well. If these assumptions change, this might need to be reworked. sourceDescriptorPair.forEach(descriptorToCheck -> { List> descriptorsMatchingThisPattern = sourceDescriptorPair.stream() - .filter(d -> descriptorToCheck.v2() != d.v2()) // Exclude the pattern currently being checked - .filter(d -> overlaps(descriptorToCheck.v2(), d.v2())) - .collect(Collectors.toUnmodifiableList()); + .filter(d -> overlaps(descriptorToCheck.v2(), d.v2()) || + (d.v2().getAliasName() != null && descriptorToCheck.v2().matchesIndexPattern(d.v2().getAliasName()))) + .collect(toUnmodifiableList()); if (descriptorsMatchingThisPattern.isEmpty() == false) { throw new IllegalStateException("a system index descriptor [" + descriptorToCheck.v2() + "] from [" + descriptorToCheck.v1() + "] overlaps with other system index descriptors: [" + @@ -223,12 +437,28 @@ static void checkForOverlappingPatterns(Map sourceToDescriptors .map(descriptor -> descriptor.v2() + " from [" + descriptor.v1() + "]") .collect(Collectors.joining(", "))); } + + List> dataStreamsMatching = sourceDataStreamDescriptorPair.stream() + .filter(dsTuple -> descriptorToCheck.v2().matchesIndexPattern(dsTuple.v2().getDataStreamName()) || + overlaps(descriptorToCheck.v2().getIndexPattern(), dsTuple.v2().getBackingIndexPattern())) + .collect(toUnmodifiableList()); + if (dataStreamsMatching.isEmpty() == false) { + throw new IllegalStateException("a system index descriptor [" + descriptorToCheck.v2() + "] from [" + + descriptorToCheck.v1() + "] overlaps with one or more data stream descriptors: [" + + dataStreamsMatching.stream() + .map(descriptor -> descriptor.v2() + " from [" + descriptor.v1() + "]") + .collect(Collectors.joining(", "))); + } }); } private static boolean overlaps(SystemIndexDescriptor a1, SystemIndexDescriptor a2) { - Automaton a1Automaton = SystemIndexDescriptor.buildAutomaton(a1.getIndexPattern(), null); - Automaton a2Automaton = SystemIndexDescriptor.buildAutomaton(a2.getIndexPattern(), null); + return overlaps(a1.getIndexPattern(), a2.getIndexPattern()); + } + + private static boolean overlaps(String pattern1, String pattern2) { + Automaton a1Automaton = SystemIndexDescriptor.buildAutomaton(pattern1, null); + Automaton a2Automaton = SystemIndexDescriptor.buildAutomaton(pattern2, null); return Operations.isEmpty(Operations.intersection(a1Automaton, a2Automaton)) == false; } @@ -269,23 +499,27 @@ public static void validateFeatureName(String name, String plugin) { public static class Feature { private final String description; private final Collection indexDescriptors; + private final Collection dataStreamDescriptors; private final Collection associatedIndexPatterns; private final TriConsumer> cleanUpFunction; /** * Construct a Feature with a custom cleanup function * @param description Description of the feature - * @param indexDescriptors Patterns describing system indices for this feature + * @param indexDescriptors Collection of objects describing system indices for this feature + * @param dataStreamDescriptors Collection of objects describing system data streams for this feature * @param associatedIndexPatterns Patterns describing associated indices * @param cleanUpFunction A function that will clean up the feature's state */ public Feature( String description, Collection indexDescriptors, + Collection dataStreamDescriptors, Collection associatedIndexPatterns, TriConsumer> cleanUpFunction) { this.description = description; this.indexDescriptors = indexDescriptors; + this.dataStreamDescriptors = dataStreamDescriptors; this.associatedIndexPatterns = associatedIndexPatterns; this.cleanUpFunction = cleanUpFunction; } @@ -297,7 +531,21 @@ public Feature( * @param indexDescriptors Patterns describing system indices for this feature */ public Feature(String name, String description, Collection indexDescriptors) { - this(description, indexDescriptors, Collections.emptyList(), + this(description, indexDescriptors, Collections.emptyList(), Collections.emptyList(), + (clusterService, client, listener) -> + cleanUpFeature(indexDescriptors, Collections.emptyList(), name, clusterService, client, listener) + ); + } + /** + * Construct a Feature using the default clean-up function + * @param name Name of the feature, used in logging + * @param description Description of the feature + * @param indexDescriptors Patterns describing system indices for this feature + * @param dataStreamDescriptors Collection of objects describing system data streams for this feature + */ + public Feature(String name, String description, Collection indexDescriptors, + Collection dataStreamDescriptors) { + this(description, indexDescriptors, dataStreamDescriptors, Collections.emptyList(), (clusterService, client, listener) -> cleanUpFeature(indexDescriptors, Collections.emptyList(), name, clusterService, client, listener) ); @@ -311,6 +559,10 @@ public Collection getIndexDescriptors() { return indexDescriptors; } + public Collection getDataStreamDescriptors() { + return dataStreamDescriptors; + } + public Collection getAssociatedIndexPatterns() { return associatedIndexPatterns; } @@ -363,4 +615,12 @@ public void onFailure(Exception e) { }); } } + + public static Feature pluginToFeature(SystemIndexPlugin plugin, Settings settings) { + return new Feature(plugin.getFeatureDescription(), + plugin.getSystemIndexDescriptors(settings), + plugin.getSystemDataStreamDescriptors(), + plugin.getAssociatedIndexPatterns(), + plugin::cleanUpFeature); + } } diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index c70ce55c42623..5faf42d6eb640 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -395,19 +395,34 @@ protected Node(final Environment initialEnvironment, final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client); final UsageService usageService = new UsageService(); + SearchModule searchModule = new SearchModule(settings, pluginsService.filterPlugins(SearchPlugin.class)); + List namedWriteables = Stream.of( + NetworkModule.getNamedWriteables().stream(), + IndicesModule.getNamedWriteables().stream(), + searchModule.getNamedWriteables().stream(), + pluginsService.filterPlugins(Plugin.class).stream() + .flatMap(p -> p.getNamedWriteables().stream()), + ClusterModule.getNamedWriteables().stream()) + .flatMap(Function.identity()).collect(Collectors.toList()); + final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(namedWriteables); + NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(Stream.of( + NetworkModule.getNamedXContents().stream(), + IndicesModule.getNamedXContents().stream(), + searchModule.getNamedXContents().stream(), + pluginsService.filterPlugins(Plugin.class).stream() + .flatMap(p -> p.getNamedXContent().stream()), + ClusterModule.getNamedXWriteables().stream()) + .flatMap(Function.identity()).collect(toList()), + getCompatibleNamedXContents() + ); final Map featuresMap = pluginsService .filterPlugins(SystemIndexPlugin.class) .stream() .peek(plugin -> SystemIndices.validateFeatureName(plugin.getFeatureName(), plugin.getClass().getCanonicalName())) .collect(Collectors.toUnmodifiableMap( - plugin -> plugin.getFeatureName(), - plugin -> new SystemIndices.Feature( - plugin.getFeatureDescription(), - plugin.getSystemIndexDescriptors(settings), - plugin.getAssociatedIndexPatterns(), - plugin::cleanUpFeature - )) - ); + SystemIndexPlugin::getFeatureName, + plugin -> SystemIndices.pluginToFeature(plugin, settings) + )); final SystemIndices systemIndices = new SystemIndices(featuresMap); ModulesBuilder modules = new ModulesBuilder(); @@ -423,7 +438,6 @@ protected Node(final Environment initialEnvironment, IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class)); modules.add(indicesModule); - SearchModule searchModule = new SearchModule(settings, pluginsService.filterPlugins(SearchPlugin.class)); List pluginCircuitBreakers = pluginsService.filterPlugins(CircuitBreakerPlugin.class) .stream() .map(plugin -> plugin.getCircuitBreaker(settings)) @@ -443,25 +457,6 @@ protected Node(final Environment initialEnvironment, PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings); BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService); modules.add(settingsModule); - List namedWriteables = Stream.of( - NetworkModule.getNamedWriteables().stream(), - IndicesModule.getNamedWriteables().stream(), - searchModule.getNamedWriteables().stream(), - pluginsService.filterPlugins(Plugin.class).stream() - .flatMap(p -> p.getNamedWriteables().stream()), - ClusterModule.getNamedWriteables().stream()) - .flatMap(Function.identity()).collect(Collectors.toList()); - final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(namedWriteables); - NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(Stream.of( - NetworkModule.getNamedXContents().stream(), - IndicesModule.getNamedXContents().stream(), - searchModule.getNamedXContents().stream(), - pluginsService.filterPlugins(Plugin.class).stream() - .flatMap(p -> p.getNamedXContent().stream()), - ClusterModule.getNamedXWriteables().stream()) - .flatMap(Function.identity()).collect(toList()), - getCompatibleNamedXContents() - ); final MetaStateService metaStateService = new MetaStateService(nodeEnvironment, xContentRegistry); final PersistedClusterStateService lucenePersistedStateFactory = new PersistedClusterStateService(nodeEnvironment, xContentRegistry, bigArrays, clusterService.getClusterSettings(), diff --git a/server/src/main/java/org/elasticsearch/plugins/SystemIndexPlugin.java b/server/src/main/java/org/elasticsearch/plugins/SystemIndexPlugin.java index 9724b7eeb182c..26b1fae45843e 100644 --- a/server/src/main/java/org/elasticsearch/plugins/SystemIndexPlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/SystemIndexPlugin.java @@ -13,6 +13,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.indices.SystemDataStreamDescriptor; import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.indices.SystemIndices; @@ -35,6 +36,10 @@ default Collection getSystemIndexDescriptors(Settings set return Collections.emptyList(); } + default Collection getSystemDataStreamDescriptors() { + return Collections.emptyList(); + } + /** * @return The name of the feature, as used for specifying feature states in snapshot creation and restoration. */ diff --git a/server/src/main/java/org/elasticsearch/rest/RestController.java b/server/src/main/java/org/elasticsearch/rest/RestController.java index ae788e9101c2a..2cd94fcfad893 100644 --- a/server/src/main/java/org/elasticsearch/rest/RestController.java +++ b/server/src/main/java/org/elasticsearch/rest/RestController.java @@ -44,8 +44,8 @@ import java.util.function.UnaryOperator; import java.util.stream.Collectors; -import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.EXTERNAL_SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY; -import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY; +import static org.elasticsearch.indices.SystemIndices.EXTERNAL_SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY; +import static org.elasticsearch.indices.SystemIndices.SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY; import static org.elasticsearch.rest.BytesRestResponse.TEXT_CONTENT_TYPE; import static org.elasticsearch.rest.RestStatus.BAD_REQUEST; import static org.elasticsearch.rest.RestStatus.INTERNAL_SERVER_ERROR; diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 39a4e440c06c3..f2089fca3d572 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -749,7 +749,7 @@ public void onFailure(Exception e) { } private boolean isSystemIndex(IndexMetadata indexMetadata) { - return indexMetadata.isSystem() || systemIndices.isSystemIndex(indexMetadata.getIndex()); + return indexMetadata.isSystem() || systemIndices.isSystemName(indexMetadata.getIndex().getName()); } private Map getDataStreamsToRestore(Repository repository, SnapshotId snapshotId, SnapshotInfo snapshotInfo, diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/alias/get/TransportGetAliasesActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/alias/get/TransportGetAliasesActionTests.java index 7630859173d65..f8565133bb571 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/alias/get/TransportGetAliasesActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/alias/get/TransportGetAliasesActionTests.java @@ -11,12 +11,14 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.AliasMetadata; import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.SystemIndexAccessLevel; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.indices.EmptySystemIndices; import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.indices.SystemIndices; +import org.elasticsearch.indices.SystemIndices.SystemIndexAccessLevel; import org.elasticsearch.test.ESTestCase; import java.util.Collections; @@ -130,7 +132,7 @@ public void testDeprecationWarningNotEmittedWhenSystemAccessAllowed() { assertEquals(state.metadata().findAliases(request, concreteIndices), aliases); ImmutableOpenMap> result = TransportGetAliasesAction.postProcess(request, concreteIndices, aliases, state, - SystemIndexAccessLevel.ALL, "", EmptySystemIndices.INSTANCE); + SystemIndexAccessLevel.ALL, new ThreadContext(Settings.EMPTY), EmptySystemIndices.INSTANCE); assertThat(result.size(), equalTo(1)); assertThat(result.get(".b").size(), equalTo(1)); } @@ -150,7 +152,7 @@ public void testDeprecationWarningNotEmittedWhenOnlyNonsystemIndexRequested() { assertEquals(state.metadata().findAliases(request, concreteIndices), aliases); ImmutableOpenMap> result = TransportGetAliasesAction.postProcess(request, concreteIndices, aliases, state, - SystemIndexAccessLevel.NONE, "", EmptySystemIndices.INSTANCE); + SystemIndexAccessLevel.NONE, new ThreadContext(Settings.EMPTY), EmptySystemIndices.INSTANCE); assertThat(result.size(), equalTo(1)); assertThat(result.get("c").size(), equalTo(1)); } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java index 51498ac29321b..e663762669e21 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java @@ -335,7 +335,7 @@ public void testCreateIndexRequestForDataStream() { .build(); rolloverRequest.getCreateIndexRequest().settings(settings); final CreateIndexClusterStateUpdateRequest createIndexRequest = MetadataRolloverService.prepareDataStreamCreateIndexRequest( - dataStream.getName(), newWriteIndexName, rolloverRequest.getCreateIndexRequest()); + dataStream.getName(), newWriteIndexName, rolloverRequest.getCreateIndexRequest(), null); for (String settingKey : settings.keySet()) { assertThat(settings.get(settingKey), equalTo(createIndexRequest.settings().get(settingKey))); } @@ -503,7 +503,7 @@ public void testRolloverClusterState() throws Exception { MetadataIndexAliasesService indexAliasesService = new MetadataIndexAliasesService(clusterService, indicesService, new AliasValidator(), null, xContentRegistry()); MetadataRolloverService rolloverService = new MetadataRolloverService(testThreadPool, createIndexService, indexAliasesService, - mockIndexNameExpressionResolver); + mockIndexNameExpressionResolver, EmptySystemIndices.INSTANCE); MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong()); List> metConditions = Collections.singletonList(condition); @@ -607,7 +607,7 @@ protected String contentType() { MetadataIndexAliasesService indexAliasesService = new MetadataIndexAliasesService(clusterService, indicesService, new AliasValidator(), null, xContentRegistry()); MetadataRolloverService rolloverService = new MetadataRolloverService(testThreadPool, createIndexService, indexAliasesService, - mockIndexNameExpressionResolver); + mockIndexNameExpressionResolver, EmptySystemIndices.INSTANCE); MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong()); List> metConditions = Collections.singletonList(condition); @@ -709,7 +709,7 @@ clusterService, indicesService, allocationService, new AliasValidator(), shardLi MetadataIndexAliasesService indexAliasesService = new MetadataIndexAliasesService(clusterService, indicesService, new AliasValidator(), null, xContentRegistry()); MetadataRolloverService rolloverService = new MetadataRolloverService(testThreadPool, createIndexService, indexAliasesService, - mockIndexNameExpressionResolver); + mockIndexNameExpressionResolver, EmptySystemIndices.INSTANCE); MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong()); List> metConditions = Collections.singletonList(condition); @@ -764,7 +764,7 @@ public void testValidation() throws Exception { IndexNameExpressionResolver mockIndexNameExpressionResolver = mock(IndexNameExpressionResolver.class); when(mockIndexNameExpressionResolver.resolveDateMathExpression(any())).then(returnsFirstArg()); MetadataRolloverService rolloverService = new MetadataRolloverService(null, createIndexService, metadataIndexAliasesService, - mockIndexNameExpressionResolver); + mockIndexNameExpressionResolver, EmptySystemIndices.INSTANCE); String newIndexName = useDataStream == false && randomBoolean() ? "logs-index-9" : null; @@ -814,7 +814,7 @@ public void testRolloverClusterStateForDataStreamNoTemplate() throws Exception { MetadataIndexAliasesService indexAliasesService = new MetadataIndexAliasesService(clusterService, indicesService, new AliasValidator(), null, xContentRegistry()); MetadataRolloverService rolloverService = new MetadataRolloverService(testThreadPool, createIndexService, indexAliasesService, - mockIndexNameExpressionResolver); + mockIndexNameExpressionResolver, EmptySystemIndices.INSTANCE); MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong()); List> metConditions = Collections.singletonList(condition); diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java index 69a62059ed385..7a0d688d94039 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java @@ -52,6 +52,7 @@ import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.index.store.StoreStats; import org.elasticsearch.index.warmer.WarmerStats; +import org.elasticsearch.indices.EmptySystemIndices; import org.elasticsearch.search.suggest.completion.CompletionStats; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; @@ -248,7 +249,7 @@ public void testConditionEvaluationWhenAliasToWriteAndReadIndicesConsidersOnlyPr when(mockCreateIndexService.applyCreateIndexRequest(any(), any(), anyBoolean())).thenReturn(stateBefore); when(mdIndexAliasesService.applyAliasActions(any(), any())).thenReturn(stateBefore); MetadataRolloverService rolloverService = new MetadataRolloverService(mockThreadPool, mockCreateIndexService, - mdIndexAliasesService, mockIndexNameExpressionResolver); + mdIndexAliasesService, mockIndexNameExpressionResolver, EmptySystemIndices.INSTANCE); final TransportRolloverAction transportRolloverAction = new TransportRolloverAction(mockTransportService, mockClusterService, mockThreadPool, mockActionFilters, mockIndexNameExpressionResolver, rolloverService, mockClient); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DateMathExpressionResolverTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DateMathExpressionResolverTests.java index 09045500a877d..b594eb31fe844 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DateMathExpressionResolverTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DateMathExpressionResolverTests.java @@ -14,7 +14,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.Context; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.DateMathExpressionResolver; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.SystemIndexAccessLevel; import org.elasticsearch.test.ESTestCase; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -25,7 +24,6 @@ import java.util.Collections; import java.util.List; -import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.SystemIndexAccessLevel.NONE; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.joda.time.DateTimeZone.UTC; @@ -35,7 +33,7 @@ public class DateMathExpressionResolverTests extends ESTestCase { private final DateMathExpressionResolver expressionResolver = new DateMathExpressionResolver(); private final Context context = new Context( ClusterState.builder(new ClusterName("_name")).build(), IndicesOptions.strictExpand(), - SystemIndexAccessLevel.NONE + name -> false ); public void testNormal() throws Exception { @@ -138,7 +136,7 @@ public void testExpression_CustomTimeZoneInIndexName() throws Exception { // rounding to today 00:00 now = DateTime.now(UTC).withHourOfDay(0).withMinuteOfHour(0).withSecondOfMinute(0); } - Context context = new Context(this.context.getState(), this.context.getOptions(), now.getMillis(), NONE); + Context context = new Context(this.context.getState(), this.context.getOptions(), now.getMillis(), name -> false); List results = expressionResolver.resolve(context, Arrays.asList("<.marvel-{now/d{yyyy.MM.dd|" + timeZone.getID() + "}}>")); assertThat(results.size(), equalTo(1)); logger.info("timezone: [{}], now [{}], name: [{}]", timeZone, now, results.get(0)); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java index c7f62c4d44fbb..e536affef78bb 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java @@ -36,7 +36,6 @@ import org.elasticsearch.indices.TestIndexNameExpressionResolver; import org.elasticsearch.test.ESTestCase; - import java.time.Instant; import java.time.LocalDate; import java.time.ZoneOffset; @@ -49,15 +48,15 @@ import java.util.Map; import java.util.Set; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createBackingIndex; import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createTimestampField; import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_HIDDEN_SETTING; -import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.EXTERNAL_SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY; -import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY; -import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.SystemIndexAccessLevel.NONE; import static org.elasticsearch.common.util.set.Sets.newHashSet; +import static org.elasticsearch.indices.SystemIndices.EXTERNAL_SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY; +import static org.elasticsearch.indices.SystemIndices.SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY; import static org.hamcrest.Matchers.arrayContaining; import static org.hamcrest.Matchers.arrayContainingInAnyOrder; import static org.hamcrest.Matchers.arrayWithSize; @@ -72,6 +71,9 @@ import static org.hamcrest.Matchers.notNullValue; public class IndexNameExpressionResolverTests extends ESTestCase { + + private static final Predicate NONE = name -> false; + private IndexNameExpressionResolver indexNameExpressionResolver; private ThreadContext threadContext; private long epochMillis; diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java index b32ff87fcc221..bbb90028b01d6 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java @@ -12,9 +12,14 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate.DataStreamTemplate; import org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService.CreateDataStreamClusterStateUpdateRequest; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.indices.SystemDataStreamDescriptor; +import org.elasticsearch.indices.SystemDataStreamDescriptor.Type; +import org.elasticsearch.indices.SystemIndices; +import org.elasticsearch.indices.SystemIndices.Feature; import org.elasticsearch.test.ESTestCase; import java.util.List; @@ -25,7 +30,9 @@ import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.generateMapping; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Mockito.mock; @@ -45,12 +52,38 @@ public void testCreateDataStream() throws Exception { .build(); CreateDataStreamClusterStateUpdateRequest req = new CreateDataStreamClusterStateUpdateRequest(dataStreamName, TimeValue.ZERO, TimeValue.ZERO); - ClusterState newState = MetadataCreateDataStreamService.createDataStream(metadataCreateIndexService, cs, req); + ClusterState newState = + MetadataCreateDataStreamService.createDataStream(metadataCreateIndexService, cs, req); assertThat(newState.metadata().dataStreams().size(), equalTo(1)); assertThat(newState.metadata().dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName)); + assertThat(newState.metadata().dataStreams().get(dataStreamName).isSystem(), is(false)); + assertThat(newState.metadata().dataStreams().get(dataStreamName).isHidden(), is(false)); + assertThat(newState.metadata().dataStreams().get(dataStreamName).isReplicated(), is(false)); assertThat(newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)), notNullValue()); assertThat(newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)).getSettings().get("index.hidden"), equalTo("true")); + assertThat(newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)).isSystem(), is(false)); + } + + public void testCreateSystemDataStream() throws Exception { + final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService(); + final String dataStreamName = ".system-data-stream"; + ClusterState cs = ClusterState.builder(new ClusterName("_name")) + .metadata(Metadata.builder().build()) + .build(); + CreateDataStreamClusterStateUpdateRequest req = new CreateDataStreamClusterStateUpdateRequest( + dataStreamName, systemDataStreamDescriptor(), TimeValue.ZERO, TimeValue.ZERO); + ClusterState newState = + MetadataCreateDataStreamService.createDataStream(metadataCreateIndexService, cs, req); + assertThat(newState.metadata().dataStreams().size(), equalTo(1)); + assertThat(newState.metadata().dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName)); + assertThat(newState.metadata().dataStreams().get(dataStreamName).isSystem(), is(true)); + assertThat(newState.metadata().dataStreams().get(dataStreamName).isHidden(), is(false)); + assertThat(newState.metadata().dataStreams().get(dataStreamName).isReplicated(), is(false)); + assertThat(newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)), notNullValue()); + assertThat(newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)).getSettings().get("index.hidden"), + nullValue()); + assertThat(newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)).isSystem(), is(true)); } public void testCreateDuplicateDataStream() throws Exception { @@ -146,6 +179,7 @@ public static ClusterState createDataStream(final String dataStreamName) throws private static MetadataCreateIndexService getMetadataCreateIndexService() throws Exception { MetadataCreateIndexService s = mock(MetadataCreateIndexService.class); + when(s.getSystemIndices()).thenReturn(getSystemIndices()); when(s.applyCreateIndexRequest(any(ClusterState.class), any(CreateIndexClusterStateUpdateRequest.class), anyBoolean())) .thenAnswer(mockInvocation -> { ClusterState currentState = (ClusterState) mockInvocation.getArguments()[0]; @@ -158,6 +192,7 @@ private static MetadataCreateIndexService getMetadataCreateIndexService() throws .put(request.settings()) .build()) .putMapping(generateMapping("@timestamp")) + .system(getSystemIndices().isSystemName(request.index())) .numberOfShards(1) .numberOfReplicas(1) .build(), false); @@ -166,4 +201,26 @@ private static MetadataCreateIndexService getMetadataCreateIndexService() throws return s; } + + private static SystemIndices getSystemIndices() { + Map map = Map.of("system", new Feature( + "systemFeature", + "system feature description", + List.of(), + List.of(systemDataStreamDescriptor()) + )); + + return new SystemIndices(map); + } + + private static SystemDataStreamDescriptor systemDataStreamDescriptor() { + return new SystemDataStreamDescriptor( + ".system-data-stream", + "test system datastream", + Type.EXTERNAL, + new ComposableIndexTemplate(List.of(".system-data-stream"), null, null, null, null, null, new DataStreamTemplate()), + Map.of(), + List.of("stack") + ); + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamServiceTests.java index 772b42d094799..fd76706fd591d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamServiceTests.java @@ -14,8 +14,10 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperServiceTestCase; +import org.elasticsearch.indices.EmptySystemIndices; import org.elasticsearch.plugins.Plugin; import java.io.IOException; @@ -29,6 +31,8 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class MetadataMigrateToDataStreamServiceTests extends MapperServiceTestCase { @@ -207,7 +211,10 @@ public void testCreateDataStreamWithSuppliedWriteIndex() throws Exception { ClusterState newState = MetadataMigrateToDataStreamService.migrateToDataStream(cs, this::getMapperService, new MetadataMigrateToDataStreamService.MigrateToDataStreamClusterStateUpdateRequest(dataStreamName, TimeValue.ZERO, - TimeValue.ZERO)); + TimeValue.ZERO), + new ThreadContext(Settings.EMPTY), + getMetadataCreateIndexService() + ); IndexAbstraction ds = newState.metadata().getIndicesLookup().get(dataStreamName); assertThat(ds, notNullValue()); assertThat(ds.getType(), equalTo(IndexAbstraction.Type.DATA_STREAM)); @@ -249,7 +256,10 @@ public void testCreateDataStreamHidesBackingIndicesAndRemovesAlias() throws Exce ClusterState newState = MetadataMigrateToDataStreamService.migrateToDataStream(cs, this::getMapperService, new MetadataMigrateToDataStreamService.MigrateToDataStreamClusterStateUpdateRequest(dataStreamName, TimeValue.ZERO, - TimeValue.ZERO)); + TimeValue.ZERO), + new ThreadContext(Settings.EMPTY), + getMetadataCreateIndexService() + ); IndexAbstraction ds = newState.metadata().getIndicesLookup().get(dataStreamName); assertThat(ds, notNullValue()); assertThat(ds.getType(), equalTo(IndexAbstraction.Type.DATA_STREAM)); @@ -298,7 +308,9 @@ public void testCreateDataStreamWithoutSuppliedWriteIndex() throws Exception { this::getMapperService, new MetadataMigrateToDataStreamService.MigrateToDataStreamClusterStateUpdateRequest(dataStreamName, TimeValue.ZERO, - TimeValue.ZERO))); + TimeValue.ZERO), + new ThreadContext(Settings.EMPTY), + getMetadataCreateIndexService())); assertThat(e.getMessage(), containsString("alias [" + dataStreamName + "] must specify a write index")); } @@ -310,6 +322,12 @@ private MapperService getMapperService(IndexMetadata im) { } } + private MetadataCreateIndexService getMetadataCreateIndexService() { + MetadataCreateIndexService service = mock(MetadataCreateIndexService.class); + when(service.getSystemIndices()).thenReturn(EmptySystemIndices.INSTANCE); + return service; + } + @Override protected Collection getPlugins() { return List.of(new MetadataIndexTemplateServiceTests.DummyPlugin()); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/WildcardExpressionResolverTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/WildcardExpressionResolverTests.java index d3825b99c7929..5d1bfcdb12640 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/WildcardExpressionResolverTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/WildcardExpressionResolverTests.java @@ -20,15 +20,18 @@ import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.function.Predicate; import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createBackingIndex; import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createTimestampField; -import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.SystemIndexAccessLevel.NONE; import static org.elasticsearch.common.util.set.Sets.newHashSet; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; public class WildcardExpressionResolverTests extends ESTestCase { + + private static final Predicate NONE = name -> false; + public void testConvertWildcardsJustIndicesTests() { Metadata.Builder mdBuilder = Metadata.builder() .put(indexBuilder("testXXX")) diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java index a5a00fe15bd8b..2a67f0e739df6 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java @@ -11,9 +11,6 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.DataStream; -import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; @@ -137,7 +134,7 @@ public static DataStream randomInstance(LongSupplier timeProvider) { metadata = Map.of("key", "value"); } return new DataStream(dataStreamName, createTimestampField("@timestamp"), indices, generation, metadata, - randomBoolean(), randomBoolean(), timeProvider); + randomBoolean(), randomBoolean(), false, timeProvider); } /** diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/GetDataStreamAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/GetDataStreamAction.java index 12e69f1425ecd..c25db6213d4f6 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/GetDataStreamAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/GetDataStreamAction.java @@ -119,6 +119,7 @@ public static class DataStreamInfo extends AbstractDiffable impl public static final ParseField INDEX_TEMPLATE_FIELD = new ParseField("template"); public static final ParseField ILM_POLICY_FIELD = new ParseField("ilm_policy"); public static final ParseField HIDDEN_FIELD = new ParseField("hidden"); + public static final ParseField SYSTEM_FIELD = new ParseField("system"); DataStream dataStream; ClusterHealthStatus dataStreamStatus; @@ -181,6 +182,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(ILM_POLICY_FIELD.getPreferredName(), ilmPolicyName); } builder.field(HIDDEN_FIELD.getPreferredName(), dataStream.isHidden()); + builder.field(SYSTEM_FIELD.getPreferredName(), dataStream.isSystem()); builder.endObject(); return builder; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/GenerateSnapshotNameStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/GenerateSnapshotNameStep.java index 1ab0fa5577560..95934d780ff62 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/GenerateSnapshotNameStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/GenerateSnapshotNameStep.java @@ -13,7 +13,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.SystemIndexAccessLevel; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; @@ -140,7 +139,7 @@ public ResolverContext() { } public ResolverContext(long startTime) { - super(null, null, startTime, false, false, false, false, SystemIndexAccessLevel.NONE); + super(null, null, startTime, false, false, false, false, name -> false); } @Override diff --git a/x-pack/plugin/core/src/main/resources/fleet-actions-results-ilm-policy.json b/x-pack/plugin/core/src/main/resources/fleet-actions-results-ilm-policy.json new file mode 100644 index 0000000000000..c0af3369413d5 --- /dev/null +++ b/x-pack/plugin/core/src/main/resources/fleet-actions-results-ilm-policy.json @@ -0,0 +1,21 @@ +{ + "phases": { + "hot": { + "min_age": "0ms", + "actions": { + "rollover": { + "max_size": "300gb", + "max_age": "30d" + } + } + }, + "delete": { + "min_age": "90d", + "actions": { + "delete": { + "delete_searchable_snapshot": true + } + } + } + } +} diff --git a/x-pack/plugin/core/src/main/resources/fleet-actions-results.json b/x-pack/plugin/core/src/main/resources/fleet-actions-results.json new file mode 100644 index 0000000000000..e4a4acce782b3 --- /dev/null +++ b/x-pack/plugin/core/src/main/resources/fleet-actions-results.json @@ -0,0 +1,54 @@ +{ + "index_patterns": [ + ".fleet-actions-results" + ], + "data_stream": {}, + "template": { + "settings": { + "index.lifecycle.name": ".fleet-actions-results-ilm-policy" + }, + "mappings": { + "_meta": { + "version": "${fleet.version}" + }, + "dynamic": false, + "properties": { + "action_id": { + "type": "keyword" + }, + "agent_id": { + "type": "keyword" + }, + "action_data": { + "enabled": false, + "type": "object" + }, + "data": { + "enabled": false, + "type": "object" + }, + "error": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + "@timestamp": { + "type": "date" + }, + "started_at": { + "type": "date" + }, + "completed_at": { + "type": "date" + } + } + } + }, + "composed_of": [], + "priority": 200, + "version": 1 +} diff --git a/x-pack/plugin/data-streams/build.gradle b/x-pack/plugin/data-streams/build.gradle index d8f7c499baa06..30f8ffc03c5a5 100644 --- a/x-pack/plugin/data-streams/build.gradle +++ b/x-pack/plugin/data-streams/build.gradle @@ -11,6 +11,16 @@ archivesBaseName = 'x-pack-data-streams' dependencies { compileOnly project(path: xpackModule('core')) testImplementation(testArtifact(project(xpackModule('core')))) + testImplementation project(path: ':modules:transport-netty4') // for http in SystemDataStreamIT + testImplementation project(path: ':plugins:transport-nio') // for http in SystemDataStreamIT } -addQaCheckDependencies() \ No newline at end of file +addQaCheckDependencies() + +tasks.named("internalClusterTest").configure { + /* + * We have to disable setting the number of available processors as tests in the same JVM randomize processors and will step on each + * other if we allow them to set the number of available processors as it's set-once in Netty. + */ + systemProperty 'es.set.netty.runtime.available.processors', 'false' +} diff --git a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamIT.java b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamIT.java new file mode 100644 index 0000000000000..3de6a623e0faa --- /dev/null +++ b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamIT.java @@ -0,0 +1,384 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.datastreams; + +import org.apache.http.util.EntityUtils; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.snapshots.features.ResetFeatureStateResponse.ResetFeatureStateStatus; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.IndicesOptions.Option; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate.DataStreamTemplate; +import org.elasticsearch.cluster.metadata.Template; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.compress.CompressedXContent; +import org.elasticsearch.common.network.NetworkModule; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.indices.SystemDataStreamDescriptor; +import org.elasticsearch.indices.SystemDataStreamDescriptor.Type; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.SystemIndexPlugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.transport.Netty4Plugin; +import org.elasticsearch.transport.nio.NioTransportPlugin; +import org.elasticsearch.xpack.core.action.DeleteDataStreamAction; +import org.elasticsearch.xpack.datastreams.DataStreamsPlugin; +import org.junit.After; +import org.junit.BeforeClass; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class SystemDataStreamIT extends ESIntegTestCase { + + private static String nodeHttpTypeKey; + + @BeforeClass + public static void setUpTransport() { + if (randomBoolean()) { + nodeHttpTypeKey = NioTransportPlugin.NIO_HTTP_TRANSPORT_NAME; + } else { + nodeHttpTypeKey = Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME; + } + } + + @Override + protected Collection> nodePlugins() { + List> plugins = new ArrayList<>(super.nodePlugins()); + plugins.add(NioTransportPlugin.class); + plugins.add(DataStreamsPlugin.class); + plugins.add(TestSystemDataStreamPlugin.class); + plugins.add(Netty4Plugin.class); + return plugins; + } + + @Override + protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal, otherSettings)) + .put(NetworkModule.HTTP_TYPE_KEY, nodeHttpTypeKey) + .build(); + } + + @Override + protected boolean addMockHttpTransport() { + return false; + } + + @SuppressWarnings("unchecked") + public void testSystemDataStreamCRUD() throws Exception { + try (RestClient restClient = createRestClient()) { + Request putRequest = new Request("PUT", "/_data_stream/.test-data-stream"); + + // no product header + ResponseException re = expectThrows(ResponseException.class, () -> restClient.performRequest(putRequest)); + assertThat(re.getMessage(), containsString("reserved for system")); + + // wrong header + putRequest.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("X-elastic-product-origin", "wrong").build()); + re = expectThrows(ResponseException.class, () -> restClient.performRequest(putRequest)); + assertThat(re.getMessage(), containsString("accessed by product")); + + // correct + putRequest.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("X-elastic-product-origin", "product").build()); + Response putResponse = restClient.performRequest(putRequest); + assertThat(putResponse.getStatusLine().getStatusCode(), is(200)); + + // list - no header needed + Request listAllRequest = new Request("GET", "/_data_stream"); + Response listAllResponse = restClient.performRequest(listAllRequest); + assertThat(listAllResponse.getStatusLine().getStatusCode(), is(200)); + Map responseMap = XContentHelper.convertToMap( + XContentType.JSON.xContent(), + EntityUtils.toString(listAllResponse.getEntity()), + false + ); + List dataStreams = (List) responseMap.get("data_streams"); + assertThat(dataStreams.size(), is(1)); + + Request listRequest = new Request("GET", "/_data_stream/.test-data-stream"); + Response listResponse = restClient.performRequest(listRequest); + assertThat(listResponse.getStatusLine().getStatusCode(), is(200)); + responseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), EntityUtils.toString(listResponse.getEntity()), false); + dataStreams = (List) responseMap.get("data_streams"); + assertThat(dataStreams.size(), is(1)); + + // delete + Request deleteRequest = new Request("DELETE", "/_data_stream/.test-data-stream"); + + // no header + re = expectThrows(ResponseException.class, () -> restClient.performRequest(deleteRequest)); + assertThat(re.getMessage(), containsString("reserved for system")); + + // incorrect header + deleteRequest.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("X-elastic-product-origin", "wrong").build()); + re = expectThrows(ResponseException.class, () -> restClient.performRequest(deleteRequest)); + assertThat(re.getMessage(), containsString("accessed by product")); + + // correct + deleteRequest.setOptions(putRequest.getOptions()); + Response deleteResponse = restClient.performRequest(deleteRequest); + assertThat(deleteResponse.getStatusLine().getStatusCode(), is(200)); + } + } + + public void testDataStreamStats() throws Exception { + try (RestClient restClient = createRestClient()) { + Request putRequest = new Request("PUT", "/_data_stream/.test-data-stream"); + putRequest.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("X-elastic-product-origin", "product").build()); + Response putResponse = restClient.performRequest(putRequest); + assertThat(putResponse.getStatusLine().getStatusCode(), is(200)); + + Request statsRequest = new Request("GET", "/_data_stream/_stats"); + Response response = restClient.performRequest(statsRequest); + assertThat(response.getStatusLine().getStatusCode(), is(200)); + + Map map = XContentHelper.convertToMap( + XContentType.JSON.xContent(), + EntityUtils.toString(response.getEntity()), + false + ); + assertThat(map.get("data_stream_count"), equalTo(1)); + } + } + + @SuppressWarnings("unchecked") + public void testSystemDataStreamReadWrite() throws Exception { + try (RestClient restClient = createRestClient()) { + Request putRequest = new Request("PUT", "/_data_stream/.test-data-stream"); + putRequest.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("X-elastic-product-origin", "product").build()); + Response putResponse = restClient.performRequest(putRequest); + assertThat(putResponse.getStatusLine().getStatusCode(), is(200)); + + // write + Request index = new Request("POST", "/.test-data-stream/_doc"); + index.setJsonEntity("{ \"@timestamp\": \"2099-03-08T11:06:07.000Z\", \"name\": \"my-name\" }"); + index.addParameter("refresh", "true"); + + // no product specified + ResponseException re = expectThrows(ResponseException.class, () -> restClient.performRequest(index)); + assertThat(re.getMessage(), containsString("reserved for system")); + + // wrong header + index.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("X-elastic-product-origin", "wrong").build()); + re = expectThrows(ResponseException.class, () -> restClient.performRequest(index)); + assertThat(re.getMessage(), containsString("accessed by product")); + + // correct + index.setOptions(putRequest.getOptions()); + Response response = restClient.performRequest(index); + assertEquals(201, response.getStatusLine().getStatusCode()); + + Map responseMap = XContentHelper.convertToMap( + XContentType.JSON.xContent(), + EntityUtils.toString(response.getEntity()), + false + ); + String indexName = (String) responseMap.get("_index"); + String id = (String) responseMap.get("_id"); + + // get + Request get = new Request("GET", "/" + indexName + "/_doc/" + id); + + // no product specified + re = expectThrows(ResponseException.class, () -> restClient.performRequest(get)); + assertThat(re.getMessage(), containsString("reserved for system")); + + // wrong product + get.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("X-elastic-product-origin", "wrong").build()); + re = expectThrows(ResponseException.class, () -> restClient.performRequest(get)); + assertThat(re.getMessage(), containsString("accessed by product")); + + // correct + get.setOptions(putRequest.getOptions()); + Response getResponse = restClient.performRequest(get); + assertThat(getResponse.getStatusLine().getStatusCode(), is(200)); + + // search all + Request search = new Request("GET", "/_search"); + search.setJsonEntity("{ \"query\": { \"match_all\": {} } }"); + + // no header + Response searchResponse = restClient.performRequest(search); + assertThat(searchResponse.getStatusLine().getStatusCode(), is(200)); + responseMap = XContentHelper.convertToMap( + XContentType.JSON.xContent(), + EntityUtils.toString(searchResponse.getEntity()), + false + ); + Map hits = (Map) responseMap.get("hits"); + List hitsHits = (List) hits.get("hits"); + assertThat(hitsHits.size(), is(0)); + + // wrong header + search.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("X-elastic-product-origin", "wrong").build()); + searchResponse = restClient.performRequest(search); + assertThat(searchResponse.getStatusLine().getStatusCode(), is(200)); + responseMap = XContentHelper.convertToMap( + XContentType.JSON.xContent(), + EntityUtils.toString(searchResponse.getEntity()), + false + ); + hits = (Map) responseMap.get("hits"); + hitsHits = (List) hits.get("hits"); + assertThat(hitsHits.size(), is(0)); + + // correct + search.setOptions(putRequest.getOptions()); + searchResponse = restClient.performRequest(search); + assertThat(searchResponse.getStatusLine().getStatusCode(), is(200)); + responseMap = XContentHelper.convertToMap( + XContentType.JSON.xContent(), + EntityUtils.toString(searchResponse.getEntity()), + false + ); + hits = (Map) responseMap.get("hits"); + hitsHits = (List) hits.get("hits"); + assertThat(hitsHits.size(), is(1)); + + // search the datastream + Request searchIdx = new Request("GET", "/.test-data-stream/_search"); + searchIdx.setJsonEntity("{ \"query\": { \"match_all\": {} } }"); + + // no header + re = expectThrows(ResponseException.class, () -> restClient.performRequest(searchIdx)); + assertThat(re.getMessage(), containsString("reserved for system")); + + // incorrect header + searchIdx.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("X-elastic-product-origin", "wrong").build()); + re = expectThrows(ResponseException.class, () -> restClient.performRequest(searchIdx)); + assertThat(re.getMessage(), containsString("accessed by product")); + + // correct + searchIdx.setOptions(putRequest.getOptions()); + searchResponse = restClient.performRequest(searchIdx); + assertThat(searchResponse.getStatusLine().getStatusCode(), is(200)); + responseMap = XContentHelper.convertToMap( + XContentType.JSON.xContent(), + EntityUtils.toString(searchResponse.getEntity()), + false + ); + hits = (Map) responseMap.get("hits"); + hitsHits = (List) hits.get("hits"); + assertThat(hitsHits.size(), is(1)); + } + } + + @After + public void cleanup() { + try { + PlainActionFuture stateStatusPlainActionFuture = new PlainActionFuture<>(); + new TestSystemDataStreamPlugin().cleanUpFeature( + internalCluster().clusterService(), + internalCluster().client(), + stateStatusPlainActionFuture + ); + stateStatusPlainActionFuture.actionGet(); + } catch (ResourceNotFoundException e) { + // ignore + } + } + + public static final class TestSystemDataStreamPlugin extends Plugin implements SystemIndexPlugin { + + @Override + public Collection getSystemDataStreamDescriptors() { + try { + CompressedXContent mappings = new CompressedXContent("{\"properties\":{\"name\":{\"type\":\"keyword\"}}}"); + return List.of( + new SystemDataStreamDescriptor( + ".test-data-stream", + "system data stream test", + Type.EXTERNAL, + new ComposableIndexTemplate( + List.of(".test-data-stream"), + new Template(Settings.EMPTY, mappings, null), + null, + null, + null, + null, + new DataStreamTemplate() + ), + Map.of(), + List.of("product") + ) + ); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public String getFeatureName() { + return SystemDataStreamIT.class.getSimpleName(); + } + + @Override + public String getFeatureDescription() { + return "Integration testing of system data streams"; + } + + @Override + public void cleanUpFeature(ClusterService clusterService, Client client, ActionListener listener) { + Collection dataStreamDescriptors = getSystemDataStreamDescriptors(); + final DeleteDataStreamAction.Request request = new DeleteDataStreamAction.Request( + dataStreamDescriptors.stream() + .map(SystemDataStreamDescriptor::getDataStreamName) + .collect(Collectors.toList()) + .toArray(Strings.EMPTY_ARRAY) + ); + EnumSet