-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add async blob read and download support using multiple streams
Signed-off-by: Kunal Kotwani <[email protected]>
- Loading branch information
1 parent
a08d588
commit 6a7e26c
Showing
15 changed files
with
727 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
46 changes: 46 additions & 0 deletions
46
server/src/main/java/org/opensearch/common/blobstore/stream/read/ReadContext.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.common.blobstore.stream.read; | ||
|
||
import org.opensearch.common.io.InputStreamContainer; | ||
|
||
import java.util.List; | ||
|
||
/** | ||
* ReadContext is used to encapsulate all data needed by <code>BlobContainer#readBlobAsync</code> | ||
* | ||
* @opensearch.internal | ||
*/ | ||
public class ReadContext { | ||
private final long blobSize; | ||
private final List<InputStreamContainer> partStreams; | ||
private final String blobChecksum; | ||
|
||
public ReadContext(long blobSize, List<InputStreamContainer> partStreams, String blobChecksum) { | ||
this.blobSize = blobSize; | ||
this.partStreams = partStreams; | ||
this.blobChecksum = blobChecksum; | ||
} | ||
|
||
public String getBlobChecksum() { | ||
return blobChecksum; | ||
} | ||
|
||
public int getNumberOfParts() { | ||
return partStreams.size(); | ||
} | ||
|
||
public long getBlobSize() { | ||
return blobSize; | ||
} | ||
|
||
public List<InputStreamContainer> getPartStreams() { | ||
return partStreams; | ||
} | ||
} |
42 changes: 42 additions & 0 deletions
42
...ain/java/org/opensearch/common/blobstore/stream/read/listener/FileCompletionListener.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.common.blobstore.stream.read.listener; | ||
|
||
import org.opensearch.core.action.ActionListener; | ||
|
||
import java.util.Collections; | ||
import java.util.HashSet; | ||
import java.util.Set; | ||
|
||
public class FileCompletionListener implements ActionListener<Integer> { | ||
private final int numberOfParts; | ||
private final String fileName; | ||
private final Set<Integer> completedParts; | ||
private final ActionListener<String> completionListener; | ||
|
||
public FileCompletionListener(int numberOfParts, String fileName, ActionListener<String> completionListener) { | ||
this.completedParts = Collections.synchronizedSet(new HashSet<>()); | ||
this.numberOfParts = numberOfParts; | ||
this.fileName = fileName; | ||
this.completionListener = completionListener; | ||
} | ||
|
||
@Override | ||
public void onResponse(Integer partNumber) { | ||
completedParts.add(partNumber); | ||
if (completedParts.size() == numberOfParts) { | ||
completionListener.onResponse(fileName); | ||
} | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
completionListener.onFailure(e); | ||
} | ||
} |
58 changes: 58 additions & 0 deletions
58
...c/main/java/org/opensearch/common/blobstore/stream/read/listener/ReadContextListener.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.common.blobstore.stream.read.listener; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.opensearch.action.support.ThreadedActionListener; | ||
import org.opensearch.common.blobstore.stream.read.ReadContext; | ||
import org.opensearch.core.action.ActionListener; | ||
import org.opensearch.threadpool.ThreadPool; | ||
|
||
import java.nio.file.Path; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
|
||
public class ReadContextListener implements ActionListener<ReadContext> { | ||
|
||
private final String fileName; | ||
private final Path fileLocation; | ||
private final ThreadPool threadPool; | ||
private final ActionListener<String> completionListener; | ||
private static final Logger logger = LogManager.getLogger(ReadContextListener.class); | ||
|
||
public ReadContextListener(String fileName, Path fileLocation, ThreadPool threadPool, ActionListener<String> completionListener) { | ||
this.fileName = fileName; | ||
this.fileLocation = fileLocation; | ||
this.threadPool = threadPool; | ||
this.completionListener = completionListener; | ||
} | ||
|
||
@Override | ||
public void onResponse(ReadContext readContext) { | ||
final int numParts = readContext.getNumberOfParts(); | ||
final AtomicBoolean anyPartStreamFailed = new AtomicBoolean(); | ||
FileCompletionListener fileCompletionListener = new FileCompletionListener(numParts, fileName, completionListener); | ||
|
||
for (int partNumber = 0; partNumber < numParts; partNumber++) { | ||
StreamCompletionListener streamCompletionListener = new StreamCompletionListener( | ||
partNumber, | ||
readContext.getPartStreams().get(partNumber), | ||
fileLocation, | ||
anyPartStreamFailed, | ||
fileCompletionListener | ||
); | ||
new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.GENERIC, streamCompletionListener, false).onResponse(null); | ||
} | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
completionListener.onFailure(e); | ||
} | ||
} |
88 changes: 88 additions & 0 deletions
88
...n/java/org/opensearch/common/blobstore/stream/read/listener/StreamCompletionListener.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.common.blobstore.stream.read.listener; | ||
|
||
import org.opensearch.common.io.Channels; | ||
import org.opensearch.common.io.InputStreamContainer; | ||
import org.opensearch.core.action.ActionListener; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.nio.ByteBuffer; | ||
import java.nio.channels.FileChannel; | ||
import java.nio.file.Files; | ||
import java.nio.file.Path; | ||
import java.nio.file.StandardOpenOption; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
|
||
public class StreamCompletionListener implements ActionListener<Void> { | ||
private final int partNumber; | ||
private final InputStreamContainer blobPartStreamContainer; | ||
private final Path fileLocation; | ||
private final AtomicBoolean anyPartStreamFailed; | ||
private final FileCompletionListener fileCompletionListener; | ||
|
||
// 8 MB buffer for transfer | ||
private static final int BUFFER_SIZE = 8 * 1024 * 2024; | ||
|
||
public StreamCompletionListener( | ||
int partNumber, | ||
InputStreamContainer blobPartStreamContainer, | ||
Path fileLocation, | ||
AtomicBoolean anyPartStreamFailed, | ||
FileCompletionListener fileCompletionListener | ||
) { | ||
this.partNumber = partNumber; | ||
this.blobPartStreamContainer = blobPartStreamContainer; | ||
this.fileLocation = fileLocation; | ||
this.anyPartStreamFailed = anyPartStreamFailed; | ||
this.fileCompletionListener = fileCompletionListener; | ||
} | ||
|
||
@Override | ||
public void onResponse(Void unused) { | ||
// Ensures no writes to the file if any stream fails. | ||
if (!anyPartStreamFailed.get()) { | ||
try (FileChannel outputFileChannel = FileChannel.open(fileLocation, StandardOpenOption.WRITE, StandardOpenOption.CREATE)) { | ||
try (InputStream inputStream = blobPartStreamContainer.getInputStream()) { | ||
outputFileChannel.position(blobPartStreamContainer.getOffset()); | ||
|
||
final byte[] buffer = new byte[BUFFER_SIZE]; | ||
ByteBuffer byteBuffer = ByteBuffer.wrap(buffer); | ||
int bytesRead; | ||
|
||
while ((bytesRead = inputStream.read(buffer)) != -1) { | ||
byteBuffer.limit(bytesRead); | ||
Channels.writeToChannel(byteBuffer, outputFileChannel); | ||
byteBuffer.clear(); | ||
} | ||
} | ||
} catch (IOException e) { | ||
onFailure(e); | ||
return; | ||
} | ||
fileCompletionListener.onResponse(partNumber); | ||
} | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
try { | ||
if (Files.exists(fileLocation)) { | ||
Files.delete(fileLocation); | ||
} | ||
} catch (IOException ex) { | ||
// Die silently | ||
} | ||
if (!anyPartStreamFailed.get()) { | ||
anyPartStreamFailed.compareAndSet(false, true); | ||
fileCompletionListener.onFailure(e); | ||
} | ||
} | ||
} |
14 changes: 14 additions & 0 deletions
14
server/src/main/java/org/opensearch/common/blobstore/stream/read/listener/package-info.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
/** | ||
* Abstractions for stream based file reads from the blob store. | ||
* Adds listeners for performing the necessary async read operations to perform | ||
* multi stream reads for blobs from the container. | ||
* */ | ||
package org.opensearch.common.blobstore.stream.read.listener; |
Oops, something went wrong.