From ad535306498bc068348674af6762e100642bb9fb Mon Sep 17 00:00:00 2001
From: Armin Braun <me@obrown.io>
Date: Fri, 29 Mar 2019 14:05:42 +0100
Subject: [PATCH] Add Restore Operation to SnapshotResiliencyTests

* Expand the successful snapshot test case to also include restoring the snapshop
  * Add indexing of documents as well to be able to meaningfully verify the restore
* This is part of the larger effort to test eventually consistent blob stores in #39504
---
 .../snapshots/SnapshotResiliencyTests.java    | 165 ++++++++++++++++--
 1 file changed, 150 insertions(+), 15 deletions(-)

diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java
index 9c1d256b552bd..f986e585484af 100644
--- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java
+++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java
@@ -34,18 +34,42 @@
 import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotAction;
 import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
 import org.elasticsearch.action.admin.cluster.snapshots.delete.TransportDeleteSnapshotAction;
+import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotAction;
+import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
+import org.elasticsearch.action.admin.cluster.snapshots.restore.TransportRestoreSnapshotAction;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
 import org.elasticsearch.action.admin.cluster.state.TransportClusterStateAction;
 import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
+import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction;
+import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
+import org.elasticsearch.action.admin.indices.mapping.put.TransportPutMappingAction;
 import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresAction;
 import org.elasticsearch.action.admin.indices.shards.TransportIndicesShardStoresAction;
+import org.elasticsearch.action.bulk.BulkAction;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.TransportBulkAction;
+import org.elasticsearch.action.bulk.TransportShardBulkAction;
+import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.resync.TransportResyncReplicationAction;
+import org.elasticsearch.action.search.SearchAction;
+import org.elasticsearch.action.search.SearchExecutionStatsCollector;
+import org.elasticsearch.action.search.SearchPhaseController;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchTransportService;
+import org.elasticsearch.action.search.TransportSearchAction;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.ActiveShardCount;
+import org.elasticsearch.action.support.AutoCreateIndex;
+import org.elasticsearch.action.support.DestructiveOperations;
 import org.elasticsearch.action.support.TransportAction;
+import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.action.update.UpdateHelper;
 import org.elasticsearch.client.AdminClient;
 import org.elasticsearch.client.node.NodeClient;
 import org.elasticsearch.cluster.ClusterModule;
@@ -54,6 +78,7 @@
 import org.elasticsearch.cluster.ESAllocationTestCase;
 import org.elasticsearch.cluster.NodeConnectionsService;
 import org.elasticsearch.cluster.SnapshotsInProgress;
+import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
 import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction;
 import org.elasticsearch.cluster.action.shard.ShardStateAction;
 import org.elasticsearch.cluster.coordination.ClusterBootstrapService;
@@ -68,6 +93,8 @@
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
+import org.elasticsearch.cluster.metadata.MetaDataDeleteIndexService;
+import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
 import org.elasticsearch.cluster.metadata.MetaDataMappingService;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
@@ -100,7 +127,9 @@
 import org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction;
 import org.elasticsearch.index.seqno.RetentionLeaseSyncAction;
 import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
+import org.elasticsearch.indices.IndicesModule;
 import org.elasticsearch.indices.IndicesService;
+import org.elasticsearch.indices.analysis.AnalysisModule;
 import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
 import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService;
 import org.elasticsearch.indices.cluster.IndicesClusterStateService;
@@ -109,13 +138,16 @@
 import org.elasticsearch.indices.recovery.PeerRecoverySourceService;
 import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
 import org.elasticsearch.indices.recovery.RecoverySettings;
-import org.elasticsearch.plugins.MapperPlugin;
+import org.elasticsearch.ingest.IngestService;
+import org.elasticsearch.node.ResponseCollectorService;
 import org.elasticsearch.plugins.PluginsService;
 import org.elasticsearch.repositories.RepositoriesService;
 import org.elasticsearch.repositories.Repository;
 import org.elasticsearch.repositories.fs.FsRepository;
 import org.elasticsearch.script.ScriptService;
 import org.elasticsearch.search.SearchService;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.fetch.FetchPhase;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.disruption.DisruptableMockTransport;
 import org.elasticsearch.test.disruption.NetworkDisruption;
@@ -138,10 +170,12 @@
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -177,7 +211,7 @@ public void stopServices() {
         testClusterNodes.nodes.values().forEach(TestClusterNode::stop);
     }
 
-    public void testSuccessfulSnapshot() {
+    public void testSuccessfulSnapshotAndRestore() {
         setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));
 
         String repoName = "repo";
@@ -185,10 +219,12 @@ public void testSuccessfulSnapshot() {
         final String index = "test";
 
         final int shards = randomIntBetween(1, 10);
-
+        final int documents = randomIntBetween(0, 100);
         TestClusterNode masterNode =
             testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());
         final AtomicBoolean createdSnapshot = new AtomicBoolean();
