Skip to content

Commit

Permalink
Add BlobContainer.writeBlobAtomic()
Browse files Browse the repository at this point in the history
This commit adds a new writeBlobAtomic() method to the BlobContainer
interface that can be implemented by repository implementations which
support atomic writes operations.

When the repository does not support atomic writes, this method just
delegate the write operation to the usual writeBlob() method.

Related to elastic#30680
  • Loading branch information
tlrx committed Jun 5, 2018
1 parent c7c0acc commit c2ff0c9
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,28 @@ public interface BlobContainer {
*/
void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException;

/**
* Reads blob content from the input stream and writes it to the container in a new blob with the given name,
* using an atomic write operation if then implementation supports it. When atomic writes are not supported,
* this method delegates to {@link #writeBlob(String, InputStream, long)}.
*
* This method assumes the container does not already contain a blob of the same blobName. If a blob by the
* same name already exists, the operation will fail and an {@link IOException} will be thrown.
*
* @param blobName
* The name of the blob to write the contents of the input stream to.
* @param inputStream
* The input stream from which to retrieve the bytes to write to the blob.
* @param blobSize
* The size of the blob to be written, in bytes. It is implementation dependent whether
* this value is used in writing the blob to the repository.
* @throws FileAlreadyExistsException if a blob by the same name already exists
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
*/
default void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize) throws IOException {
writeBlob(blobName, inputStream, blobSize);
}

