Skip to content

Commit

Permalink
Add Restore Operation to SnapshotResiliencyTests (#40634)
Browse files Browse the repository at this point in the history
* Add Restore Operation to SnapshotResiliencyTests

* Expand the successful snapshot test case to also include restoring the snapshop
  * Add indexing of documents as well to be able to meaningfully verify the restore
* This is part of the larger effort to test eventually consistent blob stores in #39504
  • Loading branch information
original-brownbear authored Apr 3, 2019
1 parent efe47ce commit 78a590f
Showing 1 changed file with 149 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,42 @@
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotAction;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.delete.TransportDeleteSnapshotAction;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotAction;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.restore.TransportRestoreSnapshotAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.TransportClusterStateAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.mapping.put.TransportPutMappingAction;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresAction;
import org.elasticsearch.action.admin.indices.shards.TransportIndicesShardStoresAction;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.bulk.TransportShardBulkAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.resync.TransportResyncReplicationAction;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchExecutionStatsCollector;
import org.elasticsearch.action.search.SearchPhaseController;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchTransportService;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.DestructiveOperations;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateHelper;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterModule;
Expand All @@ -54,6 +78,7 @@
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.coordination.ClusterBootstrapService;
Expand All @@ -68,6 +93,8 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.metadata.MetaDataDeleteIndexService;
import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
import org.elasticsearch.cluster.metadata.MetaDataMappingService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand Down Expand Up @@ -100,7 +127,9 @@
import org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction;
import org.elasticsearch.index.seqno.RetentionLeaseSyncAction;
import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.analysis.AnalysisModule;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
Expand All @@ -109,13 +138,16 @@
import org.elasticsearch.indices.recovery.PeerRecoverySourceService;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.node.ResponseCollectorService;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.FetchPhase;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.disruption.DisruptableMockTransport;
import org.elasticsearch.test.disruption.NetworkDisruption;
Expand All @@ -138,10 +170,12 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -177,18 +211,20 @@ public void stopServices() {
testClusterNodes.nodes.values().forEach(TestClusterNode::stop);
}

public void testSuccessfulSnapshot() {
public void testSuccessfulSnapshotAndRestore() {
setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));

String repoName = "repo";
String snapshotName = "snapshot";
final String index = "test";

final int shards = randomIntBetween(1, 10);

final int documents = randomIntBetween(0, 100);
TestClusterNode masterNode =
testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());
final AtomicBoolean createdSnapshot = new AtomicBoolean();
final AtomicBoolean snapshotRestored = new AtomicBoolean();
final AtomicBoolean documentCountVerified = new AtomicBoolean();
masterNode.client.admin().cluster().preparePutRepository(repoName)
.setType(FsRepository.TYPE).setSettings(Settings.builder().put("location", randomAlphaOfLength(10)))
.execute(
Expand All @@ -197,12 +233,61 @@ public void testSuccessfulSnapshot() {
new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL)
.settings(defaultIndexSettings(shards)),
assertNoFailureListener(
() -> masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
.execute(assertNoFailureListener(() -> createdSnapshot.set(true)))))));

deterministicTaskQueue.runAllRunnableTasks();

