-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Buffered upload no longer requires length in sync client #22218
Buffered upload no longer requires length in sync client #22218
Conversation
(int) Math.min(Integer.MAX_VALUE, validatedParallelTransferOptions.getBlockSizeLong()), false) | ||
: options.getDataFlux(); | ||
int chunkSize = (int) Math.min(Integer.MAX_VALUE, validatedParallelTransferOptions.getBlockSizeLong()); | ||
data = FluxUtil.toFluxByteBuffer(options.getDataStream(), chunkSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How much data does this buffer at a time? Just chunkSize?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
each bytebuffer will be chunkSize. It already was, I just pulled it into a variable.
/azp run java - storage - ci |
Azure Pipelines successfully started running 1 pipeline(s). |
/azp run java - storage - ci |
Azure Pipelines successfully started running 1 pipeline(s). |
/check-enforcer override |
overrode check enforcer to deal with pipeline bugs. was green before they were introduced. discussed offline with team. |
// We can only buffer up to max int due to restrictions in ByteBuffer. | ||
(int) Math.min(Integer.MAX_VALUE, parallelTransferOptions.getBlockSizeLong()), false) | ||
: options.getDataFlux(); | ||
int chunkSize = (int) Math.min(Integer.MAX_VALUE, parallelTransferOptions.getBlockSizeLong()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a note here that this is fine since buffered upload does not require a replayable flux?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not on this exact line but it's in the method on decision-making that this is buffered
Flux<ByteBuffer> data = options.getDataFlux() == null ? Utility.convertStreamToByteBuffer( | ||
options.getDataStream(), options.getLength(), | ||
Flux<ByteBuffer> data = options.getDataFlux(); | ||
// no specified length: use azure.core's converter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we have markable stream - is SDK smart enough to detect it and don't buffer at all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately not. Also, though, that is only applicable when maxConcurrency = 1, so we aren't missing out on the biggest optimization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this PR somewhat impairs ability to introduce such optimization?
We have customers asking about memory overhead. We should offer a way to do unbuffered upload of markable stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this PR introduces any new hurdles for such a feature that didn't already exist. I did the work for that optimization in .NET and the integration aspect was fairly simple.
closing and opening in a new PR to get a new pipeline. this one seems permanently ruined by the bug the other day. |
Apparently we're stuck with this pipeline |
This pull request is protected by Check Enforcer. What is Check Enforcer?Check Enforcer helps ensure all pull requests are covered by at least one check-run (typically an Azure Pipeline). When all check-runs associated with this pull request pass then Check Enforcer itself will pass. Why am I getting this message?You are getting this message because Check Enforcer did not detect any check-runs being associated with this pull request within five minutes. This may indicate that your pull request is not covered by any pipelines and so Check Enforcer is correctly blocking the pull request being merged. What should I do now?If the check-enforcer check-run is not passing and all other check-runs associated with this PR are passing (excluding license-cla) then you could try telling Check Enforcer to evaluate your pull request again. You can do this by adding a comment to this pull request as follows: What if I am onboarding a new service?Often, new services do not have validation pipelines associated with them, in order to bootstrap pipelines for a new service, you can issue the following command as a pull request comment: |
/azp run java - storage - ci |
Azure Pipelines successfully started running 1 pipeline(s). |
// We can only buffer up to max int due to restrictions in ByteBuffer. | ||
(int) Math.min(Integer.MAX_VALUE, parallelTransferOptions.getBlockSizeLong()), false) | ||
: options.getDataFlux(); | ||
int chunkSize = (int) Math.min(Integer.MAX_VALUE, parallelTransferOptions.getBlockSizeLong()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This chunkSize
isn't related to block size right? It's just for the purpose of stream->flux conversion.
I wonder if we should be using block size here.
few things to consider (and please correct me if this is stupid but I have a feeling that his might be reason we've seen higher than expected demand for memory in perf tests):
- if this results in double buffering, i.e. stream->flux needs memory and then buffered upload needs it as well then in worst case "uploader" might be busy working on uploading last chunk (list of bytebuffers emitted from converter) and converter trying to supply next block - both will hold memory. I believe reactor will make sure at least one next element is getting prepared while recent block is being uploaded. @gapra-msft do you think this makes sense?
- allocating smaller chunks to feed Flux might be a bit friendlier for allocator and GC. It will have some overhead but on the other hand it's easier to fit multiple smaller arrays into heap rather than finding place for Integer.Max_Value array. The later might trigger GC/heap defragmentation efforts. We shouldn't go crazy small though here but I think int.max is too big. A lot of built in components (like bufferedstreams, http stacks) default to 8KB buffer, so that's the lower boundary, not sure how high we should allow it to be - maybe 64kb? or maybe we need another knob.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you file an issue for this and assign it to me, or does it really need to be addressed now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this can be converted to an issue and addressed outside of this PR. Would you mind doing this?
public BlobParallelUploadOptions(InputStream dataStream, long length) { | ||
StorageImplUtils.assertNotNull("dataStream", dataStream); | ||
StorageImplUtils.assertInBounds("length", length, 0, Long.MAX_VALUE); | ||
StorageImplUtils.assertInBounds("length", length, -1, Long.MAX_VALUE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we add overload instead of using magic number and use nullable Long
to represent this?
well, I just noticed overload below. In that case I'd keep original validation condition here and point users towards other ctor. or maybe create (InputStream, Long)
ctor and move this logic there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the deprecation not enough to point users to the other ctor? I don't like the idea of introducing another ctor with a Long in it because the whole point of this change it's meaningless to provide one. There's nothing extra bought from providing a length, we're just preserving old behavior in case someone relied on us catching a mismatched length or something like that. A Long
implies it may still be of use or provide some optimization or something when it really doesn't.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then we should either have private ctor or don't call one from the other one.
We should also deprecate getLength and replace it with something like Long getSize()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I liked size in the context of expanding the ParallelTransferOptions back then because we could also rename the setter. But here we're not adding a new setter to complete the connection. Perhaps Long getOptionalLength()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
either way is fine, we'll know at apireview.
*/ | ||
@Deprecated | ||
public long getLength() { | ||
return length; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it worth saving the customer of a NPE here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or if length is null should we throw a meaningful error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is fine, it won't throw if they haven't switched to new code path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm. Left one new comment about getLength()
Previously, Storage's InputStream -> Flux conversion method required a length for pregenerating a flux of set size to chunk the stream into. Since then, azure.core has introduced a converter that uses Flux.generate() to recreate the flux as needed. They serve slightly different cases:
Since any storage transfer method that avoids buffering in favor of seeking requires a supplied length for the REST call anyway, this PR shifts to using the azure.core method when desired, rather than modifying one of those converters to serve both purposes.