Skip to content

Commit

Permalink
Handle exceptions thrown by HTTP header validation (elastic#107355) (e…
Browse files Browse the repository at this point in the history
…lastic#107454)

Today if the HTTP header validation throws an exception (rather than
calling `listener.onFailure()`) then we treat this as a server-side
error, record it in the logs, and close the connection abruptly without
sending a response. In practice such an exception is more likely a
client-side error, so with this commit we catch it and marshal it back
to the client instead.

Closes elastic#107338
  • Loading branch information
DaveCTurner authored Apr 15, 2024
1 parent 1657ddd commit 3b02039
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 16 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/107355.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 107355
summary: Handle exceptions thrown by HTTP header validation
area: Network
type: bug
issues:
- 107338
Original file line number Diff line number Diff line change
Expand Up @@ -107,24 +107,39 @@ private void requestStart(ChannelHandlerContext ctx) {

if (httpRequest == null) {
// this looks like a malformed request and will forward without validation
ctx.channel().eventLoop().submit(() -> forwardFullRequest(ctx));
ctx.channel().eventLoop().execute(() -> forwardFullRequest(ctx));
} else {
Transports.assertDefaultThreadContext(threadContext);
// this prevents thread-context changes to propagate to the validation listener
// atm, the validation listener submits to the event loop executor, which doesn't know about the ES thread-context,
// so this is just a defensive play, in case the code inside the listener changes to not use the event loop executor
ContextPreservingActionListener<Void> contextPreservingActionListener = new ContextPreservingActionListener<>(
threadContext.wrapRestorable(threadContext.newStoredContext()),
ActionListener.wrap(aVoid ->
// Always use "Submit" to prevent reentrancy concerns if we are still on event loop
ctx.channel().eventLoop().submit(() -> forwardFullRequest(ctx)),
e -> ctx.channel().eventLoop().submit(() -> forwardRequestWithDecoderExceptionAndNoContent(ctx, e))
)
assert Transports.assertDefaultThreadContext(threadContext);
ActionListener.run(
// this prevents thread-context changes to propagate to the validation listener
// atm, the validation listener submits to the event loop executor, which doesn't know about the ES thread-context,
// so this is just a defensive play, in case the code inside the listener changes to not use the event loop executor
ActionListener.assertOnce(
new ContextPreservingActionListener<Void>(
threadContext.wrapRestorable(threadContext.newStoredContext()),
// Always explicitly dispatch back to the event loop to prevent reentrancy concerns if we are still on event loop
new ActionListener<>() {
@Override
public void onResponse(Void unused) {
assert Transports.assertDefaultThreadContext(threadContext);
ctx.channel().eventLoop().execute(() -> forwardFullRequest(ctx));
}

@Override
public void onFailure(Exception e) {
assert Transports.assertDefaultThreadContext(threadContext);
ctx.channel().eventLoop().execute(() -> forwardRequestWithDecoderExceptionAndNoContent(ctx, e));
}
}
)
),
listener -> {
// this prevents thread-context changes to propagate beyond the validation, as netty worker threads are reused
try (ThreadContext.StoredContext ignore = threadContext.newStoredContext()) {
validator.validate(httpRequest, ctx.channel(), listener);
}
}
);
// this prevents thread-context changes to propagate beyond the validation, as netty worker threads are reused
try (ThreadContext.StoredContext ignore = threadContext.newStoredContext()) {
validator.validate(httpRequest, ctx.channel(), contextPreservingActionListener);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class Netty4HttpHeaderValidatorTests extends ESTestCase {
private final AtomicReference<ActionListener<Void>> listener = new AtomicReference<>();
private EmbeddedChannel channel;
private Netty4HttpHeaderValidator netty4HttpHeaderValidator;
private final AtomicReference<RuntimeException> validationException = new AtomicReference<>();

@Override
public void setUp() throws Exception {
Expand All @@ -63,8 +64,13 @@ private void reset() {
channel = new EmbeddedChannel();
header.set(null);
listener.set(null);
validationException.set(null);
HttpValidator validator = (httpRequest, channel, validationCompleteListener) -> {
header.set(httpRequest);
final var exception = validationException.get();
if (exception != null) {
throw exception;
}
listener.set(validationCompleteListener);
};
netty4HttpHeaderValidator = new Netty4HttpHeaderValidator(validator, new ThreadContext(Settings.EMPTY));
Expand Down Expand Up @@ -253,6 +259,7 @@ public void testValidationErrorForwardsAsDecoderErrorMessage() {

final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");
final DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4));

channel.writeInbound(request);
channel.writeInbound(content);

Expand Down Expand Up @@ -285,6 +292,43 @@ public void testValidationErrorForwardsAsDecoderErrorMessage() {
}
}

public void testValidationExceptionForwardsAsDecoderErrorMessage() {
final var exception = new ElasticsearchException("Failure");
assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));

final DefaultHttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri");

validationException.set(exception);
channel.writeInbound(request);

assertThat(header.get(), sameInstance(request));
assertThat(listener.get(), nullValue());

channel.runPendingTasks();
assertTrue(channel.config().isAutoRead());
DefaultHttpRequest failed = channel.readInbound();
assertThat(failed, sameInstance(request));
assertThat(failed.headers().get(HttpHeaderNames.CONNECTION), nullValue());
assertTrue(failed.decoderResult().isFailure());
Exception cause = (Exception) failed.decoderResult().cause();
assertThat(cause, equalTo(exception));
assertThat(netty4HttpHeaderValidator.getState(), equalTo(DROPPING_DATA_UNTIL_NEXT_REQUEST));

final DefaultHttpContent content = new DefaultHttpContent(Unpooled.buffer(4));
channel.writeInbound(content);

assertThat(channel.readInbound(), nullValue());
assertThat(content.refCnt(), equalTo(0));

DefaultLastHttpContent lastContent = new DefaultLastHttpContent(Unpooled.buffer(4));
channel.writeInbound(lastContent);
assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
assertThat(channel.readInbound(), nullValue());
assertThat(lastContent.refCnt(), equalTo(0));
}

public void testValidationHandlesMultipleQueuedUpMessages() {
assertTrue(channel.config().isAutoRead());
assertThat(netty4HttpHeaderValidator.getState(), equalTo(WAITING_TO_START));
Expand Down

0 comments on commit 3b02039

Please sign in to comment.