diff --git a/server/src/internalClusterTest/java/org/elasticsearch/rest/ChunkedZipResponseIT.java b/server/src/internalClusterTest/java/org/elasticsearch/rest/ChunkedZipResponseIT.java index 2578e528ce726..ed2b315d8895a 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/rest/ChunkedZipResponseIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/rest/ChunkedZipResponseIT.java @@ -217,7 +217,7 @@ private static void handleZipRestRequest( ActionRunnable.supply( chunkedZipResponse.newEntryListener( entry.getKey(), - ActionListener.releasing(Releasables.wrap(ref, refs.acquire())) + Releasables.wrap(ref, refs.acquire()) ), () -> entry.getValue() == null && randomBoolean() // randomBoolean() to allow some null entries to fail with NPE ? null diff --git a/server/src/main/java/org/elasticsearch/rest/ChunkedZipResponse.java b/server/src/main/java/org/elasticsearch/rest/ChunkedZipResponse.java index af4ba66374e33..b140bb3aba411 100644 --- a/server/src/main/java/org/elasticsearch/rest/ChunkedZipResponse.java +++ b/server/src/main/java/org/elasticsearch/rest/ChunkedZipResponse.java @@ -107,7 +107,7 @@ public void write(byte[] b, int off, int len) throws IOException { public ChunkedZipResponse(String filename, RestChannel restChannel, Releasable onCompletion) { this.filename = filename; this.restChannel = restChannel; - this.listenersRefs = AbstractRefCounted.of(() -> enqueueEntry(null, NO_MORE_ENTRIES, ActionListener.releasing(onCompletion))); + this.listenersRefs = AbstractRefCounted.of(() -> enqueueEntry(null, NO_MORE_ENTRIES, onCompletion)); this.rootListenerRef = Releasables.releaseOnce(listenersRefs::decRef); } @@ -130,21 +130,33 @@ public void close() { * This method may be called as long as this {@link ChunkedZipResponse} is not closed, or there is at least one other incomplete entry * listener. * - * @param entryName The name of the entry in the response zip file. - * @param listener A listener which is completed when the entry has been completely processed: either fully sent, or else the request - * was cancelled and the response will not be used any further. If the returned entry listener is completed - * exceptionally then the exception is passed to {@code listener}, otherwise this listener is completed successfully. + * @param entryName The name of the entry in the response zip file. + * @param releasable A resource which is released when the entry has been completely processed: either fully sent, or else the request + * was cancelled and the response will not be used any further. */ - public ActionListener newEntryListener(String entryName, ActionListener listener) { + public ActionListener newEntryListener(String entryName, Releasable releasable) { if (listenersRefs.tryIncRef()) { final var zipEntry = new ZipEntry(filename + "/" + entryName); - return ActionListener.assertOnce(ActionListener.releaseAfter(listener.delegateFailureAndWrap((l, firstBodyPart) -> { - if (firstBodyPart == null) { - l.onResponse(null); - } else { - enqueueEntry(zipEntry, firstBodyPart, l); + return ActionListener.assertOnce(ActionListener.releaseAfter(new ActionListener<>() { + @Override + public void onResponse(ChunkedRestResponseBodyPart chunkedRestResponseBodyPart) { + if (chunkedRestResponseBodyPart == null) { + Releasables.closeExpectNoException(releasable); + } else { + enqueueEntry(zipEntry, chunkedRestResponseBodyPart, releasable); + } + } + + @Override + public void onFailure(Exception e) { + Releasables.closeExpectNoException(releasable); + } + + @Override + public String toString() { + return "ZipEntry[" + zipEntry.getName() + "]"; } - }), listenersRefs::decRef)); + }, listenersRefs::decRef)); } else { assert false : "already closed"; throw new AlreadyClosedException("response already closed"); @@ -195,12 +207,12 @@ private boolean tryAcquireQueueRef() { * * @param zipEntry The entry metadata. * @param firstBodyPart The first part of the entry. Entries may comprise multiple parts, with transmission pauses in between. - * @param listener Completed when the entry has been fully transmitted. + * @param releasable Released when the entry has been fully transmitted. */ - private void enqueueEntry(ZipEntry zipEntry, ChunkedRestResponseBodyPart firstBodyPart, ActionListener listener) { + private void enqueueEntry(ZipEntry zipEntry, ChunkedRestResponseBodyPart firstBodyPart, Releasable releasable) { if (tryAcquireQueueRef()) { try { - entryQueue.add(new ChunkedZipEntry(zipEntry, firstBodyPart, () -> listener.onResponse(null))); + entryQueue.add(new ChunkedZipEntry(zipEntry, firstBodyPart, releasable)); if (queueLength.getAndIncrement() == 0) { // There is no active AvailableChunksZipResponseBodyPart, but there is now an entry in the queue, so we must create a // AvailableChunksZipResponseBodyPart to process it (along with any other entries that are concurrently added to the @@ -228,7 +240,7 @@ private void enqueueEntry(ZipEntry zipEntry, ChunkedRestResponseBodyPart firstBo queueRefs.decRef(); } } else { - listener.onFailure(new AlreadyClosedException("response already closed")); + Releasables.closeExpectNoException(releasable); } }