-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add interface changes for async repository downloads #9182
Add interface changes for async repository downloads #9182
Conversation
393c7a8
to
55ad51e
Compare
@Override | ||
public CompletableFuture<ReadContext> asyncBlobDownload(String blobName, boolean forceSingleStream) throws IOException { | ||
throw new UnsupportedOperationException("S3 BlobContainer currently does not support async blob downloads"); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we keep the interface consistent and use a call back listener instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was going to suggest that we add a backlog task for making writes compatible with Future
instead.
I think Future
provides a reduction in a lot of boilerplate code as well as a native Java interface support, than relying on the listener mechanism, which predates complete support for async programming natively.
It also makes the code a lot for forward looking and compatible with any new features that come in with Future
support.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for consistency. Given this is an internal API i think we are free to refactor this to use Futures after this lands. Or, we refactor the existing api first before adding download.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Listener takes care of blocking on the response and the future completion thread switch once the response becomes available. The users have to simply chain the callback and worry less about blocking for response. The heavy lifting for this is being done centrally rather than every consumer needing to handle this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought chained futures on return execute in a blocking fashion once you do a .join
or .get
.
In the interest of separating these two concerns (Listener/Future and Multipart downloads - the crux of this PR), I have created a new issue for this and would love to chat a bit more on Future
v/s Listener
pattern. It would be great to hear a bit more here @Bukhtawar - #9236
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @andrross! I am updating the API as we speak to make it consistent with the upload API. We can have this discussion here or on #9236 . I will summarize either ways.
I am planning on creating a backlog item for downloads and uploads before we make that API public if these approaches are functionally equivalent.
IMHO the big benefit is reduction in a lot of boilerplate code + native chaining (even with an array of futures) - all out of the box.
As seen from the example above, it is a lot more functional and readable. We can also rely on any Java API additions which support Future/CompletableFuture out of the box - making it a lot more Futureproof.
Sorry for the pun :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to clarify that an ActionListener
in no way restricts chaining. For instance look at the wrapper utilities that help with chaining.
ActionListener serves to be a common IO agnostic interface for any calls made across the service for instance any node to node communication like TransportService#sendRequest has a ResponseHandler very similar to ActionListener
in addition to the blocking TransportFuture
which finds very rare usage.
One of the reasons ActionListener is used extensively is because of the Netty framework that is non-blocking and most REST request modelled as a callback which doesn't require blocking on response but a callback that send the response over the channel once available.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One of the reasons ActionListener is used extensively
I think the main reason why ActionListener is used instead of CompletableFuture is that ActionListeners pre-date the introduction of CompletableFuture in Java 8. I think the question here is whether it is okay to start introducing CompletableFutures into new functionality (with the benefit of using a standard language primitive with all the built-in capabilities that go with it), or it is better to stay with ActionListener for consistency across the project, as well as possibility easier adaptability across different implementations in some cases. I think its mostly a style/consistency question.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not quite sure, but I'll choose to stick with status quo for better consistency unless we see a primitive that is missing and makes it unwieldy to maintain ActionListeners
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks all for the info here! I'll summarize and put it down on the other issue.
* @return A future of {@link ReadContext} object which serves the input streams and other metadata for the blob | ||
* @throws IOException if any of the input streams could not be requested, or reading metadata for requested blob fails | ||
*/ | ||
CompletableFuture<ReadContext> asyncBlobDownload(String blobName, boolean forceSingleStream) throws IOException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will prefer
void asyncBlobDownload(String blobName, boolean forceSingleStream, ActionListener<ReadContext>) throws IOException;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure we should have this abstraction. We needed it for upload as a bunch of streams was needed for upload to happen concurrently.
For multipart download cases, this can be handled in BlobContainer
itself. We don't need to expose it to the end user .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Bukhtawar Left a comment related your feedback here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure we should have this abstraction.
@gbbafna The current mechanism for blob stores is to expose the InputStream(s)
directly to the caller rather than dealing with files.
I did play around with files directly within the POC, but for maintaining separation of concerns for repositories, this mechanism seems to be more cohesive and consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree that we can expose InputStream
, but not in favor of InputStreams
. In this case every consumer will need to create multiple files and combine. I would prefer we do this heavy lifting inside BlobContainer
itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on an offline discussion with @gbbafna -
Single InputStream vs List of InputStreams
Background
The existing repository APIs return an InputStream to for the requested blob to be downloaded.
The repository itself has no constructs of files or downloads and simply pipes through the stream from the provider to OpenSearch core. (Ref: Here)
Current proposed solution
The current design for multi stream downloads proposes returning multiple, ordered streams back to the caller i.e. OpenSearch core, and letting the core handle reading from these streams as it deems fit.
Pros:
- The repository implementations are not exposed to any additional constructs which are not directly related to the repository
- Operations can be performed in O(n) space with some additional space for rolling the temp part data into a single segment file
Cons:
- Additional business logic performed by the repository will have to operate on parts of the segment instead of the complete file
Alternative solution
An alternative solution suggested here points to returning a single stream back to core, and performing the multistream calls within the repository plugins itself. A sample flow would be as follows -
- OpenSearch core requests for a blob using the new async API
- Plugin performs the necessary calls to the cloud provider to create multiple streams for the requested blob
- The plugin implementation streams these blobs into temp files and stores them on disk in a temporary location
- Once all streams are completed, the plugin stitches these temp files into a single file and returns the InputStream from this file to the caller - OpenSearch core
- The core code receives a single InputStream, reads from the stream and saves the data into a file within the segment directory
Pros:
- Core code deals with a single input stream as it used to previously
- The underlying details of how the files are fetched is abstracted from core
- Complete file fetches simplify any additional business logic which needs to be operated on with the entire file - like cryptographic operations
Cons:
- The plugin implementation needs exposure to files/folder structure on the local node which it is unaware of currently
- There is storage wastage since the files downloaded locally by the plugin will take up 1x additional space while the segment is copied into the local directory (needs 2x the space)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case every consumer will need to create multiple files and combine. I would prefer we do this heavy lifting inside BlobContainer itself.
@gbbafna I totally agree with this sentiment. However, is there a need to push the "download in parallel and stitch pieces into a single file" or even "download multiple streams in parallel" logic into the BlobContainer itself? Here is an alternative approach:
- In BlobContainer, we define
CompletableFuture<InputStream> readBlobAsync(String blobName, long position, long length)
(or use a listener-based approach if necessary) for each repository to implement - Create a utility method to do the heavy lifting (not sure exactly where this should live):
void downloadParallel(BlobContainer blobContainer, String blobName, long blobLength, String destinationFile, String checksum);
}
- In any place we need to download a remote file in parallel, we invoke the
downloadParallel
method and pass in the appropriate blob container.
To me, the "readBlobAsync" method is the only thing that is repository-specific, everything else about parallel download is business logic that should be common across all repository implementations. I'm curious if there's another reason to push more logic than that down into the BlobContainer layer?
Also, sort of related, looking at the POC, in S3BlobContainer.asyncBlobDownload
we call getObjectAttributes
to get the total object size, but at the root of the call chain back in IndexShard we already had the object size in RemoteSegmentStoreDirectory.UploadedSegmentMetadata
, so that seems like an unnecessary call. It seems possible to put all the parallel download and file stitching logic into a utility outside of BlobContainer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for documenting this @kotwanikunal and @andrross for your comments . I agree on the proposed approach . Even for encryption use cases , the repository should return a bunch of streams. This would enable the decryption library to decrypt all the part streams in parallel , which otherwise would have to be performed on a huge stream .
Regarding combining all the streams into one, we can push it to a helper/util class in org.opensearch.common.blobstore.stream
Yes, the unnecessary call be avoided by taking a look at the size upfront. The caller would take a size and call multipart download/single download in remote segment store
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gbbafna I updated to what @andrross was suggesting. I think it makes it cleaner and the util
nature of transformations can be done by a custom listener implementation which is here in it's rough form here.
I am going through the crypto logic currently to ensure we can accommodate for encryption/decryption logic using the current design.
#9178 A bunch of times :(
|
55ad51e
to
7508d6f
Compare
#9116 :(
|
Gradle Check (Jenkins) Run Completed with:
|
Codecov Report
@@ Coverage Diff @@
## main #9182 +/- ##
==========================================
Coverage 71.08% 71.09%
- Complexity 57428 57444 +16
==========================================
Files 4781 4777 -4
Lines 271184 270706 -478
Branches 39591 39565 -26
==========================================
- Hits 192780 192446 -334
+ Misses 62122 62081 -41
+ Partials 16282 16179 -103
|
Gradle Check (Jenkins) Run Completed with:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple minor comments, but I think this PR pretty much just introduces a new method on an interface. @kotwanikunal What do you think about folding it into your next PR? It'll be easier to validate that the interface is correct if we can see it in use.
* @param listener Async listener for {@link InputStream} object which serves the input streams and other metadata for the blob | ||
* @throws IOException if the input stream could not be requested or fails | ||
*/ | ||
void readBlobAsync(String blobName, long position, long length, ActionListener<InputStream> listener) throws IOException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's nothing "verifying" or "multi stream" about this particular method, so it probably belongs in the BlobContainer
interface with our handy new experimental tag:
@ExperimentalApi
default void readBlobAsync(String blobName, long position, long length, ActionListener<InputStream> listener) throws IOException {
throw new UnsupportedOperationException();
}
@@ -114,6 +116,23 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp | |||
|
|||
} | |||
|
|||
@Override | |||
public void readBlobAsync(String blobName, long position, long length, ActionListener<InputStream> listener) throws IOException { | |||
ExecutorService executorService = Executors.newFixedThreadPool(10); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to create a 10 thread executor just to submit one task and shut it down :) Can probably just do:
new Thread(() -> {
try {
listener.onResponse(readBlob(blobName, position, length));
} catch (Exception e) {
listener.onFailure(e);
}
})).start();
8088507
to
b505545
Compare
Gradle Check (Jenkins) Run Completed with:
|
@Override | ||
public void readBlobAsync(String blobName, long position, long length, ActionListener<InputStream> listener) { | ||
throw new UnsupportedOperationException("S3 BlobContainer currently does not support async blob downloads"); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you evaluated using vendor specific multi-part downloads. Or are you thinking about handling block level fetches alike?
b505545
to
c2da08d
Compare
Compatibility status:Checks if related components are compatible with change dd75a22 Incompatible componentsIncompatible components: [https://github.com/opensearch-project/notifications.git, https://github.com/opensearch-project/index-management.git, https://github.com/opensearch-project/security-analytics.git, https://github.com/opensearch-project/asynchronous-search.git] Skipped componentsCompatible componentsCompatible components: [https://github.com/opensearch-project/geospatial.git, https://github.com/opensearch-project/security.git, https://github.com/opensearch-project/alerting.git, https://github.com/opensearch-project/neural-search.git, https://github.com/opensearch-project/sql.git, https://github.com/opensearch-project/anomaly-detection.git, https://github.com/opensearch-project/job-scheduler.git, https://github.com/opensearch-project/opensearch-oci-object-storage.git, https://github.com/opensearch-project/observability.git, https://github.com/opensearch-project/common-utils.git, https://github.com/opensearch-project/k-nn.git, https://github.com/opensearch-project/reporting.git, https://github.com/opensearch-project/cross-cluster-replication.git, https://github.com/opensearch-project/performance-analyzer.git, https://github.com/opensearch-project/ml-commons.git, https://github.com/opensearch-project/performance-analyzer-rca.git] |
Gradle Check (Jenkins) Run Completed with:
|
c2da08d
to
6320967
Compare
Compatibility status:Checks if related components are compatible with change 6320967 Incompatible componentsSkipped componentsCompatible components |
Gradle Check (Jenkins) Run Completed with:
|
6320967
to
6145fa2
Compare
Compatibility status:Checks if related components are compatible with change 6145fa2 Incompatible componentsSkipped componentsCompatible components |
Gradle Check (Jenkins) Run Completed with:
|
Signed-off-by: Kunal Kotwani <[email protected]>
Signed-off-by: Kunal Kotwani <[email protected]>
6145fa2
to
0ee7784
Compare
Compatibility status:Checks if related components are compatible with change 0ee7784 Incompatible componentsSkipped componentsCompatible components |
Gradle Check (Jenkins) Run Completed with:
|
Signed-off-by: Kunal Kotwani <[email protected]>
Compatibility status:Checks if related components are compatible with change 6445ab5 Incompatible componentsSkipped componentsCompatible components |
Gradle Check (Jenkins) Run Completed with:
|
Signed-off-by: Kunal Kotwani <[email protected]>
Compatibility status:Checks if related components are compatible with change 329f0fa Incompatible componentsSkipped componentsCompatible components |
Gradle Check (Jenkins) Run Completed with:
|
@@ -211,6 +216,47 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp | |||
} | |||
} | |||
|
|||
@Override | |||
public void readBlobAsync(String blobName, ActionListener<ReadContext> listener) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throws IOException
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An "async" method should not be doing any I/O on the calling thread, right? Any IOException should be communicated via the callback.
// 8 MB buffer for transfer | ||
private static final int BUFFER_SIZE = 8 * 1024 * 2024; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we set this to 5 MB which is going to be the default part size for objects upto 5 GB ?
Ideally we would want to make it configurable per BlobStore , but there would be challenges in it.
long partSize = blobStore.getAsyncTransferManager().calculateOptimalPartSize(writeContext.getFileSize());
Description
Implementation details
The examples below use
S3BlobContainer
as an example. The same logic can be extended to other plugin implementations.The class hierarchy is as follows:
The request flow can be visualized using the following diagram:
Related Issues
Partially resolves #9031
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.