/**
* Deletes a blob with giving name, if the blob exists. If the blob does not exist,
* this method throws a NoSuchFileException.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@

package org.elasticsearch.common.blobstore.fs;

import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.core.internal.io.Streams;

import java.io.BufferedInputStream;
Expand Down Expand Up @@ -131,6 +132,28 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize) t
IOUtils.fsync(path, true);
}

@Override
public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize) throws IOException {
final Path tempBlobPath = path.resolve("pending-" + blobName + "-" + UUIDs.randomBase64UUID());
try {
try (OutputStream outputStream = Files.newOutputStream(tempBlobPath, StandardOpenOption.CREATE_NEW)) {
Streams.copy(inputStream, outputStream);
}
IOUtils.fsync(tempBlobPath, false);

final Path blobPath = path.resolve(blobName);
// If the target file exists then Files.move() behaviour is implementation specific
// the existing file might be replaced or this method fails by throwing an IOException.
if (Files.exists(blobPath)) {
throw new FileAlreadyExistsException("blob [" + blobPath + "] already exists, cannot overwrite");
}
Files.move(tempBlobPath, blobPath, StandardCopyOption.ATOMIC_MOVE);
} finally {
IOUtils.deleteFilesIgnoringExceptions(tempBlobPath);
IOUtils.fsync(path, true);
}
}

@Override
public void move(String source, String target) throws IOException {
Path sourcePath = path.resolve(source);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -774,18 +774,8 @@ private long listBlobsToGetLatestIndexId() throws IOException {
}

private void writeAtomic(final String blobName, final BytesReference bytesRef) throws IOException {
final String tempBlobName = "pending-" + blobName + "-" + UUIDs.randomBase64UUID();
try (InputStream stream = bytesRef.streamInput()) {
snapshotsBlobContainer.writeBlob(tempBlobName, stream, bytesRef.length());
snapshotsBlobContainer.move(tempBlobName, blobName);
} catch (IOException ex) {
// temporary blob creation or move failed - try cleaning up
try {
snapshotsBlobContainer.deleteBlobIgnoringIfNotExists(tempBlobName);
} catch (IOException e) {
ex.addSuppressed(e);
}
throw ex;
snapshotsBlobContainer.writeBlobAtomic(blobName, stream, bytesRef.length());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.store.OutputStreamIndexOutput;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.bytes.BytesArray;
Expand Down Expand Up @@ -120,7 +121,7 @@ public T readBlob(BlobContainer blobContainer, String blobName) throws IOExcepti
}

/**
* Writes blob in atomic manner with resolving the blob name using {@link #blobName} and {@link #tempBlobName} methods.
* Writes blob in atomic manner with resolving the blob name using {@link #blobName} method.
* <p>
* The blob will be compressed and checksum will be written if required.
*
Expand All @@ -131,20 +132,12 @@ public T readBlob(BlobContainer blobContainer, String blobName) throws IOExcepti
* @param name blob name
*/
public void writeAtomic(T obj, BlobContainer blobContainer, String name) throws IOException {
String blobName = blobName(name);
String tempBlobName = tempBlobName(name);
writeBlob(obj, blobContainer, tempBlobName);
try {
blobContainer.move(tempBlobName, blobName);
} catch (IOException ex) {
// Move failed - try cleaning up
try {
blobContainer.deleteBlob(tempBlobName);
} catch (Exception e) {
ex.addSuppressed(e);
final String blobName = blobName(name);
writeTo(obj, blobName, bytesArray -> {
try (InputStream stream = bytesArray.streamInput()) {
blobContainer.writeBlobAtomic(blobName, stream, bytesArray.length());
}
throw ex;
}
});
}

/**
Expand All @@ -157,39 +150,32 @@ public void writeAtomic(T obj, BlobContainer blobContainer, String name) throws
* @param name blob name
*/
public void write(T obj, BlobContainer blobContainer, String name) throws IOException {
String blobName = blobName(name);
writeBlob(obj, blobContainer, blobName);
final String blobName = blobName(name);
writeTo(obj, blobName, bytesArray -> {
try (InputStream stream = bytesArray.streamInput()) {
blobContainer.writeBlob(blobName, stream, bytesArray.length());
}
});
}

/**
* Writes blob in atomic manner without resolving the blobName using using {@link #blobName} method.
* <p>
* The blob will be compressed and checksum will be written if required.
*
* @param obj object to be serialized
* @param blobContainer blob container
* @param blobName blob name
*/
protected void writeBlob(T obj, BlobContainer blobContainer, String blobName) throws IOException {
BytesReference bytes = write(obj);
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
private void writeTo(final T obj, final String blobName, final CheckedConsumer<BytesArray, IOException> consumer) throws IOException {
final BytesReference bytes = write(obj);
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
final String resourceDesc = "ChecksumBlobStoreFormat.writeBlob(blob=\"" + blobName + "\")";
try (OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(resourceDesc, blobName, byteArrayOutputStream, BUFFER_SIZE)) {
try (OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(resourceDesc, blobName, outputStream, BUFFER_SIZE)) {
CodecUtil.writeHeader(indexOutput, codec, VERSION);
try (OutputStream indexOutputOutputStream = new IndexOutputOutputStream(indexOutput) {
@Override
public void close() throws IOException {
// this is important since some of the XContentBuilders write bytes on close.
// in order to write the footer we need to prevent closing the actual index input.
} }) {
}
}) {
bytes.writeTo(indexOutputOutputStream);
}
CodecUtil.writeFooter(indexOutput);
}
BytesArray bytesArray = new BytesArray(byteArrayOutputStream.toByteArray());
try (InputStream stream = bytesArray.streamInput()) {
blobContainer.writeBlob(blobName, stream, bytesArray.length());
}
consumer.accept(new BytesArray(outputStream.toByteArray()));
}
}

Expand Down Expand Up @@ -222,10 +208,4 @@ protected void write(T obj, StreamOutput streamOutput) throws IOException {
builder.endObject();
}
}


protected String tempBlobName(String name) {
return TEMP_FILE_PREFIX + String.format(Locale.ROOT, blobNameFormat, name);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void waitForBlock(String node, String repository, TimeValue timeout) thro
}
Thread.sleep(100);
}
fail("Timeout!!!");
fail("Timeout waiting for node [" + node + "] to be blocked");
}

public SnapshotInfo waitForCompletion(String repository, String snapshotName, TimeValue timeout) throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,52 +224,16 @@ public void testAtomicWriteFailures() throws Exception {
IOException writeBlobException = expectThrows(IOException.class, () -> {
BlobContainer wrapper = new BlobContainerWrapper(blobContainer) {
@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
throw new IOException("Exception thrown in writeBlob() for " + blobName);
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize) throws IOException {
throw new IOException("Exception thrown in writeBlobAtomic() for " + blobName);
}
};
checksumFormat.writeAtomic(blobObj, wrapper, name);
});

