Skip to content

Commit

Permalink
Implement Eventually Consistent Mock Repository for SnapshotResilienc…
Browse files Browse the repository at this point in the history
…yTests (#40893) (#44570)

* Add eventually consistent mock repository for reproducing and testing AWS S3 blob store behavior
* Relates #38941
  • Loading branch information
original-brownbear authored Jul 18, 2019
1 parent 8445c41 commit 3b5038b
Show file tree
Hide file tree
Showing 4 changed files with 560 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

private static final int BUFFER_SIZE = 4096;

private static final String SNAPSHOT_PREFIX = "snap-";
public static final String SNAPSHOT_PREFIX = "snap-";

private static final String SNAPSHOT_CODEC = "snapshot";
public static final String SNAPSHOT_CODEC = "snapshot";

private static final String INDEX_FILE_PREFIX = "index-";

private static final String INDEX_LATEST_BLOB = "index.latest";
public static final String INDEX_LATEST_BLOB = "index.latest";

private static final String TESTS_FILE = "tests-";

Expand Down Expand Up @@ -210,7 +210,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

private ChecksumBlobStoreFormat<IndexMetaData> indexMetaDataFormat;

private ChecksumBlobStoreFormat<SnapshotInfo> snapshotFormat;
protected ChecksumBlobStoreFormat<SnapshotInfo> snapshotFormat;

private final boolean readOnly;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -152,6 +153,7 @@
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.FetchPhase;
import org.elasticsearch.snapshots.mockstore.MockEventuallyConsistentRepository;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.disruption.DisruptableMockTransport;
import org.elasticsearch.test.disruption.NetworkDisruption;
Expand Down Expand Up @@ -203,16 +205,28 @@ public class SnapshotResiliencyTests extends ESTestCase {

private Path tempDir;

/**
* Context shared by all the node's {@link Repository} instances if the eventually consistent blobstore is to be used.
* {@code null} if not using the eventually consistent blobstore.
*/
@Nullable private MockEventuallyConsistentRepository.Context blobStoreContext;

@Before
public void createServices() {
tempDir = createTempDir();
if (randomBoolean()) {
blobStoreContext = new MockEventuallyConsistentRepository.Context();
}
deterministicTaskQueue =
new DeterministicTaskQueue(Settings.builder().put(NODE_NAME_SETTING.getKey(), "shared").build(), random());
}

@After
public void verifyReposThenStopServices() {
try {
if (blobStoreContext != null) {
blobStoreContext.forceConsistent();
}
BlobStoreTestUtil.assertConsistency(
(BlobStoreRepository) testClusterNodes.randomMasterNodeSafe().repositoriesService.repository("repo"),
Runnable::run);
Expand Down Expand Up @@ -900,19 +914,7 @@ public void onFailure(final Exception e) {
final IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver();
repositoriesService = new RepositoriesService(
settings, clusterService, transportService,
Collections.singletonMap(FsRepository.TYPE, metaData -> {
final Repository repository = new FsRepository(metaData, environment, xContentRegistry(), threadPool) {
@Override
protected void assertSnapshotOrGenericThread() {
// eliminate thread name check as we create repo in the test thread
}
};
repository.start();
return repository;
}
),
emptyMap(),
threadPool
Collections.singletonMap(FsRepository.TYPE, getRepoFactory(environment)), emptyMap(), threadPool
);
snapshotsService =
new SnapshotsService(settings, clusterService, indexNameExpressionResolver, repositoriesService, threadPool);
Expand Down Expand Up @@ -1093,6 +1095,28 @@ searchTransportService, new SearchPhaseController(searchService::createReduceCon
client.initialize(actions, () -> clusterService.localNode().getId(), transportService.getRemoteClusterService());
}

private Repository.Factory getRepoFactory(Environment environment) {
// Run half the tests with the eventually consistent repository
if (blobStoreContext == null) {
return metaData -> {
final Repository repository = new FsRepository(metaData, environment, xContentRegistry(), threadPool) {
@Override
protected void assertSnapshotOrGenericThread() {
// eliminate thread name check as we create repo in the test thread
}
};
repository.start();
return repository;
};
} else {
return metaData -> {
final Repository repository = new MockEventuallyConsistentRepository(
metaData, environment, xContentRegistry(), deterministicTaskQueue.getThreadPool(), blobStoreContext);
repository.start();
return repository;
};
}
}
public void restart() {
testClusterNodes.disconnectNode(this);
final ClusterState oldState = this.clusterService.state();
Expand Down
Loading

0 comments on commit 3b5038b

Please sign in to comment.