Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce garbage for requests with unpooled buffers #32228

Closed
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,31 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelined
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest<FullHttpRequest> msg) throws Exception {
Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
FullHttpRequest request = msg.getRequest();
final FullHttpRequest request = httpRequest(msg);
final Netty4HttpRequest httpRequest = new Netty4HttpRequest(request, msg.getSequence());

if (request.decoderResult().isFailure()) {
Throwable cause = request.decoderResult().cause();
if (cause instanceof Error) {
ExceptionsHelper.dieOnError(cause);
serverTransport.incomingRequestError(httpRequest, channel, new Exception(cause));
} else {
serverTransport.incomingRequestError(httpRequest, channel, (Exception) cause);
}
} else {
serverTransport.incomingRequest(httpRequest, channel);
}
}

try {
// package private for unit-testing
static FullHttpRequest httpRequest(HttpPipelinedRequest<FullHttpRequest> msg) {
FullHttpRequest request = msg.getRequest();
if (Netty4Utils.isUnpooled(request.content())) {
assert Netty4Utils.isBufferHierarchyUnpooled(request.content()) : "request body contains unpooled and pooled buffers";
// if the buffer is unpooled its lifecycle is managed by the garbage collector instead of Netty's internal
// memory pool. Thus we we can avoiding copying the request content buffer and use the original request instead.
return request;
} else {
final FullHttpRequest copiedRequest =
new DefaultFullHttpRequest(
request.protocolVersion(),
Expand All @@ -52,23 +74,9 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest<Full
Unpooled.copiedBuffer(request.content()),
request.headers(),
request.trailingHeaders());

Netty4HttpRequest httpRequest = new Netty4HttpRequest(copiedRequest, msg.getSequence());

if (request.decoderResult().isFailure()) {
Throwable cause = request.decoderResult().cause();
if (cause instanceof Error) {
ExceptionsHelper.dieOnError(cause);
serverTransport.incomingRequestError(httpRequest, channel, new Exception(cause));
} else {
serverTransport.incomingRequestError(httpRequest, channel, (Exception) cause);
}
} else {
serverTransport.incomingRequest(httpRequest, channel);
}
} finally {
// As we have copied the buffer, we can release the request
request.release();
return copiedRequest;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.NettyRuntime;
Expand Down Expand Up @@ -161,6 +162,38 @@ public static void closeChannels(final Collection<Channel> channels) throws IOEx
}
}

/**
* @param buffer A byte buffer instance. Must not be null.
* @return <code>true</code> iff this byte buffer has been allocated outside of Netty's buffer pool.
*/
public static boolean isUnpooled(final ByteBuf buffer) {
return buffer.alloc() instanceof UnpooledByteBufAllocator;
}

/**
* Contrary to {@link #isUnpooled(ByteBuf)} which assumes that the top-level buffer's allocator is used for any associated buffers,
* this implementation does a more thorough check by inspecting all components of a <code>CompositeByteBuf</code>.
*
* @param buffer A byte buffer instance. Must not be null.
* @return <code>true</code> iff this byte buffer and all its components have been allocated outside of Netty's buffer pool.
*/
public static boolean isBufferHierarchyUnpooled(final ByteBuf buffer) {
if (isUnpooled(buffer)) {
if (buffer instanceof CompositeByteBuf) {
CompositeByteBuf compositeBuffer = (CompositeByteBuf) buffer;
for(int i = 0; i < compositeBuffer.numComponents(); i++) {
// access the internal component to avoid duplicating the buffer (see CompositeByteBuf#component(int))
if (isBufferHierarchyUnpooled(compositeBuffer.internalComponent(i)) == false) {
return false;
}
}
}
return true;
} else {
return false;
}
}

/**
* If the specified cause is an unrecoverable error, this method will rethrow the cause on a separate thread so that it can not be
* caught and bubbles up to the uncaught exception handler.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.http.netty4;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import org.elasticsearch.http.HttpPipelinedRequest;
import org.elasticsearch.test.ESTestCase;

public class Netty4HttpRequestHandlerTests extends ESTestCase {
public void testExtractHttpRequestWithUnpooledContent() {
HttpPipelinedRequest<FullHttpRequest> pipelinedRequest = createPipelinedRequest(false);
FullHttpRequest extractedRequest = Netty4HttpRequestHandler.httpRequest(pipelinedRequest);
assertSame(extractedRequest, pipelinedRequest.getRequest());
}

public void testExtractHttpRequestWithPooledContent() {
HttpPipelinedRequest<FullHttpRequest> pipelinedRequest = createPipelinedRequest(true);
assertEquals(1, pipelinedRequest.getRequest().refCnt());
FullHttpRequest extractedRequest = Netty4HttpRequestHandler.httpRequest(pipelinedRequest);
assertNotSame(extractedRequest, pipelinedRequest.getRequest());
assertEquals(0, pipelinedRequest.getRequest().refCnt());
}

private HttpPipelinedRequest<FullHttpRequest> createPipelinedRequest(boolean usePooledAllocator) {
ByteBufAllocator alloc = usePooledAllocator ? PooledByteBufAllocator.DEFAULT : UnpooledByteBufAllocator.DEFAULT;
ByteBuf content = alloc.buffer(5);
ByteBufUtil.writeAscii(content, randomAlphaOfLength(content.capacity()));
final FullHttpRequest nettyRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/", content);
HttpUtil.setContentLength(nettyRequest, content.capacity());
return new HttpPipelinedRequest<>(0, nettyRequest);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.bytes.AbstractBytesReferenceTestCase;
import org.elasticsearch.common.bytes.BytesArray;
Expand All @@ -30,13 +32,31 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.ESTestCase;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;

public class Netty4UtilsTests extends ESTestCase {

private static final int PAGE_SIZE = BigArrays.BYTE_PAGE_SIZE;
private final BigArrays bigarrays = new BigArrays(null, new NoneCircuitBreakerService(), false);
private ByteBuf unpooledBuffer;
private ByteBuf pooledBuffer;

@Before
public void setUp() throws Exception {
super.setUp();
pooledBuffer = PooledByteBufAllocator.DEFAULT.buffer(1);
unpooledBuffer = UnpooledByteBufAllocator.DEFAULT.buffer(1);
}

@After
public void tearDown() throws Exception {
pooledBuffer.release();
unpooledBuffer.release();
super.tearDown();
}

public void testToChannelBufferWithEmptyRef() throws IOException {
ByteBuf buffer = Netty4Utils.toByteBuf(getRandomizedBytesReference(0));
Expand Down Expand Up @@ -95,4 +115,8 @@ private BytesReference getRandomizedBytesReference(int length) throws IOExceptio
}
}

public void testClassifiesBuffersAsPooledOrUnpooled() {
assertTrue(Netty4Utils.isUnpooled(unpooledBuffer));
assertFalse(Netty4Utils.isUnpooled(pooledBuffer));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefIterator;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -63,6 +64,38 @@ static ByteBuf toByteBuf(final BytesReference reference) {
}
}

/**
* @param buffer A byte buffer instance. Must not be null.
* @return <code>true</code> iff this byte buffer has been allocated outside of Netty's buffer pool.
*/
static boolean isUnpooled(final ByteBuf buffer) {
return buffer.alloc() instanceof UnpooledByteBufAllocator;
}

/**
* Contrary to {@link #isUnpooled(ByteBuf)} which assumes that the top-level buffer's allocator is used for any associated buffers,
* this implementation does a more thorough check by inspecting all components of a <code>CompositeByteBuf</code>.
*
* @param buffer A byte buffer instance. Must not be null.
* @return <code>true</code> iff this byte buffer and all its components have been allocated outside of Netty's buffer pool.
*/
static boolean isBufferHierarchyUnpooled(final ByteBuf buffer) {
if (isUnpooled(buffer)) {
if (buffer instanceof CompositeByteBuf) {
CompositeByteBuf compositeBuffer = (CompositeByteBuf) buffer;
for(int i = 0; i < compositeBuffer.numComponents(); i++) {
// access the internal component to avoid duplicating the buffer (see CompositeByteBuf#component(int))
if (isBufferHierarchyUnpooled(compositeBuffer.internalComponent(i)) == false) {
return false;
}
}
}
return true;
} else {
return false;
}
}

static BytesReference toBytesReference(final ByteBuf buffer) {
return new ByteBufBytesReference(buffer, buffer.readableBytes());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,31 @@ public void close() throws IOException {
@SuppressWarnings("unchecked")
private void handleRequest(Object msg) {
final HttpPipelinedRequest<FullHttpRequest> pipelinedRequest = (HttpPipelinedRequest<FullHttpRequest>) msg;
FullHttpRequest request = pipelinedRequest.getRequest();
final FullHttpRequest request = httpRequest(pipelinedRequest);
final NioHttpRequest httpRequest = new NioHttpRequest(request, pipelinedRequest.getSequence());

if (request.decoderResult().isFailure()) {
Throwable cause = request.decoderResult().cause();
if (cause instanceof Error) {
ExceptionsHelper.dieOnError(cause);
transport.incomingRequestError(httpRequest, nioHttpChannel, new Exception(cause));
} else {
transport.incomingRequestError(httpRequest, nioHttpChannel, (Exception) cause);
}
} else {
transport.incomingRequest(httpRequest, nioHttpChannel);
}
}

try {
// package private for unit-testing
static FullHttpRequest httpRequest(HttpPipelinedRequest<FullHttpRequest> pipelinedRequest) {
FullHttpRequest request = pipelinedRequest.getRequest();
if (ByteBufUtils.isUnpooled(request.content())) {
assert ByteBufUtils.isBufferHierarchyUnpooled(request.content()) : "request body contains unpooled and pooled buffers";
// if the buffer is unpooled its lifecycle is managed by the garbage collector instead of Netty's internal
// memory pool. Thus we we can avoiding copying the request content buffer and use the original request instead.
return request;
} else {
final FullHttpRequest copiedRequest =
new DefaultFullHttpRequest(
request.protocolVersion(),
Expand All @@ -133,23 +155,10 @@ private void handleRequest(Object msg) {
Unpooled.copiedBuffer(request.content()),
request.headers(),
request.trailingHeaders());

NioHttpRequest httpRequest = new NioHttpRequest(copiedRequest, pipelinedRequest.getSequence());

if (request.decoderResult().isFailure()) {
Throwable cause = request.decoderResult().cause();
if (cause instanceof Error) {
ExceptionsHelper.dieOnError(cause);
transport.incomingRequestError(httpRequest, nioHttpChannel, new Exception(cause));
} else {
transport.incomingRequestError(httpRequest, nioHttpChannel, (Exception) cause);
}
} else {
transport.incomingRequest(httpRequest, nioHttpChannel);
}
} finally {
// As we have copied the buffer, we can release the request
request.release();
return copiedRequest;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.http.nio;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import org.elasticsearch.test.ESTestCase;
import org.junit.After;
import org.junit.Before;

public class ByteBufUtilsTests extends ESTestCase {
private ByteBuf unpooledBuffer;
private ByteBuf pooledBuffer;

@Before
public void setUp() throws Exception {
super.setUp();
pooledBuffer = PooledByteBufAllocator.DEFAULT.buffer(1);
unpooledBuffer = UnpooledByteBufAllocator.DEFAULT.buffer(1);
}

@After
public void tearDown() throws Exception {
pooledBuffer.release();
unpooledBuffer.release();
super.tearDown();
}

public void testClassifiesBuffersAsPooledOrUnpooled() {
assertTrue(ByteBufUtils.isUnpooled(unpooledBuffer));
assertFalse(ByteBufUtils.isUnpooled(pooledBuffer));
}
}
Loading