assertEquals("Exception thrown in writeBlob() for pending-" + name, writeBlobException.getMessage());
assertEquals("Exception thrown in writeBlobAtomic() for " + name, writeBlobException.getMessage());
assertEquals(0, writeBlobException.getSuppressed().length);
}
{
IOException moveException = expectThrows(IOException.class, () -> {
BlobContainer wrapper = new BlobContainerWrapper(blobContainer) {
@Override
public void move(String sourceBlobName, String targetBlobName) throws IOException {
throw new IOException("Exception thrown in move() for " + sourceBlobName);
}
};
checksumFormat.writeAtomic(blobObj, wrapper, name);
});
assertEquals("Exception thrown in move() for pending-" + name, moveException.getMessage());
assertEquals(0, moveException.getSuppressed().length);
}
{
IOException moveThenDeleteException = expectThrows(IOException.class, () -> {
BlobContainer wrapper = new BlobContainerWrapper(blobContainer) {
@Override
public void move(String sourceBlobName, String targetBlobName) throws IOException {
throw new IOException("Exception thrown in move() for " + sourceBlobName);
}

@Override
public void deleteBlob(String blobName) throws IOException {
throw new IOException("Exception thrown in deleteBlob() for " + blobName);
}
};
checksumFormat.writeAtomic(blobObj, wrapper, name);
});

assertEquals("Exception thrown in move() for pending-" + name, moveThenDeleteException.getMessage());
assertEquals(1, moveThenDeleteException.getSuppressed().length);

final Throwable suppressedThrowable = moveThenDeleteException.getSuppressed()[0];
assertTrue(suppressedThrowable instanceof IOException);
assertEquals("Exception thrown in deleteBlob() for pending-" + name, suppressedThrowable.getMessage());
}
}

protected BlobStore createTestBlobStore() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,21 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize) t
delegate.writeBlob(blobName, inputStream, blobSize);
}

@Override
public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize) throws IOException {
delegate.writeBlobAtomic(blobName, inputStream, blobSize);
}

@Override
public void deleteBlob(String blobName) throws IOException {
delegate.deleteBlob(blobName);
}

@Override
public void deleteBlobIgnoringIfNotExists(final String blobName) throws IOException {
delegate.deleteBlobIgnoringIfNotExists(blobName);
}

@Override
public Map<String, BlobMetaData> listBlobs() throws IOException {
return delegate.listBlobs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,6 @@

package org.elasticsearch.snapshots.mockstore;

import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.nio.file.Path;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;

import com.carrotsearch.randomizedtesting.RandomizedContext;
import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.ElasticsearchException;
Expand All @@ -49,11 +35,25 @@
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.snapshots.SnapshotId;

import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.nio.file.Path;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;

public class MockRepository extends FsRepository {

public static class Plugin extends org.elasticsearch.plugins.Plugin implements RepositoryPlugin {
Expand Down Expand Up @@ -325,6 +325,12 @@ public void deleteBlob(String blobName) throws IOException {
super.deleteBlob(blobName);
}

@Override
public void deleteBlobIgnoringIfNotExists(String blobName) throws IOException {
maybeIOExceptionOrBlock(blobName);
super.deleteBlobIgnoringIfNotExists(blobName);
}

@Override
public Map<String, BlobMetaData> listBlobs() throws IOException {
maybeIOExceptionOrBlock("");
Expand Down Expand Up @@ -365,6 +371,12 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize) t
maybeIOExceptionOrBlock(blobName);
}
}

@Override
public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize) throws IOException {
maybeIOExceptionOrBlock(blobName);
super.writeBlobAtomic(blobName, inputStream, blobSize);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,11 @@ public void testVerifyOverwriteFails() throws IOException {

protected void writeBlob(final BlobContainer container, final String blobName, final BytesArray bytesArray) throws IOException {
try (InputStream stream = bytesArray.streamInput()) {
container.writeBlob(blobName, stream, bytesArray.length());
if (randomBoolean()) {
container.writeBlob(blobName, stream, bytesArray.length());
} else {
container.writeBlobAtomic(blobName, stream, bytesArray.length());
}
}
}

Expand Down

0 comments on commit c2ff0c9

Please sign in to comment.