diff --git a/docs/changelog/107355.yaml b/docs/changelog/107355.yaml new file mode 100644 index 0000000000000..1d4813b877e58 --- /dev/null +++ b/docs/changelog/107355.yaml @@ -0,0 +1,6 @@ +pr: 107355 +summary: Handle exceptions thrown by HTTP header validation +area: Network +type: bug +issues: + - 107338 diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java index 91471863e620f..ad322503b0d06 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidator.java @@ -107,24 +107,39 @@ private void requestStart(ChannelHandlerContext ctx) { if (httpRequest == null) { // this looks like a malformed request and will forward without validation - ctx.channel().eventLoop().submit(() -> forwardFullRequest(ctx)); + ctx.channel().eventLoop().execute(() -> forwardFullRequest(ctx)); } else { - Transports.assertDefaultThreadContext(threadContext); - // this prevents thread-context changes to propagate to the validation listener - // atm, the validation listener submits to the event loop executor, which doesn't know about the ES thread-context, - // so this is just a defensive play, in case the code inside the listener changes to not use the event loop executor - ContextPreservingActionListener contextPreservingActionListener = new ContextPreservingActionListener<>( - threadContext.wrapRestorable(threadContext.newStoredContext()), - ActionListener.wrap(aVoid -> - // Always use "Submit" to prevent reentrancy concerns if we are still on event loop - ctx.channel().eventLoop().submit(() -> forwardFullRequest(ctx)), - e -> ctx.channel().eventLoop().submit(() -> forwardRequestWithDecoderExceptionAndNoContent(ctx, e)) - ) + assert Transports.assertDefaultThreadContext(threadContext); + ActionListener.run( + // this prevents thread-context changes to propagate to the validation listener + // atm, the validation listener submits to the event loop executor, which doesn't know about the ES thread-context, + // so this is just a defensive play, in case the code inside the listener changes to not use the event loop executor + ActionListener.assertOnce( + new ContextPreservingActionListener( + threadContext.wrapRestorable(threadContext.newStoredContext()), + // Always explicitly dispatch back to the event loop to prevent reentrancy concerns if we are still on event loop + new ActionListener<>() { + @Override + public void onResponse(Void unused) { + assert Transports.assertDefaultThreadContext(threadContext); + ctx.channel().eventLoop().execute(() -> forwardFullRequest(ctx)); + } + + @Override + public void onFailure(Exception e) { + assert Transports.assertDefaultThreadContext(threadContext); + ctx.channel().eventLoop().execute(() -> forwardRequestWithDecoderExceptionAndNoContent(ctx, e)); + } + } + ) + ), + listener -> { + // this prevents thread-context changes to propagate beyond the validation, as netty worker threads are reused + try (ThreadContext.StoredContext ignore = threadContext.newStoredContext()) { + validator.validate(httpRequest, ctx.channel(), listener); + } + } ); - // this prevents thread-context changes to propagate beyond the validation, as netty worker threads are reused - try (ThreadContext.StoredContext ignore = threadContext.newStoredContext()) { - validator.validate(httpRequest, ctx.channel(), contextPreservingActionListener); - } } } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidatorTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidatorTests.java index a2c034acdcb8d..e8622f2c95c2c 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidatorTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpHeaderValidatorTests.java @@ -52,6 +52,7 @@ public class Netty4HttpHeaderValidatorTests extends ESTestCase { private final AtomicReference> listener = new AtomicReference<>(); private EmbeddedChannel channel; private Netty4HttpHeaderValidator netty4HttpHeaderValidator; + private final AtomicReference validationException = new AtomicReference<>(); @Override public void setUp() throws Exception { @@ -63,8 +64,13 @@ private void reset() { channel = new EmbeddedChannel(); header.set(null); listener.set(null); + validationException.set(null); HttpValidator validator = (httpRequest, channel, validationCompleteListener) -> { header.set(httpRequest); + final var exception = validationException.get(); + if (exception != null) { + throw exception; + } listener.set(validationCompleteListener); }; netty4HttpHeaderValidator = new Netty4HttpHeaderValidator(validator, new ThreadContext(Settings.EMPTY)); @@ -253,6 +259,7 @@ public void testValidationErrorForwardsAsDecoderErrorMessage() { final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); final DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4)); + channel.writeInbound(request); channel.writeInbound(content); @@ -285,6 +292,43 @@ public void testValidationErrorForwardsAsDecoderErrorMessage() { } } + public void testValidationExceptionForwardsAsDecoderErrorMessage() { + final var exception = new ElasticsearchException("Failure"); + assertTrue(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); + + final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); + + validationException.set(exception); + channel.writeInbound(request); + + assertThat(header.get(), sameInstance(request)); + assertThat(listener.get(), nullValue()); + + channel.runPendingTasks(); + assertTrue(channel.config().isAutoRead()); + DefaultHttpRequest failed = channel.readInbound(); + assertThat(failed, sameInstance(request)); + assertThat(failed.headers().get(HttpHeaderNames.CONNECTION), nullValue()); + assertTrue(failed.decoderResult().isFailure()); + Exception cause = (Exception) failed.decoderResult().cause(); + assertThat(cause, equalTo(exception)); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(DROPPING_DATA_UNTIL_NEXT_REQUEST)); + + final DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4)); + channel.writeInbound(content); + + assertThat(channel.readInbound(), nullValue()); + assertThat(content.refCnt(), equalTo(0)); + + DefaultLastHttpContent lastContent = new DefaultLastHttpContent(Unpooled.buffer(4)); + channel.writeInbound(lastContent); + assertTrue(channel.config().isAutoRead()); + assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START)); + assertThat(channel.readInbound(), nullValue()); + assertThat(lastContent.refCnt(), equalTo(0)); + } + public void testValidationHandlesMultipleQueuedUpMessages() { assertTrue(channel.config().isAutoRead()); assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));