Skip to content

Commit

Permalink
Add system data streams to feature state snapshots (#75902) (#76568)
Browse files Browse the repository at this point in the history
* Add system data streams to feature state snapshots (#75902)

Add system data streams to the "snapshot feature state" code block, so that if
we're snapshotting a feature by name we grab that feature's system data streams
too. Handle these data streams on the restore side as well.

* Add system data streams to feature state snapshots
* Don't pass system data streams through index name resolution
* Don't add no-op features to snapshots
* Hook in system data streams for snapshot restoration
  • Loading branch information
williamrandolph authored Aug 16, 2021
1 parent 5ddb217 commit 879700a
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,21 @@

package org.elasticsearch.indices;

import org.apache.lucene.util.automaton.CharacterRunAutomaton;
import org.elasticsearch.cluster.metadata.ComponentTemplate;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.Metadata;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static org.elasticsearch.indices.AssociatedIndexDescriptor.buildAutomaton;

/**
* Describes a {@link DataStream} that is reserved for use by a system component. The data stream will be managed by the system and also
* protected by the system against user modification so that system features are not broken by inadvertent user operations.
Expand All @@ -31,6 +36,7 @@ public class SystemDataStreamDescriptor {
private final Map<String, ComponentTemplate> componentTemplates;
private final List<String> allowedElasticProductOrigins;
private final ExecutorNames executorNames;
private final CharacterRunAutomaton characterRunAutomaton;

/**
* Creates a new descriptor for a system data descriptor
Expand Down Expand Up @@ -73,12 +79,31 @@ public SystemDataStreamDescriptor(String dataStreamName, String description, Typ
this.executorNames = Objects.nonNull(executorNames)
? executorNames
: ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS;

this.characterRunAutomaton = new CharacterRunAutomaton(
buildAutomaton(backingIndexPatternForDataStream(this.dataStreamName)));
}

public String getDataStreamName() {
return dataStreamName;
}

/**
* Retrieve backing indices for this system data stream
* @param metadata Metadata in which to look for indices
* @return List of names of backing indices
*/
public List<String> getBackingIndexNames(Metadata metadata) {
ArrayList<String> matchingIndices = new ArrayList<>();
metadata.indices().keysIt().forEachRemaining(indexName -> {
if (this.characterRunAutomaton.run(indexName)) {
matchingIndices.add(indexName);
}
});

return Collections.unmodifiableList(matchingIndices);
}

public String getDescription() {
return description;
}
Expand All @@ -92,7 +117,11 @@ public boolean isExternal() {
}

public String getBackingIndexPattern() {
return DataStream.BACKING_INDEX_PREFIX + getDataStreamName() + "-*";
return backingIndexPatternForDataStream(getDataStreamName());
}

private static String backingIndexPatternForDataStream(String dataStream) {
return DataStream.BACKING_INDEX_PREFIX + dataStream + "-*";
}

public List<String> getAllowedElasticProductOrigins() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.ShardLimitValidator;
import org.elasticsearch.indices.SystemDataStreamDescriptor;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -317,13 +318,40 @@ private void startRestore(
Collections.addAll(requestIndices, indicesInRequest);
}

// Determine system indices to restore from requested feature states
final Map<String, List<String>> featureStatesToRestore = getFeatureStatesToRestore(request, snapshotInfo, snapshot);
final Set<String> featureStateIndices = featureStatesToRestore.values()
.stream()
.flatMap(Collection::stream)
.collect(Collectors.toSet());

final Map<String, SystemIndices.Feature> featureSet = systemIndices.getFeatures();
final Set<String> featureStateDataStreams = featureStatesToRestore.keySet().stream().filter(featureName -> {
if (featureSet.containsKey(featureName)) {
return true;
}
logger.warn(
() -> new ParameterizedMessage(
"Restoring snapshot[{}] skipping feature [{}] because it is not available in this cluster",
snapshotInfo.snapshotId(),
featureName
)
);
return false;
})
.map(name -> systemIndices.getFeatures().get(name))
.flatMap(feature -> feature.getDataStreamDescriptors().stream())
.map(SystemDataStreamDescriptor::getDataStreamName)
.collect(Collectors.toSet());

// Get data stream metadata for requested data streams
Tuple<Map<String, DataStream>, Map<String, DataStreamAlias>> result = getDataStreamsToRestore(
repository,
snapshotId,
snapshotInfo,
globalMetadata,
requestIndices,
// include system data stream names in argument to this method
Stream.concat(requestIndices.stream(), featureStateDataStreams.stream()).collect(Collectors.toList()),
request.includeAliases()
);
Map<String, DataStream> dataStreamsToRestore = result.v1();
Expand All @@ -340,13 +368,6 @@ private void startRestore(
.collect(Collectors.toSet());
requestIndices.addAll(dataStreamIndices);

// Determine system indices to restore from requested feature states
final Map<String, List<String>> featureStatesToRestore = getFeatureStatesToRestore(request, snapshotInfo, snapshot);
final Set<String> featureStateIndices = featureStatesToRestore.values()
.stream()
.flatMap(Collection::stream)
.collect(Collectors.toSet());

// Resolve the indices that were directly requested
final List<String> requestedIndicesInSnapshot = filterIndices(
snapshotInfo.indices(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.AssociatedIndexDescriptor;
import org.elasticsearch.indices.SystemDataStreamDescriptor;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -495,36 +495,45 @@ public ClusterState execute(ClusterState currentState) {
// Store newSnapshot here to be processed in clusterStateProcessed
List<String> indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, request));

final List<SnapshotFeatureInfo> featureStates;
final Set<SnapshotFeatureInfo> featureStates = new HashSet<>();
final Set<String> systemDataStreamNames = new HashSet<>();
// if we have any feature states in the snapshot, we add their required indices to the snapshot indices if they haven't
// been requested by the request directly
if (featureStatesSet.isEmpty()) {
featureStates = Collections.emptyList();
} else {
final Set<String> indexNames = new HashSet<>(indices);
featureStates = featureStatesSet.stream()
.map(
feature -> new SnapshotFeatureInfo(
feature,
systemIndexDescriptorMap.get(feature)
.getIndexDescriptors()
.stream()
.flatMap(descriptor -> descriptor.getMatchingIndices(currentState.metadata()).stream())
.collect(Collectors.toList())
)
)
.filter(featureInfo -> featureInfo.getIndices().isEmpty() == false) // Omit any empty featureStates
.collect(Collectors.toList());
for (SnapshotFeatureInfo featureState : featureStates) {
indexNames.addAll(featureState.getIndices());
}
final Set<String> indexNames = new HashSet<>(indices);
for (String featureName : featureStatesSet) {
SystemIndices.Feature feature = systemIndexDescriptorMap.get(featureName);

// Add all resolved indices from the feature states to the list of indices
for (String feature : featureStatesSet) {
for (AssociatedIndexDescriptor aid : systemIndexDescriptorMap.get(feature).getAssociatedIndexDescriptors()) {
indexNames.addAll(aid.getMatchingIndices(currentState.metadata()));
Set<String> featureSystemIndices = feature.getIndexDescriptors()
.stream()
.flatMap(descriptor -> descriptor.getMatchingIndices(currentState.metadata()).stream())
.collect(Collectors.toSet());
Set<String> featureAssociatedIndices = feature.getAssociatedIndexDescriptors()
.stream()
.flatMap(descriptor -> descriptor.getMatchingIndices(currentState.metadata()).stream())
.collect(Collectors.toSet());

Set<String> featureSystemDataStreams = new HashSet<>();
Set<String> featureDataStreamBackingIndices = new HashSet<>();
for (SystemDataStreamDescriptor sdd : feature.getDataStreamDescriptors()) {
List<String> backingIndexNames = sdd.getBackingIndexNames(currentState.metadata());
if (backingIndexNames.size() > 0) {
featureDataStreamBackingIndices.addAll(backingIndexNames);
featureSystemDataStreams.add(sdd.getDataStreamName());
}
}

if (featureSystemIndices.size() > 0
|| featureAssociatedIndices.size() > 0
|| featureDataStreamBackingIndices.size() > 0) {

featureStates.add(
new SnapshotFeatureInfo(featureName, Collections.unmodifiableList(new ArrayList<>(featureSystemIndices)))
);
indexNames.addAll(featureSystemIndices);
indexNames.addAll(featureAssociatedIndices);
indexNames.addAll(featureDataStreamBackingIndices);
systemDataStreamNames.addAll(featureSystemDataStreams);
}
indices = Collections.unmodifiableList(new ArrayList<>(indexNames));
}

Expand All @@ -533,6 +542,7 @@ public ClusterState execute(ClusterState currentState) {
request.indicesOptions(),
request.indices()
);
dataStreams.addAll(systemDataStreamNames);

logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices);

Expand Down Expand Up @@ -575,7 +585,7 @@ public ClusterState execute(ClusterState currentState) {
shards,
userMeta,
version,
featureStates
Collections.unmodifiableList(new ArrayList<>(featureStates))
);
return ClusterState.builder(currentState)
.putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(CollectionUtils.appendToCopy(runningSnapshots, newEntry)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
package org.elasticsearch.datastreams;

import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.index.IndexResponse;
Expand All @@ -20,6 +19,7 @@
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SystemIndexPlugin;
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.mockstore.MockRepository;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -34,8 +34,11 @@
import java.util.Collections;

import static org.elasticsearch.datastreams.SystemDataStreamSnapshotIT.SystemDataStreamTestPlugin.SYSTEM_DATA_STREAM_NAME;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.oneOf;

@ESIntegTestCase.ClusterScope(transportClientRatio = 0)
Expand Down Expand Up @@ -82,12 +85,91 @@ public void testSystemDataStreamSnapshotIT() throws Exception {
assertTrue(response.getDataStreams().get(0).getDataStream().isSystem());
}

CreateSnapshotResponse createSnapshotResponse = client().admin()
assertSuccessful(
client().admin()
.cluster()
.prepareCreateSnapshot(REPO, SNAPSHOT)
.setWaitForCompletion(true)
.setIncludeGlobalState(false)
.execute()
);

// We have to delete the data stream directly, as the feature reset API doesn't clean up system data streams yet
// See https://github.com/elastic/elasticsearch/issues/75818
{
DeleteDataStreamAction.Request request = new DeleteDataStreamAction.Request(new String[] { SYSTEM_DATA_STREAM_NAME });
AcknowledgedResponse response = client().execute(DeleteDataStreamAction.INSTANCE, request).get();
assertTrue(response.isAcknowledged());
}

{
GetIndexResponse indicesRemaining = client().admin().indices().prepareGetIndex().addIndices("_all").get();
assertThat(indicesRemaining.indices(), arrayWithSize(0));
}

RestoreSnapshotResponse restoreSnapshotResponse = client().admin()
.cluster()
.prepareCreateSnapshot(REPO, SNAPSHOT)
.prepareRestoreSnapshot(REPO, SNAPSHOT)
.setWaitForCompletion(true)
.setIncludeGlobalState(false)
.setRestoreGlobalState(false)
.get();
assertEquals(restoreSnapshotResponse.getRestoreInfo().totalShards(), restoreSnapshotResponse.getRestoreInfo().successfulShards());

{
GetDataStreamAction.Request request = new GetDataStreamAction.Request(new String[] { SYSTEM_DATA_STREAM_NAME });
GetDataStreamAction.Response response = client().execute(GetDataStreamAction.INSTANCE, request).get();
assertThat(response.getDataStreams(), hasSize(1));
assertTrue(response.getDataStreams().get(0).getDataStream().isSystem());
}
}

public void testSystemDataStreamInFeatureState() throws Exception {
Path location = randomRepoPath();
createRepository(REPO, "fs", location);

{
CreateDataStreamAction.Request request = new CreateDataStreamAction.Request(SYSTEM_DATA_STREAM_NAME);
final AcknowledgedResponse response = client().execute(CreateDataStreamAction.INSTANCE, request).get();
assertTrue(response.isAcknowledged());
}

// Index a doc so that a concrete backing index will be created
IndexResponse indexToDataStreamResponse = client().prepareIndex(SYSTEM_DATA_STREAM_NAME, "_doc")
.setId("42")
.setSource("{ \"@timestamp\": \"2099-03-08T11:06:07.000Z\", \"name\": \"my-name\" }", XContentType.JSON)
.setOpType(DocWriteRequest.OpType.CREATE)
.execute()
.actionGet();
assertThat(indexToDataStreamResponse.status().getStatus(), oneOf(200, 201));

// Index a doc so that a concrete backing index will be created
IndexResponse indexResponse = client().prepareIndex("my-index", "_doc")
.setId("42")
.setSource("{ \"name\": \"my-name\" }", XContentType.JSON)
.setOpType(DocWriteRequest.OpType.CREATE)
.execute()
.get();
assertThat(indexResponse.status().getStatus(), oneOf(200, 201));

{
GetDataStreamAction.Request request = new GetDataStreamAction.Request(new String[] { SYSTEM_DATA_STREAM_NAME });
GetDataStreamAction.Response response = client().execute(GetDataStreamAction.INSTANCE, request).get();
assertThat(response.getDataStreams(), hasSize(1));
assertTrue(response.getDataStreams().get(0).getDataStream().isSystem());
}

SnapshotInfo snapshotInfo = assertSuccessful(
client().admin()
.cluster()
.prepareCreateSnapshot(REPO, SNAPSHOT)
.setIndices("my-index")
.setFeatureStates(SystemDataStreamTestPlugin.class.getSimpleName())
.setWaitForCompletion(true)
.setIncludeGlobalState(false)
.execute()
);

assertThat(snapshotInfo.dataStreams(), not(empty()));

// We have to delete the data stream directly, as the feature reset API doesn't clean up system data streams yet
// See https://github.com/elastic/elasticsearch/issues/75818
Expand All @@ -97,6 +179,8 @@ public void testSystemDataStreamSnapshotIT() throws Exception {
assertTrue(response.isAcknowledged());
}

assertAcked(client().admin().indices().prepareDelete("my-index"));

{
GetIndexResponse indicesRemaining = client().admin().indices().prepareGetIndex().addIndices("_all").get();
assertThat(indicesRemaining.indices(), arrayWithSize(0));
Expand All @@ -106,7 +190,8 @@ public void testSystemDataStreamSnapshotIT() throws Exception {
.cluster()
.prepareRestoreSnapshot(REPO, SNAPSHOT)
.setWaitForCompletion(true)
.setRestoreGlobalState(false)
.setIndices("my-index")
.setFeatureStates(SystemDataStreamTestPlugin.class.getSimpleName())
.get();
assertEquals(restoreSnapshotResponse.getRestoreInfo().totalShards(), restoreSnapshotResponse.getRestoreInfo().successfulShards());

Expand Down

0 comments on commit 879700a

Please sign in to comment.