Skip to content

Commit

Permalink
Rename to ChunkedRestResponseBodyPart
Browse files Browse the repository at this point in the history
Follow-up to elastic#104851 to rename some symbols to reflect that the class
formerly known as a `ChunkedRestResponseBody` may now only be _part_ of
the whole response body.
  • Loading branch information
DaveCTurner committed May 25, 2024
1 parent a2c385e commit 1526ead
Show file tree
Hide file tree
Showing 29 changed files with 161 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.ChunkedRestResponseBody;
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
Expand Down Expand Up @@ -370,30 +370,30 @@ public void writeTo(StreamOutput out) {
TransportAction.localOnly();
}

public ChunkedRestResponseBody getChunkedBody() {
public ChunkedRestResponseBodyPart getChunkedBody() {
return getChunkBatch(0);
}

private ChunkedRestResponseBody getChunkBatch(int batchIndex) {
private ChunkedRestResponseBodyPart getChunkBatch(int batchIndex) {
if (batchIndex == failIndex && randomBoolean()) {
throw new ElasticsearchException("simulated failure creating next batch");
}
return new ChunkedRestResponseBody() {
return new ChunkedRestResponseBodyPart() {

private final Iterator<String> lines = Iterators.forRange(0, 3, i -> "batch-" + batchIndex + "-chunk-" + i + "\n");

@Override
public boolean isDone() {
public boolean isPartComplete() {
return lines.hasNext() == false;
}

@Override
public boolean isEndOfResponse() {
public boolean isLastPart() {
return batchIndex == 2;
}

@Override
public void getContinuation(ActionListener<ChunkedRestResponseBody> listener) {
public void getNextPart(ActionListener<ChunkedRestResponseBodyPart> listener) {
executor.execute(ActionRunnable.supply(listener, () -> getChunkBatch(batchIndex + 1)));
}

Expand Down Expand Up @@ -534,22 +534,22 @@ public void writeTo(StreamOutput out) {
TransportAction.localOnly();
}

public ChunkedRestResponseBody getChunkedBody() {
return new ChunkedRestResponseBody() {
public ChunkedRestResponseBodyPart getChunkedBody() {
return new ChunkedRestResponseBodyPart() {
private final Iterator<String> lines = Iterators.single("infinite response\n");

@Override
public boolean isDone() {
public boolean isPartComplete() {
return lines.hasNext() == false;
}

@Override
public boolean isEndOfResponse() {
public boolean isLastPart() {
return false;
}

@Override
public void getContinuation(ActionListener<ChunkedRestResponseBody> listener) {
public void getNextPart(ActionListener<ChunkedRestResponseBodyPart> listener) {
computingContinuation = true;
executor.execute(ActionRunnable.supply(listener, () -> {
computingContinuation = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.ChunkedRestResponseBody;
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
Expand Down Expand Up @@ -245,19 +245,19 @@ public BytesReference next() {
private static void sendChunksResponse(RestChannel channel, Iterator<BytesReference> chunkIterator) {
final var localRefs = refs; // single volatile read
if (localRefs != null && localRefs.tryIncRef()) {
channel.sendResponse(RestResponse.chunked(RestStatus.OK, new ChunkedRestResponseBody() {
channel.sendResponse(RestResponse.chunked(RestStatus.OK, new ChunkedRestResponseBodyPart() {
@Override
public boolean isDone() {
public boolean isPartComplete() {
return chunkIterator.hasNext() == false;
}

@Override
public boolean isEndOfResponse() {
public boolean isLastPart() {
return true;
}

@Override
public void getContinuation(ActionListener<ChunkedRestResponseBody> listener) {
public void getNextPart(ActionListener<ChunkedRestResponseBodyPart> listener) {
assert false : "no continuations";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.ChunkedRestResponseBody;
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.rest.RestRequest;
Expand Down Expand Up @@ -243,21 +243,21 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
throw new IllegalArgumentException("[" + FAIL_AFTER_BYTES_PARAM + "] must be present and non-negative");
}
return channel -> randomExecutor(client.threadPool()).execute(
() -> channel.sendResponse(RestResponse.chunked(RestStatus.OK, new ChunkedRestResponseBody() {
() -> channel.sendResponse(RestResponse.chunked(RestStatus.OK, new ChunkedRestResponseBodyPart() {
int bytesRemaining = failAfterBytes;

@Override
public boolean isDone() {
public boolean isPartComplete() {
return false;
}

@Override
public boolean isEndOfResponse() {
public boolean isLastPart() {
return true;
}

@Override
public void getContinuation(ActionListener<ChunkedRestResponseBody> listener) {
public void getNextPart(ActionListener<ChunkedRestResponseBodyPart> listener) {
fail("no continuations here");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@

import io.netty.util.concurrent.PromiseCombiner;

import org.elasticsearch.rest.ChunkedRestResponseBody;
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;

final class Netty4ChunkedHttpContinuation implements Netty4HttpResponse {
private final int sequence;
private final ChunkedRestResponseBody body;
private final ChunkedRestResponseBodyPart body;
private final PromiseCombiner combiner;

Netty4ChunkedHttpContinuation(int sequence, ChunkedRestResponseBody body, PromiseCombiner combiner) {
Netty4ChunkedHttpContinuation(int sequence, ChunkedRestResponseBodyPart body, PromiseCombiner combiner) {
this.sequence = sequence;
this.body = body;
this.combiner = combiner;
Expand All @@ -28,7 +28,7 @@ public int getSequence() {
return sequence;
}

public ChunkedRestResponseBody body() {
public ChunkedRestResponseBodyPart body() {
return body;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import io.netty.handler.codec.http.HttpVersion;

import org.elasticsearch.http.HttpResponse;
import org.elasticsearch.rest.ChunkedRestResponseBody;
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
import org.elasticsearch.rest.RestStatus;

/**
Expand All @@ -23,15 +23,15 @@ final class Netty4ChunkedHttpResponse extends DefaultHttpResponse implements Net

private final int sequence;

private final ChunkedRestResponseBody body;
private final ChunkedRestResponseBodyPart body;

Netty4ChunkedHttpResponse(int sequence, HttpVersion version, RestStatus status, ChunkedRestResponseBody body) {
Netty4ChunkedHttpResponse(int sequence, HttpVersion version, RestStatus status, ChunkedRestResponseBodyPart body) {
super(version, HttpResponseStatus.valueOf(status.getStatus()));
this.sequence = sequence;
this.body = body;
}

public ChunkedRestResponseBody body() {
public ChunkedRestResponseBodyPart body() {
return body;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.elasticsearch.core.Booleans;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.rest.ChunkedRestResponseBody;
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
import org.elasticsearch.transport.Transports;
import org.elasticsearch.transport.netty4.Netty4Utils;
import org.elasticsearch.transport.netty4.Netty4WriteThrottlingHandler;
Expand All @@ -58,7 +58,7 @@ public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler {
private final int maxEventsHeld;
private final PriorityQueue<Tuple<? extends Netty4HttpResponse, ChannelPromise>> outboundHoldingQueue;

private record ChunkedWrite(PromiseCombiner combiner, ChannelPromise onDone, ChunkedRestResponseBody responseBody) {}
private record ChunkedWrite(PromiseCombiner combiner, ChannelPromise onDone, ChunkedRestResponseBodyPart responseBody) {}

/**
* The current {@link ChunkedWrite} if a chunked write is executed at the moment.
Expand Down Expand Up @@ -233,7 +233,8 @@ private void doWriteChunkedContinuation(ChannelHandlerContext ctx, Netty4Chunked
final PromiseCombiner combiner = continuation.combiner();
assert currentChunkedWrite == null;
final var responseBody = continuation.body();
assert responseBody.isDone() == false : "response with continuations must have at least one (possibly-empty) chunk in each part";
assert responseBody.isPartComplete() == false
: "response with continuations must have at least one (possibly-empty) chunk in each part";
currentChunkedWrite = new ChunkedWrite(combiner, promise, responseBody);
// NB "writable" means there's space in the downstream ChannelOutboundBuffer, we aren't trying to saturate the physical channel.
while (ctx.channel().isWritable()) {
Expand All @@ -252,16 +253,16 @@ private void finishChunkedWrite() {
final var finishingWrite = currentChunkedWrite;
currentChunkedWrite = null;
final var finishingWriteBody = finishingWrite.responseBody();
assert finishingWriteBody.isDone();
final var endOfResponse = finishingWriteBody.isEndOfResponse();
assert finishingWriteBody.isPartComplete();
final var endOfResponse = finishingWriteBody.isLastPart();
if (endOfResponse) {
writeSequence++;
finishingWrite.combiner().finish(finishingWrite.onDone());
} else {
final var channel = finishingWrite.onDone().channel();
ActionListener.run(ActionListener.assertOnce(new ActionListener<>() {
@Override
public void onResponse(ChunkedRestResponseBody continuation) {
public void onResponse(ChunkedRestResponseBodyPart continuation) {
channel.writeAndFlush(
new Netty4ChunkedHttpContinuation(writeSequence, continuation, finishingWrite.combiner()),
finishingWrite.onDone() // pass the terminal listener/promise along the line
Expand Down Expand Up @@ -296,7 +297,7 @@ private void checkShutdown() {
}
}

}), finishingWriteBody::getContinuation);
}), finishingWriteBody::getNextPart);
}
}

Expand Down Expand Up @@ -376,16 +377,16 @@ private boolean doFlush(ChannelHandlerContext ctx) throws IOException {
private boolean writeChunk(ChannelHandlerContext ctx, ChunkedWrite chunkedWrite) {
final var body = chunkedWrite.responseBody();
final var combiner = chunkedWrite.combiner();
assert body.isDone() == false : "should not continue to try and serialize once done";
assert body.isPartComplete() == false : "should not continue to try and serialize once done";
final ReleasableBytesReference bytes;
try {
bytes = body.encodeChunk(Netty4WriteThrottlingHandler.MAX_BYTES_PER_WRITE, serverTransport.recycler());
} catch (Exception e) {
return handleChunkingFailure(ctx, chunkedWrite, e);
}
final ByteBuf content = Netty4Utils.toByteBuf(bytes);
final boolean done = body.isDone();
final boolean lastChunk = done && body.isEndOfResponse();
final boolean done = body.isPartComplete();
final boolean lastChunk = done && body.isLastPart();
final ChannelFuture f = ctx.write(lastChunk ? new DefaultLastHttpContent(content) : new DefaultHttpContent(content));
f.addListener(ignored -> bytes.close());
combiner.add(f);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.http.HttpRequest;
import org.elasticsearch.http.HttpResponse;
import org.elasticsearch.rest.ChunkedRestResponseBody;
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.netty4.Netty4Utils;
Expand Down Expand Up @@ -176,7 +176,7 @@ public Netty4FullHttpResponse createResponse(RestStatus status, BytesReference c
}

@Override
public HttpResponse createResponse(RestStatus status, ChunkedRestResponseBody content) {
public HttpResponse createResponse(RestStatus status, ChunkedRestResponseBodyPart content) {
return new Netty4ChunkedHttpResponse(sequence, request.protocolVersion(), status, content);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.elasticsearch.common.bytes.ZeroBytesReference;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.http.HttpResponse;
import org.elasticsearch.rest.ChunkedRestResponseBody;
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.netty4.Netty4Utils;
Expand Down Expand Up @@ -502,23 +502,23 @@ protected void handlePipelinedRequest(ChannelHandlerContext ctx, Netty4HttpReque
};
}

private static ChunkedRestResponseBody getRepeatedChunkResponseBody(int chunkCount, BytesReference chunk) {
return new ChunkedRestResponseBody() {
private static ChunkedRestResponseBodyPart getRepeatedChunkResponseBody(int chunkCount, BytesReference chunk) {
return new ChunkedRestResponseBodyPart() {

private int remaining = chunkCount;

@Override
public boolean isDone() {
public boolean isPartComplete() {
return remaining == 0;
}

@Override
public boolean isEndOfResponse() {
public boolean isLastPart() {
return true;
}

@Override
public void getContinuation(ActionListener<ChunkedRestResponseBody> listener) {
public void getNextPart(ActionListener<ChunkedRestResponseBodyPart> listener) {
fail("no continuations here");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
import org.elasticsearch.http.NullDispatcher;
import org.elasticsearch.http.netty4.internal.HttpHeadersAuthenticatorUtils;
import org.elasticsearch.http.netty4.internal.HttpValidator;
import org.elasticsearch.rest.ChunkedRestResponseBody;
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
Expand Down Expand Up @@ -692,7 +692,7 @@ public void testHeadRequestToChunkedApi() throws InterruptedException {
public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) {
try {
channel.sendResponse(
RestResponse.chunked(OK, ChunkedRestResponseBody.fromXContent(ignored -> Iterators.single((builder, params) -> {
RestResponse.chunked(OK, ChunkedRestResponseBodyPart.fromXContent(ignored -> Iterators.single((builder, params) -> {
throw new AssertionError("should not be called for HEAD REQUEST");
}), ToXContent.EMPTY_PARAMS, channel), null)
);
Expand Down Expand Up @@ -1048,7 +1048,7 @@ public void dispatchRequest(final RestRequest request, final RestChannel channel
assertEquals(request.uri(), url);
final var response = RestResponse.chunked(
OK,
ChunkedRestResponseBody.fromTextChunks(RestResponse.TEXT_CONTENT_TYPE, Collections.emptyIterator()),
ChunkedRestResponseBodyPart.fromTextChunks(RestResponse.TEXT_CONTENT_TYPE, Collections.emptyIterator()),
responseReleasedLatch::countDown
);
transportClosedFuture.addListener(ActionListener.running(() -> channel.sendResponse(response)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
return channel -> {
final var response = RestResponse.chunked(
RestStatus.OK,
ChunkedRestResponseBody.fromXContent(
ChunkedRestResponseBodyPart.fromXContent(
params -> Iterators.single((b, p) -> b.startObject().endObject()),
request,
channel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.rest.AbstractRestChannel;
import org.elasticsearch.rest.ChunkedRestResponseBody;
import org.elasticsearch.rest.LoggingChunkedRestResponseBody;
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
import org.elasticsearch.rest.LoggingChunkedRestResponseBodyPart;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
Expand Down Expand Up @@ -113,7 +113,7 @@ public void sendResponse(RestResponse restResponse) {
try {
final HttpResponse httpResponse;
if (isHeadRequest == false && restResponse.isChunked()) {
ChunkedRestResponseBody chunkedContent = restResponse.chunkedContent();
ChunkedRestResponseBodyPart chunkedContent = restResponse.chunkedContent();
if (httpLogger != null && httpLogger.isBodyTracerEnabled()) {
final var loggerStream = httpLogger.openResponseBodyLoggingStream(request.getRequestId());
toClose.add(() -> {
Expand All @@ -123,7 +123,7 @@ public void sendResponse(RestResponse restResponse) {
assert false : e; // nothing much to go wrong here
}
});
chunkedContent = new LoggingChunkedRestResponseBody(chunkedContent, loggerStream);
chunkedContent = new LoggingChunkedRestResponseBodyPart(chunkedContent, loggerStream);
}

httpResponse = httpRequest.createResponse(restResponse.status(), chunkedContent);
Expand Down
Loading

0 comments on commit 1526ead

Please sign in to comment.