Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Ability to Write a BytesReference to BlobContainer #66501

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.bytes.BytesReference;

import java.io.BufferedInputStream;
import java.io.FileNotFoundException;
Expand Down Expand Up @@ -132,7 +133,7 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
}

@Override
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
throw new UnsupportedOperationException("URL repository doesn't support this operation");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -36,9 +37,7 @@
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
import org.elasticsearch.rest.RestStatus;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Collection;
Expand Down Expand Up @@ -221,10 +220,8 @@ public void testLargeBlobCountDeletion() throws Exception {
final BlobContainer container = store.blobContainer(new BlobPath());
for (int i = 0; i < numberOfBlobs; i++) {
byte[] bytes = randomBytes(randomInt(100));
try (InputStream inputStream = new ByteArrayInputStream(bytes)) {
String blobName = randomAlphaOfLength(10);
container.writeBlob(blobName, inputStream, bytes.length, false);
}
String blobName = randomAlphaOfLength(10);
container.writeBlob(blobName, new BytesArray(bytes), false);
}

container.delete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.bytes.BytesReference;

import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -101,8 +102,8 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
}

@Override
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
writeBlob(blobName, bytes, failIfAlreadyExists);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.common.blobstore.BlobStoreException;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.bytes.BytesReference;

import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -83,8 +84,8 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
}

@Override
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
writeBlob(blobName, bytes, failIfAlreadyExists);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.common.blobstore.fs.FsBlobContainer;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.blobstore.support.PlainBlobMetadata;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.repositories.hdfs.HdfsBlobStore.Operation;

Expand Down Expand Up @@ -150,12 +151,28 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
}

@Override
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
Path blob = new Path(path, blobName);
// we pass CREATE, which means it fails if a blob already exists.
final EnumSet<CreateFlag> flags = failIfAlreadyExists ? EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK)
: EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK);
store.execute((Operation<Void>) fileContext -> {
try {
writeToPath(bytes, blob, fileContext, flags);
} catch (org.apache.hadoop.fs.FileAlreadyExistsException faee) {
throw new FileAlreadyExistsException(blob.toString(), null, faee.getMessage());
}
return null;
});
}

@Override
public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
final String tempBlob = FsBlobContainer.tempBlobName(blobName);
final Path tempBlobPath = new Path(path, tempBlob);
final Path blob = new Path(path, blobName);
store.execute((Operation<Void>) fileContext -> {
writeToPath(inputStream, blobSize, fileContext, tempBlobPath, EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK));
writeToPath(bytes, tempBlobPath, fileContext, EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK));
try {
fileContext.rename(tempBlobPath, blob, failIfAlreadyExists ? Options.Rename.NONE : Options.Rename.OVERWRITE);
} catch (org.apache.hadoop.fs.FileAlreadyExistsException faee) {
Expand All @@ -165,6 +182,13 @@ public void writeBlobAtomic(String blobName, InputStream inputStream, long blobS
});
}

private void writeToPath(BytesReference bytes, Path blobPath, FileContext fileContext,
EnumSet<CreateFlag> createFlags) throws IOException {
try (FSDataOutputStream stream = fileContext.create(blobPath, createFlags)) {
bytes.writeTo(stream);
}
}

private void writeToPath(InputStream inputStream, long blobSize, FileContext fileContext, Path blobPath,
EnumSet<CreateFlag> createFlags) throws IOException {
final byte[] buffer = new byte[blobSize < bufferSize ? Math.toIntExact(blobSize) : bufferSize];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -158,12 +157,9 @@ public void testEnforcedCooldownPeriod() throws IOException {
SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.minimumCompatibilityVersion()));
final BytesReference serialized = BytesReference.bytes(modifiedRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(),
SnapshotsService.OLD_SNAPSHOT_FORMAT));
PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.run(f, () -> {
try (InputStream stream = serialized.streamInput()) {
PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.run(f, () ->
repository.blobStore().blobContainer(repository.basePath()).writeBlobAtomic(
BlobStoreRepository.INDEX_FILE_PREFIX + modifiedRepositoryData.getGenId(), stream, serialized.length(), true);
}
})));
BlobStoreRepository.INDEX_FILE_PREFIX + modifiedRepositoryData.getGenId(), serialized, true))));

