From 1ecb00ee15e1c570245943caa9cb5a2b80abfebd Mon Sep 17 00:00:00 2001 From: yawkat Date: Thu, 8 Jun 2023 12:20:56 +0200 Subject: [PATCH] Fix out-of-order write in HttpStreamsHandler HttpStreamsHandler delays writing the first HttpContent to when the request has been fully written. The second HttpContent was not delayed however, so when writing the request took time (apparently with TLS?), the second HttpContent was written before the first. This patch delays all further writes (normal, complete and error) until the first content has been written. Should fix https://github.com/micronaut-projects/micronaut-tracing/issues/316 --- .../http/netty/stream/HttpStreamsHandler.java | 31 ++++++++- .../HttpStreamsClientHandlerSpec.groovy | 66 +++++++++++++++++++ 2 files changed, 96 insertions(+), 1 deletion(-) create mode 100644 http-netty/src/test/groovy/io/micronaut/http/netty/stream/HttpStreamsClientHandlerSpec.groovy diff --git a/http-netty/src/main/java/io/micronaut/http/netty/stream/HttpStreamsHandler.java b/http-netty/src/main/java/io/micronaut/http/netty/stream/HttpStreamsHandler.java index 27acf66b930..58a5f6fd1fe 100644 --- a/http-netty/src/main/java/io/micronaut/http/netty/stream/HttpStreamsHandler.java +++ b/http-netty/src/main/java/io/micronaut/http/netty/stream/HttpStreamsHandler.java @@ -21,6 +21,7 @@ import io.micronaut.http.netty.reactive.HandlerPublisher; import io.micronaut.http.netty.reactive.HandlerSubscriber; import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; @@ -391,20 +392,39 @@ protected void unbufferedWrite(final ChannelHandlerContext ctx, final Out messag StreamedHttpMessage streamed = (StreamedHttpMessage) message; HandlerSubscriber subscriber = new HandlerSubscriber(ctx.executor()) { AtomicBoolean messageWritten = new AtomicBoolean(); + ChannelFuture delayWrites; @Override public void onNext(HttpContent httpContent) { + if (delayWrites != null && !delayWrites.isDone()) { + delayWrites.addListener(future -> onNext(httpContent)); + return; + } + if (messageWritten.compareAndSet(false, true)) { ChannelPromise messageWritePromise = ctx.newPromise(); //if oncomplete gets called before the message is written the promise //set to lastWriteFuture shouldn't complete until the first content is written lastWriteFuture = messageWritePromise; - ctx.writeAndFlush(message).addListener(f -> super.onNext(httpContent, messageWritePromise)); + delayWrites = ctx.writeAndFlush(message); + delayWrites.addListener(f -> { + delayWrites = null; + super.onNext(httpContent, messageWritePromise); + }); } else { super.onNext(httpContent); } } + @Override + public void onError(Throwable error) { + if (delayWrites != null && !delayWrites.isDone()) { + delayWrites.addListener(future -> onError(error)); + return; + } + super.onError(error); + } + @Override protected void error(Throwable error) { try { @@ -424,6 +444,15 @@ protected void error(Throwable error) { } } + @Override + public void onComplete() { + if (delayWrites != null && !delayWrites.isDone()) { + delayWrites.addListener(future -> onComplete()); + return; + } + super.onComplete(); + } + @Override protected void complete() { if (messageWritten.compareAndSet(false, true)) { diff --git a/http-netty/src/test/groovy/io/micronaut/http/netty/stream/HttpStreamsClientHandlerSpec.groovy b/http-netty/src/test/groovy/io/micronaut/http/netty/stream/HttpStreamsClientHandlerSpec.groovy new file mode 100644 index 00000000000..0cca173eced --- /dev/null +++ b/http-netty/src/test/groovy/io/micronaut/http/netty/stream/HttpStreamsClientHandlerSpec.groovy @@ -0,0 +1,66 @@ +package io.micronaut.http.netty.stream + +import io.netty.buffer.ByteBuf +import io.netty.buffer.Unpooled +import io.netty.channel.ChannelHandlerContext +import io.netty.channel.ChannelOutboundHandlerAdapter +import io.netty.channel.ChannelPromise +import io.netty.channel.embedded.EmbeddedChannel +import io.netty.handler.codec.http.DefaultHttpContent +import io.netty.handler.codec.http.DefaultHttpRequest +import io.netty.handler.codec.http.HttpContent +import io.netty.handler.codec.http.HttpMethod +import io.netty.handler.codec.http.HttpRequest +import io.netty.handler.codec.http.HttpVersion +import io.netty.handler.codec.http.LastHttpContent +import reactor.core.publisher.Flux +import spock.lang.Issue +import spock.lang.Specification + +import java.nio.charset.StandardCharsets + +class HttpStreamsClientHandlerSpec extends Specification { + @Issue('https://github.com/micronaut-projects/micronaut-tracing/issues/316') + def 'out of order write'() { + given: + ChannelPromise firstWritePromise = null + def channel = new EmbeddedChannel( + new ChannelOutboundHandlerAdapter() { + @Override + void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + // delay the completion of the write of the HttpRequest + if (firstWritePromise == null) { + firstWritePromise = promise + ctx.write(msg, ctx.voidPromise()) + return + } + super.write(ctx, msg, promise) + } + }, + new HttpStreamsClientHandler() + ) + def msg = new DelegateStreamedHttpRequest( + new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/"), + JsonSubscriber.lift(Flux.just(new DefaultHttpContent(Unpooled.wrappedBuffer("\"foo\"".getBytes(StandardCharsets.UTF_8))))) + ) + + when: + channel.writeOutbound(msg) + firstWritePromise.trySuccess() + channel.flushOutbound() + then: + channel.readOutbound() instanceof HttpRequest + when: + ByteBuf combined = Unpooled.buffer() + while (true) { + HttpContent h = channel.readOutbound() + combined.writeBytes(h.content()) + if (h instanceof LastHttpContent) { + break + } + } + then: + combined.toString(StandardCharsets.UTF_8) == "[\"foo\"]" + !channel.finish() + } +}