Skip to content

Commit

Permalink
Ensures cleanup of temporary index-* generational blobs during snapsh…
Browse files Browse the repository at this point in the history
…otting (#21469)

Ensures pending index-* blobs are deleted when snapshotting.  The
index-* blobs are generational files that maintain the snapshots
in the repository.  To write these atomically, we first write a
`pending-index-*` blob, then move it to `index-*`, which also deletes
`pending-index-*` in case its not a file-system level move (e.g.
S3 repositories) .  For example, to write the 5th generation of the
index blob for the repository, we would first write the bytes to
`pending-index-5` and then move `pending-index-5` to `index-5`.  It is
possible that we fail after writing `pending-index-5`, but before
moving it to `index-5` or deleting `pending-index-5`.  In this case,
we will have a dangling `pending-index-5` blob laying around.  Since
snapshot #5 would have failed, the next snapshot assumes a generation
number of 5, so it tries to write to `index-5`, which first tries to
write to `pending-index-5` before moving the blob to `index-5`.  Since
`pending-index-5` is leftover from the previous failure, the snapshot
fails as it cannot overwrite this blob.

This commit solves the problem by first, adding a UUID to the
`pending-index-*` blobs, and secondly, strengthen the logic around
failure to write the `index-*` generational blob to ensure pending
files are deleted on cleanup.

Closes #21462
  • Loading branch information
Ali Beyad committed Nov 11, 2016
1 parent f91c8d4 commit 6966fa5
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,11 @@ public interface BlobContainer {
Map<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) throws IOException;

/**
* Atomically renames the source blob into the target blob. If the source blob does not exist or the
* target blob already exists, an exception is thrown.
* Renames the source blob into the target blob. If the source blob does not exist or the
* target blob already exists, an exception is thrown. Atomicity of the move operation
* can only be guaranteed on an implementation-by-implementation basis. The only current
* implementation of {@link BlobContainer} for which atomicity can be guaranteed is the
* {@link org.elasticsearch.common.blobstore.fs.FsBlobContainer}.
*
* @param sourceBlobName
* The blob to rename.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -867,15 +867,17 @@ private long listBlobsToGetLatestIndexId() throws IOException {
}

private void writeAtomic(final String blobName, final BytesReference bytesRef) throws IOException {
final String tempBlobName = "pending-" + blobName;
final String tempBlobName = "pending-" + blobName + "-" + UUIDs.randomBase64UUID();
try (InputStream stream = bytesRef.streamInput()) {
snapshotsBlobContainer.writeBlob(tempBlobName, stream, bytesRef.length());
}
try {
snapshotsBlobContainer.move(tempBlobName, blobName);
} catch (IOException ex) {
// Move failed - try cleaning up
snapshotsBlobContainer.deleteBlob(tempBlobName);
// temporary blob creation or move failed - try cleaning up
try {
snapshotsBlobContainer.deleteBlob(tempBlobName);
} catch (IOException e) {
ex.addSuppressed(e);
}
throw ex;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2672,4 +2672,53 @@ public void testSnapshotCanceledOnRemovedShard() throws Exception {
assertEquals("IndexShardSnapshotFailedException[Aborted]", snapshotInfo.shardFailures().get(0).reason());
}

public void testSnapshotSucceedsAfterSnapshotFailure() throws Exception {
logger.info("--> creating repository");
final Path repoPath = randomRepoPath();
assertAcked(client().admin().cluster().preparePutRepository("test-repo").setType("mock").setVerify(false).setSettings(
Settings.builder().put("location", repoPath).put("random_control_io_exception_rate", randomIntBetween(5, 20) / 100f)));

logger.info("--> indexing some data");
createIndex("test-idx");
ensureGreen();
final int numDocs = randomIntBetween(1, 5);
for (int i = 0; i < numDocs; i++) {
index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();
assertThat(client().prepareSearch("test-idx").setSize(0).get().getHits().totalHits(), equalTo((long) numDocs));

logger.info("--> snapshot with potential I/O failures");
try {
CreateSnapshotResponse createSnapshotResponse =
client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true)
.setIndices("test-idx")
.get();
if (createSnapshotResponse.getSnapshotInfo().totalShards() != createSnapshotResponse.getSnapshotInfo().successfulShards()) {
assertThat(getFailureCount("test-repo"), greaterThan(0L));
assertThat(createSnapshotResponse.getSnapshotInfo().shardFailures().size(), greaterThan(0));
for (SnapshotShardFailure shardFailure : createSnapshotResponse.getSnapshotInfo().shardFailures()) {
assertThat(shardFailure.reason(), containsString("Random IOException"));
}
}
} catch (Exception ex) {
// sometimes, the snapshot will fail with a top level I/O exception
assertThat(ExceptionsHelper.stackTrace(ex), containsString("Random IOException"));
}

logger.info("--> snapshot with no I/O failures");
assertAcked(client().admin().cluster().preparePutRepository("test-repo-2").setType("mock").setVerify(false).setSettings(
Settings.builder().put("location", repoPath)));
CreateSnapshotResponse createSnapshotResponse =
client().admin().cluster().prepareCreateSnapshot("test-repo-2", "test-snap-2")
.setWaitForCompletion(true)
.setIndices("test-idx")
.get();
assertEquals(0, createSnapshotResponse.getSnapshotInfo().failedShards());
GetSnapshotsResponse getSnapshotsResponse = client().admin().cluster().prepareGetSnapshots("test-repo-2")
.addSnapshots("test-snap-2").get();
assertEquals(SnapshotState.SUCCESS, getSnapshotsResponse.getSnapshots().get(0).state());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -321,14 +321,20 @@ public Map<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) throws

@Override
public void move(String sourceBlob, String targetBlob) throws IOException {
// simulate a non-atomic move, since many blob container implementations
// will not have an atomic move, and we should be able to handle that
maybeIOExceptionOrBlock(targetBlob);
super.move(sourceBlob, targetBlob);
super.writeBlob(targetBlob, super.readBlob(sourceBlob), 0L);
super.deleteBlob(sourceBlob);
}

@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
maybeIOExceptionOrBlock(blobName);
super.writeBlob(blobName, inputStream, blobSize);
// for network based repositories, the blob may have been written but we may still
// get an error with the client connection, so an IOException here simulates this
maybeIOExceptionOrBlock(blobName);
}
}
}
Expand Down

0 comments on commit 6966fa5

Please sign in to comment.