() -> {
final Runnable afterIndexing = () ->
masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
.setWaitForCompletion(true).execute(assertNoFailureListener(() -> {
createdSnapshot.set(true);
masterNode.client.admin().indices().delete(
new DeleteIndexRequest(index),
assertNoFailureListener(() -> masterNode.client.admin().cluster().restoreSnapshot(
new RestoreSnapshotRequest(repoName, snapshotName).waitForCompletion(true),
assertNoFailureListener(restoreSnapshotResponse -> {
snapshotRestored.set(true);
assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards());
masterNode.client.search(
new SearchRequest(index).source(
new SearchSourceBuilder().size(0).trackTotalHits(true)
),
assertNoFailureListener(r -> {
assertEquals(
(long) documents,
Objects.requireNonNull(r.getHits().getTotalHits()).value
);
documentCountVerified.set(true);
}));
})
)));
}));
final AtomicInteger countdown = new AtomicInteger(documents);
masterNode.client.admin().indices().putMapping(
new PutMappingRequest(index).type("_doc").source("foo", "type=text"),
assertNoFailureListener(r -> {
for (int i = 0; i < documents; ++i) {
masterNode.client.bulk(
new BulkRequest().add(new IndexRequest(index).source(
Collections.singletonMap("foo", "bar" + i)))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE),
assertNoFailureListener(
bulkResponse -> {
assertFalse(
"Failures in bulkresponse: " + bulkResponse.buildFailureMessage(),
bulkResponse.hasFailures());
if (countdown.decrementAndGet() == 0) {
afterIndexing.run();
}
}));
}
if (documents == 0) {
afterIndexing.run();
}
}
));
}))));
runUntil(documentCountVerified::get, TimeUnit.MINUTES.toMillis(5L));
assertTrue(createdSnapshot.get());
assertTrue(snapshotRestored.get());
assertTrue(documentCountVerified.get());
SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE);
assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false));
final Repository repository = masterNode.repositoriesService.repository(repoName);
Expand Down Expand Up @@ -236,7 +321,6 @@ public void testSnapshotWithNodeDisconnects() {
.execute(
assertNoFailureListener(
() -> masterNode.client.admin().indices().create(

new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL)
.settings(defaultIndexSettings(shards)),
assertNoFailureListener(
Expand Down Expand Up @@ -831,6 +915,8 @@ protected void assertSnapshotOrGenericThread() {
allocationService = ESAllocationTestCase.createAllocationService(settings);
final IndexScopedSettings indexScopedSettings =
new IndexScopedSettings(settings, IndexScopedSettings.BUILT_IN_INDEX_SETTINGS);
final BigArrays bigArrays = new BigArrays(new PageCacheRecycler(settings), null, "test");
final MapperRegistry mapperRegistry = new IndicesModule(Collections.emptyList()).getMapperRegistry();
indicesService = new IndicesService(
settings,
mock(PluginsService.class),
Expand All @@ -839,12 +925,12 @@ protected void assertSnapshotOrGenericThread() {
new AnalysisRegistry(environment, emptyMap(), emptyMap(), emptyMap(), emptyMap(), emptyMap(),
emptyMap(), emptyMap(), emptyMap(), emptyMap()),
indexNameExpressionResolver,
new MapperRegistry(emptyMap(), emptyMap(), MapperPlugin.NOOP_FIELD_FILTER),
mapperRegistry,
namedWriteableRegistry,
threadPool,
indexScopedSettings,
new NoneCircuitBreakerService(),
new BigArrays(new PageCacheRecycler(settings), null, "test"),
bigArrays,
scriptService,
client,
new MetaStateService(nodeEnv, namedXContentRegistry),
Expand All @@ -861,14 +947,15 @@ protected void assertSnapshotOrGenericThread() {
new RoutingService(clusterService, allocationService),
threadPool
);
final MetaDataMappingService metaDataMappingService = new MetaDataMappingService(clusterService, indicesService);
indicesClusterStateService = new IndicesClusterStateService(
settings,
indicesService,
clusterService,
threadPool,
new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService),
shardStateAction,
new NodeMappingRefreshAction(transportService, new MetaDataMappingService(clusterService, indicesService)),
new NodeMappingRefreshAction(transportService, metaDataMappingService),
repositoriesService,
mock(SearchService.class),
new SyncedFlushService(indicesService, clusterService, transportService, indexNameExpressionResolver),
Expand Down Expand Up @@ -913,14 +1000,61 @@ protected void assertSnapshotOrGenericThread() {
actionFilters,
indexNameExpressionResolver));
Map<Action, TransportAction> actions = new HashMap<>();
final MetaDataCreateIndexService metaDataCreateIndexService = new MetaDataCreateIndexService(settings, clusterService,
indicesService,
allocationService, new AliasValidator(), environment, indexScopedSettings,
threadPool, namedXContentRegistry, false);
actions.put(CreateIndexAction.INSTANCE,
new TransportCreateIndexAction(
transportService, clusterService, threadPool,
new MetaDataCreateIndexService(settings, clusterService, indicesService,
allocationService, new AliasValidator(), environment, indexScopedSettings,
threadPool, namedXContentRegistry, false),
metaDataCreateIndexService,
actionFilters, indexNameExpressionResolver
));
final MappingUpdatedAction mappingUpdatedAction = new MappingUpdatedAction(settings, clusterSettings);
mappingUpdatedAction.setClient(client);
final TransportShardBulkAction transportShardBulkAction = new TransportShardBulkAction(settings, transportService,
clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedAction, new UpdateHelper(scriptService),
actionFilters, indexNameExpressionResolver);
actions.put(BulkAction.INSTANCE,
new TransportBulkAction(threadPool, transportService, clusterService,
new IngestService(
clusterService, threadPool, environment, scriptService,
new AnalysisModule(environment, Collections.emptyList()).getAnalysisRegistry(),
Collections.emptyList()),
transportShardBulkAction, client, actionFilters, indexNameExpressionResolver,
new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver)
));
final RestoreService restoreService = new RestoreService(
clusterService, repositoriesService, allocationService,
metaDataCreateIndexService,
new MetaDataIndexUpgradeService(
settings, namedXContentRegistry,
mapperRegistry,
indexScopedSettings,
Collections.emptyList()
),
clusterSettings
);
actions.put(PutMappingAction.INSTANCE,
new TransportPutMappingAction(transportService, clusterService, threadPool, metaDataMappingService,
actionFilters, indexNameExpressionResolver, new TransportPutMappingAction.RequestValidators(Collections.emptyList())));
final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService);
final SearchTransportService searchTransportService = new SearchTransportService(transportService,
SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
final SearchService searchService = new SearchService(clusterService, indicesService, threadPool, scriptService,
bigArrays, new FetchPhase(Collections.emptyList()), responseCollectorService);
actions.put(SearchAction.INSTANCE,
new TransportSearchAction(threadPool, transportService, searchService,
searchTransportService, new SearchPhaseController(searchService::createReduceContext), clusterService,
actionFilters, indexNameExpressionResolver));
actions.put(RestoreSnapshotAction.INSTANCE,
new TransportRestoreSnapshotAction(transportService, clusterService, threadPool, restoreService, actionFilters,
indexNameExpressionResolver));
actions.put(DeleteIndexAction.INSTANCE,
new TransportDeleteIndexAction(
transportService, clusterService, threadPool,
new MetaDataDeleteIndexService(settings, clusterService, allocationService), actionFilters,
indexNameExpressionResolver, new DestructiveOperations(settings, clusterSettings)));
actions.put(PutRepositoryAction.INSTANCE,
new TransportPutRepositoryAction(
transportService, clusterService, repositoriesService, threadPool,
Expand Down

0 comments on commit 78a590f

Please sign in to comment.