Skip to content

Commit

Permalink
Remove ACE in enqueueEntry
Browse files Browse the repository at this point in the history
  • Loading branch information
DaveCTurner committed Aug 6, 2024
1 parent d31a3b1 commit 8d73a28
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ private static void handleZipRestRequest(
ActionRunnable.supply(
chunkedZipResponse.newEntryListener(
entry.getKey(),
ActionListener.releasing(Releasables.wrap(ref, refs.acquire()))
Releasables.wrap(ref, refs.acquire())
),
() -> entry.getValue() == null && randomBoolean() // randomBoolean() to allow some null entries to fail with NPE
? null
Expand Down
44 changes: 28 additions & 16 deletions server/src/main/java/org/elasticsearch/rest/ChunkedZipResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void write(byte[] b, int off, int len) throws IOException {
public ChunkedZipResponse(String filename, RestChannel restChannel, Releasable onCompletion) {
this.filename = filename;
this.restChannel = restChannel;
this.listenersRefs = AbstractRefCounted.of(() -> enqueueEntry(null, NO_MORE_ENTRIES, ActionListener.releasing(onCompletion)));
this.listenersRefs = AbstractRefCounted.of(() -> enqueueEntry(null, NO_MORE_ENTRIES, onCompletion));
this.rootListenerRef = Releasables.releaseOnce(listenersRefs::decRef);
}

Expand All @@ -130,21 +130,33 @@ public void close() {
* This method may be called as long as this {@link ChunkedZipResponse} is not closed, or there is at least one other incomplete entry
* listener.
*
* @param entryName The name of the entry in the response zip file.
* @param listener A listener which is completed when the entry has been completely processed: either fully sent, or else the request
* was cancelled and the response will not be used any further. If the returned entry listener is completed
* exceptionally then the exception is passed to {@code listener}, otherwise this listener is completed successfully.
* @param entryName The name of the entry in the response zip file.
* @param releasable A resource which is released when the entry has been completely processed: either fully sent, or else the request
* was cancelled and the response will not be used any further.
*/
public ActionListener<ChunkedRestResponseBodyPart> newEntryListener(String entryName, ActionListener<Void> listener) {
public ActionListener<ChunkedRestResponseBodyPart> newEntryListener(String entryName, Releasable releasable) {
if (listenersRefs.tryIncRef()) {
final var zipEntry = new ZipEntry(filename + "/" + entryName);
return ActionListener.assertOnce(ActionListener.releaseAfter(listener.delegateFailureAndWrap((l, firstBodyPart) -> {
if (firstBodyPart == null) {
l.onResponse(null);
} else {
enqueueEntry(zipEntry, firstBodyPart, l);
return ActionListener.assertOnce(ActionListener.releaseAfter(new ActionListener<>() {
@Override
public void onResponse(ChunkedRestResponseBodyPart chunkedRestResponseBodyPart) {
if (chunkedRestResponseBodyPart == null) {
Releasables.closeExpectNoException(releasable);
} else {
enqueueEntry(zipEntry, chunkedRestResponseBodyPart, releasable);
}
}

@Override
public void onFailure(Exception e) {
Releasables.closeExpectNoException(releasable);
}

@Override
public String toString() {
return "ZipEntry[" + zipEntry.getName() + "]";
}
}), listenersRefs::decRef));
}, listenersRefs::decRef));
} else {
assert false : "already closed";
throw new AlreadyClosedException("response already closed");
Expand Down Expand Up @@ -195,12 +207,12 @@ private boolean tryAcquireQueueRef() {
*
* @param zipEntry The entry metadata.
* @param firstBodyPart The first part of the entry. Entries may comprise multiple parts, with transmission pauses in between.
* @param listener Completed when the entry has been fully transmitted.
* @param releasable Released when the entry has been fully transmitted.
*/
private void enqueueEntry(ZipEntry zipEntry, ChunkedRestResponseBodyPart firstBodyPart, ActionListener<Void> listener) {
private void enqueueEntry(ZipEntry zipEntry, ChunkedRestResponseBodyPart firstBodyPart, Releasable releasable) {
if (tryAcquireQueueRef()) {
try {
entryQueue.add(new ChunkedZipEntry(zipEntry, firstBodyPart, () -> listener.onResponse(null)));
entryQueue.add(new ChunkedZipEntry(zipEntry, firstBodyPart, releasable));
if (queueLength.getAndIncrement() == 0) {
// There is no active AvailableChunksZipResponseBodyPart, but there is now an entry in the queue, so we must create a
// AvailableChunksZipResponseBodyPart to process it (along with any other entries that are concurrently added to the
Expand Down Expand Up @@ -228,7 +240,7 @@ private void enqueueEntry(ZipEntry zipEntry, ChunkedRestResponseBodyPart firstBo
queueRefs.decRef();
}
} else {
listener.onFailure(new AlreadyClosedException("response already closed"));
Releasables.closeExpectNoException(releasable);
}
}

Expand Down

0 comments on commit 8d73a28

Please sign in to comment.