Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Upload blob from input stream #14097

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.gateway.remote.model;

import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.common.remote.RemoteWritableEntityStore;
import org.opensearch.common.remote.RemoteWriteableEntity;
Expand Down Expand Up @@ -54,15 +55,20 @@ public void writeAsync(final U entity, final ActionListener<Void> listener) {
try (InputStream inputStream = entity.serialize()) {
BlobPath blobPath = getBlobPathForUpload(entity);
entity.setFullBlobName(blobPath);
// TODO uncomment below logic after merging PR https://github.com/opensearch-project/OpenSearch/pull/13836
// transferService.uploadBlob(inputStream, getBlobPathForUpload(entity), entity.getBlobFileName(), WritePriority.URGENT,
// listener);
transferService.uploadBlob(
inputStream,
getBlobPathForUpload(entity),
entity.getBlobFileName(),
WritePriority.URGENT,
listener
);
}
} catch (Exception e) {
listener.onFailure(e);
}
}

@Override
public T read(final U entity) throws IOException {
// TODO Add timing logs and tracing
assert entity.getFullBlobName() != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.IndexInput;
import org.opensearch.action.ActionRunnable;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
Expand All @@ -19,11 +20,13 @@
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.blobstore.InputStreamWithMetadata;
import org.opensearch.common.blobstore.stream.write.WriteContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.blobstore.transfer.RemoteTransferContainer;
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeFileInputStream;
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.store.exception.ChecksumCombinationException;
import org.opensearch.index.translog.ChannelFactory;
import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -41,6 +44,7 @@
import java.util.Set;

import static org.opensearch.common.blobstore.BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC;
import static org.opensearch.common.blobstore.transfer.RemoteTransferContainer.checksumOfChecksum;
import static org.opensearch.index.translog.transfer.TranslogTransferManager.CHECKPOINT_FILE_DATA_KEY;

/**
Expand All @@ -53,6 +57,7 @@ public class BlobStoreTransferService implements TransferService {
private final BlobStore blobStore;
private final ThreadPool threadPool;

private static final int CHECKSUM_BYTES_LENGTH = 8;
private static final Logger logger = LogManager.getLogger(BlobStoreTransferService.class);

public BlobStoreTransferService(BlobStore blobStore, ThreadPool threadPool) {
Expand Down Expand Up @@ -108,6 +113,40 @@ public void uploadBlobs(

}

@Override
public void uploadBlob(
InputStream inputStream,
Iterable<String> remotePath,
String fileName,
WritePriority writePriority,
ActionListener<Void> listener
) throws IOException {
assert remotePath instanceof BlobPath;
BlobPath blobPath = (BlobPath) remotePath;
final BlobContainer blobContainer = blobStore.blobContainer(blobPath);
if (blobContainer instanceof AsyncMultiStreamBlobContainer == false) {
blobContainer.writeBlob(fileName, inputStream, inputStream.available(), false);
listener.onResponse(null);
return;
}
final String resourceDescription = "BlobStoreTransferService.uploadBlob(blob=\"" + fileName + "\")";
byte[] bytes = inputStream.readAllBytes();
try (IndexInput input = new ByteArrayIndexInput(resourceDescription, bytes)) {
long expectedChecksum = computeChecksum(input, resourceDescription);
uploadBlobAsyncInternal(
fileName,
fileName,
bytes.length,
blobPath,
writePriority,
(size, position) -> new OffsetRangeIndexInputStream(input, size, position),
expectedChecksum,
listener,
null
);
}
}

// Builds a metadata map containing the Base64-encoded checkpoint file data associated with a translog file.
static Map<String, String> buildTransferFileMetadata(InputStream metadataInputStream) throws IOException {
Map<String, String> metadata = new HashMap<>();
Expand Down Expand Up @@ -150,37 +189,23 @@ private void uploadBlob(
try (FileChannel channel = channelFactory.open(fileSnapshot.getPath(), StandardOpenOption.READ)) {
contentLength = channel.size();
}
boolean remoteIntegrityEnabled = false;
BlobContainer blobContainer = blobStore.blobContainer(blobPath);
if (blobContainer instanceof AsyncMultiStreamBlobContainer) {
remoteIntegrityEnabled = ((AsyncMultiStreamBlobContainer) blobContainer).remoteIntegrityCheckSupported();
}
RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
ActionListener<Void> completionListener = ActionListener.wrap(resp -> listener.onResponse(fileSnapshot), ex -> {
logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), ex);
listener.onFailure(new FileTransferException(fileSnapshot, ex));
});

Objects.requireNonNull(fileSnapshot.getChecksum());
uploadBlobAsyncInternal(
fileSnapshot.getName(),
fileSnapshot.getName(),
contentLength,
true,
blobPath,
writePriority,
(size, position) -> new OffsetRangeFileInputStream(fileSnapshot.getPath(), size, position),
Objects.requireNonNull(fileSnapshot.getChecksum()),
remoteIntegrityEnabled,
fileSnapshot.getChecksum(),
completionListener,
metadata
);
ActionListener<Void> completionListener = ActionListener.wrap(resp -> listener.onResponse(fileSnapshot), ex -> {
logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), ex);
listener.onFailure(new FileTransferException(fileSnapshot, ex));
});

completionListener = ActionListener.runBefore(completionListener, () -> {
try {
remoteTransferContainer.close();
} catch (Exception e) {
logger.warn("Error occurred while closing streams", e);
}
});

WriteContext writeContext = remoteTransferContainer.createWriteContext();
((AsyncMultiStreamBlobContainer) blobStore.blobContainer(blobPath)).asyncBlobUpload(writeContext, completionListener);

} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), e);
Expand All @@ -195,6 +220,40 @@ private void uploadBlob(

}

private void uploadBlobAsyncInternal(
String fileName,
String remoteFileName,
long contentLength,
BlobPath blobPath,
WritePriority writePriority,
RemoteTransferContainer.OffsetRangeInputStreamSupplier inputStreamSupplier,
long expectedChecksum,
ActionListener<Void> completionListener,
Map<String, String> metadata
) throws IOException {
BlobContainer blobContainer = blobStore.blobContainer(blobPath);
assert blobContainer instanceof AsyncMultiStreamBlobContainer;
boolean remoteIntegrityEnabled = ((AsyncMultiStreamBlobContainer) blobContainer).remoteIntegrityCheckSupported();
try (
RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
fileName,
remoteFileName,
contentLength,
true,
writePriority,
inputStreamSupplier,
expectedChecksum,
remoteIntegrityEnabled,
metadata
)
) {
((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(
remoteTransferContainer.createWriteContext(),
completionListener
);
}
}

@Override
public InputStream downloadBlob(Iterable<String> path, String fileName) throws IOException {
return blobStore.blobContainer((BlobPath) path).readBlob(fileName);
Expand Down Expand Up @@ -276,4 +335,19 @@ public void listAllInSortedOrderAsync(
threadPool.executor(threadpoolName).execute(() -> { listAllInSortedOrder(path, filenamePrefix, limit, listener); });
}

private static long computeChecksum(IndexInput indexInput, String resourceDescription) throws ChecksumCombinationException {
long expectedChecksum;
try {
expectedChecksum = checksumOfChecksum(indexInput.clone(), CHECKSUM_BYTES_LENGTH);
} catch (Exception e) {
throw new ChecksumCombinationException(
"Potentially corrupted file: Checksum combination failed while combining stored checksum "
+ "and calculated checksum of stored checksum",
resourceDescription,
e
);
}
return expectedChecksum;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,23 @@ void uploadBlobs(
*/
void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable<String> remotePath, WritePriority writePriority) throws IOException;

/**
* Reads the input stream and uploads as a blob
* @param inputStream the stream to read from
* @param remotePath the remote path where upload should be made
* @param blobName the name of blob file
* @param writePriority Priority by which content needs to be written.
* @param listener the callback to be invoked once uploads complete successfully/fail
* @throws IOException the exception thrown while uploading
*/
void uploadBlob(
InputStream inputStream,
Iterable<String> remotePath,
String blobName,
WritePriority writePriority,
ActionListener<Void> listener
) throws IOException;

void deleteBlobs(Iterable<String> path, List<String> fileNames) throws IOException;

/**
Expand Down
Loading
Loading