Skip to content

Commit

Permalink
Remove HTTP content copies (#117303)
Browse files Browse the repository at this point in the history
  • Loading branch information
mhl-b authored Nov 26, 2024
1 parent b22d185 commit 1866299
Show file tree
Hide file tree
Showing 32 changed files with 141 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,5 +167,3 @@ org.elasticsearch.cluster.SnapshotDeletionsInProgress$Entry#<init>(java.lang.Str
@defaultMessage Use a Thread constructor with a name, anonymous threads are more difficult to debug
java.lang.Thread#<init>(java.lang.Runnable)
java.lang.Thread#<init>(java.lang.ThreadGroup, java.lang.Runnable)

org.elasticsearch.common.bytes.BytesReference#copyBytes(org.elasticsearch.common.bytes.BytesReference) @ This method is a subject for removal. Copying bytes is prone to performance regressions and unnecessary allocations.
5 changes: 5 additions & 0 deletions docs/changelog/117303.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 117303
summary: Remove HTTP content copies
area: Network
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public List<Route> routes() {

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
var content = request.releasableContent();
var content = request.content();
var iter = content.iterator();
return (chan) -> {
request.getHttpRequest().release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

package org.elasticsearch.system.indices;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.internal.node.NodeClient;
Expand Down Expand Up @@ -177,12 +178,12 @@ public List<Route> routes() {

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
var content = request.requiredContent();
IndexRequest indexRequest = new IndexRequest(".net-new-system-index-primary");
indexRequest.source(request.requiredContent(), request.getXContentType());
indexRequest.source(content, request.getXContentType());
indexRequest.id(request.param("id"));
indexRequest.setRefreshPolicy(request.param("refresh"));

return channel -> client.index(indexRequest, new RestToXContentListener<>(channel));
return channel -> client.index(indexRequest, ActionListener.withRef(new RestToXContentListener<>(channel), content));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,4 +475,12 @@ static <T, R extends AutoCloseable> void runWithResource(
ActionListener.run(ActionListener.runBefore(listener, resource::close), l -> action.accept(l, resource));
}

/**
* Increments ref count and returns a listener that will decrement ref count on listener completion.
*/
static <Response> ActionListener<Response> withRef(ActionListener<Response> listener, RefCounted ref) {
ref.mustIncRef();
return releaseAfter(listener, ref::decRef);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -74,29 +74,6 @@ static ByteBuffer[] toByteBuffers(BytesReference reference) {
}
}

/**
* Allocates new buffer and copy bytes from given BytesReference.
*
* @deprecated copying bytes is a right place for performance regression and unnecessary allocations.
* This method exists to serve very few places that struggle to handle reference counted buffers.
*/
@Deprecated(forRemoval = true)
static BytesReference copyBytes(BytesReference bytesReference) {
byte[] arr = new byte[bytesReference.length()];
int offset = 0;
final BytesRefIterator iterator = bytesReference.iterator();
try {
BytesRef slice;
while ((slice = iterator.next()) != null) {
System.arraycopy(slice.bytes, slice.offset, arr, offset, slice.length);
offset += slice.length;
}
return new BytesArray(arr);
} catch (IOException e) {
throw new AssertionError(e);
}
}

/**
* Returns BytesReference composed of the provided ByteBuffers.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ HttpTracer maybeLogRequest(RestRequest restRequest, @Nullable Exception e) {

private void logFullContent(RestRequest restRequest) {
try (var stream = HttpBodyTracer.getBodyOutputStream(restRequest.getRequestId(), HttpBodyTracer.Type.REQUEST)) {
restRequest.releasableContent().writeTo(stream);
restRequest.content().writeTo(stream);
} catch (Exception e2) {
assert false : e2; // no real IO here
}
Expand Down
43 changes: 10 additions & 33 deletions server/src/main/java/org/elasticsearch/rest/RestRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.http.HttpBody;
Expand Down Expand Up @@ -303,22 +302,13 @@ public boolean isFullContent() {
return httpRequest.body().isFull();
}

/**
* Returns a copy of HTTP content. The copy is GC-managed and does not require reference counting.
* Please use {@link #releasableContent()} to avoid content copy.
*/
@SuppressForbidden(reason = "temporarily support content copy while migrating RestHandlers to ref counted pooled buffers")
public BytesReference content() {
return BytesReference.copyBytes(releasableContent());
}

/**
* Returns a direct reference to the network buffer containing the request body. The HTTP layers will release their references to this
* buffer as soon as they have finished the synchronous steps of processing the request on the network thread, which will by default
* release the buffer back to the pool where it may be re-used for another request. If you need to keep the buffer alive past the end of
* these synchronous steps, acquire your own reference to this buffer and release it once it's no longer needed.
*/
public ReleasableBytesReference releasableContent() {
public ReleasableBytesReference content() {
this.contentConsumed = true;
var bytes = httpRequest.body().asFull().bytes();
if (bytes.hasReferences() == false) {
Expand All @@ -338,32 +328,19 @@ public HttpBody.Stream contentStream() {
return httpRequest.body().asStream();
}

private void ensureContent() {
/**
* Returns reference to the network buffer of HTTP content or throw an exception if the body or content type is missing.
* See {@link #content()}.
*/
public ReleasableBytesReference requiredContent() {
if (hasContent() == false) {
throw new ElasticsearchParseException("request body is required");
} else if (xContentType.get() == null) {
throwValidationException("unknown content type");
}
}

/**
* @return copy of the request body or throw an exception if the body or content type is missing.
* See {@link #content()}. Please use {@link #requiredReleasableContent()} to avoid content copy.
*/
public final BytesReference requiredContent() {
ensureContent();
return content();
}

/**
* Returns reference to the network buffer of HTTP content or throw an exception if the body or content type is missing.
* See {@link #releasableContent()}. It's a recommended method to handle HTTP content without copying it.
*/
public ReleasableBytesReference requiredReleasableContent() {
ensureContent();
return releasableContent();
}

private static void throwValidationException(String msg) {
ValidationException unknownContentType = new ValidationException();
unknownContentType.addValidationError(msg);
Expand Down Expand Up @@ -596,7 +573,7 @@ public final boolean hasContentOrSourceParam() {
* if you need to handle the absence request content gracefully.
*/
public final XContentParser contentOrSourceParamParser() throws IOException {
Tuple<XContentType, BytesReference> tuple = contentOrSourceParam();
Tuple<XContentType, ReleasableBytesReference> tuple = contentOrSourceParam();
return XContentHelper.createParserNotCompressed(parserConfig, tuple.v2(), tuple.v1().xContent().type());
}

Expand All @@ -607,7 +584,7 @@ public final XContentParser contentOrSourceParamParser() throws IOException {
*/
public final void withContentOrSourceParamParserOrNull(CheckedConsumer<XContentParser, IOException> withParser) throws IOException {
if (hasContentOrSourceParam()) {
Tuple<XContentType, BytesReference> tuple = contentOrSourceParam();
Tuple<XContentType, ReleasableBytesReference> tuple = contentOrSourceParam();
try (XContentParser parser = XContentHelper.createParserNotCompressed(parserConfig, tuple.v2(), tuple.v1())) {
withParser.accept(parser);
}
Expand All @@ -620,7 +597,7 @@ public final void withContentOrSourceParamParserOrNull(CheckedConsumer<XContentP
* Get the content of the request or the contents of the {@code source} param or throw an exception if both are missing.
* Prefer {@link #contentOrSourceParamParser()} or {@link #withContentOrSourceParamParserOrNull(CheckedConsumer)} if you need a parser.
*/
public final Tuple<XContentType, BytesReference> contentOrSourceParam() {
public final Tuple<XContentType, ReleasableBytesReference> contentOrSourceParam() {
if (hasContentOrSourceParam() == false) {
throw new ElasticsearchParseException("request body or source parameter is required");
} else if (hasContent()) {
Expand All @@ -636,7 +613,7 @@ public final Tuple<XContentType, BytesReference> contentOrSourceParam() {
if (xContentType == null) {
throwValidationException("Unknown value for source_content_type [" + typeParam + "]");
}
return new Tuple<>(xContentType, bytes);
return new Tuple<>(xContentType, ReleasableBytesReference.wrap(bytes));
}

public ParsedMediaType getParsedAccept() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ public boolean hasContent() {
}

@Override
public ReleasableBytesReference releasableContent() {
public ReleasableBytesReference content() {
if (filteredBytes == null) {
Tuple<XContentType, Map<String, Object>> result = XContentHelper.convertToMap(
restRequest.requiredReleasableContent(),
restRequest.requiredContent(),
true,
restRequest.getXContentType()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*/
package org.elasticsearch.rest.action.admin.cluster;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptRequest;
import org.elasticsearch.action.admin.cluster.storedscripts.TransportPutStoredScriptAction;
import org.elasticsearch.client.internal.node.NodeClient;
Expand Down Expand Up @@ -57,6 +58,10 @@ public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client
request.getXContentType(),
StoredScriptSource.parse(content, xContentType)
);
return channel -> client.execute(TransportPutStoredScriptAction.TYPE, putRequest, new RestToXContentListener<>(channel));
return channel -> client.execute(
TransportPutStoredScriptAction.TYPE,
putRequest,
ActionListener.withRef(new RestToXContentListener<>(channel), content)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
boolean defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, false);
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
bulkRequest.setRefreshPolicy(request.param("refresh"));
ReleasableBytesReference content = request.requiredReleasableContent();
ReleasableBytesReference content = request.requiredContent();

try {
bulkRequest.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public RestChannelConsumer prepareRequest(RestRequest request, final NodeClient

@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
ReleasableBytesReference source = request.requiredReleasableContent();
ReleasableBytesReference source = request.requiredContent();
IndexRequest indexRequest = new IndexRequest(request.param("index"));
indexRequest.id(request.param("id"));
indexRequest.routing(request.param("routing"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@

package org.elasticsearch.rest.action.ingest;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineTransportAction;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
Expand Down Expand Up @@ -56,15 +57,20 @@ public RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient cl
}
}

Tuple<XContentType, BytesReference> sourceTuple = restRequest.contentOrSourceParam();
Tuple<XContentType, ReleasableBytesReference> sourceTuple = restRequest.contentOrSourceParam();
var content = sourceTuple.v2();
final var request = new PutPipelineRequest(
getMasterNodeTimeout(restRequest),
getAckTimeout(restRequest),
restRequest.param("id"),
sourceTuple.v2(),
content,
sourceTuple.v1(),
ifVersion
);
return channel -> client.execute(PutPipelineTransportAction.TYPE, request, new RestToXContentListener<>(channel));
return channel -> client.execute(
PutPipelineTransportAction.TYPE,
request,
ActionListener.withRef(new RestToXContentListener<>(channel), content)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.action.bulk.SimulateBulkRequest;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.ingest.ConfigurationUtils;
Expand Down Expand Up @@ -72,7 +73,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
String defaultIndex = request.param("index");
FetchSourceContext defaultFetchSourceContext = FetchSourceContext.parseFromRestRequest(request);
String defaultPipeline = request.param("pipeline");
Tuple<XContentType, BytesReference> sourceTuple = request.contentOrSourceParam();
Tuple<XContentType, ReleasableBytesReference> sourceTuple = request.contentOrSourceParam();
Map<String, Object> sourceMap = XContentHelper.convertToMap(sourceTuple.v2(), false, sourceTuple.v1()).v2();
Map<String, Map<String, Object>> pipelineSubstitutions = (Map<String, Map<String, Object>>) sourceMap.remove(
"pipeline_substitutions"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@

package org.elasticsearch.rest.action.ingest;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
Expand Down Expand Up @@ -46,10 +47,13 @@ public String getName() {

@Override
public RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
Tuple<XContentType, BytesReference> sourceTuple = restRequest.contentOrSourceParam();
Tuple<XContentType, ReleasableBytesReference> sourceTuple = restRequest.contentOrSourceParam();
var content = sourceTuple.v2();
SimulatePipelineRequest request = new SimulatePipelineRequest(sourceTuple.v2(), sourceTuple.v1(), restRequest.getRestApiVersion());
request.setId(restRequest.param("id"));
request.setVerbose(restRequest.paramAsBoolean("verbose", false));
return channel -> client.admin().cluster().simulatePipeline(request, new RestToXContentListener<>(channel));
return channel -> client.admin()
.cluster()
.simulatePipeline(request, ActionListener.withRef(new RestToXContentListener<>(channel), content));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.TriFunction;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.features.NodeFeature;
Expand Down Expand Up @@ -184,9 +184,9 @@ public static void parseMultiLineRequest(
boolean ccsMinimizeRoundtrips = request.paramAsBoolean("ccs_minimize_roundtrips", true);
String routing = request.param("routing");

final Tuple<XContentType, BytesReference> sourceTuple = request.contentOrSourceParam();
final Tuple<XContentType, ReleasableBytesReference> sourceTuple = request.contentOrSourceParam();
final XContent xContent = sourceTuple.v1().xContent();
final BytesReference data = sourceTuple.v2();
final ReleasableBytesReference data = sourceTuple.v2();
MultiSearchRequest.readMultiLineFormat(
xContent,
request.contentParserConfig(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,4 @@ public void testGetDoubleLE() {
assertThat(e.getMessage(), equalTo("Index 9 out of bounds for length 9"));
}

public void testCopyBytes() {
var data = randomByteArrayOfLength(between(1024, 1024 * 1024 * 50));
var copy = BytesReference.copyBytes(new BytesArray(data));
assertArrayEquals(data, BytesReference.toBytes(copy));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.ValidationException;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.http.HttpBody;
import org.elasticsearch.http.HttpChannel;
Expand Down Expand Up @@ -321,7 +321,7 @@ public String uri() {
}

@Override
public BytesReference content() {
public ReleasableBytesReference content() {
return restRequest.content();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ protected final BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest r
// We need to consume parameters and content from the REST request in order to bypass unrecognized param errors
// and return a license error.
request.params().keySet().forEach(key -> request.param(key, ""));
request.releasableContent();
request.content();
return channel -> channel.sendResponse(
new RestResponse(channel, LicenseUtils.newComplianceException(this.licenseState, this.product))
);
Expand Down
Loading

0 comments on commit 1866299

Please sign in to comment.