From 00294c154c12788e8e9a340d202246cf57943a25 Mon Sep 17 00:00:00 2001 From: Armin Date: Wed, 25 Jul 2018 21:03:15 +0200 Subject: [PATCH 1/2] TESTS: Fix Buf Leaks in HttpReadWriteHandlerTests * Release all ref counted things that weren't getting properly released * Mannually force channel promise to be completed because mock channel doesn't do it and it prevents one `release` call in `io.netty.channel.ChannelOutboundHandlerAdapter#write` from firing * Now passes a few thousand iterations with leak detection set to paranoid --- .../http/nio/HttpReadWriteHandlerTests.java | 199 ++++++++++++------ 1 file changed, 129 insertions(+), 70 deletions(-) diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/HttpReadWriteHandlerTests.java b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/HttpReadWriteHandlerTests.java index 0a09b6b8789f7..78d7dda7af528 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/HttpReadWriteHandlerTests.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/HttpReadWriteHandlerTests.java @@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelPromise; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; @@ -33,6 +34,7 @@ import io.netty.handler.codec.http.HttpUtil; import io.netty.handler.codec.http.HttpVersion; +import io.netty.util.ReferenceCountUtil; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; @@ -116,21 +118,27 @@ public void testSuccessfulDecodeHttpRequest() throws IOException { ByteBuf buf = requestEncoder.encode(httpRequest); int slicePoint = randomInt(buf.writerIndex() - 1); - ByteBuf slicedBuf = buf.retainedSlice(0, slicePoint); ByteBuf slicedBuf2 = buf.retainedSlice(slicePoint, buf.writerIndex()); - handler.consumeReads(toChannelBuffer(slicedBuf)); + try { + handler.consumeReads(toChannelBuffer(slicedBuf)); - verify(transport, times(0)).incomingRequest(any(HttpRequest.class), any(NioHttpChannel.class)); + verify(transport, times(0)).incomingRequest(any(HttpRequest.class), any(NioHttpChannel.class)); - handler.consumeReads(toChannelBuffer(slicedBuf2)); + handler.consumeReads(toChannelBuffer(slicedBuf2)); - ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(HttpRequest.class); - verify(transport).incomingRequest(requestCaptor.capture(), any(NioHttpChannel.class)); + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(HttpRequest.class); + verify(transport).incomingRequest(requestCaptor.capture(), any(NioHttpChannel.class)); - HttpRequest nioHttpRequest = requestCaptor.getValue(); - assertEquals(HttpRequest.HttpVersion.HTTP_1_1, nioHttpRequest.protocolVersion()); - assertEquals(RestRequest.Method.GET, nioHttpRequest.method()); + HttpRequest nioHttpRequest = requestCaptor.getValue(); + assertEquals(HttpRequest.HttpVersion.HTTP_1_1, nioHttpRequest.protocolVersion()); + assertEquals(RestRequest.Method.GET, nioHttpRequest.method()); + } finally { + handler.close(); + buf.release(); + slicedBuf.release(); + slicedBuf2.release(); + } } public void testDecodeHttpRequestError() throws IOException { @@ -138,16 +146,20 @@ public void testDecodeHttpRequestError() throws IOException { io.netty.handler.codec.http.HttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri); ByteBuf buf = requestEncoder.encode(httpRequest); - buf.setByte(0, ' '); - buf.setByte(1, ' '); - buf.setByte(2, ' '); + try { + buf.setByte(0, ' '); + buf.setByte(1, ' '); + buf.setByte(2, ' '); - handler.consumeReads(toChannelBuffer(buf)); + handler.consumeReads(toChannelBuffer(buf)); - ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); - verify(transport).incomingRequestError(any(HttpRequest.class), any(NioHttpChannel.class), exceptionCaptor.capture()); + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); + verify(transport).incomingRequestError(any(HttpRequest.class), any(NioHttpChannel.class), exceptionCaptor.capture()); - assertTrue(exceptionCaptor.getValue() instanceof IllegalArgumentException); + assertTrue(exceptionCaptor.getValue() instanceof IllegalArgumentException); + } finally { + buf.release(); + } } public void testDecodeHttpRequestContentLengthToLongGeneratesOutboundMessage() throws IOException { @@ -157,9 +169,11 @@ public void testDecodeHttpRequestContentLengthToLongGeneratesOutboundMessage() t HttpUtil.setKeepAlive(httpRequest, false); ByteBuf buf = requestEncoder.encode(httpRequest); - - handler.consumeReads(toChannelBuffer(buf)); - + try { + handler.consumeReads(toChannelBuffer(buf)); + } finally { + buf.release(); + } verify(transport, times(0)).incomingRequestError(any(), any(), any()); verify(transport, times(0)).incomingRequest(any(), any()); @@ -168,13 +182,17 @@ public void testDecodeHttpRequestContentLengthToLongGeneratesOutboundMessage() t FlushOperation flushOperation = flushOperations.get(0); FullHttpResponse response = responseDecoder.decode(Unpooled.wrappedBuffer(flushOperation.getBuffersToWrite())); - assertEquals(HttpVersion.HTTP_1_1, response.protocolVersion()); - assertEquals(HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, response.status()); - - flushOperation.getListener().accept(null, null); - // Since we have keep-alive set to false, we should close the channel after the response has been - // flushed - verify(nioHttpChannel).close(); + try { + assertEquals(HttpVersion.HTTP_1_1, response.protocolVersion()); + assertEquals(HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, response.status()); + + flushOperation.getListener().accept(null, null); + // Since we have keep-alive set to false, we should close the channel after the response has been + // flushed + verify(nioHttpChannel).close(); + } finally { + response.release(); + } } @SuppressWarnings("unchecked") @@ -189,11 +207,15 @@ public void testEncodeHttpResponse() throws IOException { SocketChannelContext context = mock(SocketChannelContext.class); HttpWriteOperation writeOperation = new HttpWriteOperation(context, httpResponse, mock(BiConsumer.class)); List flushOperations = handler.writeToBytes(writeOperation); - - FullHttpResponse response = responseDecoder.decode(Unpooled.wrappedBuffer(flushOperations.get(0).getBuffersToWrite())); - - assertEquals(HttpResponseStatus.OK, response.status()); - assertEquals(HttpVersion.HTTP_1_1, response.protocolVersion()); + FlushOperation operation = flushOperations.get(0); + FullHttpResponse response = responseDecoder.decode(Unpooled.wrappedBuffer(operation.getBuffersToWrite())); + ((ChannelPromise) operation.getListener()).setSuccess(); + try { + assertEquals(HttpResponseStatus.OK, response.status()); + assertEquals(HttpVersion.HTTP_1_1, response.protocolVersion()); + } finally { + response.release(); + } } public void testCorsEnabledWithoutAllowOrigins() throws IOException { @@ -201,9 +223,13 @@ public void testCorsEnabledWithoutAllowOrigins() throws IOException { Settings settings = Settings.builder() .put(HttpTransportSettings.SETTING_CORS_ENABLED.getKey(), true) .build(); - io.netty.handler.codec.http.HttpResponse response = executeCorsRequest(settings, "remote-host", "request-host"); - // inspect response and validate - assertThat(response.headers().get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN), nullValue()); + FullHttpResponse response = executeCorsRequest(settings, "remote-host", "request-host"); + try { + // inspect response and validate + assertThat(response.headers().get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN), nullValue()); + } finally { + response.release(); + } } public void testCorsEnabledWithAllowOrigins() throws IOException { @@ -213,11 +239,15 @@ public void testCorsEnabledWithAllowOrigins() throws IOException { .put(SETTING_CORS_ENABLED.getKey(), true) .put(SETTING_CORS_ALLOW_ORIGIN.getKey(), originValue) .build(); - io.netty.handler.codec.http.HttpResponse response = executeCorsRequest(settings, originValue, "request-host"); - // inspect response and validate - assertThat(response.headers().get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN), notNullValue()); - String allowedOrigins = response.headers().get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN); - assertThat(allowedOrigins, is(originValue)); + FullHttpResponse response = executeCorsRequest(settings, originValue, "request-host"); + try { + // inspect response and validate + assertThat(response.headers().get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN), notNullValue()); + String allowedOrigins = response.headers().get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN); + assertThat(allowedOrigins, is(originValue)); + } finally { + response.release(); + } } public void testCorsAllowOriginWithSameHost() throws IOException { @@ -228,29 +258,44 @@ public void testCorsAllowOriginWithSameHost() throws IOException { .put(SETTING_CORS_ENABLED.getKey(), true) .build(); FullHttpResponse response = executeCorsRequest(settings, originValue, host); - // inspect response and validate - assertThat(response.headers().get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN), notNullValue()); - String allowedOrigins = response.headers().get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN); - assertThat(allowedOrigins, is(originValue)); - + String allowedOrigins; + try { + // inspect response and validate + assertThat(response.headers().get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN), notNullValue()); + allowedOrigins = response.headers().get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN); + assertThat(allowedOrigins, is(originValue)); + } finally { + response.release(); + } originValue = "http://" + originValue; response = executeCorsRequest(settings, originValue, host); - assertThat(response.headers().get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN), notNullValue()); - allowedOrigins = response.headers().get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN); - assertThat(allowedOrigins, is(originValue)); + try { + assertThat(response.headers().get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN), notNullValue()); + allowedOrigins = response.headers().get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN); + assertThat(allowedOrigins, is(originValue)); + } finally { + response.release(); + } originValue = originValue + ":5555"; host = host + ":5555"; response = executeCorsRequest(settings, originValue, host); - assertThat(response.headers().get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN), notNullValue()); - allowedOrigins = response.headers().get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN); - assertThat(allowedOrigins, is(originValue)); - + try { + assertThat(response.headers().get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN), notNullValue()); + allowedOrigins = response.headers().get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN); + assertThat(allowedOrigins, is(originValue)); + } finally { + response.release(); + } originValue = originValue.replace("http", "https"); response = executeCorsRequest(settings, originValue, host); - assertThat(response.headers().get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN), notNullValue()); - allowedOrigins = response.headers().get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN); - assertThat(allowedOrigins, is(originValue)); + try { + assertThat(response.headers().get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN), notNullValue()); + allowedOrigins = response.headers().get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN); + assertThat(allowedOrigins, is(originValue)); + } finally { + response.release(); + } } public void testThatStringLiteralWorksOnMatch() throws IOException { @@ -261,12 +306,16 @@ public void testThatStringLiteralWorksOnMatch() throws IOException { .put(SETTING_CORS_ALLOW_METHODS.getKey(), "get, options, post") .put(SETTING_CORS_ALLOW_CREDENTIALS.getKey(), true) .build(); - io.netty.handler.codec.http.HttpResponse response = executeCorsRequest(settings, originValue, "request-host"); - // inspect response and validate - assertThat(response.headers().get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN), notNullValue()); - String allowedOrigins = response.headers().get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN); - assertThat(allowedOrigins, is(originValue)); - assertThat(response.headers().get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_CREDENTIALS), equalTo("true")); + FullHttpResponse response = executeCorsRequest(settings, originValue, "request-host"); + try { + // inspect response and validate + assertThat(response.headers().get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN), notNullValue()); + String allowedOrigins = response.headers().get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN); + assertThat(allowedOrigins, is(originValue)); + assertThat(response.headers().get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_CREDENTIALS), equalTo("true")); + } finally { + response.release(); + } } public void testThatAnyOriginWorks() throws IOException { @@ -275,12 +324,16 @@ public void testThatAnyOriginWorks() throws IOException { .put(SETTING_CORS_ENABLED.getKey(), true) .put(SETTING_CORS_ALLOW_ORIGIN.getKey(), originValue) .build(); - io.netty.handler.codec.http.HttpResponse response = executeCorsRequest(settings, originValue, "request-host"); - // inspect response and validate - assertThat(response.headers().get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN), notNullValue()); - String allowedOrigins = response.headers().get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN); - assertThat(allowedOrigins, is(originValue)); - assertThat(response.headers().get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_CREDENTIALS), nullValue()); + FullHttpResponse response = executeCorsRequest(settings, originValue, "request-host"); + try { + // inspect response and validate + assertThat(response.headers().get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN), notNullValue()); + String allowedOrigins = response.headers().get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN); + assertThat(allowedOrigins, is(originValue)); + assertThat(response.headers().get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_CREDENTIALS), nullValue()); + } finally { + response.release(); + } } private FullHttpResponse executeCorsRequest(final Settings settings, final String originValue, final String host) throws IOException { @@ -299,9 +352,12 @@ private FullHttpResponse executeCorsRequest(final Settings settings, final Strin response.addHeader("Content-Length", Integer.toString(content.length())); SocketChannelContext context = mock(SocketChannelContext.class); - List flushOperations = handler.writeToBytes(handler.createWriteOperation(context, response, (v, e) -> {})); - + List flushOperations = handler.writeToBytes(handler.createWriteOperation(context, response, (v, e) -> { + ReferenceCountUtil.release(v); + })); + handler.close(); FlushOperation flushOperation = flushOperations.get(0); + ((ChannelPromise) flushOperation.getListener()).setSuccess(); return responseDecoder.decode(Unpooled.wrappedBuffer(flushOperation.getBuffersToWrite())); } @@ -314,8 +370,11 @@ private NioHttpRequest prepareHandlerForResponse(HttpReadWriteHandler handler) t io.netty.handler.codec.http.HttpRequest request = new DefaultFullHttpRequest(version, method, uri); ByteBuf buf = requestEncoder.encode(request); - - handler.consumeReads(toChannelBuffer(buf)); + try { + handler.consumeReads(toChannelBuffer(buf)); + } finally { + buf.release(); + } ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(NioHttpRequest.class); verify(transport, atLeastOnce()).incomingRequest(requestCaptor.capture(), any(HttpChannel.class)); From 8ecc45caec287c40dd4f49ba61933dbf084a5176 Mon Sep 17 00:00:00 2001 From: Armin Date: Wed, 25 Jul 2018 22:29:20 +0200 Subject: [PATCH 2/2] CR: Revert noop release call --- .../elasticsearch/http/nio/HttpReadWriteHandlerTests.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/HttpReadWriteHandlerTests.java b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/HttpReadWriteHandlerTests.java index 78d7dda7af528..62bf845a77058 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/HttpReadWriteHandlerTests.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/HttpReadWriteHandlerTests.java @@ -34,7 +34,6 @@ import io.netty.handler.codec.http.HttpUtil; import io.netty.handler.codec.http.HttpVersion; -import io.netty.util.ReferenceCountUtil; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; @@ -352,9 +351,7 @@ private FullHttpResponse executeCorsRequest(final Settings settings, final Strin response.addHeader("Content-Length", Integer.toString(content.length())); SocketChannelContext context = mock(SocketChannelContext.class); - List flushOperations = handler.writeToBytes(handler.createWriteOperation(context, response, (v, e) -> { - ReferenceCountUtil.release(v); - })); + List flushOperations = handler.writeToBytes(handler.createWriteOperation(context, response, (v, e) -> {})); handler.close(); FlushOperation flushOperation = flushOperations.get(0); ((ChannelPromise) flushOperation.getListener()).setSuccess();