final String newSnapshotName = "snapshot-new";
final long beforeThrottledSnapshot = repository.threadPool().relativeTimeInNanos();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.blobstore.support.PlainBlobMetadata;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
Expand Down Expand Up @@ -143,8 +144,8 @@ long getLargeBlobThresholdInBytes() {
}

@Override
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
writeBlob(blobName, bytes, failIfAlreadyExists);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.RepositoryCleanupInProgress;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ESIntegTestCase;

import java.io.ByteArrayInputStream;
import java.util.concurrent.ExecutionException;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFutureThrows;
Expand Down Expand Up @@ -85,7 +85,7 @@ private String startBlockedCleanup(String repoName) throws Exception {
logger.info("--> creating a garbage data blob");
final PlainActionFuture<Void> garbageFuture = PlainActionFuture.newFuture();
repository.threadPool().generic().execute(ActionRunnable.run(garbageFuture, () -> repository.blobStore()
.blobContainer(repository.basePath()).writeBlob("snap-foo.dat", new ByteArrayInputStream(new byte[1]), 1, true)));
.blobContainer(repository.basePath()).writeBlob("snap-foo.dat", new BytesArray(new byte[1]), true)));
garbageFuture.get();

blockMasterFromFinalizingSnapshotOnIndexFile(repoName);
Expand Down Expand Up @@ -120,7 +120,7 @@ public void testCleanupOldIndexN() throws ExecutionException, InterruptedExcepti
final int generation = i;
repository.threadPool().generic().execute(ActionRunnable.run(createOldIndexNFuture, () -> repository.blobStore()
.blobContainer(repository.basePath()).writeBlob(BlobStoreRepository.INDEX_FILE_PREFIX + generation,
new ByteArrayInputStream(new byte[1]), 1, true)));
new BytesArray(new byte[1]), true)));
createOldIndexNFuture.get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.elasticsearch.common.blobstore;

