Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make ChunkedRestResponseBody extend Releasable #99871

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,9 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> rec
public String getResponseContentTypeString() {
return "application/octet-stream";
}

@Override
public void close() {}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public void sendResponse(RestResponse restResponse) {
final HttpResponse httpResponse;
if (isHeadRequest == false && restResponse.isChunked()) {
ChunkedRestResponseBody chunkedContent = restResponse.chunkedContent();
toClose.add(chunkedContent);
if (httpLogger != null && httpLogger.isBodyTracerEnabled()) {
final var loggerStream = httpLogger.openResponseBodyLoggingStream(request.getRequestId());
toClose.add(() -> {
Expand All @@ -132,8 +133,10 @@ public void sendResponse(RestResponse restResponse) {
httpResponse = httpRequest.createResponse(restResponse.status(), chunkedContent);
} else {
final BytesReference content = restResponse.content();
if (content instanceof Releasable) {
toClose.add((Releasable) content);
if (content instanceof Releasable releasable) {
toClose.add(releasable);
} else if (restResponse.isChunked()) {
toClose.add(restResponse.chunkedContent());
}
toClose.add(this::releaseOutputBuffer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.core.Streams;
Expand All @@ -32,7 +34,7 @@
* The body of a rest response that uses chunked HTTP encoding. Implementations are used to avoid materializing full responses on heap and
* instead serialize only as much of the response as can be flushed to the network right away.
*/
public interface ChunkedRestResponseBody {
public interface ChunkedRestResponseBody extends Releasable {

/**
* @return true once this response has been written fully.
Expand Down Expand Up @@ -62,9 +64,29 @@ public interface ChunkedRestResponseBody {
* @param params parameters to use for serialization
* @param channel channel the response will be written to
* @return chunked rest response body
* @deprecated Use {@link #fromXContent(ChunkedToXContent, ToXContent.Params, RestChannel, Releasable)} instead.
*/
@Deprecated(forRemoval = true)
static ChunkedRestResponseBody fromXContent(ChunkedToXContent chunkedToXContent, ToXContent.Params params, RestChannel channel)
throws IOException {
return fromXContent(chunkedToXContent, params, channel, null);
}

/**
* Create a chunked response body to be written to a specific {@link RestChannel} from a {@link ChunkedToXContent}.
*
* @param chunkedToXContent chunked x-content instance to serialize
* @param params parameters to use for serialization
* @param channel channel the response will be written to
* @param releasable resource to release when the response is fully sent, or {@code null} if nothing to release
* @return chunked rest response body
*/
static ChunkedRestResponseBody fromXContent(
ChunkedToXContent chunkedToXContent,
ToXContent.Params params,
RestChannel channel,
@Nullable Releasable releasable
) throws IOException {

return new ChunkedRestResponseBody() {

Expand Down Expand Up @@ -132,14 +154,34 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> rec
public String getResponseContentTypeString() {
return builder.getResponseContentTypeString();
}

@Override
public void close() {
Releasables.closeExpectNoException(releasable);
}
};
}

/**
* Create a chunked response body to be written to a specific {@link RestChannel} from a stream of text chunks, each represented as a
* consumer of a {@link Writer}. The last chunk that the iterator yields must write at least one byte.
*
* @deprecated Use {@link #fromTextChunks(String, Iterator, Releasable)} instead.
*/
@Deprecated(forRemoval = true)
static ChunkedRestResponseBody fromTextChunks(String contentType, Iterator<CheckedConsumer<Writer, IOException>> chunkIterator) {
return fromTextChunks(contentType, chunkIterator, null);
}

/**
* Create a chunked response body to be written to a specific {@link RestChannel} from a stream of text chunks, each represented as a
* consumer of a {@link Writer}. The last chunk that the iterator yields must write at least one byte.
*/
static ChunkedRestResponseBody fromTextChunks(
String contentType,
Iterator<CheckedConsumer<Writer, IOException>> chunkIterator,
@Nullable Releasable releasable
) {
return new ChunkedRestResponseBody() {
private RecyclerBytesStreamOutput currentOutput;
private final Writer writer = new OutputStreamWriter(new OutputStream() {
Expand Down Expand Up @@ -209,6 +251,11 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> rec
public String getResponseContentTypeString() {
return contentType;
}

@Override
public void close() {
Releasables.closeExpectNoException(releasable);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,9 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> rec
public String getResponseContentTypeString() {
return inner.getResponseContentTypeString();
}

@Override
public void close() {
inner.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.xcontent.ToXContent;
Expand Down Expand Up @@ -81,7 +82,11 @@ public RestResponse(RestStatus status, String responseMediaType, BytesReference

public static RestResponse chunked(RestStatus restStatus, ChunkedRestResponseBody content) {
if (content.isDone()) {
return new RestResponse(restStatus, content.getResponseContentTypeString(), BytesArray.EMPTY);
return new RestResponse(
restStatus,
content.getResponseContentTypeString(),
new ReleasableBytesReference(BytesArray.EMPTY, content)
);
} else {
return new RestResponse(restStatus, content.getResponseContentTypeString(), null, content);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.test.ActionListenerUtils.anyActionListener;
Expand Down Expand Up @@ -525,6 +526,7 @@ public void testHandleHeadRequest() {
}
{
// chunked response
final var isClosed = new AtomicBoolean();
channel.sendResponse(RestResponse.chunked(RestStatus.OK, new ChunkedRestResponseBody() {

@Override
Expand All @@ -541,11 +543,28 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> rec
public String getResponseContentTypeString() {
return RestResponse.TEXT_CONTENT_TYPE;
}

@Override
public void close() {
assertTrue(isClosed.compareAndSet(false, true));
}
}));
verify(httpChannel, times(2)).sendResponse(requestCaptor.capture(), any());
@SuppressWarnings("unchecked")
Class<ActionListener<Void>> listenerClass = (Class<ActionListener<Void>>) (Class<?>) ActionListener.class;
ArgumentCaptor<ActionListener<Void>> listenerCaptor = ArgumentCaptor.forClass(listenerClass);
verify(httpChannel, times(2)).sendResponse(requestCaptor.capture(), listenerCaptor.capture());
HttpResponse response = requestCaptor.getValue();
assertThat(response, instanceOf(TestHttpResponse.class));
assertThat(((TestHttpResponse) response).content().length(), equalTo(0));

ActionListener<Void> listener = listenerCaptor.getValue();
assertFalse(isClosed.get());
if (randomBoolean()) {
listener.onResponse(null);
} else {
listener.onFailure(new ClosedChannelException());
}
assertTrue(isClosed.get());
}
}

Expand Down Expand Up @@ -703,6 +722,7 @@ public HttpResponse createResponse(RestStatus status, ChunkedRestResponseBody co
)
);

final var isClosed = new AtomicBoolean();
assertEquals(
responseBody,
ChunkedLoggingStreamTests.getDecodedLoggedBody(
Expand Down Expand Up @@ -730,10 +750,16 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> rec
public String getResponseContentTypeString() {
return RestResponse.TEXT_CONTENT_TYPE;
}

@Override
public void close() {
assertTrue(isClosed.compareAndSet(false, true));
}
}))
)
);

assertTrue(isClosed.get());
}

private TestHttpResponse executeRequest(final Settings settings, final String host) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

public class ChunkedRestResponseBodyTests extends ESTestCase {

Expand All @@ -50,40 +51,59 @@ public void testEncodesChunkedXContentCorrectly() throws IOException {
}
final var bytesDirect = BytesReference.bytes(builderDirect);

final var chunkedResponse = ChunkedRestResponseBody.fromXContent(
chunkedToXContent,
ToXContent.EMPTY_PARAMS,
new FakeRestChannel(
new FakeRestRequest.Builder(xContentRegistry()).withContent(BytesArray.EMPTY, randomXContent.type()).build(),
randomBoolean(),
1
final var isClosed = new AtomicBoolean();
try (
var chunkedResponse = ChunkedRestResponseBody.fromXContent(
chunkedToXContent,
ToXContent.EMPTY_PARAMS,
new FakeRestChannel(
new FakeRestRequest.Builder(xContentRegistry()).withContent(BytesArray.EMPTY, randomXContent.type()).build(),
randomBoolean(),
1
),
() -> assertTrue(isClosed.compareAndSet(false, true))
)
);
) {

final List<BytesReference> refsGenerated = new ArrayList<>();
while (chunkedResponse.isDone() == false) {
refsGenerated.add(chunkedResponse.encodeChunk(randomIntBetween(2, 10), BytesRefRecycler.NON_RECYCLING_INSTANCE));
}
final List<BytesReference> refsGenerated = new ArrayList<>();
while (chunkedResponse.isDone() == false) {
refsGenerated.add(chunkedResponse.encodeChunk(randomIntBetween(2, 10), BytesRefRecycler.NON_RECYCLING_INSTANCE));
}

assertEquals(bytesDirect, CompositeBytesReference.of(refsGenerated.toArray(new BytesReference[0])));
assertEquals(bytesDirect, CompositeBytesReference.of(refsGenerated.toArray(new BytesReference[0])));
assertFalse(isClosed.get());
}
assertTrue(isClosed.get());
}

public void testFromTextChunks() throws IOException {
final var chunks = randomList(1000, () -> randomUnicodeOfLengthBetween(1, 100));
final var body = ChunkedRestResponseBody.fromTextChunks("text/plain", Iterators.map(chunks.iterator(), s -> w -> w.write(s)));

final List<BytesReference> refsGenerated = new ArrayList<>();
while (body.isDone() == false) {
refsGenerated.add(body.encodeChunk(randomIntBetween(2, 10), BytesRefRecycler.NON_RECYCLING_INSTANCE));
}
final BytesReference chunkedBytes = CompositeBytesReference.of(refsGenerated.toArray(new BytesReference[0]));
final var isClosed = new AtomicBoolean();
try (
var body = ChunkedRestResponseBody.fromTextChunks(
"text/plain",
Iterators.map(chunks.iterator(), s -> w -> w.write(s)),
() -> assertTrue(isClosed.compareAndSet(false, true))
)
) {
final List<BytesReference> refsGenerated = new ArrayList<>();
while (body.isDone() == false) {
refsGenerated.add(body.encodeChunk(randomIntBetween(2, 10), BytesRefRecycler.NON_RECYCLING_INSTANCE));
}
final BytesReference chunkedBytes = CompositeBytesReference.of(refsGenerated.toArray(new BytesReference[0]));

try (var outputStream = new ByteArrayOutputStream(); var writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8)) {
for (final var chunk : chunks) {
writer.write(chunk);
try (
var outputStream = new ByteArrayOutputStream();
var writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8)
) {
for (final var chunk : chunks) {
writer.write(chunk);
}
writer.flush();
assertEquals(new BytesArray(outputStream.toByteArray()), chunkedBytes);
}
writer.flush();
assertEquals(new BytesArray(outputStream.toByteArray()), chunkedBytes);
assertFalse(isClosed.get());
}
assertTrue(isClosed.get());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

This is almost exactly what I have in my branch.

}
}