Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into fix_blob_cache_time_d…
Browse files Browse the repository at this point in the history
…ependency
  • Loading branch information
henningandersen committed Jan 25, 2024
2 parents adcb884 + 2a5cd78 commit 02462e3
Show file tree
Hide file tree
Showing 20 changed files with 133 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@
---
"Testing require_data_stream in bulk requests":
- skip:
version: " - 8.12.99"
reason: "require_data_stream was introduced in 8.13.0"
version: "all"
reason: "AwaitsFix https://github.com/elastic/elasticsearch/issues/104774"
#version: " - 8.12.99"
#reason: "require_data_stream was introduced in 8.13.0"
features: allowed_warnings

- do:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,9 +522,6 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> rec
public String getResponseContentTypeString() {
return "application/octet-stream";
}

@Override
public void close() {}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ public void dispatchRequest(final RestRequest request, final RestChannel channel
channel.sendResponse(
RestResponse.chunked(OK, ChunkedRestResponseBody.fromXContent(ignored -> Iterators.single((builder, params) -> {
throw new AssertionError("should not be called for HEAD REQUEST");
}), ToXContent.EMPTY_PARAMS, channel, null))
}), ToXContent.EMPTY_PARAMS, channel), null)
);
} catch (IOException e) {
throw new AssertionError(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public void sendResponse(RestResponse restResponse) {
toClose.add(() -> CloseableChannel.closeChannel(httpChannel));
}
toClose.add(() -> tracer.stopTrace(request));
toClose.add(restResponse);

boolean success = false;
String opaque = null;
Expand All @@ -113,7 +114,6 @@ public void sendResponse(RestResponse restResponse) {
final HttpResponse httpResponse;
if (isHeadRequest == false && restResponse.isChunked()) {
ChunkedRestResponseBody chunkedContent = restResponse.chunkedContent();
toClose.add(chunkedContent);
if (httpLogger != null && httpLogger.isBodyTracerEnabled()) {
final var loggerStream = httpLogger.openResponseBodyLoggingStream(request.getRequestId());
toClose.add(() -> {
Expand All @@ -131,8 +131,6 @@ public void sendResponse(RestResponse restResponse) {
final BytesReference content = restResponse.content();
if (content instanceof Releasable releasable) {
toClose.add(releasable);
} else if (restResponse.isChunked()) {
toClose.add(restResponse.chunkedContent());
}
toClose.add(this::releaseOutputBuffer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.core.Streams;
Expand All @@ -36,7 +34,7 @@
* The body of a rest response that uses chunked HTTP encoding. Implementations are used to avoid materializing full responses on heap and
* instead serialize only as much of the response as can be flushed to the network right away.
*/
public interface ChunkedRestResponseBody extends Releasable {
public interface ChunkedRestResponseBody {

Logger logger = LogManager.getLogger(ChunkedRestResponseBody.class);

Expand Down Expand Up @@ -67,15 +65,10 @@ public interface ChunkedRestResponseBody extends Releasable {
* @param chunkedToXContent chunked x-content instance to serialize
* @param params parameters to use for serialization
* @param channel channel the response will be written to
* @param releasable resource to release when the response is fully sent, or {@code null} if nothing to release
* @return chunked rest response body
*/
static ChunkedRestResponseBody fromXContent(
ChunkedToXContent chunkedToXContent,
ToXContent.Params params,
RestChannel channel,
@Nullable Releasable releasable
) throws IOException {
static ChunkedRestResponseBody fromXContent(ChunkedToXContent chunkedToXContent, ToXContent.Params params, RestChannel channel)
throws IOException {

return new ChunkedRestResponseBody() {

Expand Down Expand Up @@ -146,23 +139,14 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> rec
public String getResponseContentTypeString() {
return builder.getResponseContentTypeString();
}

@Override
public void close() {
Releasables.closeExpectNoException(releasable);
}
};
}

/**
* Create a chunked response body to be written to a specific {@link RestChannel} from a stream of text chunks, each represented as a
* consumer of a {@link Writer}. The last chunk that the iterator yields must write at least one byte.
*/
static ChunkedRestResponseBody fromTextChunks(
String contentType,
Iterator<CheckedConsumer<Writer, IOException>> chunkIterator,
@Nullable Releasable releasable
) {
static ChunkedRestResponseBody fromTextChunks(String contentType, Iterator<CheckedConsumer<Writer, IOException>> chunkIterator) {
return new ChunkedRestResponseBody() {
private RecyclerBytesStreamOutput currentOutput;
private final Writer writer = new OutputStreamWriter(new OutputStream() {
Expand Down Expand Up @@ -235,11 +219,6 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> rec
public String getResponseContentTypeString() {
return contentType;
}

@Override
public void close() {
Releasables.closeExpectNoException(releasable);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,4 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> rec
public String getResponseContentTypeString() {
return inner.getResponseContentTypeString();
}

@Override
public void close() {
inner.close();
}
}
11 changes: 5 additions & 6 deletions server/src/main/java/org/elasticsearch/rest/RestController.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.core.Streams;
import org.elasticsearch.core.TimeValue;
Expand Down Expand Up @@ -824,10 +826,8 @@ public void sendResponse(RestResponse response) {
if (response.isChunked() == false) {
methodHandlers.addResponseStats(response.content().length());
} else {
response = RestResponse.chunked(
response.status(),
new EncodedLengthTrackingChunkedRestResponseBody(response.chunkedContent(), methodHandlers)
);
final var wrapped = new EncodedLengthTrackingChunkedRestResponseBody(response.chunkedContent(), methodHandlers);
response = RestResponse.chunked(response.status(), wrapped, Releasables.wrap(wrapped, response));
}
delegate.sendResponse(response);
success = true;
Expand All @@ -851,7 +851,7 @@ private void close() {
}
}

private static class EncodedLengthTrackingChunkedRestResponseBody implements ChunkedRestResponseBody {
private static class EncodedLengthTrackingChunkedRestResponseBody implements ChunkedRestResponseBody, Releasable {

private final ChunkedRestResponseBody delegate;
private final RunOnce onCompletion;
Expand Down Expand Up @@ -884,7 +884,6 @@ public String getResponseContentTypeString() {

@Override
public void close() {
delegate.close();
// the client might close the connection before we send the last chunk, in which case we won't have recorded the response in the
// stats yet, so we do it now:
onCompletion.run();
Expand Down
34 changes: 23 additions & 11 deletions server/src/main/java/org/elasticsearch/rest/RestResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;

Expand All @@ -33,7 +34,7 @@
import static org.elasticsearch.ElasticsearchException.REST_EXCEPTION_SKIP_STACK_TRACE;
import static org.elasticsearch.rest.RestController.ELASTIC_PRODUCT_HTTP_HEADER;

public final class RestResponse {
public final class RestResponse implements Releasable {

public static final String TEXT_CONTENT_TYPE = "text/plain; charset=UTF-8";

Expand All @@ -51,6 +52,9 @@ public final class RestResponse {
private final String responseMediaType;
private Map<String, List<String>> customHeaders;

@Nullable
private final Releasable releasable;

/**
* Creates a new response based on {@link XContentBuilder}.
*/
Expand All @@ -73,18 +77,18 @@ public RestResponse(RestStatus status, String responseMediaType, String content)
}

public RestResponse(RestStatus status, String responseMediaType, BytesReference content) {
this(status, responseMediaType, content, null);
this(status, responseMediaType, content, null, null);
}

private RestResponse(RestStatus status, String responseMediaType, BytesReference content, @Nullable Releasable releasable) {
this(status, responseMediaType, content, null, releasable);
}

public static RestResponse chunked(RestStatus restStatus, ChunkedRestResponseBody content) {
public static RestResponse chunked(RestStatus restStatus, ChunkedRestResponseBody content, @Nullable Releasable releasable) {
if (content.isDone()) {
return new RestResponse(
restStatus,
content.getResponseContentTypeString(),
new ReleasableBytesReference(BytesArray.EMPTY, content)
);
return new RestResponse(restStatus, content.getResponseContentTypeString(), BytesArray.EMPTY, releasable);
} else {
return new RestResponse(restStatus, content.getResponseContentTypeString(), null, content);
return new RestResponse(restStatus, content.getResponseContentTypeString(), null, content, releasable);
}
}

Expand All @@ -95,12 +99,14 @@ private RestResponse(
RestStatus status,
String responseMediaType,
@Nullable BytesReference content,
@Nullable ChunkedRestResponseBody chunkedResponseBody
@Nullable ChunkedRestResponseBody chunkedResponseBody,
@Nullable Releasable releasable
) {
this.status = status;
this.content = content;
this.responseMediaType = responseMediaType;
this.chunkedResponseBody = chunkedResponseBody;
this.releasable = releasable;
assert (content == null) != (chunkedResponseBody == null);
}

Expand Down Expand Up @@ -142,6 +148,7 @@ public RestResponse(RestChannel channel, RestStatus status, Exception e) throws
copyHeaders(((ElasticsearchException) e));
}
this.chunkedResponseBody = null;
this.releasable = null;
}

public String contentType() {
Expand Down Expand Up @@ -224,4 +231,9 @@ public Map<String, List<String>> filterHeaders(Map<String, List<String>> headers
}
return headers;
}

@Override
public void close() {
Releasables.closeExpectNoException(releasable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ protected void processResponse(Response response) throws IOException {
channel.sendResponse(
RestResponse.chunked(
getRestStatus(response),
ChunkedRestResponseBody.fromXContent(response, params, channel, releasableFromResponse(response))
ChunkedRestResponseBody.fromXContent(response, params, channel),
releasableFromResponse(response)
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
@Override
public RestResponse buildResponse(NodesHotThreadsResponse response) {
response.mustIncRef();
return RestResponse.chunked(RestStatus.OK, fromTextChunks(TEXT_CONTENT_TYPE, response.getTextChunks(), response::decRef));
return RestResponse.chunked(RestStatus.OK, fromTextChunks(TEXT_CONTENT_TYPE, response.getTextChunks()), response::decRef);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ public static RestResponse buildXContentBuilder(Table table, RestChannel channel
Iterators.single((builder, params) -> builder.endArray())
),
ToXContent.EMPTY_PARAMS,
channel,
null
)
channel
),
null
);
}

Expand Down Expand Up @@ -127,9 +127,9 @@ public static RestResponse buildTextPlainResponse(Table table, RestChannel chann
}
writer.append("\n");
})
),
null
)
)
),
null
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ public RestResponse buildResponse(NodesStatsResponse response) throws Exception
ChunkedToXContentHelper.endObject()
),
EMPTY_PARAMS,
channel,
null
)
channel
),
null
);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,12 +543,7 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> rec
public String getResponseContentTypeString() {
return RestResponse.TEXT_CONTENT_TYPE;
}

@Override
public void close() {
assertTrue(isClosed.compareAndSet(false, true));
}
}));
}, () -> assertTrue(isClosed.compareAndSet(false, true))));
@SuppressWarnings("unchecked")
Class<ActionListener<Void>> listenerClass = (Class<ActionListener<Void>>) (Class<?>) ActionListener.class;
ArgumentCaptor<ActionListener<Void>> listenerCaptor = ArgumentCaptor.forClass(listenerClass);
Expand Down Expand Up @@ -750,12 +745,7 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> rec
public String getResponseContentTypeString() {
return RestResponse.TEXT_CONTENT_TYPE;
}

@Override
public void close() {
assertTrue(isClosed.compareAndSet(false, true));
}
}))
}, () -> assertTrue(isClosed.compareAndSet(false, true))))
)
);

Expand Down
Loading

0 comments on commit 02462e3

Please sign in to comment.