+        final AtomicBoolean snapshotRestored = new AtomicBoolean();
+        final AtomicBoolean documentCountVerified = new AtomicBoolean();
         masterNode.client.admin().cluster().preparePutRepository(repoName)
             .setType(FsRepository.TYPE).setSettings(Settings.builder().put("location", randomAlphaOfLength(10)))
             .execute(
@@ -197,12 +233,62 @@ public void testSuccessfulSnapshot() {
                         new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL)
                             .settings(defaultIndexSettings(shards)),
                         assertNoFailureListener(
-                            () -> masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
-                                .execute(assertNoFailureListener(() -> createdSnapshot.set(true)))))));
-
-        deterministicTaskQueue.runAllRunnableTasks();
-
+                            () -> {
+                                final Runnable afterIndexing = () ->
+                                    masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
+                                        .setWaitForCompletion(true).execute(assertNoFailureListener(() -> {
+                                        createdSnapshot.set(true);
+                                        masterNode.client.admin().indices().delete(
+                                            new DeleteIndexRequest(index),
+                                            assertNoFailureListener(() -> masterNode.client.admin().cluster().restoreSnapshot(
+                                                new RestoreSnapshotRequest(repoName, snapshotName).waitForCompletion(true),
+                                                assertNoFailureListener(restoreSnapshotResponse -> {
+                                                    snapshotRestored.set(true);
+                                                    assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards());
+                                                    masterNode.client.search(
+                                                        new SearchRequest(index).source(
+                                                            new SearchSourceBuilder().size(0).trackTotalHits(true)
+                                                        ),
+                                                        assertNoFailureListener(r -> {
+                                                            assertEquals(
+                                                                (long) documents,
+                                                                Objects.requireNonNull(r.getHits().getTotalHits()).value
+                                                            );
+                                                            documentCountVerified.set(true);
+                                                        }));
+                                                })
+                                            )));
+                                    }));
+                                final AtomicInteger countdown = new AtomicInteger(documents);
+                                masterNode.client.admin().indices().putMapping(
+                                    new PutMappingRequest(index).type("_doc").source("foo", "type=text"),
+                                    assertNoFailureListener(r -> {
+                                            for (int i = 0; i < documents; ++i) {
+                                                masterNode.client.bulk(
+                                                    new BulkRequest().add(new IndexRequest(index).source(
+                                                        Collections.singletonMap("foo", "bar" + i)))
+                                                        .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE),
+                                                    assertNoFailureListener(
+                                                        bulkResponse -> {
+                                                            assertFalse(
+                                                                "Failures in bulkresponse: " + bulkResponse.buildFailureMessage(),
+                                                                bulkResponse.hasFailures());
+                                                            if (countdown.decrementAndGet() == 0) {
+                                                                afterIndexing.run();
+                                                            }
+                                                        }));
+                                            }
+                                            if (documents == 0) {
+                                                afterIndexing.run();
+                                            }
+                                        }
+                                    ));
+                            }))));
+        runUntil(
+            () -> createdSnapshot.get() && snapshotRestored.get() && documentCountVerified.get(), TimeUnit.MINUTES.toMillis(5L));
         assertTrue(createdSnapshot.get());
+        assertTrue(snapshotRestored.get());
+        assertTrue(documentCountVerified.get());
         SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE);
         assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false));
         final Repository repository = masterNode.repositoriesService.repository(repoName);
