diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 5790c0dc09745..b12b2c610d8ec 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -166,13 +166,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private static final int BUFFER_SIZE = 4096; - private static final String SNAPSHOT_PREFIX = "snap-"; + public static final String SNAPSHOT_PREFIX = "snap-"; - private static final String SNAPSHOT_CODEC = "snapshot"; + public static final String SNAPSHOT_CODEC = "snapshot"; private static final String INDEX_FILE_PREFIX = "index-"; - private static final String INDEX_LATEST_BLOB = "index.latest"; + public static final String INDEX_LATEST_BLOB = "index.latest"; private static final String TESTS_FILE = "tests-"; @@ -210,7 +210,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private ChecksumBlobStoreFormat indexMetaDataFormat; - private ChecksumBlobStoreFormat snapshotFormat; + protected ChecksumBlobStoreFormat snapshotFormat; private final boolean readOnly; diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 21b05879cacf6..4912706885679 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -108,6 +108,7 @@ import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.MasterService; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.ClusterSettings; @@ -152,6 +153,7 @@ import org.elasticsearch.search.SearchService; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.fetch.FetchPhase; +import org.elasticsearch.snapshots.mockstore.MockEventuallyConsistentRepository; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.disruption.DisruptableMockTransport; import org.elasticsearch.test.disruption.NetworkDisruption; @@ -203,9 +205,18 @@ public class SnapshotResiliencyTests extends ESTestCase { private Path tempDir; + /** + * Context shared by all the node's {@link Repository} instances if the eventually consistent blobstore is to be used. + * {@code null} if not using the eventually consistent blobstore. + */ + @Nullable private MockEventuallyConsistentRepository.Context blobStoreContext; + @Before public void createServices() { tempDir = createTempDir(); + if (randomBoolean()) { + blobStoreContext = new MockEventuallyConsistentRepository.Context(); + } deterministicTaskQueue = new DeterministicTaskQueue(Settings.builder().put(NODE_NAME_SETTING.getKey(), "shared").build(), random()); } @@ -213,6 +224,9 @@ public void createServices() { @After public void verifyReposThenStopServices() { try { + if (blobStoreContext != null) { + blobStoreContext.forceConsistent(); + } BlobStoreTestUtil.assertConsistency( (BlobStoreRepository) testClusterNodes.randomMasterNodeSafe().repositoriesService.repository("repo"), Runnable::run); @@ -900,19 +914,7 @@ public void onFailure(final Exception e) { final IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver(); repositoriesService = new RepositoriesService( settings, clusterService, transportService, - Collections.singletonMap(FsRepository.TYPE, metaData -> { - final Repository repository = new FsRepository(metaData, environment, xContentRegistry(), threadPool) { - @Override - protected void assertSnapshotOrGenericThread() { - // eliminate thread name check as we create repo in the test thread - } - }; - repository.start(); - return repository; - } - ), - emptyMap(), - threadPool + Collections.singletonMap(FsRepository.TYPE, getRepoFactory(environment)), emptyMap(), threadPool ); snapshotsService = new SnapshotsService(settings, clusterService, indexNameExpressionResolver, repositoriesService, threadPool); @@ -1093,6 +1095,28 @@ searchTransportService, new SearchPhaseController(searchService::createReduceCon client.initialize(actions, () -> clusterService.localNode().getId(), transportService.getRemoteClusterService()); } + private Repository.Factory getRepoFactory(Environment environment) { + // Run half the tests with the eventually consistent repository + if (blobStoreContext == null) { + return metaData -> { + final Repository repository = new FsRepository(metaData, environment, xContentRegistry(), threadPool) { + @Override + protected void assertSnapshotOrGenericThread() { + // eliminate thread name check as we create repo in the test thread + } + }; + repository.start(); + return repository; + }; + } else { + return metaData -> { + final Repository repository = new MockEventuallyConsistentRepository( + metaData, environment, xContentRegistry(), deterministicTaskQueue.getThreadPool(), blobStoreContext); + repository.start(); + return repository; + }; + } + } public void restart() { testClusterNodes.disconnectNode(this); final ClusterState oldState = this.clusterService.state(); diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java new file mode 100644 index 0000000000000..bde2deaa642a3 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java @@ -0,0 +1,338 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.snapshots.mockstore; + +import org.apache.lucene.codecs.CodecUtil; +import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.common.blobstore.BlobMetaData; +import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.BlobStore; +import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.env.Environment; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.NoSuchFileException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; + +/** + * Mock Repository that allows testing the eventually consistent behaviour of AWS S3 as documented in the + * AWS S3 docs. + * Currently, the repository asserts that no inconsistent reads are made. + * TODO: Resolve todos on list and overwrite operation consistency to fully cover S3's behavior. + */ +public class MockEventuallyConsistentRepository extends BlobStoreRepository { + + private final Context context; + + private final NamedXContentRegistry namedXContentRegistry; + + public MockEventuallyConsistentRepository(RepositoryMetaData metadata, Environment environment, + NamedXContentRegistry namedXContentRegistry, ThreadPool threadPool, Context context) { + super(metadata, environment.settings(), false, namedXContentRegistry, threadPool); + this.context = context; + this.namedXContentRegistry = namedXContentRegistry; + } + + // Filters out all actions that are super-seeded by subsequent actions + // TODO: Remove all usages of this method, snapshots should not depend on consistent list operations + private static List consistentView(List actions) { + final Map lastActions = new HashMap<>(); + for (BlobStoreAction action : actions) { + if (action.operation == Operation.PUT) { + lastActions.put(action.path, action); + } else if (action.operation == Operation.DELETE) { + lastActions.remove(action.path); + } + } + return new ArrayList<>(lastActions.values()); + } + + @Override + protected void assertSnapshotOrGenericThread() { + // eliminate thread name check as we create repo in the test thread + } + + @Override + protected BlobStore createBlobStore() { + return new MockBlobStore(); + } + + @Override + public BlobPath basePath() { + return BlobPath.cleanPath(); + } + + /** + * Context that must be shared between all instances of {@link MockEventuallyConsistentRepository} in a test run. + */ + public static final class Context { + + private final List actions = new ArrayList<>(); + + /** + * Force the repository into a consistent end state so that its eventual state can be examined. + */ + public void forceConsistent() { + synchronized (actions) { + final List consistentActions = consistentView(actions); + actions.clear(); + actions.addAll(consistentActions); + } + } + } + + private enum Operation { + PUT, GET, DELETE + } + + private static final class BlobStoreAction { + + private final Operation operation; + + @Nullable + private final byte[] data; + + private final String path; + + private BlobStoreAction(Operation operation, String path, byte[] data) { + this.operation = operation; + this.path = path; + this.data = data; + } + + private BlobStoreAction(Operation operation, String path) { + this(operation, path, null); + } + } + + private class MockBlobStore implements BlobStore { + + private AtomicBoolean closed = new AtomicBoolean(false); + + @Override + public BlobContainer blobContainer(BlobPath path) { + return new MockBlobContainer(path); + } + + @Override + public void close() { + closed.set(true); + } + + private void ensureNotClosed() { + if (closed.get()) { + throw new AssertionError("Blobstore is closed already"); + } + } + + private class MockBlobContainer implements BlobContainer { + + private final BlobPath path; + + MockBlobContainer(BlobPath path) { + this.path = path; + } + + @Override + public BlobPath path() { + return path; + } + + @Override + public InputStream readBlob(String name) throws NoSuchFileException { + ensureNotClosed(); + final String blobPath = path.buildAsString() + name; + synchronized (context.actions) { + final List relevantActions = relevantActions(blobPath); + context.actions.add(new BlobStoreAction(Operation.GET, blobPath)); + if (relevantActions.stream().noneMatch(a -> a.operation == Operation.PUT)) { + throw new NoSuchFileException(blobPath); + } + if (relevantActions.size() == 1 && relevantActions.get(0).operation == Operation.PUT) { + // Consistent read after write + return new ByteArrayInputStream(relevantActions.get(0).data); + } + throw new AssertionError("Inconsistent read on [" + blobPath + ']'); + } + } + + private List relevantActions(String blobPath) { + assert Thread.holdsLock(context.actions); + final List relevantActions = new ArrayList<>( + context.actions.stream().filter(action -> blobPath.equals(action.path)).collect(Collectors.toList())); + for (int i = relevantActions.size() - 1; i > 0; i--) { + if (relevantActions.get(i).operation == Operation.GET) { + relevantActions.remove(i); + } else { + break; + } + } + return relevantActions; + } + + @Override + public void deleteBlob(String blobName) { + ensureNotClosed(); + synchronized (context.actions) { + context.actions.add(new BlobStoreAction(Operation.DELETE, path.buildAsString() + blobName)); + } + } + + @Override + public void delete() { + ensureNotClosed(); + final String thisPath = path.buildAsString(); + synchronized (context.actions) { + consistentView(context.actions).stream().filter(action -> action.path.startsWith(thisPath)) + .forEach(a -> context.actions.add(new BlobStoreAction(Operation.DELETE, a.path))); + } + } + + @Override + public Map listBlobs() { + ensureNotClosed(); + final String thisPath = path.buildAsString(); + synchronized (context.actions) { + return consistentView(context.actions).stream() + .filter( + action -> action.path.startsWith(thisPath) && action.path.substring(thisPath.length()).indexOf('/') == -1 + && action.operation == Operation.PUT) + .collect( + Collectors.toMap( + action -> action.path.substring(thisPath.length()), + action -> new PlainBlobMetaData(action.path.substring(thisPath.length()), action.data.length))); + } + } + + @Override + public Map children() { + ensureNotClosed(); + final String thisPath = path.buildAsString(); + synchronized (context.actions) { + return consistentView(context.actions).stream() + .filter(action -> + action.operation == Operation.PUT + && action.path.startsWith(thisPath) && action.path.substring(thisPath.length()).indexOf('/') != -1) + .map(action -> action.path.substring(thisPath.length()).split("/")[0]) + .distinct() + .collect(Collectors.toMap(Function.identity(), name -> new MockBlobContainer(path.add(name)))); + } + } + + @Override + public Map listBlobsByPrefix(String blobNamePrefix) { + return + listBlobs().entrySet().stream().filter(entry -> entry.getKey().startsWith(blobNamePrefix)).collect( + Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + @Override + public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) + throws IOException { + ensureNotClosed(); + assert blobSize < Integer.MAX_VALUE; + final byte[] data = new byte[(int) blobSize]; + final int read = inputStream.read(data); + assert read == data.length; + final String blobPath = path.buildAsString() + blobName; + synchronized (context.actions) { + final List relevantActions = relevantActions(blobPath); + // We do some checks in case there is a consistent state for a blob to prevent turning it inconsistent. + final boolean hasConsistentContent = + relevantActions.size() == 1 && relevantActions.get(0).operation == Operation.PUT; + if (BlobStoreRepository.INDEX_LATEST_BLOB.equals(blobName)) { + // TODO: Ensure that it is impossible to ever decrement the generation id stored in index.latest then assert that + // it never decrements here + } else if (blobName.startsWith(BlobStoreRepository.SNAPSHOT_PREFIX)) { + if (hasConsistentContent) { + if (basePath().buildAsString().equals(path().buildAsString())) { + try { + // TODO: dry up the logic for reading SnapshotInfo here against the code in ChecksumBlobStoreFormat + final int offset = CodecUtil.headerLength(BlobStoreRepository.SNAPSHOT_CODEC); + final SnapshotInfo updatedInfo = SnapshotInfo.fromXContentInternal( + XContentHelper.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, + new BytesArray(data, offset, data.length - offset - CodecUtil.footerLength()), + XContentType.SMILE)); + // If the existing snapshotInfo differs only in the timestamps it stores, then the overwrite is not + // a problem and could be the result of a correctly handled master failover. + final SnapshotInfo existingInfo = snapshotFormat.readBlob(this, blobName); + assertThat(existingInfo.snapshotId(), equalTo(updatedInfo.snapshotId())); + assertThat(existingInfo.reason(), equalTo(updatedInfo.reason())); + assertThat(existingInfo.state(), equalTo(updatedInfo.state())); + assertThat(existingInfo.totalShards(), equalTo(updatedInfo.totalShards())); + assertThat(existingInfo.successfulShards(), equalTo(updatedInfo.successfulShards())); + assertThat( + existingInfo.shardFailures(), containsInAnyOrder(updatedInfo.shardFailures().toArray())); + assertThat(existingInfo.indices(), equalTo(updatedInfo.indices())); + return; // No need to add a write for this since we didn't change content + } catch (Exception e) { + // Rethrow as AssertionError here since kind exception might otherwise be swallowed and logged by + // the blob store repository. + // Since we are not doing any actual IO we don't expect this to throw ever and an exception would + // signal broken SnapshotInfo bytes or unexpected behavior of SnapshotInfo otherwise. + throw new AssertionError("Failed to deserialize SnapshotInfo", e); + } + } else { + // Primaries never retry so any shard level snap- blob retry/overwrite even with the same content is + // not expected. + throw new AssertionError("Shard level snap-{uuid} blobs should never be overwritten"); + } + } + } else { + if (hasConsistentContent) { + ESTestCase.assertArrayEquals("Tried to overwrite blob [" + blobName + "]", relevantActions.get(0).data, data); + return; // No need to add a write for this since we didn't change content + } + } + context.actions.add(new BlobStoreAction(Operation.PUT, blobPath, data)); + } + } + + @Override + public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize, + final boolean failIfAlreadyExists) throws IOException { + writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists); + } + } + } +} diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java new file mode 100644 index 0000000000000..81934fe93bd8a --- /dev/null +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java @@ -0,0 +1,181 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.snapshots.mockstore; + +import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.env.Environment; +import org.elasticsearch.env.TestEnvironment; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; + +import static org.elasticsearch.env.Environment.PATH_HOME_SETTING; +import static org.elasticsearch.node.Node.NODE_NAME_SETTING; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.startsWith; +import static org.mockito.Mockito.mock; + +public class MockEventuallyConsistentRepositoryTests extends ESTestCase { + + private Environment environment; + + @Override + public void setUp() throws Exception { + super.setUp(); + final Path tempDir = createTempDir(); + final String nodeName = "testNode"; + environment = TestEnvironment.newEnvironment(Settings.builder() + .put(NODE_NAME_SETTING.getKey(), nodeName) + .put(PATH_HOME_SETTING.getKey(), tempDir.resolve(nodeName).toAbsolutePath()) + .put(Environment.PATH_REPO_SETTING.getKey(), tempDir.resolve("repo").toAbsolutePath()) + .build()); + } + + public void testReadAfterWriteConsistently() throws IOException { + MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); + try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( + new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), environment, + xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) { + repository.start(); + final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath()); + final String blobName = randomAlphaOfLength(10); + final int lengthWritten = randomIntBetween(1, 100); + final byte[] blobData = randomByteArrayOfLength(lengthWritten); + blobContainer.writeBlob(blobName, new ByteArrayInputStream(blobData), lengthWritten, true); + try (InputStream in = blobContainer.readBlob(blobName)) { + final byte[] readBytes = new byte[lengthWritten + 1]; + final int lengthSeen = in.read(readBytes); + assertThat(lengthSeen, equalTo(lengthWritten)); + assertArrayEquals(blobData, Arrays.copyOf(readBytes, lengthWritten)); + } + } + } + + public void testReadAfterWriteAfterReadThrows() throws IOException { + MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); + try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( + new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), environment, + xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) { + repository.start(); + final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath()); + final String blobName = randomAlphaOfLength(10); + final int lengthWritten = randomIntBetween(1, 100); + final byte[] blobData = randomByteArrayOfLength(lengthWritten); + expectThrows(NoSuchFileException.class, () -> blobContainer.readBlob(blobName)); + blobContainer.writeBlob(blobName, new ByteArrayInputStream(blobData), lengthWritten, true); + assertThrowsOnInconsistentRead(blobContainer, blobName); + } + } + + public void testReadAfterDeleteAfterWriteThrows() throws IOException { + MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); + try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( + new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), environment, + xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) { + repository.start(); + final BlobContainer blobContainer = repository.blobStore().blobContainer(repository.basePath()); + final String blobName = randomAlphaOfLength(10); + final int lengthWritten = randomIntBetween(1, 100); + final byte[] blobData = randomByteArrayOfLength(lengthWritten); + blobContainer.writeBlob(blobName, new ByteArrayInputStream(blobData), lengthWritten, true); + blobContainer.deleteBlob(blobName); + assertThrowsOnInconsistentRead(blobContainer, blobName); + blobStoreContext.forceConsistent(); + expectThrows(NoSuchFileException.class, () -> blobContainer.readBlob(blobName)); + } + } + + public void testOverwriteRandomBlobFails() throws IOException { + MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); + try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( + new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), environment, + xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) { + repository.start(); + final BlobContainer container = repository.blobStore().blobContainer(repository.basePath()); + final String blobName = randomAlphaOfLength(10); + final int lengthWritten = randomIntBetween(1, 100); + final byte[] blobData = randomByteArrayOfLength(lengthWritten); + container.writeBlob(blobName, new ByteArrayInputStream(blobData), lengthWritten, false); + final AssertionError assertionError = expectThrows(AssertionError.class, + () -> container.writeBlob(blobName, new ByteArrayInputStream(blobData), lengthWritten - 1, false)); + assertThat(assertionError.getMessage(), startsWith("Tried to overwrite blob [" + blobName +"]")); + } + } + + public void testOverwriteShardSnapBlobFails() throws IOException { + MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); + try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( + new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), environment, + xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) { + repository.start(); + final BlobContainer container = + repository.blobStore().blobContainer(repository.basePath().add("indices").add("someindex").add("0")); + final String blobName = BlobStoreRepository.SNAPSHOT_PREFIX + UUIDs.randomBase64UUID(); + final int lengthWritten = randomIntBetween(1, 100); + final byte[] blobData = randomByteArrayOfLength(lengthWritten); + container.writeBlob(blobName, new ByteArrayInputStream(blobData), lengthWritten, false); + final AssertionError assertionError = expectThrows(AssertionError.class, + () -> container.writeBlob(blobName, new ByteArrayInputStream(blobData), lengthWritten, false)); + assertThat(assertionError.getMessage(), equalTo("Shard level snap-{uuid} blobs should never be overwritten")); + } + } + + public void testOverwriteSnapshotInfoBlob() { + MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); + try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( + new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), environment, + xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) { + repository.start(); + + // We create a snap- blob for snapshot "foo" in the first generation + final SnapshotId snapshotId = new SnapshotId("foo", UUIDs.randomBase64UUID()); + repository.finalizeSnapshot(snapshotId, Collections.emptyList(), 1L, null, 5, Collections.emptyList(), + -1L, false, Collections.emptyMap()); + + // We try to write another snap- blob for "foo" in the next generation. It fails because the content differs. + final AssertionError assertionError = expectThrows(AssertionError.class, + () -> repository.finalizeSnapshot( + snapshotId, Collections.emptyList(), 1L, null, 6, Collections.emptyList(), + 0, false, Collections.emptyMap())); + assertThat(assertionError.getMessage(), equalTo("\nExpected: <6>\n but: was <5>")); + + // We try to write yet another snap- blob for "foo" in the next generation. + // It passes cleanly because the content of the blob except for the timestamps. + repository.finalizeSnapshot(snapshotId, Collections.emptyList(), 1L, null, 5, Collections.emptyList(), + 0, false, Collections.emptyMap()); + } + } + + private static void assertThrowsOnInconsistentRead(BlobContainer blobContainer, String blobName) { + final AssertionError assertionError = expectThrows(AssertionError.class, () -> blobContainer.readBlob(blobName)); + assertThat(assertionError.getMessage(), equalTo("Inconsistent read on [" + blobName + ']')); + } +}