-
Notifications
You must be signed in to change notification settings - Fork 24.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce ChunkedZipResponse
#109820
Introduce ChunkedZipResponse
#109820
Conversation
Adds a utility for implementing REST APIs which construct a streaming (i.e. pretty-much-constant-memory) `.zip` file response as a (pausable) sequence of `ChunkedRestResponseBodyPart` instances, where each entry in the `.zip` file is itself a (pausable) sequence of `ChunkedRestResponseBodyPart` instances. Relates elastic#104851
Pinging @elastic/es-distributed (Team:Distributed) |
I had a brief look at the main code changes. They make sense to me. But I will need more time to read closely. For the context, do we already return zip response today? Maybe something like ML models? I assume this change does not have user visible impact? If we don't have such usage today, could you please explain a bit on the intended use cases? Thanks! |
Yes sorry, some more context. This was also part of my recent on-week project. Today we don't have any APIs that expose zip-format data, at least partly because until #104851 we didn't really have a way to create such a thing in a streaming fashion, and our usual approach of creating the whole response in-memory first would risk making the node go OOM. I have a number of possible use-cases for this in mind, all somewhat to do with supportability:
These things all to be discussed separately, but this PR introduces a common prerequisite for them. |
Gentle reminder for reviews here if you have time :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the extended delay. Honestly, this PR is a bit daunting to review. I just read it again and still trying to build a full mental model for it. I do plan to come back to it soon since otherwise I'll be forgetting about the details again. For the time being, I have left some comments and questions. Thanks!
server/src/internalClusterTest/java/org/elasticsearch/rest/ChunkedZipResponseIT.java
Outdated
Show resolved
Hide resolved
server/src/internalClusterTest/java/org/elasticsearch/rest/ChunkedZipResponseIT.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/rest/ChunkedZipResponse.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/rest/ChunkedZipResponse.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/rest/ChunkedZipResponse.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/rest/ChunkedZipResponse.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/rest/ChunkedZipResponse.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ywangd, quite an interesting exercise coming back to a PR like this with fresh eyes after a few months. Your questions prompted some renaming/commentary/other cleanup that I hope helps make it easier to understand.
server/src/main/java/org/elasticsearch/rest/ChunkedZipResponse.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/rest/ChunkedZipResponse.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/rest/ChunkedZipResponse.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a few more comments.
server/src/main/java/org/elasticsearch/rest/ChunkedZipResponse.java
Outdated
Show resolved
Hide resolved
/** | ||
* Transfer {@link #currentEntryReleasable} into the supplied collection (i.e. add it to {@code releasables} and then clear | ||
* {@link #currentEntryReleasable}). Called when the last chunk of the last part of the current entry is serialized, so that we can | ||
* start serializing chunks of the next entry straight away whilst delaying the release of the current entry's resources until the | ||
* transmission of the chunk that is currently under construction. | ||
*/ | ||
private void transferCurrentEntryReleasable(ArrayList<Releasable> releasables) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This transfer of entry releasables is somewhat complext to follow for me. Conceptually, an entry releasable is released when the entry is completed or cancelled. That seems most intuitive. But we accumulate them for efficiency to prioritize ongoing bytes writing? Could it also be a concern that we are not releasing them in time? I wonder whether it might be simpler to release them once per entry?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're delaying the release of the releasable until the bytes are actually sent. That's by design: the entry is still consuming some memory in the network layer until we hand those bytes off to the OS, so we shouldn't consider it as completed earlier.
server/src/main/java/org/elasticsearch/rest/ChunkedZipResponse.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM (once @ywangd's concerns are addressed also)
clearing the approved
flag until Yang has taken a look too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few more comments. I don't expect to find anything of significance. But do plan to take yet another look. Thanks!
// request aborted, nothing more to send (queue is being cleared by queueRefs#closeInternal) | ||
isPartComplete = true; | ||
isLastPart = true; | ||
return new ReleasableBytesReference(BytesArray.EMPTY, () -> {}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we throw AlreadyClosedException
similar to what enqueueEntry
does? The underlying channel should be closed at this point. So no need to be gentle here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair point, no real need for an ACE in enqueueEntry
either - see 8d73a28.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change of using Releasable
as parameter for newEntryListener
looks nice to me. In most cases, I think the caller indeed does not care about getting notified when the resource is released other than it gets released at some point. 👍
|
||
private void finishCurrentPart(ArrayList<Releasable> releasables) throws IOException { | ||
if (bodyPart.isLastPart()) { | ||
zipOutputStream.closeEntry(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a comment to say that we don't set isLastPart = true
here because we bridge the last part of this entry to the first part of the next entry so that the caller sees only continuous parts instead of having to be aware of entries. The fact that the networking code is unware of entries and they are for producing side only is something that I didn't immediately realize. In hindsight, it would be helpful to see this first to build the mental model.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, see 9d5c060.
/** | ||
* A cache for an empty list to be used to collect the {@code Releasable} instances to be released when the next chunk has been | ||
* fully transmitted. It's a list because a call to {@link #encodeChunk} may yield a chunk that completes several entries, each of | ||
* which has its own resources to release. We cache this value across chunks because most chunks won't release anything, so we can | ||
* keep the empty list around for later to save on allocations. | ||
*/ | ||
private ArrayList<Releasable> nextReleasablesCache = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on my understanding of the other comment, it is necessary to accumulate the currentEntryReleasable
for an entry because the do loop for writeNextBytes
may write more than one entry. Until these written bytes are actually sent later by the networking layer, we must retain releaseables for all the entries. The releaseables are released each time data is sent out, i.e. each time the response is paused or finished at the end. If this sounds correct, can we please somehow incorporate it into the comments, maybe here or somewhere else more suitable?
We cache this value across chunks because most chunks won't release anything, so we can keep the empty list around for later to save on allocations.
I am not sure whether this helps the understanding. Unless my above understanding is wrong, I think the need for a list instead of a single entry is best covered by the fact that a loop of writeNextBytes
can span multiple entries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The releaseables are released each time data is sent out, i.e. each time the response is paused or finished at the end.
No, that's not right, the releasables are released when the chunk that completes the entries has been sent, but we do not wait all the way until the end (pause or finish) of the part that contains those entries.
Not sure what else to add to the comments to help here. Although the sentence about caching may not be what you're looking for, the preceding sentence explains why it's a list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry my comment was imprecise. I was meant to say
The releaseables are packaged to be released each time data is sent out, i.e. each time the response is paused or finished at the end.
Essentially it means how we pass the current list of entry releaseables each time after the do/while loop.
Not sure what else to add to the comments to help here
I think it could help to add a comment right before the do/while loop to say that writeNextBytes
can work through multiple entries and accumulate entry releaseables which are then released in a single batch once the processed bytes are fully sent out. Feels like a good compliment to the comment here about "a call ... completes serveral entries".
server/src/internalClusterTest/java/org/elasticsearch/rest/ChunkedZipResponseIT.java
Outdated
Show resolved
Hide resolved
server/src/internalClusterTest/java/org/elasticsearch/rest/ChunkedZipResponseIT.java
Show resolved
Hide resolved
…nkedZipResponseIT.java Co-authored-by: Yang Wang <[email protected]>
private SubscribableListener<ChunkedRestResponseBodyPart> nextAvailableChunksListener; | ||
|
||
/** | ||
* A resource to be released when the transmission of the current entry is complete. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit
* A resource to be released when the transmission of the current entry is complete. | |
* A resource to be released when the transmission of the current entry is complete. | |
* Multiple of them maybe released in a single batch if their associated entries are transmitted together. |
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Expanded in 5aa4c08.
|
||
@Override | ||
public void onFailure(Exception e) { | ||
Releasables.closeExpectNoException(releasable); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might worth a logging message here since I don't think it should happen normally and it could lead to an unusable zip file if happens?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No this is fine, it's covered by the docs and by ChunkedZipResponseIT#testRandomZipResponse
(see the comment about NPEs in handleZipRestRequest
). It just means no entry is sent.
* @param releasable A resource which is released when the entry has been completely processed: either fully sent, or else the request | ||
* was cancelled and the response will not be used any further. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: It's also released when the entry is "skipped" (for lack of a better word). But maybe it's obvious from the code
* @param releasable A resource which is released when the entry has been completely processed: either fully sent, or else the request | |
* was cancelled and the response will not be used any further. | |
* @param releasable A resource which is released when the entry has been skipped or completely processed: either fully sent, or else the request | |
* was cancelled and the response will not be used any further. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Expanded in 5aa4c08.
// request aborted, nothing more to send (queue is being cleared by queueRefs#closeInternal) | ||
isPartComplete = true; | ||
isLastPart = true; | ||
return new ReleasableBytesReference(BytesArray.EMPTY, () -> {}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change of using Releasable
as parameter for newEntryListener
looks nice to me. In most cases, I think the caller indeed does not care about getting notified when the resource is released other than it gets released at some point. 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Thanks for the iterations and the opportunity to review this work 👍
server/src/internalClusterTest/java/org/elasticsearch/rest/ChunkedZipResponseIT.java
Show resolved
Hide resolved
Thanks both for the reviews! |
* upstream/main: (132 commits) Fix compile after several merges Update docs with new behavior on skip conditions (elastic#111640) Skip on any instance of node or version features being present (elastic#111268) Skip on any node capability being present (elastic#111585) [DOCS] Publishes Anthropic inference service docs. (elastic#111619) Introduce `ChunkedZipResponse` (elastic#109820) [Gradle] fix esql compile cacheability (elastic#111651) Mute org.elasticsearch.datastreams.logsdb.qa.StandardVersusLogsIndexModeChallengeRestIT testTermsQuery elastic#111666 Mute org.elasticsearch.datastreams.logsdb.qa.StandardVersusLogsIndexModeChallengeRestIT testMatchAllQuery elastic#111664 Mute org.elasticsearch.xpack.esql.analysis.VerifierTests testMatchCommand elastic#111661 Mute org.elasticsearch.xpack.esql.optimizer.LocalPhysicalPlanOptimizerTests testMatchCommandWithMultipleMatches {default} elastic#111660 Mute org.elasticsearch.xpack.esql.optimizer.LocalPhysicalPlanOptimizerTests testMatchCommand {default} elastic#111659 Mute org.elasticsearch.xpack.esql.optimizer.LocalPhysicalPlanOptimizerTests testMatchCommandWithWhereClause {default} elastic#111658 LogsDB qa tests - add specific matcher for source (elastic#111568) ESQL: Move `randomLiteral` (elastic#111647) [ESQL] Clean up UNSUPPORTED type blocks (elastic#111648) ESQL: Remove the `NESTED` DataType (elastic#111495) ESQL: Move more out of esql-core (elastic#111604) Improve MvPSeriesWeightedSum edge case and add more tests (elastic#111552) Add link to flood-stage watermark exception message (elastic#111315) ... # Conflicts: # server/src/main/java/org/elasticsearch/TransportVersions.java
Adds a utility for implementing REST APIs which construct a streaming (i.e. pretty-much-constant-memory) `.zip` file response as a (pausable) sequence of `ChunkedRestResponseBodyPart` instances, where each entry in the `.zip` file is itself a (pausable) sequence of `ChunkedRestResponseBodyPart` instances. Relates elastic#104851
Adds a utility for implementing REST APIs which construct a streaming (i.e. pretty-much-constant-memory) `.zip` file response as a (pausable) sequence of `ChunkedRestResponseBodyPart` instances, where each entry in the `.zip` file is itself a (pausable) sequence of `ChunkedRestResponseBodyPart` instances. Relates #104851
Similar to `ChunkedZipResponse` (elastic#109820) this utility allows Elasticsearch to send an `XContent`-based response constructed out of a sequence of `ChunkedToXContent` fragments, provided in a streaming and asynchronous fashion. This will enable elastic#93735 to proceed without needing to create a temporary index to hold the intermediate results.
Similar to `ChunkedZipResponse` (#109820) this utility allows Elasticsearch to send an `XContent`-based response constructed out of a sequence of `ChunkedToXContent` fragments, provided in a streaming and asynchronous fashion. This will enable #93735 to proceed without needing to create a temporary index to hold the intermediate results.
Adds a utility for implementing REST APIs which construct a streaming (i.e. pretty-much-constant-memory) `.zip` file response as a (pausable) sequence of `ChunkedRestResponseBodyPart` instances, where each entry in the `.zip` file is itself a (pausable) sequence of `ChunkedRestResponseBodyPart` instances. Relates elastic#104851
Similar to `ChunkedZipResponse` (elastic#109820) this utility allows Elasticsearch to send an `XContent`-based response constructed out of a sequence of `ChunkedToXContent` fragments, provided in a streaming and asynchronous fashion. This will enable elastic#93735 to proceed without needing to create a temporary index to hold the intermediate results.
Similar to `ChunkedZipResponse` (elastic#109820) this utility allows Elasticsearch to send an `XContent`-based response constructed out of a sequence of `ChunkedToXContent` fragments, provided in a streaming and asynchronous fashion. This will enable elastic#93735 to proceed without needing to create a temporary index to hold the intermediate results.
Adds a utility for implementing REST APIs which construct a streaming
(i.e. pretty-much-constant-memory)
.zip
file response as a (pausable)sequence of
ChunkedRestResponseBodyPart
instances, where each entry inthe
.zip
file is itself a (pausable) sequence ofChunkedRestResponseBodyPart
instances.Relates #104851