From 879700a27bb7755ec0ce969cca72318d99a7dc81 Mon Sep 17 00:00:00 2001 From: William Brafford Date: Mon, 16 Aug 2021 12:54:40 -0400 Subject: [PATCH] Add system data streams to feature state snapshots (#75902) (#76568) * Add system data streams to feature state snapshots (#75902) Add system data streams to the "snapshot feature state" code block, so that if we're snapshotting a feature by name we grab that feature's system data streams too. Handle these data streams on the restore side as well. * Add system data streams to feature state snapshots * Don't pass system data streams through index name resolution * Don't add no-op features to snapshots * Hook in system data streams for snapshot restoration --- .../indices/SystemDataStreamDescriptor.java | 31 +++++- .../snapshots/RestoreService.java | 37 ++++++-- .../snapshots/SnapshotsService.java | 64 +++++++------ .../SystemDataStreamSnapshotIT.java | 95 ++++++++++++++++++- 4 files changed, 186 insertions(+), 41 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/SystemDataStreamDescriptor.java b/server/src/main/java/org/elasticsearch/indices/SystemDataStreamDescriptor.java index e4d8ba5cc0781..0add050405cac 100644 --- a/server/src/main/java/org/elasticsearch/indices/SystemDataStreamDescriptor.java +++ b/server/src/main/java/org/elasticsearch/indices/SystemDataStreamDescriptor.java @@ -8,16 +8,21 @@ package org.elasticsearch.indices; +import org.apache.lucene.util.automaton.CharacterRunAutomaton; import org.elasticsearch.cluster.metadata.ComponentTemplate; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.Metadata; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import static org.elasticsearch.indices.AssociatedIndexDescriptor.buildAutomaton; + /** * Describes a {@link DataStream} that is reserved for use by a system component. The data stream will be managed by the system and also * protected by the system against user modification so that system features are not broken by inadvertent user operations. @@ -31,6 +36,7 @@ public class SystemDataStreamDescriptor { private final Map componentTemplates; private final List allowedElasticProductOrigins; private final ExecutorNames executorNames; + private final CharacterRunAutomaton characterRunAutomaton; /** * Creates a new descriptor for a system data descriptor @@ -73,12 +79,31 @@ public SystemDataStreamDescriptor(String dataStreamName, String description, Typ this.executorNames = Objects.nonNull(executorNames) ? executorNames : ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS; + + this.characterRunAutomaton = new CharacterRunAutomaton( + buildAutomaton(backingIndexPatternForDataStream(this.dataStreamName))); } public String getDataStreamName() { return dataStreamName; } + /** + * Retrieve backing indices for this system data stream + * @param metadata Metadata in which to look for indices + * @return List of names of backing indices + */ + public List getBackingIndexNames(Metadata metadata) { + ArrayList matchingIndices = new ArrayList<>(); + metadata.indices().keysIt().forEachRemaining(indexName -> { + if (this.characterRunAutomaton.run(indexName)) { + matchingIndices.add(indexName); + } + }); + + return Collections.unmodifiableList(matchingIndices); + } + public String getDescription() { return description; } @@ -92,7 +117,11 @@ public boolean isExternal() { } public String getBackingIndexPattern() { - return DataStream.BACKING_INDEX_PREFIX + getDataStreamName() + "-*"; + return backingIndexPatternForDataStream(getDataStreamName()); + } + + private static String backingIndexPatternForDataStream(String dataStream) { + return DataStream.BACKING_INDEX_PREFIX + dataStream + "-*"; } public List getAllowedElasticProductOrigins() { diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 4845baed30180..2f7cabce9f2f4 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -68,6 +68,7 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.ShardLimitValidator; +import org.elasticsearch.indices.SystemDataStreamDescriptor; import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; @@ -317,13 +318,40 @@ private void startRestore( Collections.addAll(requestIndices, indicesInRequest); } + // Determine system indices to restore from requested feature states + final Map> featureStatesToRestore = getFeatureStatesToRestore(request, snapshotInfo, snapshot); + final Set featureStateIndices = featureStatesToRestore.values() + .stream() + .flatMap(Collection::stream) + .collect(Collectors.toSet()); + + final Map featureSet = systemIndices.getFeatures(); + final Set featureStateDataStreams = featureStatesToRestore.keySet().stream().filter(featureName -> { + if (featureSet.containsKey(featureName)) { + return true; + } + logger.warn( + () -> new ParameterizedMessage( + "Restoring snapshot[{}] skipping feature [{}] because it is not available in this cluster", + snapshotInfo.snapshotId(), + featureName + ) + ); + return false; + }) + .map(name -> systemIndices.getFeatures().get(name)) + .flatMap(feature -> feature.getDataStreamDescriptors().stream()) + .map(SystemDataStreamDescriptor::getDataStreamName) + .collect(Collectors.toSet()); + // Get data stream metadata for requested data streams Tuple, Map> result = getDataStreamsToRestore( repository, snapshotId, snapshotInfo, globalMetadata, - requestIndices, + // include system data stream names in argument to this method + Stream.concat(requestIndices.stream(), featureStateDataStreams.stream()).collect(Collectors.toList()), request.includeAliases() ); Map dataStreamsToRestore = result.v1(); @@ -340,13 +368,6 @@ private void startRestore( .collect(Collectors.toSet()); requestIndices.addAll(dataStreamIndices); - // Determine system indices to restore from requested feature states - final Map> featureStatesToRestore = getFeatureStatesToRestore(request, snapshotInfo, snapshot); - final Set featureStateIndices = featureStatesToRestore.values() - .stream() - .flatMap(Collection::stream) - .collect(Collectors.toSet()); - // Resolve the indices that were directly requested final List requestedIndicesInSnapshot = filterIndices( snapshotInfo.indices(), diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 291361cb0a632..2b272fd80bc3c 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -71,7 +71,7 @@ import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.AssociatedIndexDescriptor; +import org.elasticsearch.indices.SystemDataStreamDescriptor; import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; @@ -495,36 +495,45 @@ public ClusterState execute(ClusterState currentState) { // Store newSnapshot here to be processed in clusterStateProcessed List indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, request)); - final List featureStates; + final Set featureStates = new HashSet<>(); + final Set systemDataStreamNames = new HashSet<>(); // if we have any feature states in the snapshot, we add their required indices to the snapshot indices if they haven't // been requested by the request directly - if (featureStatesSet.isEmpty()) { - featureStates = Collections.emptyList(); - } else { - final Set indexNames = new HashSet<>(indices); - featureStates = featureStatesSet.stream() - .map( - feature -> new SnapshotFeatureInfo( - feature, - systemIndexDescriptorMap.get(feature) - .getIndexDescriptors() - .stream() - .flatMap(descriptor -> descriptor.getMatchingIndices(currentState.metadata()).stream()) - .collect(Collectors.toList()) - ) - ) - .filter(featureInfo -> featureInfo.getIndices().isEmpty() == false) // Omit any empty featureStates - .collect(Collectors.toList()); - for (SnapshotFeatureInfo featureState : featureStates) { - indexNames.addAll(featureState.getIndices()); - } + final Set indexNames = new HashSet<>(indices); + for (String featureName : featureStatesSet) { + SystemIndices.Feature feature = systemIndexDescriptorMap.get(featureName); - // Add all resolved indices from the feature states to the list of indices - for (String feature : featureStatesSet) { - for (AssociatedIndexDescriptor aid : systemIndexDescriptorMap.get(feature).getAssociatedIndexDescriptors()) { - indexNames.addAll(aid.getMatchingIndices(currentState.metadata())); + Set featureSystemIndices = feature.getIndexDescriptors() + .stream() + .flatMap(descriptor -> descriptor.getMatchingIndices(currentState.metadata()).stream()) + .collect(Collectors.toSet()); + Set featureAssociatedIndices = feature.getAssociatedIndexDescriptors() + .stream() + .flatMap(descriptor -> descriptor.getMatchingIndices(currentState.metadata()).stream()) + .collect(Collectors.toSet()); + + Set featureSystemDataStreams = new HashSet<>(); + Set featureDataStreamBackingIndices = new HashSet<>(); + for (SystemDataStreamDescriptor sdd : feature.getDataStreamDescriptors()) { + List backingIndexNames = sdd.getBackingIndexNames(currentState.metadata()); + if (backingIndexNames.size() > 0) { + featureDataStreamBackingIndices.addAll(backingIndexNames); + featureSystemDataStreams.add(sdd.getDataStreamName()); } } + + if (featureSystemIndices.size() > 0 + || featureAssociatedIndices.size() > 0 + || featureDataStreamBackingIndices.size() > 0) { + + featureStates.add( + new SnapshotFeatureInfo(featureName, Collections.unmodifiableList(new ArrayList<>(featureSystemIndices))) + ); + indexNames.addAll(featureSystemIndices); + indexNames.addAll(featureAssociatedIndices); + indexNames.addAll(featureDataStreamBackingIndices); + systemDataStreamNames.addAll(featureSystemDataStreams); + } indices = Collections.unmodifiableList(new ArrayList<>(indexNames)); } @@ -533,6 +542,7 @@ public ClusterState execute(ClusterState currentState) { request.indicesOptions(), request.indices() ); + dataStreams.addAll(systemDataStreamNames); logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices); @@ -575,7 +585,7 @@ public ClusterState execute(ClusterState currentState) { shards, userMeta, version, - featureStates + Collections.unmodifiableList(new ArrayList<>(featureStates)) ); return ClusterState.builder(currentState) .putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(CollectionUtils.appendToCopy(runningSnapshots, newEntry))) diff --git a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamSnapshotIT.java b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamSnapshotIT.java index 4cf930a9513d8..3dd5413c370cc 100644 --- a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamSnapshotIT.java +++ b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamSnapshotIT.java @@ -8,7 +8,6 @@ package org.elasticsearch.datastreams; import org.elasticsearch.action.DocWriteRequest; -import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.indices.get.GetIndexResponse; import org.elasticsearch.action.index.IndexResponse; @@ -20,6 +19,7 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.SystemIndexPlugin; import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase; +import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.mockstore.MockRepository; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.threadpool.ThreadPool; @@ -34,8 +34,11 @@ import java.util.Collections; import static org.elasticsearch.datastreams.SystemDataStreamSnapshotIT.SystemDataStreamTestPlugin.SYSTEM_DATA_STREAM_NAME; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.arrayWithSize; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.oneOf; @ESIntegTestCase.ClusterScope(transportClientRatio = 0) @@ -82,12 +85,91 @@ public void testSystemDataStreamSnapshotIT() throws Exception { assertTrue(response.getDataStreams().get(0).getDataStream().isSystem()); } - CreateSnapshotResponse createSnapshotResponse = client().admin() + assertSuccessful( + client().admin() + .cluster() + .prepareCreateSnapshot(REPO, SNAPSHOT) + .setWaitForCompletion(true) + .setIncludeGlobalState(false) + .execute() + ); + + // We have to delete the data stream directly, as the feature reset API doesn't clean up system data streams yet + // See https://github.com/elastic/elasticsearch/issues/75818 + { + DeleteDataStreamAction.Request request = new DeleteDataStreamAction.Request(new String[] { SYSTEM_DATA_STREAM_NAME }); + AcknowledgedResponse response = client().execute(DeleteDataStreamAction.INSTANCE, request).get(); + assertTrue(response.isAcknowledged()); + } + + { + GetIndexResponse indicesRemaining = client().admin().indices().prepareGetIndex().addIndices("_all").get(); + assertThat(indicesRemaining.indices(), arrayWithSize(0)); + } + + RestoreSnapshotResponse restoreSnapshotResponse = client().admin() .cluster() - .prepareCreateSnapshot(REPO, SNAPSHOT) + .prepareRestoreSnapshot(REPO, SNAPSHOT) .setWaitForCompletion(true) - .setIncludeGlobalState(false) + .setRestoreGlobalState(false) .get(); + assertEquals(restoreSnapshotResponse.getRestoreInfo().totalShards(), restoreSnapshotResponse.getRestoreInfo().successfulShards()); + + { + GetDataStreamAction.Request request = new GetDataStreamAction.Request(new String[] { SYSTEM_DATA_STREAM_NAME }); + GetDataStreamAction.Response response = client().execute(GetDataStreamAction.INSTANCE, request).get(); + assertThat(response.getDataStreams(), hasSize(1)); + assertTrue(response.getDataStreams().get(0).getDataStream().isSystem()); + } + } + + public void testSystemDataStreamInFeatureState() throws Exception { + Path location = randomRepoPath(); + createRepository(REPO, "fs", location); + + { + CreateDataStreamAction.Request request = new CreateDataStreamAction.Request(SYSTEM_DATA_STREAM_NAME); + final AcknowledgedResponse response = client().execute(CreateDataStreamAction.INSTANCE, request).get(); + assertTrue(response.isAcknowledged()); + } + + // Index a doc so that a concrete backing index will be created + IndexResponse indexToDataStreamResponse = client().prepareIndex(SYSTEM_DATA_STREAM_NAME, "_doc") + .setId("42") + .setSource("{ \"@timestamp\": \"2099-03-08T11:06:07.000Z\", \"name\": \"my-name\" }", XContentType.JSON) + .setOpType(DocWriteRequest.OpType.CREATE) + .execute() + .actionGet(); + assertThat(indexToDataStreamResponse.status().getStatus(), oneOf(200, 201)); + + // Index a doc so that a concrete backing index will be created + IndexResponse indexResponse = client().prepareIndex("my-index", "_doc") + .setId("42") + .setSource("{ \"name\": \"my-name\" }", XContentType.JSON) + .setOpType(DocWriteRequest.OpType.CREATE) + .execute() + .get(); + assertThat(indexResponse.status().getStatus(), oneOf(200, 201)); + + { + GetDataStreamAction.Request request = new GetDataStreamAction.Request(new String[] { SYSTEM_DATA_STREAM_NAME }); + GetDataStreamAction.Response response = client().execute(GetDataStreamAction.INSTANCE, request).get(); + assertThat(response.getDataStreams(), hasSize(1)); + assertTrue(response.getDataStreams().get(0).getDataStream().isSystem()); + } + + SnapshotInfo snapshotInfo = assertSuccessful( + client().admin() + .cluster() + .prepareCreateSnapshot(REPO, SNAPSHOT) + .setIndices("my-index") + .setFeatureStates(SystemDataStreamTestPlugin.class.getSimpleName()) + .setWaitForCompletion(true) + .setIncludeGlobalState(false) + .execute() + ); + + assertThat(snapshotInfo.dataStreams(), not(empty())); // We have to delete the data stream directly, as the feature reset API doesn't clean up system data streams yet // See https://github.com/elastic/elasticsearch/issues/75818 @@ -97,6 +179,8 @@ public void testSystemDataStreamSnapshotIT() throws Exception { assertTrue(response.isAcknowledged()); } + assertAcked(client().admin().indices().prepareDelete("my-index")); + { GetIndexResponse indicesRemaining = client().admin().indices().prepareGetIndex().addIndices("_all").get(); assertThat(indicesRemaining.indices(), arrayWithSize(0)); @@ -106,7 +190,8 @@ public void testSystemDataStreamSnapshotIT() throws Exception { .cluster() .prepareRestoreSnapshot(REPO, SNAPSHOT) .setWaitForCompletion(true) - .setRestoreGlobalState(false) + .setIndices("my-index") + .setFeatureStates(SystemDataStreamTestPlugin.class.getSimpleName()) .get(); assertEquals(restoreSnapshotResponse.getRestoreInfo().totalShards(), restoreSnapshotResponse.getRestoreInfo().successfulShards());