import org.elasticsearch.common.bytes.BytesReference;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.FileAlreadyExistsException;
Expand Down Expand Up @@ -110,25 +112,35 @@ default long readBlobPreferredLength() {
void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException;

/**
* Reads blob content from the input stream and writes it to the container in a new blob with the given name,
* using an atomic write operation if the implementation supports it.
* Reads blob content from a {@link BytesReference} and writes it to the container in a new blob with the given name.
*
* This method assumes the container does not already contain a blob of the same blobName. If a blob by the
* same name already exists, the operation will fail and an {@link IOException} will be thrown.
* @param blobName
* The name of the blob to write the contents of the input stream to.
* @param bytes
* The bytes to write
* @param failIfAlreadyExists
* whether to throw a FileAlreadyExistsException if the given blob already exists
* @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
*/
default void writeBlob(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
writeBlob(blobName, bytes.streamInput(), bytes.length(), failIfAlreadyExists);
}

/**
* Reads blob content from a {@link BytesReference} and writes it to the container in a new blob with the given name,
* using an atomic write operation if the implementation supports it.
*
* @param blobName
* The name of the blob to write the contents of the input stream to.
* @param inputStream
* The input stream from which to retrieve the bytes to write to the blob.
* @param blobSize
* The size of the blob to be written, in bytes. It is implementation dependent whether
* this value is used in writing the blob to the repository.
* @param bytes
* The bytes to write
* @param failIfAlreadyExists
* whether to throw a FileAlreadyExistsException if the given blob already exists
* @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
*/
void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException;
void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException;

/**
* Deletes this container and all its contents from the repository.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.blobstore.support.PlainBlobMetadata;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.core.internal.io.IOUtils;

Expand Down Expand Up @@ -190,12 +191,26 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
}

@Override
public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize, boolean failIfAlreadyExists)
throws IOException {
public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
final Path file = path.resolve(blobName);
try {
writeToPath(bytes, file);
} catch (FileAlreadyExistsException faee) {
if (failIfAlreadyExists) {
throw faee;
}
deleteBlobsIgnoringIfNotExists(Collections.singletonList(blobName));
writeToPath(bytes, file);
}
IOUtils.fsync(path, true);
}

@Override
public void writeBlobAtomic(final String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
final String tempBlob = tempBlobName(blobName);
final Path tempBlobPath = path.resolve(tempBlob);
try {
writeToPath(inputStream, tempBlobPath, blobSize);
writeToPath(bytes, tempBlobPath);
moveBlobAtomic(tempBlob, blobName, failIfAlreadyExists);
} catch (IOException ex) {
try {
Expand All @@ -209,6 +224,13 @@ public void writeBlobAtomic(final String blobName, final InputStream inputStream
}
}

private void writeToPath(BytesReference bytes, Path tempBlobPath) throws IOException {
try (OutputStream outputStream = Files.newOutputStream(tempBlobPath, StandardOpenOption.CREATE_NEW)) {
bytes.writeTo(outputStream);
}
IOUtils.fsync(tempBlobPath, false);
}

private void writeToPath(InputStream inputStream, Path tempBlobPath, long blobSize) throws IOException {
try (OutputStream outputStream = Files.newOutputStream(tempBlobPath, StandardOpenOption.CREATE_NEW)) {
final int bufferSize = blobStore.bufferSizeInBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.common.blobstore.BlobMetadata;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.bytes.BytesReference;

import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -72,8 +73,8 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
}

@Override
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
delegate.writeBlobAtomic(blobName, inputStream, blobSize, failIfAlreadyExists);
public void writeBlobAtomic(String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
delegate.writeBlobAtomic(blobName, bytes, failIfAlreadyExists);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1273,9 +1273,7 @@ public String startVerification() {
byte[] testBytes = Strings.toUTF8Bytes(seed);
BlobContainer testContainer = blobStore().blobContainer(basePath().add(testBlobPrefix(seed)));
BytesArray bytes = new BytesArray(testBytes);
try (InputStream stream = bytes.streamInput()) {
testContainer.writeBlobAtomic("master.dat", stream, bytes.length(), true);
}
testContainer.writeBlobAtomic("master.dat", new BytesArray(testBytes), true);
return seed;
}
} catch (Exception exp) {
Expand Down Expand Up @@ -1880,11 +1878,9 @@ private long latestGeneration(Collection<String> rootBlobs) {

private void writeAtomic(BlobContainer container, final String blobName, final BytesReference bytesRef,
boolean failIfAlreadyExists) throws IOException {
try (InputStream stream = bytesRef.streamInput()) {
logger.trace(() ->
new ParameterizedMessage("[{}] Writing [{}] to {} atomically", metadata.name(), blobName, container.path()));
container.writeBlobAtomic(blobName, stream, bytesRef.length(), failIfAlreadyExists);
}
logger.trace(() ->
new ParameterizedMessage("[{}] Writing [{}] to {} atomically", metadata.name(), blobName, container.path()));
container.writeBlobAtomic(blobName, bytesRef, failIfAlreadyExists);
}

@Override
Expand Down Expand Up @@ -2291,10 +2287,7 @@ public void verify(String seed, DiscoveryNode localNode) {
} else {
BlobContainer testBlobContainer = blobStore().blobContainer(basePath().add(testBlobPrefix(seed)));
try {
BytesArray bytes = new BytesArray(seed);
try (InputStream stream = bytes.streamInput()) {
testBlobContainer.writeBlob("data-" + localNode.getId() + ".dat", stream, bytes.length(), true);
}
testBlobContainer.writeBlob("data-" + localNode.getId() + ".dat", new BytesArray(seed), true);
} catch (Exception exp) {
throw new RepositoryVerificationException(metadata.name(), "store location [" + blobStore() +
"] is not accessible on the node [" + localNode + "]", exp);
Expand Down
Loading