@@ -236,7 +322,6 @@ public void testSnapshotWithNodeDisconnects() {
             .execute(
                 assertNoFailureListener(
                     () -> masterNode.client.admin().indices().create(
-
                         new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL)
                             .settings(defaultIndexSettings(shards)),
                         assertNoFailureListener(
@@ -831,6 +916,8 @@ protected void assertSnapshotOrGenericThread() {
             allocationService = ESAllocationTestCase.createAllocationService(settings);
             final IndexScopedSettings indexScopedSettings =
                 new IndexScopedSettings(settings, IndexScopedSettings.BUILT_IN_INDEX_SETTINGS);
+            final BigArrays bigArrays = new BigArrays(new PageCacheRecycler(settings), null, "test");
+            final MapperRegistry mapperRegistry = new IndicesModule(Collections.emptyList()).getMapperRegistry();
             indicesService = new IndicesService(
                 settings,
                 mock(PluginsService.class),
@@ -839,12 +926,12 @@ protected void assertSnapshotOrGenericThread() {
                 new AnalysisRegistry(environment, emptyMap(), emptyMap(), emptyMap(), emptyMap(), emptyMap(),
                     emptyMap(), emptyMap(), emptyMap(), emptyMap()),
                 indexNameExpressionResolver,
-                new MapperRegistry(emptyMap(), emptyMap(), MapperPlugin.NOOP_FIELD_FILTER),
+                mapperRegistry,
                 namedWriteableRegistry,
                 threadPool,
                 indexScopedSettings,
                 new NoneCircuitBreakerService(),
-                new BigArrays(new PageCacheRecycler(settings), null, "test"),
+                bigArrays,
                 scriptService,
                 client,
                 new MetaStateService(nodeEnv, namedXContentRegistry),
@@ -861,6 +948,7 @@ protected void assertSnapshotOrGenericThread() {
                 new RoutingService(clusterService, allocationService),
                 threadPool
             );
+            final MetaDataMappingService metaDataMappingService = new MetaDataMappingService(clusterService, indicesService);
             indicesClusterStateService = new IndicesClusterStateService(
                 settings,
                 indicesService,
@@ -868,7 +956,7 @@ protected void assertSnapshotOrGenericThread() {
                 threadPool,
                 new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService),
                 shardStateAction,
-                new NodeMappingRefreshAction(transportService, new MetaDataMappingService(clusterService, indicesService)),
+                new NodeMappingRefreshAction(transportService, metaDataMappingService),
                 repositoriesService,
                 mock(SearchService.class),
                 new SyncedFlushService(indicesService, clusterService, transportService, indexNameExpressionResolver),
@@ -913,14 +1001,61 @@ protected void assertSnapshotOrGenericThread() {
                             actionFilters,
                             indexNameExpressionResolver));
             Map<Action, TransportAction> actions = new HashMap<>();
+            final MetaDataCreateIndexService metaDataCreateIndexService = new MetaDataCreateIndexService(settings, clusterService,
+                indicesService,
+                allocationService, new AliasValidator(), environment, indexScopedSettings,
+                threadPool, namedXContentRegistry, false);
             actions.put(CreateIndexAction.INSTANCE,
                 new TransportCreateIndexAction(
                     transportService, clusterService, threadPool,
-                    new MetaDataCreateIndexService(settings, clusterService, indicesService,
-                        allocationService, new AliasValidator(), environment, indexScopedSettings,
-                        threadPool, namedXContentRegistry, false),
+                    metaDataCreateIndexService,
                     actionFilters, indexNameExpressionResolver
                 ));
+            final MappingUpdatedAction mappingUpdatedAction = new MappingUpdatedAction(settings, clusterSettings);
+            mappingUpdatedAction.setClient(client);
+            final TransportShardBulkAction transportShardBulkAction = new TransportShardBulkAction(settings, transportService,
+                clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedAction, new UpdateHelper(scriptService),
+                actionFilters, indexNameExpressionResolver);
+            actions.put(BulkAction.INSTANCE,
+                new TransportBulkAction(threadPool, transportService, clusterService,
+                    new IngestService(
+                        clusterService, threadPool, environment, scriptService,
+                        new AnalysisModule(environment, Collections.emptyList()).getAnalysisRegistry(),
+                        Collections.emptyList()),
+                    transportShardBulkAction, client, actionFilters, indexNameExpressionResolver,
+                    new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver)
+                ));
+            final RestoreService restoreService = new RestoreService(
+                clusterService, repositoriesService, allocationService,
+                metaDataCreateIndexService,
+                new MetaDataIndexUpgradeService(
+                    settings, namedXContentRegistry,
+                    mapperRegistry,
+                    indexScopedSettings,
+                    Collections.emptyList()
+                ),
+                clusterSettings
+            );
+            actions.put(PutMappingAction.INSTANCE,
+                new TransportPutMappingAction(transportService, clusterService, threadPool, metaDataMappingService,
+                    actionFilters, indexNameExpressionResolver, new TransportPutMappingAction.RequestValidators(Collections.emptyList())));
+            final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService);
+            final SearchTransportService searchTransportService = new SearchTransportService(transportService,
+                SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
+            final SearchService searchService = new SearchService(clusterService, indicesService, threadPool, scriptService,
+                bigArrays, new FetchPhase(Collections.emptyList()), responseCollectorService);
+            actions.put(SearchAction.INSTANCE,
+                new TransportSearchAction(threadPool, transportService, searchService,
+                    searchTransportService, new SearchPhaseController(searchService::createReduceContext), clusterService,
+                    actionFilters, indexNameExpressionResolver));
+            actions.put(RestoreSnapshotAction.INSTANCE,
+                new TransportRestoreSnapshotAction(transportService, clusterService, threadPool, restoreService, actionFilters,
+                    indexNameExpressionResolver));
+            actions.put(DeleteIndexAction.INSTANCE,
+                new TransportDeleteIndexAction(
+                    transportService, clusterService, threadPool,
+                    new MetaDataDeleteIndexService(settings, clusterService, allocationService), actionFilters,
+                    indexNameExpressionResolver, new DestructiveOperations(settings, clusterSettings)));
             actions.put(PutRepositoryAction.INSTANCE,
                 new TransportPutRepositoryAction(
                     transportService, clusterService, repositoriesService, threadPool,