Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pausable chunked HTTP responses #104851

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
c92a7ec
Pausable chunked HTTP responses
DaveCTurner Jan 29, 2024
f7250b8
Merge branch 'main' into 2024/01/29/pausable-chunked-responses
DaveCTurner Jan 30, 2024
ff24f3c
Add test for the failure path
DaveCTurner Jan 29, 2024
e957bc7
TODO is done
DaveCTurner Jan 30, 2024
44a4482
Merge branch 'main' into 2024/01/29/pausable-chunked-responses
DaveCTurner Jan 31, 2024
18e65e2
Use #104971
DaveCTurner Jan 31, 2024
1fd16fb
Merge branch 'main' into 2024/01/29/pausable-chunked-responses
DaveCTurner Feb 5, 2024
33fa0cd
Fixup merge
DaveCTurner Feb 5, 2024
1d927ca
Fix test race
DaveCTurner Feb 5, 2024
b0e3ff8
Merge branch 'main' into 2024/01/29/pausable-chunked-responses
DaveCTurner Feb 7, 2024
708fa52
Tighter visibility
DaveCTurner Feb 7, 2024
4954c8e
Assert continuations received in order
DaveCTurner Feb 7, 2024
aa52c18
Comments about cancellation
DaveCTurner Feb 7, 2024
43584be
Merge branch 'main' into 2024/01/29/pausable-chunked-responses
DaveCTurner Feb 7, 2024
2022d7a
Add test for client cancellation
DaveCTurner Feb 7, 2024
ffd6cac
Merge branch 'main' into 2024/01/29/pausable-chunked-responses
DaveCTurner Feb 8, 2024
1777bc5
Merge branch 'main' into 2024/01/29/pausable-chunked-responses
DaveCTurner Feb 8, 2024
befb016
Simplify leak detection in tests
DaveCTurner Feb 8, 2024
3fab7ce
Merge branch 'main' into 2024/01/29/pausable-chunked-responses
DaveCTurner Feb 9, 2024
bbcbf5a
Adjust tests to show cancellation behaviour & fix comment to match
DaveCTurner Feb 9, 2024
7b896e0
Merge branch 'main' into 2024/01/29/pausable-chunked-responses
DaveCTurner Feb 12, 2024
0b91c7f
Complete listener even on impossible path
DaveCTurner Feb 12, 2024
d5e4031
Assertions to catch invalid usage
DaveCTurner Feb 12, 2024
751ce75
isDone Javadoc
DaveCTurner Feb 12, 2024
cf74253
Merge branch 'main' into 2024/01/29/pausable-chunked-responses
DaveCTurner Feb 14, 2024
38d55af
scaledRandomIntBetween
DaveCTurner Feb 14, 2024
048194d
localRefs
DaveCTurner Feb 14, 2024
6fac79d
Fix netty/netty#8007 leak
DaveCTurner Feb 14, 2024
64ad627
Merge branch 'main' into 2024/01/29/pausable-chunked-responses
DaveCTurner Feb 21, 2024
e4a34ac
Fix up tests
DaveCTurner Feb 21, 2024
b3b7399
Better shutdown protection
DaveCTurner Feb 21, 2024
d4d1401
Merge branch 'main' into 2024/01/29/pausable-chunked-responses
DaveCTurner Feb 22, 2024
221b5b6
Merge branch 'main' into 2024/01/29/pausable-chunked-responses
DaveCTurner Feb 29, 2024
b01bda4
Merge branch 'main' into 2024/01/29/pausable-chunked-responses
DaveCTurner Mar 19, 2024
39b71dd
Merge branch 'main' into 2024/01/29/pausable-chunked-responses
DaveCTurner May 20, 2024
23c1a1f
Fixup
DaveCTurner May 20, 2024
30795e0
Merge branch 'main' into 2024/01/29/pausable-chunked-responses
DaveCTurner May 25, 2024
4570c92
Javadocs
DaveCTurner May 25, 2024
0d923f4
Assert body done in test
DaveCTurner May 25, 2024
ee1a4a4
Revert "Assert body done in test"
DaveCTurner May 25, 2024
a2c385e
Capture parts & assert complete
DaveCTurner May 25, 2024
344f75c
Merge branch 'main' into 2024/01/29/pausable-chunked-responses
DaveCTurner May 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ESNetty4IntegTestCase;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
Expand Down Expand Up @@ -158,6 +159,16 @@ public boolean isDone() {
return chunkIterator.hasNext() == false;
}

@Override
public boolean isEndOfResponse() {
return true;
}

@Override
public void getContinuation(ActionListener<ChunkedRestResponseBody> listener) {
assert false : "no continuations";
}

