Skip to content

Commit

Permalink
Rename to ChunkedRestResponseBodyPart
Browse files Browse the repository at this point in the history
Follow-up to elastic#104851 to rename some symbols to reflect that the class
formerly known as a `ChunkedRestResponseBody` may now only be _part_ of
the whole response body.
  • Loading branch information
DaveCTurner committed May 27, 2024
1 parent 6312d49 commit 6e00966
Show file tree
Hide file tree
Showing 29 changed files with 195 additions and 187 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.ChunkedRestResponseBody;
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
Expand Down Expand Up @@ -370,31 +370,31 @@ public void writeTo(StreamOutput out) {
TransportAction.localOnly();
}

public ChunkedRestResponseBody getChunkedBody() {
return getChunkBatch(0);
public ChunkedRestResponseBodyPart getFirstResponseBodyPart() {
return getResponseBodyPart(0);
}

private ChunkedRestResponseBody getChunkBatch(int batchIndex) {
private ChunkedRestResponseBodyPart getResponseBodyPart(int batchIndex) {
if (batchIndex == failIndex && randomBoolean()) {
throw new ElasticsearchException("simulated failure creating next batch");
}
return new ChunkedRestResponseBody() {
return new ChunkedRestResponseBodyPart() {

private final Iterator<String> lines = Iterators.forRange(0, 3, i -> "batch-" + batchIndex + "-chunk-" + i + "\n");

@Override
public boolean isDone() {
public boolean isPartComplete() {
return lines.hasNext() == false;
}

@Override
public boolean isEndOfResponse() {
public boolean isLastPart() {
return batchIndex == 2;
}

@Override
public void getContinuation(ActionListener<ChunkedRestResponseBody> listener) {
executor.execute(ActionRunnable.supply(listener, () -> getChunkBatch(batchIndex + 1)));
public void getNextPart(ActionListener<ChunkedRestResponseBodyPart> listener) {
executor.execute(ActionRunnable.supply(listener, () -> getResponseBodyPart(batchIndex + 1)));
}

@Override
Expand Down Expand Up @@ -486,11 +486,12 @@ public void accept(RestChannel channel) {
@Override
protected void processResponse(Response response) {
try {
final var responseBody = response.getChunkedBody(); // might fail, so do this before acquiring ref
final var responseBody = response.getFirstResponseBodyPart();
// preceding line might fail, so needs to be done before acquiring the sendResponse ref
refs.mustIncRef();
channel.sendResponse(RestResponse.chunked(RestStatus.OK, responseBody, refs::decRef));
} finally {
refs.decRef();
refs.decRef(); // release the ref acquired at the top of accept()
}
}
});
Expand Down Expand Up @@ -534,26 +535,26 @@ public void writeTo(StreamOutput out) {
TransportAction.localOnly();
}

public ChunkedRestResponseBody getChunkedBody() {
return new ChunkedRestResponseBody() {
public ChunkedRestResponseBodyPart getResponseBodyPart() {
return new ChunkedRestResponseBodyPart() {
private final Iterator<String> lines = Iterators.single("infinite response\n");

@Override
public boolean isDone() {
public boolean isPartComplete() {
return lines.hasNext() == false;
}

@Override
public boolean isEndOfResponse() {
public boolean isLastPart() {
return false;
}

@Override
public void getContinuation(ActionListener<ChunkedRestResponseBody> listener) {
public void getNextPart(ActionListener<ChunkedRestResponseBodyPart> listener) {
computingContinuation = true;
executor.execute(ActionRunnable.supply(listener, () -> {
computingContinuation = false;
return getChunkedBody();
return getResponseBodyPart();
}));
}

Expand Down Expand Up @@ -628,7 +629,7 @@ public void accept(RestChannel channel) {
client.execute(TYPE, new Request(), new RestActionListener<>(channel) {
@Override
protected void processResponse(Response response) {
channel.sendResponse(RestResponse.chunked(RestStatus.OK, response.getChunkedBody(), () -> {
channel.sendResponse(RestResponse.chunked(RestStatus.OK, response.getResponseBodyPart(), () -> {
// cancellation notification only happens while processing a continuation, not while computing
// the next one; prompt cancellation requires use of something like RestCancellableNodeClient
assertFalse(response.computingContinuation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.ChunkedRestResponseBody;
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
Expand Down Expand Up @@ -245,19 +245,19 @@ public BytesReference next() {
private static void sendChunksResponse(RestChannel channel, Iterator<BytesReference> chunkIterator) {
final var localRefs = refs; // single volatile read
if (localRefs != null && localRefs.tryIncRef()) {
channel.sendResponse(RestResponse.chunked(RestStatus.OK, new ChunkedRestResponseBody() {
channel.sendResponse(RestResponse.chunked(RestStatus.OK, new ChunkedRestResponseBodyPart() {
@Override
public boolean isDone() {
public boolean isPartComplete() {
return chunkIterator.hasNext() == false;
}

@Override
public boolean isEndOfResponse() {
public boolean isLastPart() {
return true;
}

@Override
public void getContinuation(ActionListener<ChunkedRestResponseBody> listener) {
public void getNextPart(ActionListener<ChunkedRestResponseBodyPart> listener) {
assert false : "no continuations";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.ChunkedRestResponseBody;
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.rest.RestRequest;
Expand Down Expand Up @@ -243,21 +243,21 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
throw new IllegalArgumentException("[" + FAIL_AFTER_BYTES_PARAM + "] must be present and non-negative");
}
return channel -> randomExecutor(client.threadPool()).execute(
() -> channel.sendResponse(RestResponse.chunked(RestStatus.OK, new ChunkedRestResponseBody() {
() -> channel.sendResponse(RestResponse.chunked(RestStatus.OK, new ChunkedRestResponseBodyPart() {
int bytesRemaining = failAfterBytes;

@Override
public boolean isDone() {
public boolean isPartComplete() {
return false;
}

@Override
public boolean isEndOfResponse() {
public boolean isLastPart() {
return true;
}

@Override
public void getContinuation(ActionListener<ChunkedRestResponseBody> listener) {
public void getNextPart(ActionListener<ChunkedRestResponseBodyPart> listener) {
fail("no continuations here");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@

import io.netty.util.concurrent.PromiseCombiner;

import org.elasticsearch.rest.ChunkedRestResponseBody;
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;

final class Netty4ChunkedHttpContinuation implements Netty4HttpResponse {
private final int sequence;
private final ChunkedRestResponseBody body;
private final ChunkedRestResponseBodyPart bodyPart;
private final PromiseCombiner combiner;

Netty4ChunkedHttpContinuation(int sequence, ChunkedRestResponseBody body, PromiseCombiner combiner) {
Netty4ChunkedHttpContinuation(int sequence, ChunkedRestResponseBodyPart bodyPart, PromiseCombiner combiner) {
this.sequence = sequence;
this.body = body;
this.bodyPart = bodyPart;
this.combiner = combiner;
}

Expand All @@ -28,8 +28,8 @@ public int getSequence() {
return sequence;
}

public ChunkedRestResponseBody body() {
return body;
public ChunkedRestResponseBodyPart bodyPart() {
return bodyPart;
}

public PromiseCombiner combiner() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import io.netty.handler.codec.http.HttpVersion;

import org.elasticsearch.http.HttpResponse;
import org.elasticsearch.rest.ChunkedRestResponseBody;
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
import org.elasticsearch.rest.RestStatus;

/**
Expand All @@ -23,16 +23,16 @@ final class Netty4ChunkedHttpResponse extends DefaultHttpResponse implements Net

private final int sequence;

private final ChunkedRestResponseBody body;
private final ChunkedRestResponseBodyPart firstBodyPart;

Netty4ChunkedHttpResponse(int sequence, HttpVersion version, RestStatus status, ChunkedRestResponseBody body) {
Netty4ChunkedHttpResponse(int sequence, HttpVersion version, RestStatus status, ChunkedRestResponseBodyPart firstBodyPart) {
super(version, HttpResponseStatus.valueOf(status.getStatus()));
this.sequence = sequence;
this.body = body;
this.firstBodyPart = firstBodyPart;
}

public ChunkedRestResponseBody body() {
return body;
public ChunkedRestResponseBodyPart firstBodyPart() {
return firstBodyPart;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.elasticsearch.core.Booleans;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.rest.ChunkedRestResponseBody;
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
import org.elasticsearch.transport.Transports;
import org.elasticsearch.transport.netty4.Netty4Utils;
import org.elasticsearch.transport.netty4.Netty4WriteThrottlingHandler;
Expand All @@ -58,7 +58,7 @@ public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler {
private final int maxEventsHeld;
private final PriorityQueue<Tuple<? extends Netty4HttpResponse, ChannelPromise>> outboundHoldingQueue;

private record ChunkedWrite(PromiseCombiner combiner, ChannelPromise onDone, ChunkedRestResponseBody responseBody) {}
private record ChunkedWrite(PromiseCombiner combiner, ChannelPromise onDone, ChunkedRestResponseBodyPart responseBodyPart) {}

/**
* The current {@link ChunkedWrite} if a chunked write is executed at the moment.
Expand Down Expand Up @@ -214,9 +214,9 @@ private void doWriteChunkedResponse(ChannelHandlerContext ctx, Netty4ChunkedHttp
final PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
final ChannelPromise first = ctx.newPromise();
combiner.add((Future<Void>) first);
final var responseBody = readyResponse.body();
final var firstBodyPart = readyResponse.firstBodyPart();
assert currentChunkedWrite == null;
currentChunkedWrite = new ChunkedWrite(combiner, promise, responseBody);
currentChunkedWrite = new ChunkedWrite(combiner, promise, firstBodyPart);
if (enqueueWrite(ctx, readyResponse, first)) {
// We were able to write out the first chunk directly, try writing out subsequent chunks until the channel becomes unwritable.
// NB "writable" means there's space in the downstream ChannelOutboundBuffer, we aren't trying to saturate the physical channel.
Expand All @@ -232,9 +232,10 @@ private void doWriteChunkedResponse(ChannelHandlerContext ctx, Netty4ChunkedHttp
private void doWriteChunkedContinuation(ChannelHandlerContext ctx, Netty4ChunkedHttpContinuation continuation, ChannelPromise promise) {
final PromiseCombiner combiner = continuation.combiner();
assert currentChunkedWrite == null;
final var responseBody = continuation.body();
assert responseBody.isDone() == false : "response with continuations must have at least one (possibly-empty) chunk in each part";
currentChunkedWrite = new ChunkedWrite(combiner, promise, responseBody);
final var bodyPart = continuation.bodyPart();
assert bodyPart.isPartComplete() == false
: "response with continuations must have at least one (possibly-empty) chunk in each part";
currentChunkedWrite = new ChunkedWrite(combiner, promise, bodyPart);
// NB "writable" means there's space in the downstream ChannelOutboundBuffer, we aren't trying to saturate the physical channel.
while (ctx.channel().isWritable()) {
if (writeChunk(ctx, currentChunkedWrite)) {
Expand All @@ -251,17 +252,17 @@ private void finishChunkedWrite() {
}
final var finishingWrite = currentChunkedWrite;
currentChunkedWrite = null;
final var finishingWriteBody = finishingWrite.responseBody();
assert finishingWriteBody.isDone();
final var endOfResponse = finishingWriteBody.isEndOfResponse();
final var finishingWriteBodyPart = finishingWrite.responseBodyPart();
assert finishingWriteBodyPart.isPartComplete();
final var endOfResponse = finishingWriteBodyPart.isLastPart();
if (endOfResponse) {
writeSequence++;
finishingWrite.combiner().finish(finishingWrite.onDone());
} else {
final var channel = finishingWrite.onDone().channel();
ActionListener.run(ActionListener.assertOnce(new ActionListener<>() {
@Override
public void onResponse(ChunkedRestResponseBody continuation) {
public void onResponse(ChunkedRestResponseBodyPart continuation) {
channel.writeAndFlush(
new Netty4ChunkedHttpContinuation(writeSequence, continuation, finishingWrite.combiner()),
finishingWrite.onDone() // pass the terminal listener/promise along the line
Expand Down Expand Up @@ -296,7 +297,7 @@ private void checkShutdown() {
}
}

}), finishingWriteBody::getContinuation);
}), finishingWriteBodyPart::getNextPart);
}
}

Expand Down Expand Up @@ -374,22 +375,22 @@ private boolean doFlush(ChannelHandlerContext ctx) throws IOException {
}

private boolean writeChunk(ChannelHandlerContext ctx, ChunkedWrite chunkedWrite) {
final var body = chunkedWrite.responseBody();
final var bodyPart = chunkedWrite.responseBodyPart();
final var combiner = chunkedWrite.combiner();
assert body.isDone() == false : "should not continue to try and serialize once done";
assert bodyPart.isPartComplete() == false : "should not continue to try and serialize once done";
final ReleasableBytesReference bytes;
try {
bytes = body.encodeChunk(Netty4WriteThrottlingHandler.MAX_BYTES_PER_WRITE, serverTransport.recycler());
bytes = bodyPart.encodeChunk(Netty4WriteThrottlingHandler.MAX_BYTES_PER_WRITE, serverTransport.recycler());
} catch (Exception e) {
return handleChunkingFailure(ctx, chunkedWrite, e);
}
final ByteBuf content = Netty4Utils.toByteBuf(bytes);
final boolean done = body.isDone();
final boolean lastChunk = done && body.isEndOfResponse();
final ChannelFuture f = ctx.write(lastChunk ? new DefaultLastHttpContent(content) : new DefaultHttpContent(content));
final boolean isPartComplete = bodyPart.isPartComplete();
final boolean isBodyComplete = isPartComplete && bodyPart.isLastPart();
final ChannelFuture f = ctx.write(isBodyComplete ? new DefaultLastHttpContent(content) : new DefaultHttpContent(content));
f.addListener(ignored -> bytes.close());
combiner.add(f);
return done;
return isPartComplete;
}

private boolean handleChunkingFailure(ChannelHandlerContext ctx, ChunkedWrite chunkedWrite, Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.http.HttpRequest;
import org.elasticsearch.http.HttpResponse;
import org.elasticsearch.rest.ChunkedRestResponseBody;
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.netty4.Netty4Utils;
Expand Down Expand Up @@ -176,8 +176,8 @@ public Netty4FullHttpResponse createResponse(RestStatus status, BytesReference c
}

@Override
public HttpResponse createResponse(RestStatus status, ChunkedRestResponseBody content) {
return new Netty4ChunkedHttpResponse(sequence, request.protocolVersion(), status, content);
public HttpResponse createResponse(RestStatus status, ChunkedRestResponseBodyPart firstBodyPart) {
return new Netty4ChunkedHttpResponse(sequence, request.protocolVersion(), status, firstBodyPart);
}

@Override
Expand Down
Loading

0 comments on commit 6e00966

Please sign in to comment.