From 5ab309227258eee09d1c2732011a4fa6aef4bf16 Mon Sep 17 00:00:00 2001 From: Jay Modi Date: Tue, 20 Apr 2021 15:17:56 -0600 Subject: [PATCH] Add Fleet action results system data stream (#71966) This commit adds support for system data streams and also the first use of a system data stream with the fleet action results data stream. A system data stream is one that is used to store system data that users should not interact with directly. Elasticsearch will manage these data streams. REST API access is available for external system data streams so that other stack components can store system data within a system data stream. System data streams will not use the system index read and write threadpools. Backport of #71667 --- .../client/indices/DataStream.java | 22 +- .../change-mappings-and-settings.asciidoc | 3 +- .../indices/get-data-stream.asciidoc | 11 +- .../alias/get/TransportGetAliasesAction.java | 22 +- .../indices/create/AutoCreateAction.java | 36 +- .../CreateIndexClusterStateUpdateRequest.java | 12 + .../rollover/MetadataRolloverService.java | 34 +- .../action/support/AutoCreateIndex.java | 2 +- .../cluster/metadata/DataStream.java | 41 +- .../cluster/metadata/IndexAbstraction.java | 3 +- .../metadata/IndexNameExpressionResolver.java | 172 ++++---- .../MetadataCreateDataStreamService.java | 64 ++- .../metadata/MetadataCreateIndexService.java | 97 ++++- .../MetadataIndexTemplateService.java | 34 +- .../MetadataMigrateToDataStreamService.java | 23 +- .../SystemIndexMetadataUpgradeService.java | 8 +- .../indices/SystemDataStreamDescriptor.java | 94 +++++ .../indices/SystemIndexDescriptor.java | 7 +- .../elasticsearch/indices/SystemIndices.java | 322 +++++++++++++-- .../java/org/elasticsearch/node/Node.java | 53 ++- .../plugins/SystemIndexPlugin.java | 5 + .../elasticsearch/rest/RestController.java | 4 +- .../snapshots/RestoreService.java | 2 +- .../get/TransportGetAliasesActionTests.java | 8 +- .../MetadataRolloverServiceTests.java | 12 +- .../TransportRolloverActionTests.java | 3 +- .../DateMathExpressionResolverTests.java | 6 +- .../IndexNameExpressionResolverTests.java | 10 +- .../MetadataCreateDataStreamServiceTests.java | 61 ++- ...tadataMigrateToDataStreamServiceTests.java | 24 +- .../WildcardExpressionResolverTests.java | 5 +- .../metadata/DataStreamTestHelper.java | 5 +- .../core/action/GetDataStreamAction.java | 2 + .../core/ilm/GenerateSnapshotNameStep.java | 3 +- .../fleet-actions-results-ilm-policy.json | 21 + .../main/resources/fleet-actions-results.json | 54 +++ x-pack/plugin/data-streams/build.gradle | 12 +- .../datastreams/SystemDataStreamIT.java | 387 ++++++++++++++++++ .../CreateDataStreamTransportAction.java | 12 +- .../DeleteDataStreamTransportAction.java | 29 +- .../action/GetDataStreamsTransportAction.java | 39 +- .../MigrateToDataStreamTransportAction.java | 12 +- .../PromoteDataStreamTransportAction.java | 8 +- .../rest/RestDataStreamsStatsAction.java | 5 + .../rest/RestGetDataStreamsAction.java | 5 + .../DeleteDataStreamTransportActionTests.java | 17 +- .../xpack/fleet/FleetSystemIndicesIT.java | 28 +- .../org/elasticsearch/xpack/fleet/Fleet.java | 122 +++++- .../xpack/fleet/FleetTemplateRegistry.java | 68 +++ .../elasticsearch/xpack/fleet/FleetTests.java | 13 +- 50 files changed, 1752 insertions(+), 290 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/indices/SystemDataStreamDescriptor.java create mode 100644 x-pack/plugin/core/src/main/resources/fleet-actions-results-ilm-policy.json create mode 100644 x-pack/plugin/core/src/main/resources/fleet-actions-results.json create mode 100644 x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamIT.java create mode 100644 x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/FleetTemplateRegistry.java diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/indices/DataStream.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/indices/DataStream.java index 32f62aeeafa0b..db27600a5cc9c 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/indices/DataStream.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/indices/DataStream.java @@ -26,6 +26,7 @@ public final class DataStream { private final List indices; private final long generation; private final boolean hidden; + private final boolean system; ClusterHealthStatus dataStreamStatus; @Nullable String indexTemplate; @@ -36,7 +37,7 @@ public final class DataStream { public DataStream(String name, String timeStampField, List indices, long generation, ClusterHealthStatus dataStreamStatus, @Nullable String indexTemplate, @Nullable String ilmPolicyName, @Nullable Map metadata, - boolean hidden) { + boolean hidden, boolean system) { this.name = name; this.timeStampField = timeStampField; this.indices = indices; @@ -46,6 +47,7 @@ public DataStream(String name, String timeStampField, List indices, long this.ilmPolicyName = ilmPolicyName; this.metadata = metadata; this.hidden = hidden; + this.system = system; } public String getName() { @@ -84,6 +86,10 @@ public boolean isHidden() { return hidden; } + public boolean isSystem() { + return system; + } + public static final ParseField NAME_FIELD = new ParseField("name"); public static final ParseField TIMESTAMP_FIELD_FIELD = new ParseField("timestamp_field"); public static final ParseField INDICES_FIELD = new ParseField("indices"); @@ -93,6 +99,7 @@ public boolean isHidden() { public static final ParseField ILM_POLICY_FIELD = new ParseField("ilm_policy"); public static final ParseField METADATA_FIELD = new ParseField("_meta"); public static final ParseField HIDDEN_FIELD = new ParseField("hidden"); + public static final ParseField SYSTEM_FIELD = new ParseField("system"); @SuppressWarnings("unchecked") private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("data_stream", @@ -107,9 +114,10 @@ public boolean isHidden() { String indexTemplate = (String) args[5]; String ilmPolicy = (String) args[6]; Map metadata = (Map) args[7]; - Boolean hidden = (Boolean) args[8]; - hidden = hidden != null && hidden; - return new DataStream(dataStreamName, timeStampField, indices, generation, status, indexTemplate, ilmPolicy, metadata, hidden); + boolean hidden = args[8] != null && (boolean) args[8]; + boolean system = args[9] != null && (boolean) args[9]; + return new DataStream(dataStreamName, timeStampField, indices, generation, status, indexTemplate, ilmPolicy, metadata, hidden, + system); }); static { @@ -122,6 +130,7 @@ public boolean isHidden() { PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), ILM_POLICY_FIELD); PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.map(), METADATA_FIELD); PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), HIDDEN_FIELD); + PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), SYSTEM_FIELD); } public static DataStream fromXContent(XContentParser parser) throws IOException { @@ -138,6 +147,8 @@ public boolean equals(Object o) { timeStampField.equals(that.timeStampField) && indices.equals(that.indices) && dataStreamStatus == that.dataStreamStatus && + hidden == that.hidden && + system == that.system && Objects.equals(indexTemplate, that.indexTemplate) && Objects.equals(ilmPolicyName, that.ilmPolicyName) && Objects.equals(metadata, that.metadata); @@ -145,6 +156,7 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(name, timeStampField, indices, generation, dataStreamStatus, indexTemplate, ilmPolicyName, metadata); + return Objects.hash(name, timeStampField, indices, generation, dataStreamStatus, indexTemplate, ilmPolicyName, metadata, hidden, + system); } } diff --git a/docs/reference/data-streams/change-mappings-and-settings.asciidoc b/docs/reference/data-streams/change-mappings-and-settings.asciidoc index 7f596be6edfb6..0136b9d224be8 100644 --- a/docs/reference/data-streams/change-mappings-and-settings.asciidoc +++ b/docs/reference/data-streams/change-mappings-and-settings.asciidoc @@ -572,7 +572,8 @@ stream's oldest backing index. "generation": 2, "status": "GREEN", "template": "my-data-stream-template", - "hidden": false + "hidden": false, + "system": false } ] } diff --git a/docs/reference/indices/get-data-stream.asciidoc b/docs/reference/indices/get-data-stream.asciidoc index 3038c96ed50e7..2e71396e1838e 100644 --- a/docs/reference/indices/get-data-stream.asciidoc +++ b/docs/reference/indices/get-data-stream.asciidoc @@ -203,6 +203,11 @@ use the <>. `hidden`:: (Boolean) If `true`, the data stream is <>. + +`system`:: +(Boolean) +If `true`, the data stream is created and managed by an Elastic stack component +and cannot be modified through normal user interaction. ==== [[get-data-stream-api-example]] @@ -241,7 +246,8 @@ The API returns the following response: "status": "GREEN", "template": "my-index-template", "ilm_policy": "my-lifecycle-policy", - "hidden": false + "hidden": false, + "system": false }, { "name": "my-data-stream-two", @@ -261,7 +267,8 @@ The API returns the following response: "status": "YELLOW", "template": "my-index-template", "ilm_policy": "my-lifecycle-policy", - "hidden": false + "hidden": false, + "system": false } ] } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/alias/get/TransportGetAliasesAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/alias/get/TransportGetAliasesAction.java index 34fc97b779484..216241fbde980 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/alias/get/TransportGetAliasesAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/alias/get/TransportGetAliasesAction.java @@ -16,7 +16,6 @@ import org.elasticsearch.cluster.metadata.AliasMetadata; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.SystemIndexAccessLevel; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.inject.Inject; @@ -24,6 +23,7 @@ import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.indices.SystemIndices; +import org.elasticsearch.indices.SystemIndices.SystemIndexAccessLevel; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -65,11 +65,9 @@ protected void masterOperation(GetAliasesRequest request, ClusterState state, Ac concreteIndices = indexNameExpressionResolver.concreteIndexNames(state, request); } final SystemIndexAccessLevel systemIndexAccessLevel = indexNameExpressionResolver.getSystemIndexAccessLevel(); - final String elasticProduct = - threadPool.getThreadContext().getHeader(IndexNameExpressionResolver.EXTERNAL_SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY); ImmutableOpenMap> aliases = state.metadata().findAliases(request, concreteIndices); listener.onResponse(new GetAliasesResponse(postProcess(request, concreteIndices, aliases, state, - systemIndexAccessLevel, elasticProduct, systemIndices))); + systemIndexAccessLevel, threadPool.getThreadContext(), systemIndices))); } /** @@ -78,7 +76,7 @@ protected void masterOperation(GetAliasesRequest request, ClusterState state, Ac static ImmutableOpenMap> postProcess(GetAliasesRequest request, String[] concreteIndices, ImmutableOpenMap> aliases, ClusterState state, SystemIndexAccessLevel systemIndexAccessLevel, - String elasticProduct, SystemIndices systemIndices) { + ThreadContext threadContext, SystemIndices systemIndices) { boolean noAliasesSpecified = request.getOriginalAliases() == null || request.getOriginalAliases().length == 0; ImmutableOpenMap.Builder> mapBuilder = ImmutableOpenMap.builder(aliases); for (String index : concreteIndices) { @@ -89,19 +87,19 @@ static ImmutableOpenMap> postProcess(GetAliasesReque } final ImmutableOpenMap> finalResponse = mapBuilder.build(); if (systemIndexAccessLevel != SystemIndexAccessLevel.ALL) { - checkSystemIndexAccess(request, systemIndices, state, finalResponse, systemIndexAccessLevel, elasticProduct); + checkSystemIndexAccess(request, systemIndices, state, finalResponse, systemIndexAccessLevel, threadContext); } return finalResponse; } private static void checkSystemIndexAccess(GetAliasesRequest request, SystemIndices systemIndices, ClusterState state, ImmutableOpenMap> aliasesMap, - SystemIndexAccessLevel systemIndexAccessLevel, String elasticProduct) { + SystemIndexAccessLevel systemIndexAccessLevel, ThreadContext threadContext) { final Predicate systemIndexAccessAllowPredicate; if (systemIndexAccessLevel == SystemIndexAccessLevel.NONE) { systemIndexAccessAllowPredicate = indexMetadata -> false; } else if (systemIndexAccessLevel == SystemIndexAccessLevel.RESTRICTED) { - systemIndexAccessAllowPredicate = systemIndices.getProductSystemIndexMetadataPredicate(elasticProduct); + systemIndexAccessAllowPredicate = systemIndices.getProductSystemIndexMetadataPredicate(threadContext); } else { throw new IllegalArgumentException("Unexpected system index access level: " + systemIndexAccessLevel); } @@ -121,23 +119,23 @@ private static void checkSystemIndexAccess(GetAliasesRequest request, SystemIndi "this request accesses system indices: {}, but in a future major version, direct access to system " + "indices will be prevented by default", systemIndicesNames); } else { - checkSystemAliasAccess(request, systemIndices, systemIndexAccessLevel, elasticProduct); + checkSystemAliasAccess(request, systemIndices, systemIndexAccessLevel, threadContext); } } private static void checkSystemAliasAccess(GetAliasesRequest request, SystemIndices systemIndices, - SystemIndexAccessLevel systemIndexAccessLevel, String elasticProduct) { + SystemIndexAccessLevel systemIndexAccessLevel, ThreadContext threadContext) { final Predicate systemIndexAccessAllowPredicate; if (systemIndexAccessLevel == SystemIndexAccessLevel.NONE) { systemIndexAccessAllowPredicate = name -> true; } else if (systemIndexAccessLevel == SystemIndexAccessLevel.RESTRICTED) { - systemIndexAccessAllowPredicate = systemIndices.getProductSystemIndexNamePredicate(elasticProduct).negate(); + systemIndexAccessAllowPredicate = systemIndices.getProductSystemIndexNamePredicate(threadContext).negate(); } else { throw new IllegalArgumentException("Unexpected system index access level: " + systemIndexAccessLevel); } final List systemAliases = Arrays.stream(request.aliases()) - .filter(systemIndices::isSystemIndex) + .filter(systemIndices::isSystemName) .filter(systemIndexAccessAllowPredicate) .collect(Collectors.toList()); if (systemAliases.isEmpty() == false) { diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java index 9f4456f5edea6..a5d79b5b1ceb9 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.indices.SystemDataStreamDescriptor; import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.threadpool.ThreadPool; @@ -109,11 +110,17 @@ protected void masterOperation(CreateIndexRequest request, @Override public ClusterState execute(ClusterState currentState) throws Exception { + final SystemDataStreamDescriptor dataStreamDescriptor = + systemIndices.validateDataStreamAccess(request.index(), threadPool.getThreadContext()); + final boolean isSystemDataStream = dataStreamDescriptor != null; + final boolean isSystemIndex = isSystemDataStream == false && systemIndices.isSystemIndex(request.index()); final ComposableIndexTemplate template = resolveTemplate(request, currentState.metadata()); + final boolean isDataStream = isSystemIndex == false && + (isSystemDataStream || (template != null && template.getDataStreamTemplate() != null)); - if (template != null && template.getDataStreamTemplate() != null) { + if (isDataStream) { // This expression only evaluates to true when the argument is non-null and false - if (Boolean.FALSE.equals(template.getAllowAutoCreate())) { + if (isSystemDataStream == false && Boolean.FALSE.equals(template.getAllowAutoCreate())) { throw new IndexNotFoundException( "composable template " + template.indexPatterns() + " forbids index auto creation" ); @@ -121,6 +128,7 @@ public ClusterState execute(ClusterState currentState) throws Exception { CreateDataStreamClusterStateUpdateRequest createRequest = new CreateDataStreamClusterStateUpdateRequest( request.index(), + dataStreamDescriptor, request.masterNodeTimeout(), request.timeout() ); @@ -130,21 +138,27 @@ public ClusterState execute(ClusterState currentState) throws Exception { } else { String indexName = indexNameExpressionResolver.resolveDateMathExpression(request.index()); indexNameRef.set(indexName); + if (isSystemIndex) { + if (indexName.equals(request.index()) == false) { + throw new IllegalStateException("system indices do not support date math expressions"); + } + } else { + // This will throw an exception if the index does not exist and creating it is prohibited + final boolean shouldAutoCreate = autoCreateIndex.shouldAutoCreate(indexName, currentState); - // This will throw an exception if the index does not exist and creating it is prohibited - final boolean shouldAutoCreate = autoCreateIndex.shouldAutoCreate(indexName, currentState); - - if (shouldAutoCreate == false) { - // The index already exists. - return currentState; + if (shouldAutoCreate == false) { + // The index already exists. + return currentState; + } } - final SystemIndexDescriptor mainDescriptor = systemIndices.findMatchingDescriptor(indexName); - final boolean isSystemIndex = mainDescriptor != null && mainDescriptor.isAutomaticallyManaged(); + final SystemIndexDescriptor mainDescriptor = + isSystemIndex ? systemIndices.findMatchingDescriptor(indexName) : null; + final boolean isManagedSystemIndex = mainDescriptor != null && mainDescriptor.isAutomaticallyManaged(); final CreateIndexClusterStateUpdateRequest updateRequest; - if (isSystemIndex) { + if (isManagedSystemIndex) { final SystemIndexDescriptor descriptor = mainDescriptor.getDescriptorCompatibleWith(state.nodes().getSmallestNonClientNodeVersion()); if (descriptor == null) { diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java index d8b01c05ec0d6..01a0900d3253e 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; +import org.elasticsearch.indices.SystemDataStreamDescriptor; import java.util.HashMap; import java.util.HashSet; @@ -35,6 +36,7 @@ public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequ private Index recoverFrom; private ResizeType resizeType; private boolean copySettings; + private SystemDataStreamDescriptor systemDataStreamDescriptor; private Settings settings = Settings.Builder.EMPTY_SETTINGS; @@ -95,6 +97,11 @@ public CreateIndexClusterStateUpdateRequest nameResolvedInstant(long nameResolve return this; } + public CreateIndexClusterStateUpdateRequest systemDataStreamDescriptor(SystemDataStreamDescriptor systemDataStreamDescriptor) { + this.systemDataStreamDescriptor = systemDataStreamDescriptor; + return this; + } + public String cause() { return cause; } @@ -123,6 +130,10 @@ public Index recoverFrom() { return recoverFrom; } + public SystemDataStreamDescriptor systemDataStreamDescriptor() { + return systemDataStreamDescriptor; + } + /** * The name that was provided by the user. This might contain a date math expression. * @see IndexMetadata#SETTING_INDEX_PROVIDED_NAME @@ -178,6 +189,7 @@ public String toString() { ", aliases=" + aliases + ", blocks=" + blocks + ", waitForActiveShards=" + waitForActiveShards + + ", systemDataStreamDescriptor=" + systemDataStreamDescriptor + '}'; } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java index e33168181007e..eab943ec0d1a3 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java @@ -28,6 +28,8 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.indices.SystemDataStreamDescriptor; +import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.snapshots.SnapshotInProgressException; import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.threadpool.ThreadPool; @@ -57,14 +59,17 @@ public class MetadataRolloverService { private final MetadataCreateIndexService createIndexService; private final MetadataIndexAliasesService indexAliasesService; private final IndexNameExpressionResolver indexNameExpressionResolver; + private final SystemIndices systemIndices; + @Inject public MetadataRolloverService(ThreadPool threadPool, MetadataCreateIndexService createIndexService, MetadataIndexAliasesService indexAliasesService, - IndexNameExpressionResolver indexNameExpressionResolver) { + IndexNameExpressionResolver indexNameExpressionResolver, SystemIndices systemIndices) { this.threadPool = threadPool; this.createIndexService = createIndexService; this.indexAliasesService = indexAliasesService; this.indexNameExpressionResolver = indexNameExpressionResolver; + this.systemIndices = systemIndices; } public static class RolloverResult { @@ -212,7 +217,16 @@ private RolloverResult rolloverDataStream(ClusterState currentState, IndexAbstra ); } - lookupTemplateForDataStream(dataStreamName, currentState.metadata()); + final SystemDataStreamDescriptor systemDataStreamDescriptor; + if (dataStream.isSystem() == false) { + systemDataStreamDescriptor = null; + lookupTemplateForDataStream(dataStreamName, currentState.metadata()); + } else { + systemDataStreamDescriptor = systemIndices.findMatchingDataStreamDescriptor(dataStreamName); + if (systemDataStreamDescriptor == null) { + throw new IllegalArgumentException("no system data stream descriptor found for data stream [" + dataStreamName + "]"); + } + } final Version minNodeVersion = currentState.nodes().getMinNodeVersion(); final DataStream ds = dataStream.getDataStream(); @@ -223,8 +237,12 @@ private RolloverResult rolloverDataStream(ClusterState currentState, IndexAbstra return new RolloverResult(rolledDataStream.getWriteIndex().getName(), originalWriteIndex.getIndex().getName(), currentState); } - CreateIndexClusterStateUpdateRequest createIndexClusterStateRequest = - prepareDataStreamCreateIndexRequest(dataStreamName, rolledDataStream.getWriteIndex().getName(), createIndexRequest); + CreateIndexClusterStateUpdateRequest createIndexClusterStateRequest = prepareDataStreamCreateIndexRequest( + dataStreamName, + rolledDataStream.getWriteIndex().getName(), + createIndexRequest, + systemDataStreamDescriptor + ); ClusterState newState = createIndexService.applyCreateIndexRequest(currentState, createIndexClusterStateRequest, silent, (builder, indexMetadata) -> builder.put(ds.rollover(currentState.metadata(), indexMetadata.getIndexUUID(), minNodeVersion))); @@ -256,10 +274,12 @@ static String generateRolloverIndexName(String sourceIndexName, IndexNameExpress static CreateIndexClusterStateUpdateRequest prepareDataStreamCreateIndexRequest(final String dataStreamName, final String targetIndexName, - CreateIndexRequest createIndexRequest) { - Settings settings = Settings.builder().put("index.hidden", true).build(); + CreateIndexRequest createIndexRequest, + final SystemDataStreamDescriptor descriptor) { + Settings settings = descriptor != null ? Settings.EMPTY : Settings.builder().put("index.hidden", true).build(); return prepareCreateIndexRequest(targetIndexName, targetIndexName, "rollover_data_stream", createIndexRequest, settings) - .dataStreamName(dataStreamName); + .dataStreamName(dataStreamName) + .systemDataStreamDescriptor(descriptor); } static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest( diff --git a/server/src/main/java/org/elasticsearch/action/support/AutoCreateIndex.java b/server/src/main/java/org/elasticsearch/action/support/AutoCreateIndex.java index 20150b37fcfe2..cb24aa5d8685c 100644 --- a/server/src/main/java/org/elasticsearch/action/support/AutoCreateIndex.java +++ b/server/src/main/java/org/elasticsearch/action/support/AutoCreateIndex.java @@ -62,7 +62,7 @@ public boolean shouldAutoCreate(String index, ClusterState state) { } // Always auto-create system indexes - if (systemIndices.isSystemIndex(index)) { + if (systemIndices.isSystemName(index)) { return true; } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index 724d755b30baa..2dcbd3ed6e84f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -51,19 +51,25 @@ public final class DataStream extends AbstractDiffable implements To private final Map metadata; private final boolean hidden; private final boolean replicated; + private final boolean system; public DataStream(String name, TimestampField timeStampField, List indices, long generation, Map metadata) { - this(name, timeStampField, indices, generation, metadata, false, false); + this(name, timeStampField, indices, generation, metadata, false, false, false); } public DataStream(String name, TimestampField timeStampField, List indices, long generation, Map metadata, boolean hidden, boolean replicated) { - this(name, timeStampField, indices, generation, metadata, hidden, replicated, System::currentTimeMillis); + this(name, timeStampField, indices, generation, metadata, hidden, replicated, false, System::currentTimeMillis); + } + + public DataStream(String name, TimestampField timeStampField, List indices, long generation, Map metadata, + boolean hidden, boolean replicated, boolean system) { + this(name, timeStampField, indices, generation, metadata, hidden, replicated, system, System::currentTimeMillis); } // visible for testing DataStream(String name, TimestampField timeStampField, List indices, long generation, Map metadata, - boolean hidden, boolean replicated, LongSupplier timeProvider) { + boolean hidden, boolean replicated, boolean system, LongSupplier timeProvider) { this.name = name; this.timeStampField = timeStampField; this.indices = Collections.unmodifiableList(indices); @@ -72,6 +78,7 @@ public DataStream(String name, TimestampField timeStampField, List indice this.hidden = hidden; this.replicated = replicated; this.timeProvider = timeProvider; + this.system = system; assert indices.size() > 0; } @@ -118,6 +125,10 @@ public boolean isReplicated() { return replicated; } + public boolean isSystem() { + return system; + } + /** * Performs a rollover on a {@code DataStream} instance and returns a new instance containing * the updated list of backing indices and incremented generation. @@ -142,7 +153,7 @@ public DataStream rollover(Metadata clusterMetadata, String writeIndexUuid, Vers newWriteIndexName = DataStream.getDefaultBackingIndexName(getName(), ++generation, currentTimeMillis, minNodeVersion); } while (clusterMetadata.getIndicesLookup().containsKey(newWriteIndexName)); backingIndices.add(new Index(newWriteIndexName, writeIndexUuid)); - return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden, replicated); + return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden, replicated, system); } /** @@ -156,7 +167,7 @@ public DataStream removeBackingIndex(Index index) { List backingIndices = new ArrayList<>(indices); backingIndices.remove(index); assert backingIndices.size() == indices.size() - 1; - return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden, replicated); + return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden, replicated, system); } /** @@ -181,11 +192,11 @@ public DataStream replaceBackingIndex(Index existingBackingIndex, Index newBacki "it is the write index", existingBackingIndex.getName(), name)); } backingIndices.set(backingIndexPosition, newBackingIndex); - return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden, replicated); + return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden, replicated, system); } public DataStream promoteDataStream() { - return new DataStream(name, timeStampField, indices, getGeneration(), metadata, hidden, false, timeProvider); + return new DataStream(name, timeStampField, indices, getGeneration(), metadata, hidden, false, system, timeProvider); } /** @@ -215,7 +226,8 @@ public DataStream snapshot(Collection indicesInSnapshot) { generation, metadata == null ? null : new HashMap<>(metadata), hidden, - replicated + replicated, + system ); } @@ -269,7 +281,9 @@ public DataStream(StreamInput in) throws IOException { this(in.readString(), new TimestampField(in), in.readList(Index::new), in.readVLong(), in.getVersion().onOrAfter(NEW_FEATURES_VERSION) ? in.readMap(): null, in.getVersion().onOrAfter(NEW_FEATURES_VERSION) && in.readBoolean(), - in.getVersion().onOrAfter(NEW_FEATURES_VERSION) && in.readBoolean()); + in.getVersion().onOrAfter(NEW_FEATURES_VERSION) && in.readBoolean(), + in.getVersion().onOrAfter(Version.V_7_13_0) && in.readBoolean() + ); } public static Diff readDiffFrom(StreamInput in) throws IOException { @@ -286,6 +300,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeMap(metadata); out.writeBoolean(hidden); out.writeBoolean(replicated); + if (out.getVersion().onOrAfter(Version.V_7_13_0)) { + out.writeBoolean(system); + } } } @@ -296,11 +313,13 @@ public void writeTo(StreamOutput out) throws IOException { public static final ParseField METADATA_FIELD = new ParseField("_meta"); public static final ParseField HIDDEN_FIELD = new ParseField("hidden"); public static final ParseField REPLICATED_FIELD = new ParseField("replicated"); + public static final ParseField SYSTEM_FIELD = new ParseField("system"); @SuppressWarnings("unchecked") private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("data_stream", args -> new DataStream((String) args[0], (TimestampField) args[1], (List) args[2], (Long) args[3], - (Map) args[4], args[5] != null && (boolean) args[5], args[6] != null && (boolean) args[6])); + (Map) args[4], args[5] != null && (boolean) args[5], args[6] != null && (boolean) args[6], + args[7] != null && (boolean) args[7])); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME_FIELD); @@ -310,6 +329,7 @@ public void writeTo(StreamOutput out) throws IOException { PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.map(), METADATA_FIELD); PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), HIDDEN_FIELD); PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), REPLICATED_FIELD); + PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), SYSTEM_FIELD); } public static DataStream fromXContent(XContentParser parser) throws IOException { @@ -328,6 +348,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } builder.field(HIDDEN_FIELD.getPreferredName(), hidden); builder.field(REPLICATED_FIELD.getPreferredName(), replicated); + builder.field(SYSTEM_FIELD.getPreferredName(), system); builder.endObject(); return builder; } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java index ddc05a6a8ad9f..f1f71b62375b2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java @@ -296,8 +296,7 @@ public boolean isHidden() { @Override public boolean isSystem() { - // No such thing as system data streams (yet) - return false; + return dataStream.isSystem(); } public org.elasticsearch.cluster.metadata.DataStream getDataStream() { diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java index 5540b46c6db5e..73acca17b1b5c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java @@ -13,7 +13,7 @@ import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.common.Booleans; +import org.elasticsearch.cluster.metadata.IndexAbstraction.Type; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.ImmutableOpenMap; @@ -31,6 +31,7 @@ import org.elasticsearch.indices.IndexClosedException; import org.elasticsearch.indices.InvalidIndexNameException; import org.elasticsearch.indices.SystemIndices; +import org.elasticsearch.indices.SystemIndices.SystemIndexAccessLevel; import java.time.Instant; import java.time.ZoneId; @@ -57,8 +58,6 @@ public class IndexNameExpressionResolver { private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(IndexNameExpressionResolver.class); public static final String EXCLUDED_DATA_STREAMS_KEY = "es.excluded_ds"; - public static final String SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY = "_system_index_access_allowed"; - public static final String EXTERNAL_SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY = "_external_system_index_access_origin"; public static final Version SYSTEM_INDEX_ENFORCEMENT_VERSION = Version.V_7_10_0; private final DateMathExpressionResolver dateMathExpressionResolver = new DateMathExpressionResolver(); @@ -80,7 +79,7 @@ public IndexNameExpressionResolver(ThreadContext threadContext, SystemIndices sy */ public String[] concreteIndexNames(ClusterState state, IndicesRequest request) { Context context = new Context(state, request.indicesOptions(), false, false, request.includeDataStreams(), - getSystemIndexAccessLevel()); + getSystemIndexAccessPredicate()); return concreteIndexNames(context, request.indices()); } @@ -89,7 +88,7 @@ public String[] concreteIndexNames(ClusterState state, IndicesRequest request) { */ public String[] concreteIndexNamesWithSystemIndexAccess(ClusterState state, IndicesRequest request) { Context context = new Context(state, request.indicesOptions(), false, false, request.includeDataStreams(), - SystemIndexAccessLevel.ALL); + name -> true); return concreteIndexNames(context, request.indices()); } @@ -99,7 +98,7 @@ public String[] concreteIndexNamesWithSystemIndexAccess(ClusterState state, Indi */ public Index[] concreteIndices(ClusterState state, IndicesRequest request) { Context context = new Context(state, request.indicesOptions(), false, false, request.includeDataStreams(), - getSystemIndexAccessLevel()); + getSystemIndexAccessPredicate()); return concreteIndices(context, request.indices()); } @@ -117,28 +116,27 @@ public Index[] concreteIndices(ClusterState state, IndicesRequest request) { * indices options in the context don't allow such a case. */ public String[] concreteIndexNames(ClusterState state, IndicesOptions options, String... indexExpressions) { - Context context = new Context(state, options, getSystemIndexAccessLevel()); + Context context = new Context(state, options, getSystemIndexAccessPredicate()); return concreteIndexNames(context, indexExpressions); } public String[] concreteIndexNames(ClusterState state, IndicesOptions options, boolean includeDataStreams, String... indexExpressions) { - Context context = new Context(state, options, false, false, includeDataStreams, getSystemIndexAccessLevel()); + Context context = new Context(state, options, false, false, includeDataStreams, getSystemIndexAccessPredicate()); return concreteIndexNames(context, indexExpressions); } public String[] concreteIndexNames(ClusterState state, IndicesOptions options, IndicesRequest request) { - Context context = new Context(state, options, false, false, request.includeDataStreams(), getSystemIndexAccessLevel()); + Context context = new Context(state, options, false, false, request.includeDataStreams(), getSystemIndexAccessPredicate()); return concreteIndexNames(context, request.indices()); } public String[] concreteIndexNamesWithSystemIndexAccess(ClusterState state, IndicesOptions options, String... indexExpressions) { - Context context = new Context(state, options, SystemIndexAccessLevel.ALL); + Context context = new Context(state, options, name -> true); return concreteIndexNames(context, indexExpressions); } public List dataStreamNames(ClusterState state, IndicesOptions options, String... indexExpressions) { - // Allow system index access - they'll be filtered out below as there's no such thing (yet) as system data streams - Context context = new Context(state, options, false, false, true, true, SystemIndexAccessLevel.ALL); + Context context = new Context(state, options, false, false, true, true, getSystemIndexAccessPredicate()); if (indexExpressions == null || indexExpressions.length == 0) { indexExpressions = new String[]{"*"}; } @@ -171,7 +169,7 @@ public Index[] concreteIndices(ClusterState state, IndicesOptions options, Strin public Index[] concreteIndices(ClusterState state, IndicesOptions options, boolean includeDataStreams, String... indexExpressions) { Context context = new Context(state, options, false, false, includeDataStreams, - getSystemIndexAccessLevel()); + getSystemIndexAccessPredicate()); return concreteIndices(context, indexExpressions); } @@ -189,7 +187,7 @@ public Index[] concreteIndices(ClusterState state, IndicesOptions options, boole */ public Index[] concreteIndices(ClusterState state, IndicesRequest request, long startTime) { Context context = new Context(state, request.indicesOptions(), startTime, false, false, request.includeDataStreams(), false, - getSystemIndexAccessLevel()); + getSystemIndexAccessPredicate()); return concreteIndices(context, request.indices()); } @@ -316,31 +314,38 @@ Index[] concreteIndices(Context context, String... indexExpressions) { } private void checkSystemIndexAccess(Context context, Metadata metadata, Set concreteIndices, String[] originalPatterns) { - final SystemIndexAccessLevel systemIndexAccessLevel = context.getSystemIndexAccessLevel(); - if (systemIndexAccessLevel != SystemIndexAccessLevel.ALL) { - final Predicate systemIndexAccessLevelPredicate; - if (systemIndexAccessLevel == SystemIndexAccessLevel.NONE) { - // everything should be included in the deprecation message - systemIndexAccessLevelPredicate = indexMetadata -> true; + final Predicate systemIndexAccessPredicate = context.getSystemIndexAccessPredicate().negate(); + final List systemIndicesThatShouldNotBeAccessed = concreteIndices.stream() + .map(metadata::index) + .filter(IndexMetadata::isSystem) + .filter(idxMetadata -> systemIndexAccessPredicate.test(idxMetadata.getIndex().getName())) + .collect(Collectors.toList()); + + if (systemIndicesThatShouldNotBeAccessed.isEmpty()) { + return; + } + + final List resolvedSystemIndices = new ArrayList<>(); + final Set resolvedSystemDataStreams = new HashSet<>(); + final SortedMap indicesLookup = metadata.getIndicesLookup(); + for (IndexMetadata idxMetadata : systemIndicesThatShouldNotBeAccessed) { + IndexAbstraction abstraction = indicesLookup.get(idxMetadata.getIndex().getName()); + if (abstraction.getParentDataStream() != null) { + resolvedSystemDataStreams.add(abstraction.getParentDataStream().getName()); } else { - // everything other than allowed should be included in the deprecation message - systemIndexAccessLevelPredicate = systemIndices - .getProductSystemIndexMetadataPredicate(threadContext.getHeader(EXTERNAL_SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY)) - .negate(); - } - final List resolvedSystemIndices = concreteIndices.stream() - .map(metadata::index) - .filter(IndexMetadata::isSystem) - .filter(systemIndexAccessLevelPredicate) - .map(i -> i.getIndex().getName()) - .sorted() // reliable order for testing - .collect(Collectors.toList()); - if (resolvedSystemIndices.isEmpty() == false) { - deprecationLogger.deprecate(DeprecationCategory.API, "open_system_index_access", - "this request accesses system indices: {}, but in a future major version, direct access to system " + - "indices will be prevented by default", resolvedSystemIndices); + resolvedSystemIndices.add(idxMetadata.getIndex().getName()); } } + + if (resolvedSystemIndices.isEmpty() == false) { + Collections.sort(resolvedSystemIndices); + deprecationLogger.deprecate(DeprecationCategory.API, "open_system_index_access", + "this request accesses system indices: {}, but in a future major version, direct access to system " + + "indices will be prevented by default", resolvedSystemIndices); + } + if (resolvedSystemDataStreams.isEmpty() == false) { + throw systemIndices.dataStreamAccessException(threadContext, resolvedSystemDataStreams); + } } private static boolean shouldTrackConcreteIndex(Context context, IndicesOptions options, IndexMetadata index) { @@ -427,7 +432,7 @@ public Index concreteWriteIndex(ClusterState state, IndicesOptions options, Stri options.allowAliasesToMultipleIndices(), options.forbidClosedIndices(), options.ignoreAliases(), options.ignoreThrottled()); - Context context = new Context(state, combinedOptions, false, true, includeDataStreams, getSystemIndexAccessLevel()); + Context context = new Context(state, combinedOptions, false, true, includeDataStreams, getSystemIndexAccessPredicate()); Index[] indices = concreteIndices(context, index); if (allowNoIndices && indices.length == 0) { return null; @@ -444,7 +449,7 @@ public Index concreteWriteIndex(ClusterState state, IndicesOptions options, Stri * If the data stream, index or alias contains date math then that is resolved too. */ public boolean hasIndexAbstraction(String indexAbstraction, ClusterState state) { - Context context = new Context(state, IndicesOptions.lenientExpandOpen(), false, false, true, getSystemIndexAccessLevel()); + Context context = new Context(state, IndicesOptions.lenientExpandOpen(), false, false, true, getSystemIndexAccessPredicate()); String resolvedAliasOrIndex = DateMathExpressionResolver.resolveExpression(indexAbstraction, context); return state.metadata().getIndicesLookup().containsKey(resolvedAliasOrIndex); } @@ -455,7 +460,7 @@ public boolean hasIndexAbstraction(String indexAbstraction, ClusterState state) public String resolveDateMathExpression(String dateExpression) { // The data math expression resolver doesn't rely on cluster state or indices options, because // it just resolves the date math to an actual date. - return DateMathExpressionResolver.resolveExpression(dateExpression, new Context(null, null, getSystemIndexAccessLevel())); + return DateMathExpressionResolver.resolveExpression(dateExpression, new Context(null, null, getSystemIndexAccessPredicate())); } /** @@ -463,14 +468,14 @@ public String resolveDateMathExpression(String dateExpression) { * @return If the specified string is data math expression then this method returns the resolved expression. */ public String resolveDateMathExpression(String dateExpression, long time) { - return DateMathExpressionResolver.resolveExpression(dateExpression, new Context(null, null, time, getSystemIndexAccessLevel())); + return DateMathExpressionResolver.resolveExpression(dateExpression, new Context(null, null, time, getSystemIndexAccessPredicate())); } /** * Resolve an array of expressions to the set of indices and aliases that these expressions match. */ public Set resolveExpressions(ClusterState state, String... expressions) { - Context context = new Context(state, IndicesOptions.lenientExpandOpen(), true, false, true, getSystemIndexAccessLevel()); + Context context = new Context(state, IndicesOptions.lenientExpandOpen(), true, false, true, getSystemIndexAccessPredicate()); List resolvedExpressions = Arrays.asList(expressions); for (ExpressionResolver expressionResolver : expressionResolvers) { resolvedExpressions = expressionResolver.resolve(context, resolvedExpressions); @@ -564,7 +569,7 @@ public String[] indexAliases(ClusterState state, String index, Predicate> resolveSearchRouting(ClusterState state, @Nullable String routing, String... expressions) { List resolvedExpressions = expressions != null ? Arrays.asList(expressions) : Collections.emptyList(); - Context context = new Context(state, IndicesOptions.lenientExpandOpen(), false, false, true, getSystemIndexAccessLevel()); + Context context = new Context(state, IndicesOptions.lenientExpandOpen(), false, false, true, getSystemIndexAccessPredicate()); for (ExpressionResolver expressionResolver : expressionResolvers) { resolvedExpressions = expressionResolver.resolve(context, resolvedExpressions); } @@ -715,33 +720,22 @@ boolean isPatternMatchingAllIndices(Metadata metadata, String[] indicesOrAliases return false; } - /** - * Determines what level of system index access should be allowed in the current context. - * - * @return {@link SystemIndexAccessLevel#ALL} if unrestricted system index access should be allowed, - * {@link SystemIndexAccessLevel#RESTRICTED} if a subset of system index access should be allowed, or - * {@link SystemIndexAccessLevel#NONE} if no system index access should be allowed. - */ public SystemIndexAccessLevel getSystemIndexAccessLevel() { - final String headerValue = threadContext.getHeader(SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY); - final String productHeaderValue = threadContext.getHeader(EXTERNAL_SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY); + return systemIndices.getSystemIndexAccessLevel(threadContext); + } - final boolean allowed = Booleans.parseBoolean(headerValue, true); - if (allowed) { - if (productHeaderValue != null) { - return SystemIndexAccessLevel.RESTRICTED; - } else { - return SystemIndexAccessLevel.ALL; - } + public Predicate getSystemIndexAccessPredicate() { + final SystemIndexAccessLevel systemIndexAccessLevel = getSystemIndexAccessLevel(); + final Predicate systemIndexAccessLevelPredicate; + if (systemIndexAccessLevel == SystemIndexAccessLevel.NONE) { + systemIndexAccessLevelPredicate = s -> false; + } else if (systemIndexAccessLevel == SystemIndexAccessLevel.ALL) { + systemIndexAccessLevelPredicate = s -> true; } else { - return SystemIndexAccessLevel.NONE; + // everything other than allowed should be included in the deprecation message + systemIndexAccessLevelPredicate = systemIndices.getProductSystemIndexNamePredicate(threadContext); } - } - - public enum SystemIndexAccessLevel { - ALL, - NONE, - RESTRICTED + return systemIndexAccessLevelPredicate; } public static class Context { @@ -753,30 +747,30 @@ public static class Context { private final boolean resolveToWriteIndex; private final boolean includeDataStreams; private final boolean preserveDataStreams; - private final SystemIndexAccessLevel systemIndexAccessLevel; + private final Predicate systemIndexAccessPredicate; - Context(ClusterState state, IndicesOptions options, SystemIndexAccessLevel systemIndexAccessLevel) { - this(state, options, System.currentTimeMillis(), systemIndexAccessLevel); + Context(ClusterState state, IndicesOptions options, Predicate systemIndexAccessPredicate) { + this(state, options, System.currentTimeMillis(), systemIndexAccessPredicate); } Context(ClusterState state, IndicesOptions options, boolean preserveAliases, boolean resolveToWriteIndex, - boolean includeDataStreams, SystemIndexAccessLevel systemIndexAccessLevel) { + boolean includeDataStreams, Predicate systemIndexAccessPredicate) { this(state, options, System.currentTimeMillis(), preserveAliases, resolveToWriteIndex, includeDataStreams, false, - systemIndexAccessLevel); + systemIndexAccessPredicate); } Context(ClusterState state, IndicesOptions options, boolean preserveAliases, boolean resolveToWriteIndex, - boolean includeDataStreams, boolean preserveDataStreams, SystemIndexAccessLevel systemIndexAccessLevel) { + boolean includeDataStreams, boolean preserveDataStreams, Predicate systemIndexAccessPredicate) { this(state, options, System.currentTimeMillis(), preserveAliases, resolveToWriteIndex, includeDataStreams, preserveDataStreams, - systemIndexAccessLevel); + systemIndexAccessPredicate); } - Context(ClusterState state, IndicesOptions options, long startTime, SystemIndexAccessLevel systemIndexAccessLevel) { - this(state, options, startTime, false, false, false, false, systemIndexAccessLevel); + Context(ClusterState state, IndicesOptions options, long startTime, Predicate systemIndexAccessPredicate) { + this(state, options, startTime, false, false, false, false, systemIndexAccessPredicate); } protected Context(ClusterState state, IndicesOptions options, long startTime, boolean preserveAliases, boolean resolveToWriteIndex, - boolean includeDataStreams, boolean preserveDataStreams, SystemIndexAccessLevel systemIndexAccessLevel) { + boolean includeDataStreams, boolean preserveDataStreams, Predicate systemIndexAccessPredicate) { this.state = state; this.options = options; this.startTime = startTime; @@ -784,7 +778,7 @@ protected Context(ClusterState state, IndicesOptions options, long startTime, bo this.resolveToWriteIndex = resolveToWriteIndex; this.includeDataStreams = includeDataStreams; this.preserveDataStreams = preserveDataStreams; - this.systemIndexAccessLevel = systemIndexAccessLevel; + this.systemIndexAccessPredicate = systemIndexAccessPredicate; } public ClusterState getState() { @@ -827,8 +821,8 @@ public boolean isPreserveDataStreams() { /** * Used to determine system index access is allowed in this context (e.g. for this request). */ - public SystemIndexAccessLevel getSystemIndexAccessLevel() { - return systemIndexAccessLevel; + public Predicate getSystemIndexAccessPredicate() { + return systemIndexAccessPredicate; } } @@ -860,7 +854,20 @@ public List resolve(Context context, List expressions) { } if (isEmptyOrTrivialWildcard(expressions)) { - List resolvedExpressions = resolveEmptyOrTrivialWildcard(options, metadata); + List resolvedExpressions = resolveEmptyOrTrivialWildcard(options, metadata).stream() + .filter(expression -> { + IndexAbstraction abstraction = metadata.getIndicesLookup().get(expression); + if (abstraction != null && abstraction.isSystem()) { + if (abstraction.getType() == Type.DATA_STREAM || abstraction.getParentDataStream() != null) { + return context.systemIndexAccessPredicate.test(abstraction.getName()); + } else { + return true; + } + } else { + return true; + } + }) + .collect(Collectors.toList()); if (context.includeDataStreams()) { final IndexMetadata.State excludeState = excludeState(options); final Map dataStreamsAbstractions = metadata.getIndicesLookup().entrySet() @@ -1060,6 +1067,13 @@ private static Set expand(Context context, IndexMetadata.State excludeSt String aliasOrIndexName = entry.getKey(); IndexAbstraction indexAbstraction = entry.getValue(); + if (indexAbstraction.isSystem() && + (indexAbstraction.getType() == Type.DATA_STREAM || indexAbstraction.getParentDataStream() != null)) { + if (context.systemIndexAccessPredicate.test(indexAbstraction.getName()) == false) { + continue; + } + } + if (indexAbstraction.isHidden() == false || includeHidden || implicitHiddenMatch(aliasOrIndexName, expression)) { if (context.isPreserveAliases() && indexAbstraction.getType() == IndexAbstraction.Type.ALIAS) { expand.add(aliasOrIndexName); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java index 42c4a17556550..07072ef544446 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java @@ -30,6 +30,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MetadataFieldMapper; +import org.elasticsearch.indices.SystemDataStreamDescriptor; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; @@ -41,6 +42,8 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import static java.util.Collections.emptyList; + public class MetadataCreateDataStreamService { private static final Logger logger = LogManager.getLogger(MetadataCreateDataStreamService.class); @@ -92,23 +95,47 @@ public ClusterState createDataStream(CreateDataStreamClusterStateUpdateRequest r return createDataStream(metadataCreateIndexService, current, request); } - public static final class CreateDataStreamClusterStateUpdateRequest extends ClusterStateUpdateRequest { + public static final class CreateDataStreamClusterStateUpdateRequest + extends ClusterStateUpdateRequest { private final String name; + private final SystemDataStreamDescriptor descriptor; public CreateDataStreamClusterStateUpdateRequest(String name, TimeValue masterNodeTimeout, TimeValue timeout) { + this(name, null, masterNodeTimeout, timeout); + } + + public CreateDataStreamClusterStateUpdateRequest(String name, + SystemDataStreamDescriptor systemDataStreamDescriptor, + TimeValue masterNodeTimeout, + TimeValue timeout) { this.name = name; + this.descriptor = systemDataStreamDescriptor; masterNodeTimeout(masterNodeTimeout); ackTimeout(timeout); } + + public boolean isSystem() { + return descriptor != null; + } + + public SystemDataStreamDescriptor getSystemDataStreamDescriptor() { + return descriptor; + } } static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIndexService, ClusterState currentState, CreateDataStreamClusterStateUpdateRequest request) throws Exception { - return createDataStream(metadataCreateIndexService, currentState, request.name, org.elasticsearch.common.collect.List.of(), null); + return createDataStream( + metadataCreateIndexService, + currentState, + request.name, + emptyList(), + null, + request.getSystemDataStreamDescriptor()); } /** @@ -125,14 +152,23 @@ static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIn ClusterState currentState, String dataStreamName, List backingIndices, - IndexMetadata writeIndex) throws Exception + IndexMetadata writeIndex) throws Exception { + assert metadataCreateIndexService.getSystemIndices().isSystemDataStream(dataStreamName) == false : + "dataStream [" + dataStreamName + "] is system but no system descriptor was provided!"; + return createDataStream(metadataCreateIndexService, currentState, dataStreamName, backingIndices, writeIndex, null); + } + + static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIndexService, + ClusterState currentState, + String dataStreamName, + List backingIndices, + IndexMetadata writeIndex, + SystemDataStreamDescriptor systemDataStreamDescriptor) throws Exception { if (currentState.nodes().getMinNodeVersion().before(Version.V_7_9_0)) { throw new IllegalStateException("data streams require minimum node version of " + Version.V_7_9_0); } - if (writeIndex == null) { - Objects.requireNonNull(metadataCreateIndexService); - } + Objects.requireNonNull(metadataCreateIndexService); Objects.requireNonNull(currentState); Objects.requireNonNull(backingIndices); if (currentState.metadata().dataStreams().containsKey(dataStreamName)) { @@ -150,7 +186,10 @@ static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIn + DataStream.BACKING_INDEX_PREFIX + "'"); } - ComposableIndexTemplate template = lookupTemplateForDataStream(dataStreamName, currentState.metadata()); + final boolean isSystem = systemDataStreamDescriptor != null; + final ComposableIndexTemplate template = isSystem ? + systemDataStreamDescriptor.getComposableIndexTemplate() : + lookupTemplateForDataStream(dataStreamName, currentState.metadata()); if (writeIndex == null) { String firstBackingIndexName = @@ -158,7 +197,12 @@ static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIn CreateIndexClusterStateUpdateRequest createIndexRequest = new CreateIndexClusterStateUpdateRequest("initialize_data_stream", firstBackingIndexName, firstBackingIndexName) .dataStreamName(dataStreamName) - .settings(Settings.builder().put("index.hidden", true).build()); + .systemDataStreamDescriptor(systemDataStreamDescriptor); + + if (isSystem == false) { + createIndexRequest.settings(Settings.builder().put("index.hidden", true).build()); + } + try { currentState = metadataCreateIndexService.applyCreateIndexRequest(currentState, createIndexRequest, false); } catch (ResourceAlreadyExistsException e) { @@ -177,9 +221,9 @@ static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIn DataStream.TimestampField timestampField = new DataStream.TimestampField(fieldName); List dsBackingIndices = backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()); dsBackingIndices.add(writeIndex.getIndex()); - boolean hidden = template.getDataStreamTemplate().isHidden(); + boolean hidden = isSystem ? false : template.getDataStreamTemplate().isHidden(); DataStream newDataStream = new DataStream(dataStreamName, timestampField, dsBackingIndices, 1L, - template.metadata() != null ? org.elasticsearch.common.collect.Map.copyOf(template.metadata()) : null, hidden, false); + template.metadata() != null ? org.elasticsearch.common.collect.Map.copyOf(template.metadata()) : null, hidden, false, isSystem); Metadata.Builder builder = Metadata.builder(currentState.metadata()).put(newDataStream); logger.info("adding data stream [{}] with write index [{}] and backing indices [{}]", dataStreamName, writeIndex.getIndex().getName(), diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java index 3ccc85a2011ba..d3d1406c3c153 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java @@ -65,7 +65,6 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.InvalidIndexNameException; import org.elasticsearch.indices.ShardLimitValidator; -import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.threadpool.ThreadPool; @@ -81,6 +80,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; @@ -91,6 +91,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; import static java.util.stream.Collectors.toList; @@ -102,6 +103,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService.validateTimestampFieldMapping; +import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.resolveSettings; /** * Service responsible for submitting create index requests @@ -196,11 +198,9 @@ public void validateIndexName(String index, ClusterState state) { public boolean validateDotIndex(String index, @Nullable Boolean isHidden) { boolean isSystem = false; if (index.charAt(0) == '.') { - SystemIndexDescriptor matchingDescriptor = systemIndices.findMatchingDescriptor(index); - if (matchingDescriptor != null) { - logger.trace("index [{}] is a system index because it matches index pattern [{}] with description [{}]", - index, matchingDescriptor.getIndexPattern(), matchingDescriptor.getDescription()); - isSystem = true; + isSystem = systemIndices.isSystemName(index); + if (isSystem) { + logger.trace("index [{}] is a system index", index); } else if (isHidden) { logger.trace("index [{}] is a hidden index", index); } else { @@ -213,6 +213,10 @@ public boolean validateDotIndex(String index, @Nullable Boolean isHidden) { return isSystem; } + public SystemIndices getSystemIndices() { + return systemIndices; + } + /** * Validate the name for an index or alias against some static rules. */ @@ -329,14 +333,20 @@ public ClusterState applyCreateIndexRequest(ClusterState currentState, CreateInd // in which case templates don't apply, so create the index from the source metadata return applyCreateIndexRequestWithExistingMetadata(currentState, request, silent, sourceMetadata, metadataTransformer); } else { + // The backing index may have a different name or prefix than the data stream name. + final String name = request.dataStreamName() != null ? request.dataStreamName() : request.index(); + + // The index being created is for a system data stream, so the backing index will also be a system index + if (request.systemDataStreamDescriptor() != null) { + return applyCreateIndexRequestForSystemDataStream(currentState, request, silent, metadataTransformer); + } + // Hidden indices apply templates slightly differently (ignoring wildcard '*' // templates), so we need to check to see if the request is creating a hidden index // prior to resolving which templates it matches final Boolean isHiddenFromRequest = IndexMetadata.INDEX_HIDDEN_SETTING.exists(request.settings()) ? IndexMetadata.INDEX_HIDDEN_SETTING.get(request.settings()) : null; - // The backing index may have a different name or prefix than the data stream name. - final String name = request.dataStreamName() != null ? request.dataStreamName() : request.index(); // Check to see if a v2 template matched final String v2Template = MetadataIndexTemplateService.findV2Template(currentState.metadata(), name, isHiddenFromRequest == null ? false : isHiddenFromRequest); @@ -475,7 +485,7 @@ private ClusterState applyCreateIndexRequestWithV1Templates(final ClusterState c xContentRegistry)); final Settings aggregatedIndexSettings = - aggregateIndexSettings(currentState, request, MetadataIndexTemplateService.resolveSettings(templates), + aggregateIndexSettings(currentState, request, resolveSettings(templates), null, settings, indexScopedSettings, shardLimitValidator, indexSettingProviders); int routingNumShards = getIndexNumberOfRoutingShards(aggregatedIndexSettings, null); IndexMetadata tmpImd = buildAndValidateTemporaryIndexMetadata(aggregatedIndexSettings, request, routingNumShards); @@ -509,7 +519,7 @@ private ClusterState applyCreateIndexRequestWithV2Template(final ClusterState cu collectV2Mappings(request.mappings(), currentState, templateName, xContentRegistry, request.index()); final Settings aggregatedIndexSettings = aggregateIndexSettings(currentState, request, - MetadataIndexTemplateService.resolveSettings(currentState.metadata(), templateName), + resolveSettings(currentState.metadata(), templateName), null, settings, indexScopedSettings, shardLimitValidator, indexSettingProviders); int routingNumShards = getIndexNumberOfRoutingShards(aggregatedIndexSettings, null); IndexMetadata tmpImd = buildAndValidateTemporaryIndexMetadata(aggregatedIndexSettings, request, routingNumShards); @@ -524,15 +534,71 @@ private ClusterState applyCreateIndexRequestWithV2Template(final ClusterState cu Collections.singletonList(templateName), metadataTransformer); } - public static List>> collectV2Mappings(final Map requestMappings, - final ClusterState currentState, - final String templateName, - final NamedXContentRegistry xContentRegistry, + private ClusterState applyCreateIndexRequestForSystemDataStream(final ClusterState currentState, + final CreateIndexClusterStateUpdateRequest request, + final boolean silent, + final BiConsumer metadataTransformer) + throws Exception { + Objects.requireNonNull(request.systemDataStreamDescriptor()); + logger.debug("applying create index request for system data stream [{}]", request.systemDataStreamDescriptor()); + + ComposableIndexTemplate template = request.systemDataStreamDescriptor().getComposableIndexTemplate(); + if (request.dataStreamName() == null && template.getDataStreamTemplate() != null) { + throw new IllegalArgumentException("cannot create index with name [" + request.index() + + "], because it matches with a system data stream"); + } + + final Map componentTemplates = request.systemDataStreamDescriptor().getComponentTemplates(); + final List>> mappings = + collectSystemV2Mappings(template, componentTemplates, xContentRegistry, request.index()); + + final Settings aggregatedIndexSettings = aggregateIndexSettings( + currentState, + request, + resolveSettings(template, componentTemplates), + null, + settings, + indexScopedSettings, + shardLimitValidator, + indexSettingProviders + ); + final int routingNumShards = getIndexNumberOfRoutingShards(aggregatedIndexSettings, null); + final IndexMetadata tmpImd = buildAndValidateTemporaryIndexMetadata(aggregatedIndexSettings, request, routingNumShards); + + return applyCreateIndexWithTemporaryService(currentState, request, silent, null, tmpImd, mappings, + indexService -> resolveAndValidateAliases(request.index(), request.aliases(), + MetadataIndexTemplateService.resolveAliases(template, componentTemplates, false, null), currentState.metadata(), + // the context is only used for validation so it's fine to pass fake values for the + // shard id and the current timestamp + aliasValidator, xContentRegistry, indexService.newSearchExecutionContext(0, 0, null, () -> 0L, null, emptyMap()), + indexService.dateMathExpressionResolverAt(request.getNameResolvedAt())), + emptyList(), metadataTransformer); + } + + private static List>> collectSystemV2Mappings(final ComposableIndexTemplate composableIndexTemplate, + final Map componentTemplates, + final NamedXContentRegistry xContentRegistry, final String indexName) throws Exception { - List>> result = new ArrayList<>(); + List templateMappings = + MetadataIndexTemplateService.collectMappings(composableIndexTemplate, componentTemplates, indexName); + return collectV2Mappings(singletonMap("_doc", "{}"), templateMappings, xContentRegistry); + } + public static List>> collectV2Mappings(final Map requestMappings, + final ClusterState currentState, + final String templateName, + final NamedXContentRegistry xContentRegistry, + final String indexName) throws Exception { List templateMappings = MetadataIndexTemplateService.collectMappings(currentState, templateName, indexName); + return collectV2Mappings(requestMappings, templateMappings, xContentRegistry); + } + + public static List>> collectV2Mappings(final Map requestMappings, + final List templateMappings, + final NamedXContentRegistry xContentRegistry) throws Exception { + List>> result = new ArrayList<>(); + for (CompressedXContent templateMapping : templateMappings) { Map parsedTemplateMapping = MapperService.parseMapping(xContentRegistry, templateMapping.string()); result.add(singletonMap(MapperService.SINGLE_MAPPING_NAME, parsedTemplateMapping)); @@ -548,6 +614,7 @@ public static List>> collectV2Mappings(final Map } return result; } + private ClusterState applyCreateIndexRequestWithExistingMetadata(final ClusterState currentState, final CreateIndexClusterStateUpdateRequest request, final boolean silent, diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java index 2ddd020878974..9ec01e07bc6f7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java @@ -416,7 +416,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS public static void validateV2TemplateRequest(Metadata metadata, String name, ComposableIndexTemplate template) { if (template.indexPatterns().stream().anyMatch(Regex::isMatchAllPattern)) { - Settings mergedSettings = resolveSettings(metadata, template); + Settings mergedSettings = resolveSettings(template, metadata.componentTemplates()); if (IndexMetadata.INDEX_HIDDEN_SETTING.exists(mergedSettings)) { throw new InvalidIndexTemplateException(name, "global composable templates may not specify the setting " + IndexMetadata.INDEX_HIDDEN_SETTING.getKey()); @@ -973,6 +973,16 @@ public static List collectMappings(final ClusterState state, } final Map componentTemplates = state.metadata().componentTemplates(); + return collectMappings(template, componentTemplates, indexName); + } + + /** + * Collect the given v2 template into an ordered list of mappings. + */ + public static List collectMappings(final ComposableIndexTemplate template, + final Map componentTemplates, + final String indexName) throws Exception { + Objects.requireNonNull(template, "Composable index template must be provided"); List mappings = template.composedOf().stream() .map(componentTemplates::get) .filter(Objects::nonNull) @@ -1032,11 +1042,15 @@ public static Settings resolveSettings(final Metadata metadata, final String tem if (template == null) { return Settings.EMPTY; } - return resolveSettings(metadata, template); + return resolveSettings(template, metadata.componentTemplates()); } - private static Settings resolveSettings(Metadata metadata, ComposableIndexTemplate template) { - final Map componentTemplates = metadata.componentTemplates(); + /** + * Resolve the provided v2 template and component templates into a collected {@link Settings} object + */ + public static Settings resolveSettings(ComposableIndexTemplate template, Map componentTemplates) { + Objects.requireNonNull(template, "attempted to resolve settings for a null template"); + Objects.requireNonNull(componentTemplates, "attempted to resolve settings with null component templates"); List componentSettings = template.composedOf().stream() .map(componentTemplates::get) .filter(Objects::nonNull) @@ -1094,6 +1108,18 @@ public static List> resolveAliases(final Metadata met return Collections.emptyList(); } final Map componentTemplates = metadata.componentTemplates(); + return resolveAliases(template, componentTemplates, failIfTemplateHasDataStream, templateName); + } + + /** + * Resolve the given v2 template and component templates into an ordered list of aliases + */ + static List> resolveAliases(final ComposableIndexTemplate template, + final Map componentTemplates, + final boolean failIfTemplateHasDataStream, + @Nullable String templateName) { + Objects.requireNonNull(template, "attempted to resolve aliases for a null template"); + Objects.requireNonNull(componentTemplates, "attempted to resolve aliases with null component templates"); List> aliases = template.composedOf().stream() .map(componentTemplates::get) .filter(Objects::nonNull) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamService.java index 9b3bfbb168967..30c918c0179ea 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamService.java @@ -22,9 +22,9 @@ import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Map; -import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.indices.IndicesService; @@ -44,18 +44,23 @@ public class MetadataMigrateToDataStreamService { private final ClusterService clusterService; private final ActiveShardsObserver activeShardsObserver; private final IndicesService indexServices; + private final ThreadContext threadContext; + private final MetadataCreateIndexService metadataCreateIndexService; - @Inject public MetadataMigrateToDataStreamService(ThreadPool threadPool, ClusterService clusterService, - IndicesService indexServices) { + IndicesService indexServices, + MetadataCreateIndexService metadataCreateIndexService) { this.clusterService = clusterService; this.indexServices = indexServices; this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool); + this.threadContext = threadPool.getThreadContext(); + this.metadataCreateIndexService = metadataCreateIndexService; } public void migrateToDataStream(MigrateToDataStreamClusterStateUpdateRequest request, ActionListener finalListener) { + metadataCreateIndexService.getSystemIndices().validateDataStreamAccess(request.aliasName, threadContext); AtomicReference writeIndexRef = new AtomicReference<>(); ActionListener listener = ActionListener.wrap( response -> { @@ -90,7 +95,9 @@ public ClusterState execute(ClusterState currentState) throws Exception { throw new IllegalStateException(e); } }, - request); + request, + threadContext, + metadataCreateIndexService); writeIndexRef.set(clusterState.metadata().dataStreams().get(request.aliasName).getWriteIndex().getName()); return clusterState; } @@ -99,11 +106,12 @@ public ClusterState execute(ClusterState currentState) throws Exception { static ClusterState migrateToDataStream(ClusterState currentState, Function mapperSupplier, - MigrateToDataStreamClusterStateUpdateRequest request) throws Exception { + MigrateToDataStreamClusterStateUpdateRequest request, + ThreadContext threadContext, + MetadataCreateIndexService metadataCreateIndexService) throws Exception { if (currentState.nodes().getMinNodeVersion().before(Version.V_7_11_0)) { throw new IllegalStateException("data stream migration requires minimum node version of " + Version.V_7_11_0); } - validateRequest(currentState, request); IndexAbstraction.Alias alias = (IndexAbstraction.Alias) currentState.metadata().getIndicesLookup().get(request.aliasName); @@ -122,7 +130,8 @@ static ClusterState migrateToDataStream(ClusterState currentState, .collect(Collectors.toList()); logger.info("submitting request to migrate alias [{}] to a data stream", request.aliasName); - return MetadataCreateDataStreamService.createDataStream(null, currentState, request.aliasName, backingIndices, writeIndex); + return MetadataCreateDataStreamService.createDataStream(metadataCreateIndexService, currentState, request.aliasName, + backingIndices, writeIndex); } // package-visible for testing diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeService.java index dfce857d69099..1e0144fe3e39d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeService.java @@ -54,7 +54,9 @@ public void clusterChanged(ClusterChangedEvent event) { if (lastIndexMetadataMap != indexMetadataMap) { for (ObjectObjectCursor cursor : indexMetadataMap) { if (cursor.value != lastIndexMetadataMap.get(cursor.key)) { - if (systemIndices.isSystemIndex(cursor.value.getIndex()) != cursor.value.isSystem()) { + final boolean isSystem = systemIndices.isSystemIndex(cursor.value.getIndex()) || + systemIndices.isSystemIndexBackingDataStream(cursor.value.getIndex().getName()); + if (isSystem != cursor.value.isSystem()) { updateTaskPending = true; clusterService.submitStateUpdateTask("system_index_metadata_upgrade_service {system metadata change}", new SystemIndexMetadataUpdateTask()); @@ -74,7 +76,9 @@ public ClusterState execute(ClusterState currentState) throws Exception { final List updatedMetadata = new ArrayList<>(); for (ObjectObjectCursor cursor : indexMetadataMap) { if (cursor.value != lastIndexMetadataMap.get(cursor.key)) { - if (systemIndices.isSystemIndex(cursor.value.getIndex()) != cursor.value.isSystem()) { + final boolean isSystem = systemIndices.isSystemIndex(cursor.value.getIndex()) || + systemIndices.isSystemIndexBackingDataStream(cursor.value.getIndex().getName()); + if (isSystem != cursor.value.isSystem()) { updatedMetadata.add(IndexMetadata.builder(cursor.value).system(cursor.value.isSystem() == false).build()); } } diff --git a/server/src/main/java/org/elasticsearch/indices/SystemDataStreamDescriptor.java b/server/src/main/java/org/elasticsearch/indices/SystemDataStreamDescriptor.java new file mode 100644 index 0000000000000..d408bf2954181 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/indices/SystemDataStreamDescriptor.java @@ -0,0 +1,94 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.indices; + +import org.elasticsearch.cluster.metadata.ComponentTemplate; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.cluster.metadata.DataStream; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * Describes a {@link DataStream} that is reserved for use by a system component. The data stream will be managed by the system and also + * protected by the system against user modification so that system features are not broken by inadvertent user operations. + */ +public class SystemDataStreamDescriptor { + + private final String dataStreamName; + private final String description; + private final Type type; + private final ComposableIndexTemplate composableIndexTemplate; + private final Map componentTemplates; + private final List allowedElasticProductOrigins; + + /** + * Creates a new descriptor for a system data descriptor + * @param dataStreamName the name of the data stream. Must not be {@code null} + * @param description a brief description of what the data stream is used for. Must not be {@code null} + * @param type the {@link Type} of the data stream which determines how the data stream can be accessed. Must not be {@code null} + * @param composableIndexTemplate the {@link ComposableIndexTemplate} that contains the mappings and settings for the data stream. + * Must not be {@code null} + * @param componentTemplates a map that contains {@link ComponentTemplate} instances corresponding to those references in the + * {@link ComposableIndexTemplate} + * @param allowedElasticProductOrigins a list of product origin values that are allowed to access this data stream if the + * type is {@link Type#EXTERNAL}. Must not be {@code null} + */ + public SystemDataStreamDescriptor(String dataStreamName, String description, Type type, + ComposableIndexTemplate composableIndexTemplate, Map componentTemplates, + List allowedElasticProductOrigins) { + this.dataStreamName = Objects.requireNonNull(dataStreamName, "dataStreamName must be specified"); + this.description = Objects.requireNonNull(description, "description must be specified"); + this.type = Objects.requireNonNull(type, "type must be specified"); + this.composableIndexTemplate = Objects.requireNonNull(composableIndexTemplate, "composableIndexTemplate must be provided"); + this.componentTemplates = + componentTemplates == null ? Collections.emptyMap() : Collections.unmodifiableMap(new HashMap<>((componentTemplates))); + this.allowedElasticProductOrigins = + Objects.requireNonNull(allowedElasticProductOrigins, "allowedElasticProductOrigins must not be null"); + if (type == Type.EXTERNAL && allowedElasticProductOrigins.isEmpty()) { + throw new IllegalArgumentException("External system data stream without allowed products is not a valid combination"); + } + } + + public String getDataStreamName() { + return dataStreamName; + } + + public String getDescription() { + return description; + } + + public ComposableIndexTemplate getComposableIndexTemplate() { + return composableIndexTemplate; + } + + public boolean isExternal() { + return type == Type.EXTERNAL; + } + + public String getBackingIndexPattern() { + return DataStream.BACKING_INDEX_PREFIX + getDataStreamName() + "-*"; + } + + public List getAllowedElasticProductOrigins() { + return allowedElasticProductOrigins; + } + + public Map getComponentTemplates() { + return componentTemplates; + } + + public enum Type { + INTERNAL, + EXTERNAL + } +} diff --git a/server/src/main/java/org/elasticsearch/indices/SystemIndexDescriptor.java b/server/src/main/java/org/elasticsearch/indices/SystemIndexDescriptor.java index e44ecc5c87695..36efa83eb6b0d 100644 --- a/server/src/main/java/org/elasticsearch/indices/SystemIndexDescriptor.java +++ b/server/src/main/java/org/elasticsearch/indices/SystemIndexDescriptor.java @@ -261,14 +261,17 @@ public SystemIndexDescriptor(String indexPattern, String description, Type type, this.indexPattern = indexPattern; this.primaryIndex = primaryIndex; + this.aliasName = aliasName; final Automaton automaton = buildAutomaton(indexPattern, aliasName); this.indexPatternAutomaton = new CharacterRunAutomaton(automaton); + if (primaryIndex != null && indexPatternAutomaton.run(primaryIndex) == false) { + throw new IllegalArgumentException("primary index does not match the index pattern!"); + } this.description = description; this.mappings = mappings; this.settings = settings; - this.aliasName = aliasName; this.indexFormat = indexFormat; this.versionMetaKey = versionMetaKey; this.origin = origin; @@ -403,7 +406,7 @@ public List getAllowedElasticProductOrigins() { public Version getMappingVersion() { if (type.isManaged() == false) { - throw new IllegalStateException(toString() + " is not managed so there are no mappings or version"); + throw new IllegalStateException(this + " is not managed so there are no mappings or version"); } return mappingVersion; } diff --git a/server/src/main/java/org/elasticsearch/indices/SystemIndices.java b/server/src/main/java/org/elasticsearch/indices/SystemIndices.java index 5cb7ebdd38f14..a27d88e945399 100644 --- a/server/src/main/java/org/elasticsearch/indices/SystemIndices.java +++ b/server/src/main/java/org/elasticsearch/indices/SystemIndices.java @@ -21,13 +21,18 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.TriConsumer; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.Index; +import org.elasticsearch.plugins.SystemIndexPlugin; import org.elasticsearch.snapshots.SnapshotsService; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -36,6 +41,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Optional; +import java.util.Set; import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -52,11 +58,18 @@ * to reduce the locations within the code that need to deal with {@link SystemIndexDescriptor}s. */ public class SystemIndices { + public static final String SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY = "_system_index_access_allowed"; + public static final String EXTERNAL_SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY = "_external_system_index_access_origin"; + + private static final Automaton EMPTY = Automata.makeEmpty(); + private static final Map SERVER_SYSTEM_INDEX_DESCRIPTORS = singletonMap( TASKS_FEATURE_NAME, new Feature(TASKS_FEATURE_NAME, "Manages task results", singletonList(TASKS_DESCRIPTOR)) ); - private final CharacterRunAutomaton runAutomaton; + private final CharacterRunAutomaton systemIndexAutomaton; + private final CharacterRunAutomaton systemDataStreamIndicesAutomaton; + private final Predicate systemDataStreamAutomaton; private final Map featureDescriptors; private final Map productToSystemIndicesMatcher; @@ -69,11 +82,13 @@ public SystemIndices(Map pluginAndModulesDescriptors) { featureDescriptors = buildSystemIndexDescriptorMap(pluginAndModulesDescriptors); checkForOverlappingPatterns(featureDescriptors); checkForDuplicateAliases(this.getSystemIndexDescriptors()); - this.runAutomaton = buildCharacterRunAutomaton(featureDescriptors); - this.productToSystemIndicesMatcher = getProductToSystemIndicesMap(this.getSystemIndexDescriptors()); + this.systemIndexAutomaton = buildIndexCharacterRunAutomaton(featureDescriptors); + this.systemDataStreamIndicesAutomaton = buildDataStreamBackingIndicesAutomaton(featureDescriptors); + this.systemDataStreamAutomaton = buildDataStreamNamePredicate(featureDescriptors); + this.productToSystemIndicesMatcher = getProductToSystemIndicesMap(featureDescriptors); } - private void checkForDuplicateAliases(Collection descriptors) { + private static void checkForDuplicateAliases(Collection descriptors) { final Map aliasCounts = new HashMap<>(); for (SystemIndexDescriptor descriptor : descriptors) { @@ -95,20 +110,46 @@ private void checkForDuplicateAliases(Collection descript } } - private static Map getProductToSystemIndicesMap(Collection descriptors) { - Map map = descriptors.stream() - .filter(SystemIndexDescriptor::isExternal) - .flatMap(descriptor -> descriptor.getAllowedElasticProductOrigins().stream().map(product -> new Tuple<>(product, descriptor))) - .collect(Collectors.toMap(Tuple::v1, tuple -> { - SystemIndexDescriptor descriptor = tuple.v2(); - return SystemIndexDescriptor.buildAutomaton(descriptor.getIndexPattern(), descriptor.getAliasName()); - }, Operations::union)); + private static Map getProductToSystemIndicesMap(Map descriptors) { + Map productToSystemIndicesMap = new HashMap<>(); + for (Feature feature : descriptors.values()) { + feature.getIndexDescriptors().forEach(systemIndexDescriptor -> { + if (systemIndexDescriptor.isExternal()) { + systemIndexDescriptor.getAllowedElasticProductOrigins().forEach(origin -> + productToSystemIndicesMap.compute(origin, (key, value) -> { + Automaton automaton = SystemIndexDescriptor.buildAutomaton( + systemIndexDescriptor.getIndexPattern(), systemIndexDescriptor.getAliasName()); + return value == null ? automaton : Operations.union(value, automaton); + }) + ); + } + }); + feature.getDataStreamDescriptors().forEach(dataStreamDescriptor -> { + if (dataStreamDescriptor.isExternal()) { + dataStreamDescriptor.getAllowedElasticProductOrigins().forEach(origin -> + productToSystemIndicesMap.compute(origin, (key, value) -> { + Automaton automaton = SystemIndexDescriptor.buildAutomaton( + dataStreamDescriptor.getBackingIndexPattern(), dataStreamDescriptor.getDataStreamName()); + return value == null ? automaton : Operations.union(value, automaton); + }) + ); + } + }); + } - return Collections.unmodifiableMap(map.entrySet().stream() + return unmodifiableMap(productToSystemIndicesMap.entrySet().stream() .collect(Collectors.toMap(Entry::getKey, entry -> new CharacterRunAutomaton(MinimizationOperations.minimize(entry.getValue(), Integer.MAX_VALUE))))); } + /** + * Checks whether the given name matches a reserved name or pattern that is intended for use by a system component. The name + * is checked against index names, aliases, data stream names, and the names of indices that back a system data stream. + */ + public boolean isSystemName(String name) { + return isSystemIndex(name) || isSystemDataStream(name) || isSystemIndexBackingDataStream(name); + } + /** * Determines whether a given index is a system index by comparing its name to the collection of loaded {@link SystemIndexDescriptor}s * @param index the {@link Index} object to check against loaded {@link SystemIndexDescriptor}s @@ -119,12 +160,28 @@ public boolean isSystemIndex(Index index) { } /** - * Determines whether a given index is a system index by comparing its name to the collection of loaded {@link SystemIndexDescriptor}s + * Determines whether a given index is a system index by comparing its name to the collection of loaded {@link SystemIndexDescriptor}s. + * This will also match alias names that belong to system indices. * @param indexName the index name to check against loaded {@link SystemIndexDescriptor}s * @return true if the index name matches a pattern from a {@link SystemIndexDescriptor} */ public boolean isSystemIndex(String indexName) { - return runAutomaton.run(indexName); + return systemIndexAutomaton.run(indexName); + } + + /** + * Determines whether the provided name matches that of a system data stream that has been defined by a + * {@link SystemDataStreamDescriptor} + */ + public boolean isSystemDataStream(String name) { + return systemDataStreamAutomaton.test(name); + } + + /** + * Determines whether the provided name matches that of an index that backs a system data stream. + */ + public boolean isSystemIndexBackingDataStream(String name) { + return systemDataStreamIndicesAutomaton.run(name); } /** @@ -158,12 +215,48 @@ public boolean isSystemIndex(String indexName) { } } + /** + * Finds a single matching {@link SystemDataStreamDescriptor}, if any, for the given DataStream name. + * @param name the name of the DataStream + * @return The matching {@link SystemDataStreamDescriptor} or {@code null} if no descriptor is found + * @throws IllegalStateException if multiple descriptors match the name + */ + public @Nullable SystemDataStreamDescriptor findMatchingDataStreamDescriptor(String name) { + final List matchingDescriptors = featureDescriptors.values().stream() + .flatMap(feature -> feature.getDataStreamDescriptors().stream()) + .filter(descriptor -> descriptor.getDataStreamName().equals(name)) + .collect(Collectors.toList()); + + if (matchingDescriptors.isEmpty()) { + return null; + } else if (matchingDescriptors.size() == 1) { + return matchingDescriptors.get(0); + } else { + // This should be prevented by failing on overlapping patterns at startup time, but is here just in case. + StringBuilder errorMessage = new StringBuilder() + .append("DataStream name [") + .append(name) + .append("] is claimed as a system data stream by multiple descriptors: [") + .append(matchingDescriptors.stream() + .map(descriptor -> "name: [" + descriptor.getDataStreamName() + + "], description: [" + descriptor.getDescription() + "]").collect(Collectors.joining("; "))); + // Throw AssertionError if assertions are enabled, or a regular exception otherwise: + assert false : errorMessage.toString(); + throw new IllegalStateException(errorMessage.toString()); + } + } + /** * Builds a predicate that tests if a system index should be accessible based on the provided product name - * @param product the name of the product that is attempting to access an external system index + * contained in headers. + * @param threadContext the threadContext containing headers used for system index access * @return Predicate to check external system index metadata with */ - public Predicate getProductSystemIndexMetadataPredicate(String product) { + public Predicate getProductSystemIndexMetadataPredicate(ThreadContext threadContext) { + final String product = threadContext.getHeader(EXTERNAL_SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY); + if (product == null) { + return indexMetadata -> false; + } final CharacterRunAutomaton automaton = productToSystemIndicesMatcher.get(product); if (automaton == null) { return indexMetadata -> false; @@ -173,10 +266,15 @@ public Predicate getProductSystemIndexMetadataPredicate(String pr /** * Builds a predicate that tests if a system index name should be accessible based on the provided product name - * @param product the name of the product that is attempting to access an external system index + * contained in headers. + * @param threadContext the threadContext containing headers used for system index access * @return Predicate to check external system index names with */ - public Predicate getProductSystemIndexNamePredicate(String product) { + public Predicate getProductSystemIndexNamePredicate(ThreadContext threadContext) { + final String product = threadContext.getHeader(EXTERNAL_SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY); + if (product == null) { + return name -> false; + } final CharacterRunAutomaton automaton = productToSystemIndicesMatcher.get(product); if (automaton == null) { return name -> false; @@ -188,35 +286,151 @@ public Map getFeatures() { return featureDescriptors; } - private static CharacterRunAutomaton buildCharacterRunAutomaton(Map descriptors) { + private static CharacterRunAutomaton buildIndexCharacterRunAutomaton(Map descriptors) { Optional automaton = descriptors.values().stream() - .flatMap(feature -> feature.getIndexDescriptors().stream()) + .map(SystemIndices::featureToIndexAutomaton) + .reduce(Operations::union); + return new CharacterRunAutomaton(MinimizationOperations.minimize(automaton.orElse(EMPTY), Integer.MAX_VALUE)); + } + + private static Automaton featureToIndexAutomaton(Feature feature) { + Optional systemIndexAutomaton = feature.getIndexDescriptors().stream() .map(descriptor -> SystemIndexDescriptor.buildAutomaton(descriptor.getIndexPattern(), descriptor.getAliasName())) .reduce(Operations::union); - return new CharacterRunAutomaton(MinimizationOperations.minimize(automaton.orElse(Automata.makeEmpty()), Integer.MAX_VALUE)); + + return systemIndexAutomaton.orElse(EMPTY); + } + + private static Predicate buildDataStreamNamePredicate(Map descriptors) { + Set systemDataStreamNames = descriptors.values().stream() + .flatMap(feature -> feature.getDataStreamDescriptors().stream()) + .map(SystemDataStreamDescriptor::getDataStreamName) + .collect(Collectors.toSet()); + return systemDataStreamNames::contains; + } + + private static CharacterRunAutomaton buildDataStreamBackingIndicesAutomaton(Map descriptors) { + Optional automaton = descriptors.values().stream() + .map(SystemIndices::featureToDataStreamBackingIndicesAutomaton) + .reduce(Operations::union); + return new CharacterRunAutomaton(automaton.orElse(EMPTY)); + } + + private static Automaton featureToDataStreamBackingIndicesAutomaton(Feature feature) { + Optional systemDataStreamAutomaton = feature.getDataStreamDescriptors().stream() + .map(descriptor -> SystemIndexDescriptor.buildAutomaton( + descriptor.getBackingIndexPattern(), + null + )) + .reduce(Operations::union); + return systemDataStreamAutomaton.orElse(EMPTY); + } + + public SystemDataStreamDescriptor validateDataStreamAccess(String dataStreamName, ThreadContext threadContext) { + if (systemDataStreamAutomaton.test(dataStreamName)) { + SystemDataStreamDescriptor dataStreamDescriptor = featureDescriptors.values().stream() + .flatMap(feature -> feature.getDataStreamDescriptors().stream()) + .filter(descriptor -> descriptor.getDataStreamName().equals(dataStreamName)) + .findFirst() + .orElseThrow(() -> new IllegalStateException("system data stream descriptor not found for [" + dataStreamName + "]")); + if (dataStreamDescriptor.isExternal()) { + final SystemIndexAccessLevel accessLevel = getSystemIndexAccessLevel(threadContext); + if (accessLevel == SystemIndexAccessLevel.NONE) { + throw dataStreamAccessException(null, dataStreamName); + } else if (accessLevel == SystemIndexAccessLevel.RESTRICTED) { + if (getProductSystemIndexNamePredicate(threadContext).test(dataStreamName) == false) { + throw dataStreamAccessException( + threadContext.getHeader(EXTERNAL_SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY), + dataStreamName); + } else { + return dataStreamDescriptor; + } + } else { + assert accessLevel == SystemIndexAccessLevel.ALL; + return dataStreamDescriptor; + } + } else { + return dataStreamDescriptor; + } + } else { + return null; + } + } + + public IllegalArgumentException dataStreamAccessException(ThreadContext threadContext, Collection names) { + return dataStreamAccessException( + threadContext.getHeader(EXTERNAL_SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY), + names.toArray(Strings.EMPTY_ARRAY) + ); + } + + IllegalArgumentException dataStreamAccessException(@Nullable String product, String... dataStreamNames) { + if (product == null) { + return new IllegalArgumentException("Data stream(s) " + Arrays.toString(dataStreamNames) + + " use and access is reserved for system operations"); + } else { + return new IllegalArgumentException("Data stream(s) " + Arrays.toString(dataStreamNames) + " may not be accessed by product [" + + product + "]"); + } + } + + /** + * Determines what level of system index access should be allowed in the current context. + * + * @return {@link SystemIndexAccessLevel#ALL} if unrestricted system index access should be allowed, + * {@link SystemIndexAccessLevel#RESTRICTED} if a subset of system index access should be allowed, or + * {@link SystemIndexAccessLevel#NONE} if no system index access should be allowed. + */ + public SystemIndexAccessLevel getSystemIndexAccessLevel(ThreadContext threadContext) { + final String headerValue = threadContext.getHeader(SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY); + final String productHeaderValue = threadContext.getHeader(EXTERNAL_SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY); + + final boolean allowed = Booleans.parseBoolean(headerValue, true); + if (allowed) { + if (productHeaderValue != null) { + return SystemIndexAccessLevel.RESTRICTED; + } else { + return SystemIndexAccessLevel.ALL; + } + } else { + return SystemIndexAccessLevel.NONE; + } + } + + public enum SystemIndexAccessLevel { + ALL, + NONE, + RESTRICTED } /** * Given a collection of {@link SystemIndexDescriptor}s and their sources, checks to see if the index patterns of the listed * descriptors overlap with any of the other patterns. If any do, throws an exception. * - * @param sourceToDescriptors A map of source (plugin) names to the SystemIndexDescriptors they provide. + * @param sourceToFeature A map of source (plugin) names to the SystemIndexDescriptors they provide. * @throws IllegalStateException Thrown if any of the index patterns overlaps with another. */ - static void checkForOverlappingPatterns(Map sourceToDescriptors) { - List> sourceDescriptorPair = sourceToDescriptors.entrySet().stream() + static void checkForOverlappingPatterns(Map sourceToFeature) { + List> sourceDescriptorPair = sourceToFeature.entrySet().stream() .flatMap(entry -> entry.getValue().getIndexDescriptors().stream().map(descriptor -> new Tuple<>(entry.getKey(), descriptor))) .sorted(Comparator.comparing(d -> d.v1() + ":" + d.v2().getIndexPattern())) // Consistent ordering -> consistent error message .collect(Collectors.toList()); + List> sourceDataStreamDescriptorPair = sourceToFeature.entrySet().stream() + .filter(entry -> entry.getValue().getDataStreamDescriptors().isEmpty() == false) + .flatMap(entry -> + entry.getValue().getDataStreamDescriptors().stream().map(descriptor -> new Tuple<>(entry.getKey(), descriptor))) + .sorted( + Comparator.comparing(d -> d.v1() + ":" + d.v2().getDataStreamName())) // Consistent ordering -> consistent error message + .collect(Collectors.toList()); // This is O(n^2) with the number of system index descriptors, and each check is quadratic with the number of states in the // automaton, but the absolute number of system index descriptors should be quite small (~10s at most), and the number of states // per pattern should be low as well. If these assumptions change, this might need to be reworked. sourceDescriptorPair.forEach(descriptorToCheck -> { List> descriptorsMatchingThisPattern = sourceDescriptorPair.stream() - .filter(d -> descriptorToCheck.v2() != d.v2()) // Exclude the pattern currently being checked - .filter(d -> overlaps(descriptorToCheck.v2(), d.v2())) + .filter(d -> overlaps(descriptorToCheck.v2(), d.v2()) || + (d.v2().getAliasName() != null && descriptorToCheck.v2().matchesIndexPattern(d.v2().getAliasName()))) .collect(Collectors.toList()); if (descriptorsMatchingThisPattern.isEmpty() == false) { throw new IllegalStateException("a system index descriptor [" + descriptorToCheck.v2() + "] from [" + @@ -225,12 +439,28 @@ static void checkForOverlappingPatterns(Map sourceToDescriptors .map(descriptor -> descriptor.v2() + " from [" + descriptor.v1() + "]") .collect(Collectors.joining(", "))); } + + List> dataStreamsMatching = sourceDataStreamDescriptorPair.stream() + .filter(dsTuple -> descriptorToCheck.v2().matchesIndexPattern(dsTuple.v2().getDataStreamName()) || + overlaps(descriptorToCheck.v2().getIndexPattern(), dsTuple.v2().getBackingIndexPattern())) + .collect(Collectors.toList()); + if (dataStreamsMatching.isEmpty() == false) { + throw new IllegalStateException("a system index descriptor [" + descriptorToCheck.v2() + "] from [" + + descriptorToCheck.v1() + "] overlaps with one or more data stream descriptors: [" + + dataStreamsMatching.stream() + .map(descriptor -> descriptor.v2() + " from [" + descriptor.v1() + "]") + .collect(Collectors.joining(", "))); + } }); } private static boolean overlaps(SystemIndexDescriptor a1, SystemIndexDescriptor a2) { - Automaton a1Automaton = SystemIndexDescriptor.buildAutomaton(a1.getIndexPattern(), null); - Automaton a2Automaton = SystemIndexDescriptor.buildAutomaton(a2.getIndexPattern(), null); + return overlaps(a1.getIndexPattern(), a2.getIndexPattern()); + } + + private static boolean overlaps(String pattern1, String pattern2) { + Automaton a1Automaton = SystemIndexDescriptor.buildAutomaton(pattern1, null); + Automaton a2Automaton = SystemIndexDescriptor.buildAutomaton(pattern2, null); return Operations.isEmpty(Operations.intersection(a1Automaton, a2Automaton)) == false; } @@ -271,23 +501,27 @@ public static void validateFeatureName(String name, String plugin) { public static class Feature { private final String description; private final Collection indexDescriptors; + private final Collection dataStreamDescriptors; private final Collection associatedIndexPatterns; private final TriConsumer> cleanUpFunction; /** * Construct a Feature with a custom cleanup function * @param description Description of the feature - * @param indexDescriptors Patterns describing system indices for this feature + * @param indexDescriptors Collection of objects describing system indices for this feature + * @param dataStreamDescriptors Collection of objects describing system data streams for this feature * @param associatedIndexPatterns Patterns describing associated indices * @param cleanUpFunction A function that will clean up the feature's state */ public Feature( String description, Collection indexDescriptors, + Collection dataStreamDescriptors, Collection associatedIndexPatterns, TriConsumer> cleanUpFunction) { this.description = description; this.indexDescriptors = indexDescriptors; + this.dataStreamDescriptors = dataStreamDescriptors; this.associatedIndexPatterns = associatedIndexPatterns; this.cleanUpFunction = cleanUpFunction; } @@ -299,7 +533,21 @@ public Feature( * @param indexDescriptors Patterns describing system indices for this feature */ public Feature(String name, String description, Collection indexDescriptors) { - this(description, indexDescriptors, Collections.emptyList(), + this(description, indexDescriptors, Collections.emptyList(), Collections.emptyList(), + (clusterService, client, listener) -> + cleanUpFeature(indexDescriptors, Collections.emptyList(), name, clusterService, client, listener) + ); + } + /** + * Construct a Feature using the default clean-up function + * @param name Name of the feature, used in logging + * @param description Description of the feature + * @param indexDescriptors Patterns describing system indices for this feature + * @param dataStreamDescriptors Collection of objects describing system data streams for this feature + */ + public Feature(String name, String description, Collection indexDescriptors, + Collection dataStreamDescriptors) { + this(description, indexDescriptors, dataStreamDescriptors, Collections.emptyList(), (clusterService, client, listener) -> cleanUpFeature(indexDescriptors, Collections.emptyList(), name, clusterService, client, listener) ); @@ -313,6 +561,10 @@ public Collection getIndexDescriptors() { return indexDescriptors; } + public Collection getDataStreamDescriptors() { + return dataStreamDescriptors; + } + public Collection getAssociatedIndexPatterns() { return associatedIndexPatterns; } @@ -365,4 +617,12 @@ public void onFailure(Exception e) { }); } } + + public static Feature pluginToFeature(SystemIndexPlugin plugin, Settings settings) { + return new Feature(plugin.getFeatureDescription(), + plugin.getSystemIndexDescriptors(settings), + plugin.getSystemDataStreamDescriptors(), + plugin.getAssociatedIndexPatterns(), + plugin::cleanUpFeature); + } } diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index a529a7bac4018..0156ef09b5c73 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -447,20 +447,33 @@ protected Node(final Environment initialEnvironment, final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client); final UsageService usageService = new UsageService(); - final Map featuresMap = Collections.unmodifiableMap(pluginsService + SearchModule searchModule = new SearchModule(settings, false, pluginsService.filterPlugins(SearchPlugin.class)); + IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class)); + List namedWriteables = Stream.of( + NetworkModule.getNamedWriteables().stream(), + indicesModule.getNamedWriteables().stream(), + searchModule.getNamedWriteables().stream(), + pluginsService.filterPlugins(Plugin.class).stream() + .flatMap(p -> p.getNamedWriteables().stream()), + ClusterModule.getNamedWriteables().stream()) + .flatMap(Function.identity()).collect(Collectors.toList()); + final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(namedWriteables); + NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(Stream.of( + NetworkModule.getNamedXContents().stream(), + IndicesModule.getNamedXContents().stream(), + searchModule.getNamedXContents().stream(), + pluginsService.filterPlugins(Plugin.class).stream() + .flatMap(p -> p.getNamedXContent().stream()), + ClusterModule.getNamedXWriteables().stream()) + .flatMap(Function.identity()).collect(toList())); + final Map featuresMap = pluginsService .filterPlugins(SystemIndexPlugin.class) .stream() .peek(plugin -> SystemIndices.validateFeatureName(plugin.getFeatureName(), plugin.getClass().getCanonicalName())) .collect(Collectors.toMap( - plugin -> plugin.getFeatureName(), - plugin -> new SystemIndices.Feature( - plugin.getFeatureDescription(), - plugin.getSystemIndexDescriptors(settings), - plugin.getAssociatedIndexPatterns(), - plugin::cleanUpFeature - )) - ) - ); + SystemIndexPlugin::getFeatureName, + plugin -> SystemIndices.pluginToFeature(plugin, settings) + )); final SystemIndices systemIndices = new SystemIndices(featuresMap); ModulesBuilder modules = new ModulesBuilder(); @@ -477,10 +490,8 @@ protected Node(final Environment initialEnvironment, final ClusterModule clusterModule = new ClusterModule(settings, clusterService, clusterPlugins, clusterInfoService, snapshotsInfoService, threadPool.getThreadContext(), systemIndices); modules.add(clusterModule); - IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class)); modules.add(indicesModule); - SearchModule searchModule = new SearchModule(settings, false, pluginsService.filterPlugins(SearchPlugin.class)); List pluginCircuitBreakers = pluginsService.filterPlugins(CircuitBreakerPlugin.class) .stream() .map(plugin -> plugin.getCircuitBreaker(settings)) @@ -500,23 +511,7 @@ protected Node(final Environment initialEnvironment, PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings); BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService); modules.add(settingsModule); - List namedWriteables = Stream.of( - NetworkModule.getNamedWriteables().stream(), - indicesModule.getNamedWriteables().stream(), - searchModule.getNamedWriteables().stream(), - pluginsService.filterPlugins(Plugin.class).stream() - .flatMap(p -> p.getNamedWriteables().stream()), - ClusterModule.getNamedWriteables().stream()) - .flatMap(Function.identity()).collect(Collectors.toList()); - final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(namedWriteables); - NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(Stream.of( - NetworkModule.getNamedXContents().stream(), - IndicesModule.getNamedXContents().stream(), - searchModule.getNamedXContents().stream(), - pluginsService.filterPlugins(Plugin.class).stream() - .flatMap(p -> p.getNamedXContent().stream()), - ClusterModule.getNamedXWriteables().stream()) - .flatMap(Function.identity()).collect(toList())); + final MetaStateService metaStateService = new MetaStateService(nodeEnvironment, xContentRegistry); final PersistedClusterStateService lucenePersistedStateFactory = new PersistedClusterStateService(nodeEnvironment, xContentRegistry, bigArrays, clusterService.getClusterSettings(), diff --git a/server/src/main/java/org/elasticsearch/plugins/SystemIndexPlugin.java b/server/src/main/java/org/elasticsearch/plugins/SystemIndexPlugin.java index 9724b7eeb182c..26b1fae45843e 100644 --- a/server/src/main/java/org/elasticsearch/plugins/SystemIndexPlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/SystemIndexPlugin.java @@ -13,6 +13,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.indices.SystemDataStreamDescriptor; import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.indices.SystemIndices; @@ -35,6 +36,10 @@ default Collection getSystemIndexDescriptors(Settings set return Collections.emptyList(); } + default Collection getSystemDataStreamDescriptors() { + return Collections.emptyList(); + } + /** * @return The name of the feature, as used for specifying feature states in snapshot creation and restoration. */ diff --git a/server/src/main/java/org/elasticsearch/rest/RestController.java b/server/src/main/java/org/elasticsearch/rest/RestController.java index 0a15cbd482a0e..ca75101fc7ec8 100644 --- a/server/src/main/java/org/elasticsearch/rest/RestController.java +++ b/server/src/main/java/org/elasticsearch/rest/RestController.java @@ -44,8 +44,8 @@ import java.util.function.UnaryOperator; import java.util.stream.Collectors; -import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.EXTERNAL_SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY; -import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY; +import static org.elasticsearch.indices.SystemIndices.EXTERNAL_SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY; +import static org.elasticsearch.indices.SystemIndices.SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY; import static org.elasticsearch.rest.BytesRestResponse.TEXT_CONTENT_TYPE; import static org.elasticsearch.rest.RestStatus.BAD_REQUEST; import static org.elasticsearch.rest.RestStatus.INTERNAL_SERVER_ERROR; diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 4d7720a2a019d..e83b770961cde 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -769,7 +769,7 @@ public void onFailure(Exception e) { } private boolean isSystemIndex(IndexMetadata indexMetadata) { - return indexMetadata.isSystem() || systemIndices.isSystemIndex(indexMetadata.getIndex()); + return indexMetadata.isSystem() || systemIndices.isSystemName(indexMetadata.getIndex().getName()); } private Map getDataStreamsToRestore(Repository repository, SnapshotId snapshotId, SnapshotInfo snapshotInfo, diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/alias/get/TransportGetAliasesActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/alias/get/TransportGetAliasesActionTests.java index 7630859173d65..f8565133bb571 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/alias/get/TransportGetAliasesActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/alias/get/TransportGetAliasesActionTests.java @@ -11,12 +11,14 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.AliasMetadata; import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.SystemIndexAccessLevel; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.indices.EmptySystemIndices; import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.indices.SystemIndices; +import org.elasticsearch.indices.SystemIndices.SystemIndexAccessLevel; import org.elasticsearch.test.ESTestCase; import java.util.Collections; @@ -130,7 +132,7 @@ public void testDeprecationWarningNotEmittedWhenSystemAccessAllowed() { assertEquals(state.metadata().findAliases(request, concreteIndices), aliases); ImmutableOpenMap> result = TransportGetAliasesAction.postProcess(request, concreteIndices, aliases, state, - SystemIndexAccessLevel.ALL, "", EmptySystemIndices.INSTANCE); + SystemIndexAccessLevel.ALL, new ThreadContext(Settings.EMPTY), EmptySystemIndices.INSTANCE); assertThat(result.size(), equalTo(1)); assertThat(result.get(".b").size(), equalTo(1)); } @@ -150,7 +152,7 @@ public void testDeprecationWarningNotEmittedWhenOnlyNonsystemIndexRequested() { assertEquals(state.metadata().findAliases(request, concreteIndices), aliases); ImmutableOpenMap> result = TransportGetAliasesAction.postProcess(request, concreteIndices, aliases, state, - SystemIndexAccessLevel.NONE, "", EmptySystemIndices.INSTANCE); + SystemIndexAccessLevel.NONE, new ThreadContext(Settings.EMPTY), EmptySystemIndices.INSTANCE); assertThat(result.size(), equalTo(1)); assertThat(result.get("c").size(), equalTo(1)); } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java index 10ec12b29f0b8..be598777923ed 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java @@ -336,7 +336,7 @@ public void testCreateIndexRequestForDataStream() { .build(); rolloverRequest.getCreateIndexRequest().settings(settings); final CreateIndexClusterStateUpdateRequest createIndexRequest = MetadataRolloverService.prepareDataStreamCreateIndexRequest( - dataStream.getName(), newWriteIndexName, rolloverRequest.getCreateIndexRequest()); + dataStream.getName(), newWriteIndexName, rolloverRequest.getCreateIndexRequest(), null); for (String settingKey : settings.keySet()) { assertThat(settings.get(settingKey), equalTo(createIndexRequest.settings().get(settingKey))); } @@ -504,7 +504,7 @@ public void testRolloverClusterState() throws Exception { MetadataIndexAliasesService indexAliasesService = new MetadataIndexAliasesService(clusterService, indicesService, new AliasValidator(), null, xContentRegistry()); MetadataRolloverService rolloverService = new MetadataRolloverService(testThreadPool, createIndexService, indexAliasesService, - mockIndexNameExpressionResolver); + mockIndexNameExpressionResolver, EmptySystemIndices.INSTANCE); MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong()); List> metConditions = Collections.singletonList(condition); @@ -609,7 +609,7 @@ protected String contentType() { MetadataIndexAliasesService indexAliasesService = new MetadataIndexAliasesService(clusterService, indicesService, new AliasValidator(), null, xContentRegistry()); MetadataRolloverService rolloverService = new MetadataRolloverService(testThreadPool, createIndexService, indexAliasesService, - mockIndexNameExpressionResolver); + mockIndexNameExpressionResolver, EmptySystemIndices.INSTANCE); MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong()); List> metConditions = Collections.singletonList(condition); @@ -718,7 +718,7 @@ clusterService, indicesService, allocationService, new AliasValidator(), shardLi MetadataIndexAliasesService indexAliasesService = new MetadataIndexAliasesService(clusterService, indicesService, new AliasValidator(), null, xContentRegistry()); MetadataRolloverService rolloverService = new MetadataRolloverService(testThreadPool, createIndexService, indexAliasesService, - mockIndexNameExpressionResolver); + mockIndexNameExpressionResolver, EmptySystemIndices.INSTANCE); MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong()); List> metConditions = Collections.singletonList(condition); @@ -781,7 +781,7 @@ public void testValidation() throws Exception { IndexNameExpressionResolver mockIndexNameExpressionResolver = mock(IndexNameExpressionResolver.class); when(mockIndexNameExpressionResolver.resolveDateMathExpression(any())).then(returnsFirstArg()); MetadataRolloverService rolloverService = new MetadataRolloverService(null, createIndexService, metadataIndexAliasesService, - mockIndexNameExpressionResolver); + mockIndexNameExpressionResolver, EmptySystemIndices.INSTANCE); String newIndexName = useDataStream == false && randomBoolean() ? "logs-index-9" : null; @@ -831,7 +831,7 @@ public void testRolloverClusterStateForDataStreamNoTemplate() throws Exception { MetadataIndexAliasesService indexAliasesService = new MetadataIndexAliasesService(clusterService, indicesService, new AliasValidator(), null, xContentRegistry()); MetadataRolloverService rolloverService = new MetadataRolloverService(testThreadPool, createIndexService, indexAliasesService, - mockIndexNameExpressionResolver); + mockIndexNameExpressionResolver, EmptySystemIndices.INSTANCE); MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong()); List> metConditions = Collections.singletonList(condition); diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java index c50d538b1c20a..120b5d2356675 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java @@ -52,6 +52,7 @@ import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.index.store.StoreStats; import org.elasticsearch.index.warmer.WarmerStats; +import org.elasticsearch.indices.EmptySystemIndices; import org.elasticsearch.search.suggest.completion.CompletionStats; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; @@ -252,7 +253,7 @@ public void testConditionEvaluationWhenAliasToWriteAndReadIndicesConsidersOnlyPr when(mockCreateIndexService.applyCreateIndexRequest(any(), any(), anyBoolean())).thenReturn(stateBefore); when(mdIndexAliasesService.applyAliasActions(any(), any())).thenReturn(stateBefore); MetadataRolloverService rolloverService = new MetadataRolloverService(mockThreadPool, mockCreateIndexService, - mdIndexAliasesService, mockIndexNameExpressionResolver); + mdIndexAliasesService, mockIndexNameExpressionResolver, EmptySystemIndices.INSTANCE); final TransportRolloverAction transportRolloverAction = new TransportRolloverAction(mockTransportService, mockClusterService, mockThreadPool, mockActionFilters, mockIndexNameExpressionResolver, rolloverService, mockClient); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DateMathExpressionResolverTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DateMathExpressionResolverTests.java index 09045500a877d..b594eb31fe844 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DateMathExpressionResolverTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DateMathExpressionResolverTests.java @@ -14,7 +14,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.Context; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.DateMathExpressionResolver; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.SystemIndexAccessLevel; import org.elasticsearch.test.ESTestCase; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -25,7 +24,6 @@ import java.util.Collections; import java.util.List; -import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.SystemIndexAccessLevel.NONE; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.joda.time.DateTimeZone.UTC; @@ -35,7 +33,7 @@ public class DateMathExpressionResolverTests extends ESTestCase { private final DateMathExpressionResolver expressionResolver = new DateMathExpressionResolver(); private final Context context = new Context( ClusterState.builder(new ClusterName("_name")).build(), IndicesOptions.strictExpand(), - SystemIndexAccessLevel.NONE + name -> false ); public void testNormal() throws Exception { @@ -138,7 +136,7 @@ public void testExpression_CustomTimeZoneInIndexName() throws Exception { // rounding to today 00:00 now = DateTime.now(UTC).withHourOfDay(0).withMinuteOfHour(0).withSecondOfMinute(0); } - Context context = new Context(this.context.getState(), this.context.getOptions(), now.getMillis(), NONE); + Context context = new Context(this.context.getState(), this.context.getOptions(), now.getMillis(), name -> false); List results = expressionResolver.resolve(context, Arrays.asList("<.marvel-{now/d{yyyy.MM.dd|" + timeZone.getID() + "}}>")); assertThat(results.size(), equalTo(1)); logger.info("timezone: [{}], now [{}], name: [{}]", timeZone, now, results.get(0)); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java index 3d963ec3863a2..6f5d5c35ac807 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java @@ -36,7 +36,6 @@ import org.elasticsearch.indices.TestIndexNameExpressionResolver; import org.elasticsearch.test.ESTestCase; - import java.time.Instant; import java.time.LocalDate; import java.time.ZoneOffset; @@ -48,15 +47,15 @@ import java.util.List; import java.util.Set; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createBackingIndex; import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createTimestampField; import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_HIDDEN_SETTING; -import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.EXTERNAL_SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY; -import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY; -import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.SystemIndexAccessLevel.NONE; import static org.elasticsearch.common.util.set.Sets.newHashSet; +import static org.elasticsearch.indices.SystemIndices.EXTERNAL_SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY; +import static org.elasticsearch.indices.SystemIndices.SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY; import static org.hamcrest.Matchers.arrayContaining; import static org.hamcrest.Matchers.arrayContainingInAnyOrder; import static org.hamcrest.Matchers.arrayWithSize; @@ -71,6 +70,9 @@ import static org.hamcrest.Matchers.notNullValue; public class IndexNameExpressionResolverTests extends ESTestCase { + + private static final Predicate NONE = name -> false; + private IndexNameExpressionResolver indexNameExpressionResolver; private ThreadContext threadContext; private long epochMillis; diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java index 33fdbbaf01626..2bcbebab6cde7 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java @@ -12,19 +12,27 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate.DataStreamTemplate; import org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService.CreateDataStreamClusterStateUpdateRequest; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.indices.SystemDataStreamDescriptor; +import org.elasticsearch.indices.SystemDataStreamDescriptor.Type; +import org.elasticsearch.indices.SystemIndices; +import org.elasticsearch.indices.SystemIndices.Feature; import org.elasticsearch.test.ESTestCase; import java.util.Collections; +import java.util.Map; import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createFirstBackingIndex; import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createTimestampField; import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.generateMapping; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Mockito.mock; @@ -44,12 +52,38 @@ public void testCreateDataStream() throws Exception { .build(); CreateDataStreamClusterStateUpdateRequest req = new CreateDataStreamClusterStateUpdateRequest(dataStreamName, TimeValue.ZERO, TimeValue.ZERO); - ClusterState newState = MetadataCreateDataStreamService.createDataStream(metadataCreateIndexService, cs, req); + ClusterState newState = + MetadataCreateDataStreamService.createDataStream(metadataCreateIndexService, cs, req); assertThat(newState.metadata().dataStreams().size(), equalTo(1)); assertThat(newState.metadata().dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName)); + assertThat(newState.metadata().dataStreams().get(dataStreamName).isSystem(), is(false)); + assertThat(newState.metadata().dataStreams().get(dataStreamName).isHidden(), is(false)); + assertThat(newState.metadata().dataStreams().get(dataStreamName).isReplicated(), is(false)); assertThat(newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)), notNullValue()); assertThat(newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)).getSettings().get("index.hidden"), equalTo("true")); + assertThat(newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)).isSystem(), is(false)); + } + + public void testCreateSystemDataStream() throws Exception { + final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService(); + final String dataStreamName = ".system-data-stream"; + ClusterState cs = ClusterState.builder(new ClusterName("_name")) + .metadata(Metadata.builder().build()) + .build(); + CreateDataStreamClusterStateUpdateRequest req = new CreateDataStreamClusterStateUpdateRequest( + dataStreamName, systemDataStreamDescriptor(), TimeValue.ZERO, TimeValue.ZERO); + ClusterState newState = + MetadataCreateDataStreamService.createDataStream(metadataCreateIndexService, cs, req); + assertThat(newState.metadata().dataStreams().size(), equalTo(1)); + assertThat(newState.metadata().dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName)); + assertThat(newState.metadata().dataStreams().get(dataStreamName).isSystem(), is(true)); + assertThat(newState.metadata().dataStreams().get(dataStreamName).isHidden(), is(false)); + assertThat(newState.metadata().dataStreams().get(dataStreamName).isReplicated(), is(false)); + assertThat(newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)), notNullValue()); + assertThat(newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)).getSettings().get("index.hidden"), + nullValue()); + assertThat(newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)).isSystem(), is(true)); } public void testCreateDuplicateDataStream() throws Exception { @@ -146,6 +180,7 @@ public static ClusterState createDataStream(final String dataStreamName) throws private static MetadataCreateIndexService getMetadataCreateIndexService() throws Exception { MetadataCreateIndexService s = mock(MetadataCreateIndexService.class); + when(s.getSystemIndices()).thenReturn(getSystemIndices()); when(s.applyCreateIndexRequest(any(ClusterState.class), any(CreateIndexClusterStateUpdateRequest.class), anyBoolean())) .thenAnswer(mockInvocation -> { ClusterState currentState = (ClusterState) mockInvocation.getArguments()[0]; @@ -158,6 +193,7 @@ private static MetadataCreateIndexService getMetadataCreateIndexService() throws .put(request.settings()) .build()) .putMapping("_doc", generateMapping("@timestamp")) + .system(getSystemIndices().isSystemName(request.index())) .numberOfShards(1) .numberOfReplicas(1) .build(), false); @@ -166,4 +202,27 @@ private static MetadataCreateIndexService getMetadataCreateIndexService() throws return s; } + + private static SystemIndices getSystemIndices() { + Map map = Collections.singletonMap("system", new Feature( + "systemFeature", + "system feature description", + Collections.emptyList(), + Collections.singletonList(systemDataStreamDescriptor()) + )); + + return new SystemIndices(map); + } + + private static SystemDataStreamDescriptor systemDataStreamDescriptor() { + return new SystemDataStreamDescriptor( + ".system-data-stream", + "test system datastream", + Type.EXTERNAL, + new ComposableIndexTemplate( + Collections.singletonList(".system-data-stream"), null, null, null, null, null, new DataStreamTemplate()), + Collections.emptyMap(), + Collections.singletonList("stack") + ); + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamServiceTests.java index 688e75700881e..405dcc1d2063f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamServiceTests.java @@ -16,8 +16,10 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperServiceTestCase; +import org.elasticsearch.indices.EmptySystemIndices; import org.elasticsearch.plugins.Plugin; import java.io.IOException; @@ -30,6 +32,8 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class MetadataMigrateToDataStreamServiceTests extends MapperServiceTestCase { @@ -208,7 +212,10 @@ public void testCreateDataStreamWithSuppliedWriteIndex() throws Exception { ClusterState newState = MetadataMigrateToDataStreamService.migrateToDataStream(cs, this::getMapperService, new MetadataMigrateToDataStreamService.MigrateToDataStreamClusterStateUpdateRequest(dataStreamName, TimeValue.ZERO, - TimeValue.ZERO)); + TimeValue.ZERO), + new ThreadContext(Settings.EMPTY), + getMetadataCreateIndexService() + ); IndexAbstraction ds = newState.metadata().getIndicesLookup().get(dataStreamName); assertThat(ds, notNullValue()); assertThat(ds.getType(), equalTo(IndexAbstraction.Type.DATA_STREAM)); @@ -250,7 +257,10 @@ public void testCreateDataStreamHidesBackingIndicesAndRemovesAlias() throws Exce ClusterState newState = MetadataMigrateToDataStreamService.migrateToDataStream(cs, this::getMapperService, new MetadataMigrateToDataStreamService.MigrateToDataStreamClusterStateUpdateRequest(dataStreamName, TimeValue.ZERO, - TimeValue.ZERO)); + TimeValue.ZERO), + new ThreadContext(Settings.EMPTY), + getMetadataCreateIndexService() + ); IndexAbstraction ds = newState.metadata().getIndicesLookup().get(dataStreamName); assertThat(ds, notNullValue()); assertThat(ds.getType(), equalTo(IndexAbstraction.Type.DATA_STREAM)); @@ -299,7 +309,9 @@ public void testCreateDataStreamWithoutSuppliedWriteIndex() throws Exception { this::getMapperService, new MetadataMigrateToDataStreamService.MigrateToDataStreamClusterStateUpdateRequest(dataStreamName, TimeValue.ZERO, - TimeValue.ZERO))); + TimeValue.ZERO), + new ThreadContext(Settings.EMPTY), + getMetadataCreateIndexService())); assertThat(e.getMessage(), containsString("alias [" + dataStreamName + "] must specify a write index")); } @@ -317,6 +329,12 @@ private MapperService getMapperService(IndexMetadata im) { } } + private MetadataCreateIndexService getMetadataCreateIndexService() { + MetadataCreateIndexService service = mock(MetadataCreateIndexService.class); + when(service.getSystemIndices()).thenReturn(EmptySystemIndices.INSTANCE); + return service; + } + @Override protected Collection getPlugins() { return org.elasticsearch.common.collect.List.of(new MetadataIndexTemplateServiceTests.DummyPlugin()); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/WildcardExpressionResolverTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/WildcardExpressionResolverTests.java index 9de08c555916e..f9f34a38b6e33 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/WildcardExpressionResolverTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/WildcardExpressionResolverTests.java @@ -20,15 +20,18 @@ import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.function.Predicate; import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createBackingIndex; import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createTimestampField; -import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.SystemIndexAccessLevel.NONE; import static org.elasticsearch.common.util.set.Sets.newHashSet; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; public class WildcardExpressionResolverTests extends ESTestCase { + + private static final Predicate NONE = name -> false; + public void testConvertWildcardsJustIndicesTests() { Metadata.Builder mdBuilder = Metadata.builder() .put(indexBuilder("testXXX")) diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java index 209912d5fa6f5..16deedead18f2 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java @@ -11,9 +11,6 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.DataStream; -import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; @@ -116,7 +113,7 @@ public static DataStream randomInstance(LongSupplier timeProvider) { metadata.put("key", "value"); } return new DataStream(dataStreamName, createTimestampField("@timestamp"), indices, generation, metadata, - randomBoolean(), randomBoolean(), timeProvider); + randomBoolean(), randomBoolean(), false, timeProvider); } /** diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/GetDataStreamAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/GetDataStreamAction.java index a2faa5554d9a0..92a1e8a01ebb3 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/GetDataStreamAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/GetDataStreamAction.java @@ -123,6 +123,7 @@ public static class DataStreamInfo extends AbstractDiffable impl public static final ParseField INDEX_TEMPLATE_FIELD = new ParseField("template"); public static final ParseField ILM_POLICY_FIELD = new ParseField("ilm_policy"); public static final ParseField HIDDEN_FIELD = new ParseField("hidden"); + public static final ParseField SYSTEM_FIELD = new ParseField("system"); DataStream dataStream; ClusterHealthStatus dataStreamStatus; @@ -185,6 +186,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(ILM_POLICY_FIELD.getPreferredName(), ilmPolicyName); } builder.field(HIDDEN_FIELD.getPreferredName(), dataStream.isHidden()); + builder.field(SYSTEM_FIELD.getPreferredName(), dataStream.isSystem()); builder.endObject(); return builder; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/GenerateSnapshotNameStep.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/GenerateSnapshotNameStep.java index 1ab0fa5577560..95934d780ff62 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/GenerateSnapshotNameStep.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/GenerateSnapshotNameStep.java @@ -13,7 +13,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.SystemIndexAccessLevel; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; @@ -140,7 +139,7 @@ public ResolverContext() { } public ResolverContext(long startTime) { - super(null, null, startTime, false, false, false, false, SystemIndexAccessLevel.NONE); + super(null, null, startTime, false, false, false, false, name -> false); } @Override diff --git a/x-pack/plugin/core/src/main/resources/fleet-actions-results-ilm-policy.json b/x-pack/plugin/core/src/main/resources/fleet-actions-results-ilm-policy.json new file mode 100644 index 0000000000000..c0af3369413d5 --- /dev/null +++ b/x-pack/plugin/core/src/main/resources/fleet-actions-results-ilm-policy.json @@ -0,0 +1,21 @@ +{ + "phases": { + "hot": { + "min_age": "0ms", + "actions": { + "rollover": { + "max_size": "300gb", + "max_age": "30d" + } + } + }, + "delete": { + "min_age": "90d", + "actions": { + "delete": { + "delete_searchable_snapshot": true + } + } + } + } +} diff --git a/x-pack/plugin/core/src/main/resources/fleet-actions-results.json b/x-pack/plugin/core/src/main/resources/fleet-actions-results.json new file mode 100644 index 0000000000000..e4a4acce782b3 --- /dev/null +++ b/x-pack/plugin/core/src/main/resources/fleet-actions-results.json @@ -0,0 +1,54 @@ +{ + "index_patterns": [ + ".fleet-actions-results" + ], + "data_stream": {}, + "template": { + "settings": { + "index.lifecycle.name": ".fleet-actions-results-ilm-policy" + }, + "mappings": { + "_meta": { + "version": "${fleet.version}" + }, + "dynamic": false, + "properties": { + "action_id": { + "type": "keyword" + }, + "agent_id": { + "type": "keyword" + }, + "action_data": { + "enabled": false, + "type": "object" + }, + "data": { + "enabled": false, + "type": "object" + }, + "error": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 1024 + } + } + }, + "@timestamp": { + "type": "date" + }, + "started_at": { + "type": "date" + }, + "completed_at": { + "type": "date" + } + } + } + }, + "composed_of": [], + "priority": 200, + "version": 1 +} diff --git a/x-pack/plugin/data-streams/build.gradle b/x-pack/plugin/data-streams/build.gradle index d8f7c499baa06..30f8ffc03c5a5 100644 --- a/x-pack/plugin/data-streams/build.gradle +++ b/x-pack/plugin/data-streams/build.gradle @@ -11,6 +11,16 @@ archivesBaseName = 'x-pack-data-streams' dependencies { compileOnly project(path: xpackModule('core')) testImplementation(testArtifact(project(xpackModule('core')))) + testImplementation project(path: ':modules:transport-netty4') // for http in SystemDataStreamIT + testImplementation project(path: ':plugins:transport-nio') // for http in SystemDataStreamIT } -addQaCheckDependencies() \ No newline at end of file +addQaCheckDependencies() + +tasks.named("internalClusterTest").configure { + /* + * We have to disable setting the number of available processors as tests in the same JVM randomize processors and will step on each + * other if we allow them to set the number of available processors as it's set-once in Netty. + */ + systemProperty 'es.set.netty.runtime.available.processors', 'false' +} diff --git a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamIT.java b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamIT.java new file mode 100644 index 0000000000000..17979d962d79d --- /dev/null +++ b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamIT.java @@ -0,0 +1,387 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.datastreams; + +import org.apache.http.util.EntityUtils; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.snapshots.features.ResetFeatureStateResponse.ResetFeatureStateStatus; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.IndicesOptions.Option; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate.DataStreamTemplate; +import org.elasticsearch.cluster.metadata.Template; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.compress.CompressedXContent; +import org.elasticsearch.common.network.NetworkModule; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.indices.SystemDataStreamDescriptor; +import org.elasticsearch.indices.SystemDataStreamDescriptor.Type; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.SystemIndexPlugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.transport.Netty4Plugin; +import org.elasticsearch.transport.nio.NioTransportPlugin; +import org.elasticsearch.xpack.core.action.DeleteDataStreamAction; +import org.elasticsearch.xpack.datastreams.DataStreamsPlugin; +import org.junit.After; +import org.junit.BeforeClass; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +@ESIntegTestCase.ClusterScope(transportClientRatio = 0) +public class SystemDataStreamIT extends ESIntegTestCase { + + private static String nodeHttpTypeKey; + + @BeforeClass + public static void setUpTransport() { + if (randomBoolean()) { + nodeHttpTypeKey = NioTransportPlugin.NIO_HTTP_TRANSPORT_NAME; + } else { + nodeHttpTypeKey = Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME; + } + } + + @Override + protected Collection> nodePlugins() { + List> plugins = new ArrayList<>(super.nodePlugins()); + plugins.add(getTestTransportPlugin()); + plugins.add(NioTransportPlugin.class); + plugins.add(DataStreamsPlugin.class); + plugins.add(TestSystemDataStreamPlugin.class); + plugins.add(Netty4Plugin.class); + return plugins; + } + + @Override + protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal, otherSettings)) + .put(NetworkModule.HTTP_TYPE_KEY, nodeHttpTypeKey) + .build(); + } + + @Override + protected boolean addMockHttpTransport() { + return false; + } + + @SuppressWarnings("unchecked") + public void testSystemDataStreamCRUD() throws Exception { + try (RestClient restClient = createRestClient()) { + Request putRequest = new Request("PUT", "/_data_stream/.test-data-stream"); + + // no product header + ResponseException re = expectThrows(ResponseException.class, () -> restClient.performRequest(putRequest)); + assertThat(re.getMessage(), containsString("reserved for system")); + + // wrong header + putRequest.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("X-elastic-product-origin", "wrong").build()); + re = expectThrows(ResponseException.class, () -> restClient.performRequest(putRequest)); + assertThat(re.getMessage(), containsString("accessed by product")); + + // correct + putRequest.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("X-elastic-product-origin", "product").build()); + Response putResponse = restClient.performRequest(putRequest); + assertThat(putResponse.getStatusLine().getStatusCode(), is(200)); + + // list - no header needed + Request listAllRequest = new Request("GET", "/_data_stream"); + Response listAllResponse = restClient.performRequest(listAllRequest); + assertThat(listAllResponse.getStatusLine().getStatusCode(), is(200)); + Map responseMap = XContentHelper.convertToMap( + XContentType.JSON.xContent(), + EntityUtils.toString(listAllResponse.getEntity()), + false + ); + List dataStreams = (List) responseMap.get("data_streams"); + assertThat(dataStreams.size(), is(1)); + + Request listRequest = new Request("GET", "/_data_stream/.test-data-stream"); + Response listResponse = restClient.performRequest(listRequest); + assertThat(listResponse.getStatusLine().getStatusCode(), is(200)); + responseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), EntityUtils.toString(listResponse.getEntity()), false); + dataStreams = (List) responseMap.get("data_streams"); + assertThat(dataStreams.size(), is(1)); + + // delete + Request deleteRequest = new Request("DELETE", "/_data_stream/.test-data-stream"); + + // no header + re = expectThrows(ResponseException.class, () -> restClient.performRequest(deleteRequest)); + assertThat(re.getMessage(), containsString("reserved for system")); + + // incorrect header + deleteRequest.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("X-elastic-product-origin", "wrong").build()); + re = expectThrows(ResponseException.class, () -> restClient.performRequest(deleteRequest)); + assertThat(re.getMessage(), containsString("accessed by product")); + + // correct + deleteRequest.setOptions(putRequest.getOptions()); + Response deleteResponse = restClient.performRequest(deleteRequest); + assertThat(deleteResponse.getStatusLine().getStatusCode(), is(200)); + } + } + + public void testDataStreamStats() throws Exception { + try (RestClient restClient = createRestClient()) { + Request putRequest = new Request("PUT", "/_data_stream/.test-data-stream"); + putRequest.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("X-elastic-product-origin", "product").build()); + Response putResponse = restClient.performRequest(putRequest); + assertThat(putResponse.getStatusLine().getStatusCode(), is(200)); + + Request statsRequest = new Request("GET", "/_data_stream/_stats"); + Response response = restClient.performRequest(statsRequest); + assertThat(response.getStatusLine().getStatusCode(), is(200)); + + Map map = XContentHelper.convertToMap( + XContentType.JSON.xContent(), + EntityUtils.toString(response.getEntity()), + false + ); + assertThat(map.get("data_stream_count"), equalTo(1)); + } + } + + @SuppressWarnings("unchecked") + public void testSystemDataStreamReadWrite() throws Exception { + try (RestClient restClient = createRestClient()) { + Request putRequest = new Request("PUT", "/_data_stream/.test-data-stream"); + putRequest.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("X-elastic-product-origin", "product").build()); + Response putResponse = restClient.performRequest(putRequest); + assertThat(putResponse.getStatusLine().getStatusCode(), is(200)); + + // write + Request index = new Request("POST", "/.test-data-stream/_doc"); + index.setJsonEntity("{ \"@timestamp\": \"2099-03-08T11:06:07.000Z\", \"name\": \"my-name\" }"); + index.addParameter("refresh", "true"); + + // no product specified + ResponseException re = expectThrows(ResponseException.class, () -> restClient.performRequest(index)); + assertThat(re.getMessage(), containsString("reserved for system")); + + // wrong header + index.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("X-elastic-product-origin", "wrong").build()); + re = expectThrows(ResponseException.class, () -> restClient.performRequest(index)); + assertThat(re.getMessage(), containsString("accessed by product")); + + // correct + index.setOptions(putRequest.getOptions()); + Response response = restClient.performRequest(index); + assertEquals(201, response.getStatusLine().getStatusCode()); + + Map responseMap = XContentHelper.convertToMap( + XContentType.JSON.xContent(), + EntityUtils.toString(response.getEntity()), + false + ); + String indexName = (String) responseMap.get("_index"); + String id = (String) responseMap.get("_id"); + + // get + Request get = new Request("GET", "/" + indexName + "/_doc/" + id); + + // no product specified + re = expectThrows(ResponseException.class, () -> restClient.performRequest(get)); + assertThat(re.getMessage(), containsString("reserved for system")); + + // wrong product + get.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("X-elastic-product-origin", "wrong").build()); + re = expectThrows(ResponseException.class, () -> restClient.performRequest(get)); + assertThat(re.getMessage(), containsString("accessed by product")); + + // correct + get.setOptions(putRequest.getOptions()); + Response getResponse = restClient.performRequest(get); + assertThat(getResponse.getStatusLine().getStatusCode(), is(200)); + + // search all + Request search = new Request("GET", "/_search"); + search.setJsonEntity("{ \"query\": { \"match_all\": {} } }"); + + // no header + Response searchResponse = restClient.performRequest(search); + assertThat(searchResponse.getStatusLine().getStatusCode(), is(200)); + responseMap = XContentHelper.convertToMap( + XContentType.JSON.xContent(), + EntityUtils.toString(searchResponse.getEntity()), + false + ); + Map hits = (Map) responseMap.get("hits"); + List hitsHits = (List) hits.get("hits"); + assertThat(hitsHits.size(), is(0)); + + // wrong header + search.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("X-elastic-product-origin", "wrong").build()); + searchResponse = restClient.performRequest(search); + assertThat(searchResponse.getStatusLine().getStatusCode(), is(200)); + responseMap = XContentHelper.convertToMap( + XContentType.JSON.xContent(), + EntityUtils.toString(searchResponse.getEntity()), + false + ); + hits = (Map) responseMap.get("hits"); + hitsHits = (List) hits.get("hits"); + assertThat(hitsHits.size(), is(0)); + + // correct + search.setOptions(putRequest.getOptions()); + searchResponse = restClient.performRequest(search); + assertThat(searchResponse.getStatusLine().getStatusCode(), is(200)); + responseMap = XContentHelper.convertToMap( + XContentType.JSON.xContent(), + EntityUtils.toString(searchResponse.getEntity()), + false + ); + hits = (Map) responseMap.get("hits"); + hitsHits = (List) hits.get("hits"); + assertThat(hitsHits.size(), is(1)); + + // search the datastream + Request searchIdx = new Request("GET", "/.test-data-stream/_search"); + searchIdx.setJsonEntity("{ \"query\": { \"match_all\": {} } }"); + + // no header + re = expectThrows(ResponseException.class, () -> restClient.performRequest(searchIdx)); + assertThat(re.getMessage(), containsString("reserved for system")); + + // incorrect header + searchIdx.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("X-elastic-product-origin", "wrong").build()); + re = expectThrows(ResponseException.class, () -> restClient.performRequest(searchIdx)); + assertThat(re.getMessage(), containsString("accessed by product")); + + // correct + searchIdx.setOptions(putRequest.getOptions()); + searchResponse = restClient.performRequest(searchIdx); + assertThat(searchResponse.getStatusLine().getStatusCode(), is(200)); + responseMap = XContentHelper.convertToMap( + XContentType.JSON.xContent(), + EntityUtils.toString(searchResponse.getEntity()), + false + ); + hits = (Map) responseMap.get("hits"); + hitsHits = (List) hits.get("hits"); + assertThat(hitsHits.size(), is(1)); + } + } + + @After + public void cleanup() { + try { + PlainActionFuture stateStatusPlainActionFuture = new PlainActionFuture<>(); + new TestSystemDataStreamPlugin().cleanUpFeature( + internalCluster().clusterService(), + internalCluster().client(), + stateStatusPlainActionFuture + ); + stateStatusPlainActionFuture.actionGet(); + } catch (ResourceNotFoundException e) { + // ignore + } + } + + public static final class TestSystemDataStreamPlugin extends Plugin implements SystemIndexPlugin { + + @Override + public Collection getSystemDataStreamDescriptors() { + try { + CompressedXContent mappings = new CompressedXContent("{\"properties\":{\"name\":{\"type\":\"keyword\"}}}"); + return Collections.singletonList( + new SystemDataStreamDescriptor( + ".test-data-stream", + "system data stream test", + Type.EXTERNAL, + new ComposableIndexTemplate( + Collections.singletonList(".test-data-stream"), + new Template(Settings.EMPTY, mappings, null), + null, + null, + null, + null, + new DataStreamTemplate() + ), + Collections.emptyMap(), + Collections.singletonList("product") + ) + ); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public String getFeatureName() { + return SystemDataStreamIT.class.getSimpleName(); + } + + @Override + public String getFeatureDescription() { + return "Integration testing of system data streams"; + } + + @Override + public void cleanUpFeature(ClusterService clusterService, Client client, ActionListener listener) { + Collection dataStreamDescriptors = getSystemDataStreamDescriptors(); + final DeleteDataStreamAction.Request request = new DeleteDataStreamAction.Request( + dataStreamDescriptors.stream() + .map(SystemDataStreamDescriptor::getDataStreamName) + .collect(Collectors.toList()) + .toArray(Strings.EMPTY_ARRAY) + ); + EnumSet