@Override
public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> recycler) {
final var page = recycler.obtain(); // just to ensure nothing is leaked
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.http.netty4;

import io.netty.util.concurrent.PromiseCombiner;

import org.elasticsearch.rest.ChunkedRestResponseBody;

public final class Netty4ChunkedHttpContinuation implements Netty4HttpResponse {
ywangd marked this conversation as resolved.
Show resolved Hide resolved
private final int sequence;
private final ChunkedRestResponseBody body;
private final PromiseCombiner combiner;

public Netty4ChunkedHttpContinuation(int sequence, ChunkedRestResponseBody body, PromiseCombiner combiner) {
this.sequence = sequence;
this.body = body;
this.combiner = combiner;
}

@Override
public int getSequence() {
return sequence;
}

public ChunkedRestResponseBody body() {
return body;
}

public PromiseCombiner combiner() {
return combiner;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.core.Booleans;
import org.elasticsearch.core.Nullable;
Expand Down Expand Up @@ -198,6 +200,8 @@ private void doWrite(ChannelHandlerContext ctx, Netty4HttpResponse readyResponse
doWriteFullResponse(ctx, fullResponse, promise);
} else if (readyResponse instanceof Netty4ChunkedHttpResponse chunkedResponse) {
doWriteChunkedResponse(ctx, chunkedResponse, promise);
} else if (readyResponse instanceof Netty4ChunkedHttpContinuation chunkedContinuation) {
doWriteChunkedContinuation(ctx, chunkedContinuation, promise);
ywangd marked this conversation as resolved.
Show resolved Hide resolved
} else {
assert false : readyResponse.getClass().getCanonicalName();
throw new IllegalStateException("illegal message type: " + readyResponse.getClass().getCanonicalName());
Expand Down Expand Up @@ -236,13 +240,54 @@ private void doWriteChunkedResponse(ChannelHandlerContext ctx, Netty4ChunkedHttp
}
}

private void doWriteChunkedContinuation(ChannelHandlerContext ctx, Netty4ChunkedHttpContinuation continuation, ChannelPromise promise)
throws IOException {
final PromiseCombiner combiner = continuation.combiner();
assert currentChunkedWrite == null;
final var responseBody = continuation.body();
currentChunkedWrite = new ChunkedWrite(combiner, promise, responseBody);
// NB "writable" means there's space in the downstream ChannelOutboundBuffer, we aren't trying to saturate the physical channel.
while (ctx.channel().isWritable()) {
if (writeChunk(ctx, combiner, responseBody)) {
finishChunkedWrite();
return;
}
}
ywangd marked this conversation as resolved.
Show resolved Hide resolved
}

private void finishChunkedWrite() {
assert currentChunkedWrite != null;
assert currentChunkedWrite.responseBody().isDone();
final var finishingWrite = currentChunkedWrite;
final var endOfResponse = finishingWrite.responseBody().isEndOfResponse();
currentChunkedWrite = null;
writeSequence++;
finishingWrite.combiner.finish(finishingWrite.onDone());
if (endOfResponse) {
writeSequence++;
finishingWrite.combiner.finish(finishingWrite.onDone());
} else {
final var channel = finishingWrite.onDone().channel();
ActionListener.run(ActionListener.assertOnce(new ActionListener<>() {
@Override
public void onResponse(ChunkedRestResponseBody continuation) {
channel.writeAndFlush(
new Netty4ChunkedHttpContinuation(writeSequence, continuation, finishingWrite.combiner()),
finishingWrite.onDone() // pass the terminal listener/promise along the line
);
ywangd marked this conversation as resolved.
Show resolved Hide resolved
Comment on lines +265 to +268
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether similar treatment (#105306) is needed here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, this code runs on the event loop already so these writes will still either complete or fail properly even if we're in the process of shutting down.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it always the case even when getContinuation dispatches to another thread pool?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes you're quite right, thanks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've applied a fix in 6fac79d but it had to be a little different from #105306 since we don't have a listener to wrap any more. But then I realised we could do the same thing at the outer layer too, and extract a utility, and in fact what we do today at the outer layer is also a little questionable too. I made the relevant changes in #105486 and will migrate to that utility once it's available here.

}

@Override
public void onFailure(Exception e) {
logger.error(
Strings.format("failed to get continuation of HTTP response body for [%s], closing connection", channel),
e
);
channel.close().addListener(ignored -> {
finishingWrite.combiner().add(channel.newFailedFuture(e));
finishingWrite.combiner().finish(finishingWrite.onDone());
Comment on lines +279 to +280
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly to the other PR, I wonder whether this could be safeFailPromise(finishingWrite.onDone, new ClosedChannelException())?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather wait for the whole combiner to complete (and wonder whether we should do that in other places too). And channel.newFailedFuture() shouldn't be able to fail so safeFailPromise isn't necessary to fail that.

});
}
}), finishingWrite.responseBody()::getContinuation);
}
}

private void splitAndWrite(ChannelHandlerContext ctx, Netty4FullHttpResponse msg, ChannelPromise promise) {
Expand Down Expand Up @@ -326,7 +371,8 @@ private boolean writeChunk(ChannelHandlerContext ctx, PromiseCombiner combiner,
);
final ByteBuf content = Netty4Utils.toByteBuf(bytes);
final boolean done = body.isDone();
final ChannelFuture f = ctx.write(done ? new DefaultLastHttpContent(content) : new DefaultHttpContent(content));
final boolean lastChunk = done && body.isEndOfResponse();
final ChannelFuture f = ctx.write(lastChunk ? new DefaultLastHttpContent(content) : new DefaultHttpContent(content));
f.addListener(ignored -> bytes.close());
combiner.add(f);
return done;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
/**
* Super-interface for responses handled by the Netty4 HTTP transport.
*/
public sealed interface Netty4HttpResponse permits Netty4FullHttpResponse, Netty4ChunkedHttpResponse {
public sealed interface Netty4HttpResponse permits Netty4FullHttpResponse, Netty4ChunkedHttpResponse, Netty4ChunkedHttpContinuation {
/**
* @return The sequence number for the request which corresponds with this response, for making sure that we send responses to pipelined
* requests in the correct order.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.netty4.NettyAllocator;

import java.io.Closeable;
Expand Down Expand Up @@ -137,9 +138,20 @@ private synchronized List<FullHttpResponse> sendRequests(final SocketAddress rem
channelFuture = clientBootstrap.connect(remoteAddress);
channelFuture.sync();

boolean needsFinalFlush = false;
for (HttpRequest request : requests) {
channelFuture.channel().writeAndFlush(request);
if (ESTestCase.randomBoolean()) {
channelFuture.channel().writeAndFlush(request);
ywangd marked this conversation as resolved.
Show resolved Hide resolved
needsFinalFlush = false;
} else {
channelFuture.channel().write(request);
needsFinalFlush = true;
}
}
if (needsFinalFlush) {
channelFuture.channel().flush();
}

if (latch.await(30L, TimeUnit.SECONDS) == false) {
fail("Failed to get all expected responses.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -511,6 +512,16 @@ public boolean isDone() {
return remaining == 0;
}

@Override
public boolean isEndOfResponse() {
return true;
}

@Override
public void getContinuation(ActionListener<ChunkedRestResponseBody> listener) {
fail("no continuations here");
}

@Override
public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> recycler) {
assertThat(remaining, greaterThan(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.rest;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.io.stream.BytesStream;
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
Expand Down Expand Up @@ -43,6 +44,21 @@ public interface ChunkedRestResponseBody {
*/
boolean isDone();
ywangd marked this conversation as resolved.
Show resolved Hide resolved

/**
* @return true if this is the last chunked body in the response.
*/
boolean isEndOfResponse();

/**
* Asynchronously retrieves the next part of the body. Note that this is called on a transport thread, so implementations must take care
* to dispatch any nontrivial work elsewhere.
*
* @param listener Listener to complete with the next part of the body. By the point this is called we have already started to send
* the body of the response, so there's no good ways to handle an exception here. Completing the listener exceptionally
* will log an error, abort sending the response, and close the HTTP connection.
*/
void getContinuation(ActionListener<ChunkedRestResponseBody> listener);
nik9000 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Serializes approximately as many bytes of the response as request by {@code sizeHint} to a {@link ReleasableBytesReference} that
* is created from buffers backed by the given {@code recycler}.
Expand Down Expand Up @@ -102,6 +118,16 @@ public boolean isDone() {
return serialization.hasNext() == false;
}

@Override
public boolean isEndOfResponse() {
return true;
}

@Override
public void getContinuation(ActionListener<ChunkedRestResponseBody> listener) {
assert false : "no continuations";
ywangd marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> recycler) throws IOException {
try {
Expand Down Expand Up @@ -180,6 +206,16 @@ public boolean isDone() {
return chunkIterator.hasNext() == false;
}

@Override
public boolean isEndOfResponse() {
return true;
}

@Override
public void getContinuation(ActionListener<ChunkedRestResponseBody> listener) {
assert false : "no continuations";
ywangd marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> recycler) throws IOException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.rest;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.recycler.Recycler;

Expand All @@ -30,6 +31,16 @@ public boolean isDone() {
return inner.isDone();
}

@Override
public boolean isEndOfResponse() {
return inner.isEndOfResponse();
}

@Override
public void getContinuation(ActionListener<ChunkedRestResponseBody> listener) {
inner.getContinuation(listener.map(continuation -> new LoggingChunkedRestResponseBody(continuation, loggerStream)));
}

@Override
public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> recycler) throws IOException {
var chunk = inner.encodeChunk(sizeHint, recycler);
Expand Down
14 changes: 13 additions & 1 deletion server/src/main/java/org/elasticsearch/rest/RestController.java
Original file line number Diff line number Diff line change
Expand Up @@ -907,11 +907,23 @@ public boolean isDone() {
return delegate.isDone();
}

@Override
public boolean isEndOfResponse() {
return delegate.isEndOfResponse();
}

@Override
public void getContinuation(ActionListener<ChunkedRestResponseBody> listener) {
delegate.getContinuation(
listener.map(continuation -> new EncodedLengthTrackingChunkedRestResponseBody(continuation, responseLengthRecorder))
);
}

@Override
public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> recycler) throws IOException {
final ReleasableBytesReference bytesReference = delegate.encodeChunk(sizeHint, recycler);
responseLengthRecorder.addChunkLength(bytesReference.length());
if (isDone()) {
if (isDone() && isEndOfResponse()) {
responseLengthRecorder.close();
}
return bytesReference;
Expand Down
Loading