Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make RestResponse releasable #104752

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -522,9 +522,6 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> rec
public String getResponseContentTypeString() {
return "application/octet-stream";
}

@Override
public void close() {}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ public void dispatchRequest(final RestRequest request, final RestChannel channel
channel.sendResponse(
RestResponse.chunked(OK, ChunkedRestResponseBody.fromXContent(ignored -> Iterators.single((builder, params) -> {
throw new AssertionError("should not be called for HEAD REQUEST");
}), ToXContent.EMPTY_PARAMS, channel, null))
}), ToXContent.EMPTY_PARAMS, channel), null)
);
} catch (IOException e) {
throw new AssertionError(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public void sendResponse(RestResponse restResponse) {
toClose.add(() -> CloseableChannel.closeChannel(httpChannel));
}
toClose.add(() -> tracer.stopTrace(request));
toClose.add(restResponse);

boolean success = false;
String opaque = null;
Expand All @@ -113,7 +114,6 @@ public void sendResponse(RestResponse restResponse) {
final HttpResponse httpResponse;
if (isHeadRequest == false && restResponse.isChunked()) {
ChunkedRestResponseBody chunkedContent = restResponse.chunkedContent();
toClose.add(chunkedContent);
if (httpLogger != null && httpLogger.isBodyTracerEnabled()) {
final var loggerStream = httpLogger.openResponseBodyLoggingStream(request.getRequestId());
toClose.add(() -> {
Expand All @@ -131,8 +131,6 @@ public void sendResponse(RestResponse restResponse) {
final BytesReference content = restResponse.content();
if (content instanceof Releasable releasable) {
toClose.add(releasable);
} else if (restResponse.isChunked()) {
toClose.add(restResponse.chunkedContent());
}
toClose.add(this::releaseOutputBuffer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.core.Streams;
Expand All @@ -36,7 +34,7 @@
* The body of a rest response that uses chunked HTTP encoding. Implementations are used to avoid materializing full responses on heap and
* instead serialize only as much of the response as can be flushed to the network right away.
*/
public interface ChunkedRestResponseBody extends Releasable {
public interface ChunkedRestResponseBody {

Logger logger = LogManager.getLogger(ChunkedRestResponseBody.class);

Expand Down Expand Up @@ -67,15 +65,10 @@ public interface ChunkedRestResponseBody extends Releasable {
* @param chunkedToXContent chunked x-content instance to serialize
* @param params parameters to use for serialization
* @param channel channel the response will be written to
* @param releasable resource to release when the response is fully sent, or {@code null} if nothing to release
* @return chunked rest response body
*/
static ChunkedRestResponseBody fromXContent(
ChunkedToXContent chunkedToXContent,
ToXContent.Params params,
RestChannel channel,
@Nullable Releasable releasable
) throws IOException {
static ChunkedRestResponseBody fromXContent(ChunkedToXContent chunkedToXContent, ToXContent.Params params, RestChannel channel)
throws IOException {

return new ChunkedRestResponseBody() {

Expand Down Expand Up @@ -146,23 +139,14 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> rec
public String getResponseContentTypeString() {
return builder.getResponseContentTypeString();
}

@Override
public void close() {
Releasables.closeExpectNoException(releasable);
}
};
}

/**
* Create a chunked response body to be written to a specific {@link RestChannel} from a stream of text chunks, each represented as a
* consumer of a {@link Writer}. The last chunk that the iterator yields must write at least one byte.
*/
static ChunkedRestResponseBody fromTextChunks(
String contentType,
Iterator<CheckedConsumer<Writer, IOException>> chunkIterator,
@Nullable Releasable releasable
) {
static ChunkedRestResponseBody fromTextChunks(String contentType, Iterator<CheckedConsumer<Writer, IOException>> chunkIterator) {
return new ChunkedRestResponseBody() {
private RecyclerBytesStreamOutput currentOutput;
private final Writer writer = new OutputStreamWriter(new OutputStream() {
Expand Down Expand Up @@ -235,11 +219,6 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> rec
public String getResponseContentTypeString() {
return contentType;
}

@Override
public void close() {
Releasables.closeExpectNoException(releasable);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,4 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> rec
public String getResponseContentTypeString() {
return inner.getResponseContentTypeString();
}

@Override
public void close() {
inner.close();
}
}
11 changes: 5 additions & 6 deletions server/src/main/java/org/elasticsearch/rest/RestController.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.core.Streams;
import org.elasticsearch.core.TimeValue;
Expand Down Expand Up @@ -824,10 +826,8 @@ public void sendResponse(RestResponse response) {
if (response.isChunked() == false) {
methodHandlers.addResponseStats(response.content().length());
} else {
response = RestResponse.chunked(
response.status(),
new EncodedLengthTrackingChunkedRestResponseBody(response.chunkedContent(), methodHandlers)
);
final var wrapped = new EncodedLengthTrackingChunkedRestResponseBody(response.chunkedContent(), methodHandlers);
response = RestResponse.chunked(response.status(), wrapped, Releasables.wrap(wrapped, response));
}
delegate.sendResponse(response);
success = true;
Expand All @@ -851,7 +851,7 @@ private void close() {
}
}

private static class EncodedLengthTrackingChunkedRestResponseBody implements ChunkedRestResponseBody {
private static class EncodedLengthTrackingChunkedRestResponseBody implements ChunkedRestResponseBody, Releasable {

private final ChunkedRestResponseBody delegate;
private final RunOnce onCompletion;
Expand Down Expand Up @@ -884,7 +884,6 @@ public String getResponseContentTypeString() {

@Override
public void close() {
delegate.close();
// the client might close the connection before we send the last chunk, in which case we won't have recorded the response in the
// stats yet, so we do it now:
onCompletion.run();
Expand Down
34 changes: 23 additions & 11 deletions server/src/main/java/org/elasticsearch/rest/RestResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;

Expand All @@ -33,7 +34,7 @@
import static org.elasticsearch.ElasticsearchException.REST_EXCEPTION_SKIP_STACK_TRACE;
import static org.elasticsearch.rest.RestController.ELASTIC_PRODUCT_HTTP_HEADER;

public final class RestResponse {
public final class RestResponse implements Releasable {

public static final String TEXT_CONTENT_TYPE = "text/plain; charset=UTF-8";

Expand All @@ -51,6 +52,9 @@ public final class RestResponse {
private final String responseMediaType;
private Map<String, List<String>> customHeaders;

@Nullable
private final Releasable releasable;

/**
* Creates a new response based on {@link XContentBuilder}.
*/
Expand All @@ -73,18 +77,18 @@ public RestResponse(RestStatus status, String responseMediaType, String content)
}

public RestResponse(RestStatus status, String responseMediaType, BytesReference content) {
this(status, responseMediaType, content, null);
this(status, responseMediaType, content, null, null);
}

private RestResponse(RestStatus status, String responseMediaType, BytesReference content, @Nullable Releasable releasable) {
this(status, responseMediaType, content, null, releasable);
}

public static RestResponse chunked(RestStatus restStatus, ChunkedRestResponseBody content) {
public static RestResponse chunked(RestStatus restStatus, ChunkedRestResponseBody content, @Nullable Releasable releasable) {
if (content.isDone()) {
return new RestResponse(
restStatus,
content.getResponseContentTypeString(),
new ReleasableBytesReference(BytesArray.EMPTY, content)
);
return new RestResponse(restStatus, content.getResponseContentTypeString(), BytesArray.EMPTY, releasable);
} else {
return new RestResponse(restStatus, content.getResponseContentTypeString(), null, content);
return new RestResponse(restStatus, content.getResponseContentTypeString(), null, content, releasable);
}
}

Expand All @@ -95,12 +99,14 @@ private RestResponse(
RestStatus status,
String responseMediaType,
@Nullable BytesReference content,
@Nullable ChunkedRestResponseBody chunkedResponseBody
@Nullable ChunkedRestResponseBody chunkedResponseBody,
@Nullable Releasable releasable
) {
this.status = status;
this.content = content;
this.responseMediaType = responseMediaType;
this.chunkedResponseBody = chunkedResponseBody;
this.releasable = releasable;
assert (content == null) != (chunkedResponseBody == null);
}

Expand Down Expand Up @@ -142,6 +148,7 @@ public RestResponse(RestChannel channel, RestStatus status, Exception e) throws
copyHeaders(((ElasticsearchException) e));
}
this.chunkedResponseBody = null;
this.releasable = null;
}

public String contentType() {
Expand Down Expand Up @@ -224,4 +231,9 @@ public Map<String, List<String>> filterHeaders(Map<String, List<String>> headers
}
return headers;
}

@Override
public void close() {
Releasables.closeExpectNoException(releasable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ protected void processResponse(Response response) throws IOException {
channel.sendResponse(
RestResponse.chunked(
getRestStatus(response),
ChunkedRestResponseBody.fromXContent(response, params, channel, releasableFromResponse(response))
ChunkedRestResponseBody.fromXContent(response, params, channel),
releasableFromResponse(response)
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
@Override
public RestResponse buildResponse(NodesHotThreadsResponse response) {
response.mustIncRef();
return RestResponse.chunked(RestStatus.OK, fromTextChunks(TEXT_CONTENT_TYPE, response.getTextChunks(), response::decRef));
return RestResponse.chunked(RestStatus.OK, fromTextChunks(TEXT_CONTENT_TYPE, response.getTextChunks()), response::decRef);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ public static RestResponse buildXContentBuilder(Table table, RestChannel channel
Iterators.single((builder, params) -> builder.endArray())
),
ToXContent.EMPTY_PARAMS,
channel,
null
)
channel
),
null
);
}

Expand Down Expand Up @@ -127,9 +127,9 @@ public static RestResponse buildTextPlainResponse(Table table, RestChannel chann
}
writer.append("\n");
})
),
null
)
)
),
null
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ public RestResponse buildResponse(NodesStatsResponse response) throws Exception
ChunkedToXContentHelper.endObject()
),
EMPTY_PARAMS,
channel,
null
)
channel
),
null
);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,12 +543,7 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> rec
public String getResponseContentTypeString() {
return RestResponse.TEXT_CONTENT_TYPE;
}

@Override
public void close() {
assertTrue(isClosed.compareAndSet(false, true));
}
}));
}, () -> assertTrue(isClosed.compareAndSet(false, true))));
@SuppressWarnings("unchecked")
Class<ActionListener<Void>> listenerClass = (Class<ActionListener<Void>>) (Class<?>) ActionListener.class;
ArgumentCaptor<ActionListener<Void>> listenerCaptor = ArgumentCaptor.forClass(listenerClass);
Expand Down Expand Up @@ -750,12 +745,7 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> rec
public String getResponseContentTypeString() {
return RestResponse.TEXT_CONTENT_TYPE;
}

@Override
public void close() {
assertTrue(isClosed.compareAndSet(false, true));
}
}))
}, () -> assertTrue(isClosed.compareAndSet(false, true))))
)
);

Expand Down
Loading