diff --git a/http-client/src/main/java/io/micronaut/http/client/netty/DefaultHttpClient.java b/http-client/src/main/java/io/micronaut/http/client/netty/DefaultHttpClient.java index 65b83984262..4580789e94c 100644 --- a/http-client/src/main/java/io/micronaut/http/client/netty/DefaultHttpClient.java +++ b/http-client/src/main/java/io/micronaut/http/client/netty/DefaultHttpClient.java @@ -49,6 +49,7 @@ import io.micronaut.http.bind.RequestBinderRegistry; import io.micronaut.http.body.ByteBody; import io.micronaut.http.body.ChunkedMessageBodyReader; +import io.micronaut.http.body.CloseableAvailableByteBody; import io.micronaut.http.body.CloseableByteBody; import io.micronaut.http.body.ContextlessMessageBodyHandlerRegistry; import io.micronaut.http.body.InternalByteBody; @@ -88,13 +89,11 @@ import io.micronaut.http.filter.HttpClientFilterResolver; import io.micronaut.http.filter.HttpFilterResolver; import io.micronaut.http.multipart.MultipartException; -import io.micronaut.http.netty.EventLoopFlow; import io.micronaut.http.netty.NettyHttpHeaders; import io.micronaut.http.netty.NettyHttpRequestBuilder; import io.micronaut.http.netty.NettyHttpResponseBuilder; import io.micronaut.http.netty.body.AvailableNettyByteBody; import io.micronaut.http.netty.body.BodySizeLimits; -import io.micronaut.http.netty.body.BufferConsumer; import io.micronaut.http.netty.body.NettyBodyAdapter; import io.micronaut.http.netty.body.NettyByteBody; import io.micronaut.http.netty.body.NettyByteBufMessageBodyHandler; @@ -130,9 +129,7 @@ import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFactory; -import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; @@ -157,7 +154,6 @@ import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpUtil; import io.netty.handler.codec.http.HttpVersion; -import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.codec.http.multipart.DefaultHttpDataFactory; import io.netty.handler.codec.http.multipart.FileUpload; import io.netty.handler.codec.http.multipart.HttpData; @@ -172,8 +168,6 @@ import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.concurrent.FastThreadLocalThread; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.Promise; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -205,9 +199,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; +import java.util.function.BiFunction; import java.util.function.Function; +import java.util.function.UnaryOperator; /** * Default implementation of the {@link HttpClient} interface based on Netty. @@ -617,8 +611,7 @@ public HttpResponse exchange(io.micronaut.http.HttpRequest reque here."""); } BlockHint blockHint = BlockHint.willBlockThisThread(); - return Flux.from(DefaultHttpClient.this.exchange(request, bodyType, errorType, blockHint)) - .blockFirst(); + return DefaultHttpClient.this.exchange(request, bodyType, errorType, blockHint).block(); // We don't have to release client response buffer } @@ -867,16 +860,103 @@ public Publisher jsonStream(@NonNull io.micronaut.http.HttpRequest @Override public Publisher> exchange(@NonNull io.micronaut.http.HttpRequest request, @NonNull Argument bodyType, @NonNull Argument errorType) { - return exchange(request, bodyType, errorType, null); + return exchange(request, bodyType, errorType, null) + // some tests expect flux... + .flux(); } @NonNull - private Flux> exchange(io.micronaut.http.HttpRequest request, Argument bodyType, Argument errorType, @Nullable BlockHint blockHint) { + private Mono> exchange(io.micronaut.http.HttpRequest request, Argument bodyType, Argument errorType, @Nullable BlockHint blockHint) { setupConversionService(request); final io.micronaut.http.HttpRequest parentRequest = ServerRequestContext.currentRequest().orElse(null); - Publisher uriPublisher = resolveRequestURI(request); - return Flux.from(uriPublisher) - .switchMap(uri -> (Publisher) exchangeImpl(uri, parentRequest, toMutableRequest(request), bodyType, errorType, blockHint)); + Mono> mono = resolveRequestURI(request).flatMap(uri -> { + MutableHttpRequest mutableRequest = toMutableRequest(request).uri(uri); + //noinspection unchecked + return sendRequestWithRedirects( + parentRequest, + blockHint, + mutableRequest, + (req, resp) -> Mono.>from(ReactiveExecutionFlow.fromFlow(InternalByteBody.bufferFlow(resp.byteBody()) + .onErrorResume(t -> ExecutionFlow.error(handleResponseError(mutableRequest, t))) + .flatMap(av -> handleExchangeResponse(bodyType, errorType, resp, av))).toPublisher()) + ).map(r -> (HttpResponse) r); + }); + + Duration requestTimeout = configuration.getRequestTimeout(); + if (requestTimeout == null) { + // for compatibility + requestTimeout = configuration.getReadTimeout() + .filter(d -> !d.isNegative()) + .map(d -> d.plusSeconds(1)).orElse(null); + } + if (requestTimeout != null) { + if (!requestTimeout.isNegative()) { + mono = mono.timeout(requestTimeout) + .onErrorResume(throwable -> { + if (throwable instanceof TimeoutException) { + return Mono.error(ReadTimeoutException.TIMEOUT_EXCEPTION); + } + return Mono.error(throwable); + }); + } + } + return mono; + } + + private @NonNull ExecutionFlow> handleExchangeResponse(Argument bodyType, Argument errorType, NettyClientByteBodyResponse resp, CloseableAvailableByteBody av) { + ByteBuf buf = AvailableNettyByteBody.toByteBuf(av); + DefaultFullHttpResponse fullHttpResponse = new DefaultFullHttpResponse( + resp.nettyResponse.protocolVersion(), + resp.nettyResponse.status(), + buf, + resp.nettyResponse.headers(), + EmptyHttpHeaders.INSTANCE + ); + + try { + if (log.isTraceEnabled()) { + traceBody("Response", fullHttpResponse.content()); + } + + boolean convertBodyWithBodyType = shouldConvertWithBodyType(fullHttpResponse, this.configuration, bodyType, errorType); + FullNettyClientHttpResponse response = new FullNettyClientHttpResponse<>(fullHttpResponse, handlerRegistry, bodyType, convertBodyWithBodyType, conversionService); + + if (convertBodyWithBodyType) { + return ExecutionFlow.just(response); + } else { // error flow + try { + return ExecutionFlow.error(makeErrorFromRequestBody(errorType, fullHttpResponse.status(), response)); + } catch (HttpClientResponseException t) { + return ExecutionFlow.error(t); + } catch (Exception t) { + return ExecutionFlow.error(makeErrorBodyParseError(fullHttpResponse, t)); + } + } + } catch (HttpClientResponseException t) { + return ExecutionFlow.error(t); + } catch (Exception t) { + FullNettyClientHttpResponse response = new FullNettyClientHttpResponse<>( + fullHttpResponse, + handlerRegistry, + null, + false, + conversionService + ); + HttpClientResponseException clientResponseError = decorate(new HttpClientResponseException( + "Error decoding HTTP response body: " + t.getMessage(), + t, + response, + new HttpClientErrorDecoder() { + @Override + public Argument getErrorType(MediaType mediaType) { + return errorType; + } + } + )); + return ExecutionFlow.error(clientResponseError); + } finally { + fullHttpResponse.release(); + } } @Override @@ -1047,18 +1127,47 @@ private Publisher> buildStreamExchange( @NonNull MutableHttpRequest request, @NonNull URI requestURI, @Nullable Argument errorType) { + return this.sendRequestWithRedirects( + parentRequest, + null, + request.uri(requestURI), + (req, resp) -> { + ByteBody bb = resp.byteBody(); + Publisher body; + if (!hasBody(resp)) { + resp.close(); + body = Flux.empty(); + } else { + if (isAcceptEvents(req)) { + if (bb instanceof AvailableNettyByteBody anbb) { + // same semantics as the streaming branch, but this is eager so it's more + // lax wrt unclosed responses. + ByteBuf single = AvailableNettyByteBody.toByteBuf(anbb); + List parts = SseSplitter.split(single); + parts.get(parts.size() - 1).release(); + body = Flux.fromIterable(parts.subList(0, parts.size() - 1)).map(DefaultHttpContent::new); + } else { + body = SseSplitter.split(NettyByteBody.toByteBufs(bb), sizeLimits()).map(DefaultHttpContent::new); + } + } else { + body = NettyByteBody.toByteBufs(bb).map(DefaultHttpContent::new); + } + } - AtomicReference> requestWrapper = new AtomicReference<>(request); - Flux> streamResponsePublisher = connectAndStream(parentRequest, request, requestURI, requestWrapper, false, true); - - streamResponsePublisher = readBodyOnError(errorType, streamResponsePublisher); - - // apply filters - streamResponsePublisher = Flux.from( - applyFilterToResponsePublisher(parentRequest, request, requestURI, streamResponsePublisher) + return readBodyOnError(errorType, Mono.>just(toStreamingResponse(resp, body)) + .flatMap(r -> handleStreamHttpError(r, true))); + } ); + } - return streamResponsePublisher; + private MutableHttpResponse toStreamingResponse(NettyClientByteBodyResponse resp, Publisher content) { + DefaultStreamedHttpResponse nettyResponse = new DefaultStreamedHttpResponse( + resp.nettyResponse.protocolVersion(), + resp.nettyResponse.status(), + resp.getHeaders().getNettyHeaders(), + content + ); + return new NettyStreamedHttpResponse<>(nettyResponse, conversionService); } @Override @@ -1070,27 +1179,32 @@ public Publisher> proxy(@NonNull io.micronaut.http.HttpRe public Publisher> proxy(@NonNull io.micronaut.http.HttpRequest request, @NonNull ProxyRequestOptions options) { Objects.requireNonNull(options, "options"); setupConversionService(request); - return Flux.from(resolveRequestURI(request)) + return resolveRequestURI(request) .flatMap(requestURI -> { MutableHttpRequest httpRequest = toMutableRequest(request); if (!options.isRetainHostHeader()) { httpRequest.headers(headers -> headers.remove(HttpHeaderNames.HOST)); } - AtomicReference> requestWrapper = new AtomicReference<>(httpRequest); - Flux> proxyResponsePublisher = connectAndStream(request, request, requestURI, requestWrapper, true, false); - // apply filters - //noinspection - proxyResponsePublisher = Flux.from( - applyFilterToResponsePublisher( - request, - requestWrapper.get(), - requestURI, - proxyResponsePublisher - ) + return this.sendRequestWithRedirects( + request, + null, + httpRequest.uri(requestURI), + (req, resp) -> { + Publisher body; + if (!hasBody(resp)) { + resp.close(); + body = Flux.empty(); + } else { + body = NettyByteBody.toByteBufs(resp.byteBody()).map(DefaultHttpContent::new); + } + + return Mono.>just(toStreamingResponse(resp, body)) + .flatMap(r -> handleStreamHttpError(r, false)); + } ); - return proxyResponsePublisher.map(HttpResponse::toMutableResponse); - }); + }) + .map(HttpResponse::toMutableResponse); } private void setupConversionService(io.micronaut.http.HttpRequest httpRequest) { @@ -1099,103 +1213,12 @@ private void setupConversionService(io.micronaut.http.HttpRequest httpRequest } } - private Flux> connectAndStream( - io.micronaut.http.HttpRequest parentRequest, - io.micronaut.http.HttpRequest request, - URI requestURI, - AtomicReference> requestWrapper, - boolean isProxy, - boolean failOnError - ) { - RequestKey requestKey; - try { - requestKey = new RequestKey(this, requestURI); - } catch (Exception e) { - return Flux.error(e); - } - return connectionManager.connect(requestKey, null).flatMapMany(poolHandle -> { - request.setAttribute(NettyClientHttpRequest.CHANNEL, poolHandle.channel); - - boolean sse = !isProxy && isAcceptEvents(request); - return this.streamRequestThroughChannel( - parentRequest, - requestWrapper.get(), - poolHandle, - failOnError, - sse - ); - }); - } - - /** - * Implementation of {@link #exchange(io.micronaut.http.HttpRequest, Argument, Argument)} (after URI resolution). - */ - private Publisher> exchangeImpl( - URI requestURI, - io.micronaut.http.HttpRequest parentRequest, - MutableHttpRequest request, - @NonNull Argument bodyType, - @NonNull Argument errorType, - @Nullable BlockHint blockHint) { - AtomicReference> requestWrapper = new AtomicReference<>(request); - - RequestKey requestKey; - try { - requestKey = new RequestKey(this, requestURI); - } catch (HttpClientException e) { - return Flux.error(e); - } - - Mono handlePublisher = connectionManager.connect(requestKey, blockHint); - - Flux> responsePublisher = handlePublisher.flatMapMany(poolHandle -> Flux.create(emitter -> { - try { - sendRequestThroughChannel( - requestWrapper.get(), - bodyType, - errorType, - emitter, - poolHandle - ); - } catch (Exception e) { - emitter.error(e); - } - })); - - Publisher> finalPublisher = applyFilterToResponsePublisher( - parentRequest, - request, - requestURI, - responsePublisher - ); - Flux> finalReactiveSequence = Flux.from(finalPublisher); - Duration requestTimeout = configuration.getRequestTimeout(); - if (requestTimeout == null) { - // for compatibility - requestTimeout = configuration.getReadTimeout() - .filter(d -> !d.isNegative()) - .map(d -> d.plusSeconds(1)).orElse(null); - } - if (requestTimeout != null) { - if (!requestTimeout.isNegative()) { - finalReactiveSequence = finalReactiveSequence.timeout(requestTimeout) - .onErrorResume(throwable -> { - if (throwable instanceof TimeoutException) { - return Flux.error(ReadTimeoutException.TIMEOUT_EXCEPTION); - } - return Flux.error(throwable); - }); - } - } - return finalReactiveSequence; - } - /** * @param request The request * @param The input type * @return A {@link Publisher} with the resolved URI */ - protected Publisher resolveRequestURI(io.micronaut.http.HttpRequest request) { + protected Mono resolveRequestURI(io.micronaut.http.HttpRequest request) { return resolveRequestURI(request, true); } @@ -1205,11 +1228,11 @@ protected Publisher resolveRequestURI(io.micronaut.http.HttpRequest * @param The input type * @return A {@link Publisher} with the resolved URI */ - protected Publisher resolveRequestURI(io.micronaut.http.HttpRequest request, boolean includeContextPath) { + protected Mono resolveRequestURI(io.micronaut.http.HttpRequest request, boolean includeContextPath) { URI requestURI = request.getUri(); if (requestURI.getScheme() != null) { // if the request URI includes a scheme then it is fully qualified so use the direct server - return Flux.just(requestURI); + return Mono.just(requestURI); } else { return resolveURI(request, includeContextPath); } @@ -1221,11 +1244,11 @@ protected Publisher resolveRequestURI(io.micronaut.http.HttpRequest * @param The input type * @return A {@link Publisher} with the resolved URI */ - protected Publisher resolveRedirectURI(io.micronaut.http.HttpRequest parentRequest, io.micronaut.http.HttpRequest request) { + protected Mono resolveRedirectURI(io.micronaut.http.HttpRequest parentRequest, io.micronaut.http.HttpRequest request) { URI requestURI = request.getUri(); if (requestURI.getScheme() != null) { // if the request URI includes a scheme then it is fully qualified so use the direct server - return Flux.just(requestURI); + return Mono.just(requestURI); } else { if (parentRequest == null || parentRequest.getUri().getHost() == null) { return resolveURI(request, false); @@ -1236,7 +1259,7 @@ protected Publisher resolveRedirectURI(io.micronaut.http.HttpRequest .userInfo(parentURI.getUserInfo()) .host(parentURI.getHost()) .port(parentURI.getPort()); - return Flux.just(uriBuilder.build()); + return Mono.just(uriBuilder.build()); } } } @@ -1248,66 +1271,11 @@ protected Object getLoadBalancerDiscriminator() { return null; } - private Publisher> applyFilterToResponsePublisher( - io.micronaut.http.HttpRequest parentRequest, - io.micronaut.http.HttpRequest request, - URI requestURI, - Publisher> responsePublisher) { - - if (!(request instanceof MutableHttpRequest mutRequest)) { - return responsePublisher; - } - - mutRequest.uri(requestURI); - if (informationalServiceId != null && mutRequest.getAttribute(HttpAttributes.SERVICE_ID).isEmpty()) { - - mutRequest.setAttribute(HttpAttributes.SERVICE_ID, informationalServiceId); - } - - List filters = - filterResolver.resolveFilters(request, clientFilterEntries); - if (parentRequest != null) { - // todo: migrate to new filter - filters.add( - GenericHttpFilter.createLegacyFilter(new ClientServerContextFilter(parentRequest), new FilterOrder.Fixed(Ordered.HIGHEST_PRECEDENCE)) - ); - } - - FilterRunner.sortReverse(filters); - - FilterRunner runner = new FilterRunner(filters) { - @Override - protected ExecutionFlow> provideResponse(io.micronaut.http.HttpRequest request, PropagatedContext propagatedContext) { - try { - try (PropagatedContext.Scope ignore = propagatedContext.propagate()) { - return ReactiveExecutionFlow.fromPublisher(responsePublisher); - } - } catch (Throwable e) { - return ExecutionFlow.error(e); - } - } - }; - Mono> responseMono = Mono.from(ReactiveExecutionFlow.fromFlow(runner.run(request)).toPublisher()); - if (parentRequest != null) { - responseMono = responseMono.contextWrite(c -> { - // existing entry takes precedence. The parentRequest is derived from a thread - // local, and is more likely to be wrong than any reactive context we are fed. - if (c.hasKey(ServerRequestContext.KEY)) { - return c; - } else { - return c.put(ServerRequestContext.KEY, parentRequest); - } - }); - } - return responseMono; - } - /** * @param request The request * @param requestURI The URI of the request * @param requestContentType The request content type * @param permitsBody Whether permits body - * @param onError Called when the body publisher encounters an error * @return The body * @throws HttpPostRequestEncoder.ErrorDataEncoderException if there is an encoder exception */ @@ -1316,7 +1284,6 @@ private NettyByteBody buildNettyRequest( URI requestURI, MediaType requestContentType, boolean permitsBody, - Consumer onError, EventLoop eventLoop) throws HttpPostRequestEncoder.ErrorDataEncoderException { if (!request.getHeaders().contains(io.micronaut.http.HttpHeaders.HOST)) { @@ -1334,9 +1301,9 @@ private NettyByteBody buildNettyRequest( } NettyHttpRequestBuilder nettyRequestBuilder = NettyHttpRequestBuilder.asBuilder(request); - Optional direct = nettyRequestBuilder.byteBodyDirect(); - if (direct.isPresent()) { - return NettyBodyAdapter.adapt(direct.get(), eventLoop); + ByteBody direct = nettyRequestBuilder.byteBodyDirect(); + if (direct != null) { + return NettyBodyAdapter.adapt(direct, eventLoop); } if (permitsBody) { @@ -1374,8 +1341,6 @@ private NettyByteBody buildNettyRequest( requestBodyPublisher = JsonSubscriber.lift(requestBodyPublisher); } - requestBodyPublisher = requestBodyPublisher.doOnError(onError); - return NettyBodyAdapter.adapt(requestBodyPublisher.map(ByteBufHolder::content), eventLoop, nettyRequestBuilder.toHttpRequestWithoutBody().headers(), null); } else if (bodyValue instanceof CharSequence sequence) { bodyContent = charSequenceToByteBuf(sequence, requestContentType); @@ -1411,7 +1376,7 @@ private static boolean permitsRequestBody(HttpMethod method) { ); } - private Flux> readBodyOnError(@Nullable Argument errorType, @NonNull Flux> publisher) { + private Mono> readBodyOnError(@Nullable Argument errorType, @NonNull Mono> publisher) { if (errorType != null && errorType != HttpClient.DEFAULT_ERROR_TYPE) { return publisher.onErrorResume(clientException -> { if (clientException instanceof HttpClientResponseException exception) { @@ -1470,13 +1435,13 @@ public Argument getErrorType(MediaType mediaType) { return publisher; } - private Publisher resolveURI(io.micronaut.http.HttpRequest request, boolean includeContextPath) { + private Mono resolveURI(io.micronaut.http.HttpRequest request, boolean includeContextPath) { URI requestURI = request.getUri(); if (loadBalancer == null) { - return Flux.error(decorate(new NoHostException("Request URI specifies no host to connect to"))); + return Mono.error(decorate(new NoHostException("Request URI specifies no host to connect to"))); } - return Flux.from(loadBalancer.select(getLoadBalancerDiscriminator())).map(server -> { + return Mono.from(loadBalancer.select(getLoadBalancerDiscriminator())).map(server -> { Optional authInfo = server.getMetadata().get(io.micronaut.http.HttpHeaders.AUTHORIZATION_INFO, String.class); if (request instanceof MutableHttpRequest httpRequest && authInfo.isPresent()) { httpRequest.getHeaders().auth(authInfo.get()); @@ -1491,175 +1456,354 @@ private Publisher resolveURI(io.micronaut.http.HttpRequest request, ); } - private void sendRequestThroughChannel( - io.micronaut.http.HttpRequest finalRequest, - Argument bodyType, - Argument errorType, - FluxSink> emitter, - ConnectionManager.PoolHandle poolHandle) throws HttpPostRequestEncoder.ErrorDataEncoderException { - URI requestURI = finalRequest.getUri(); - MediaType requestContentType = finalRequest - .getContentType() - .orElse(MediaType.APPLICATION_JSON_TYPE); - - boolean permitsBody = io.micronaut.http.HttpMethod.permitsRequestBody(finalRequest.getMethod()); - - NettyByteBody bytes = buildNettyRequest( - (MutableHttpRequest) finalRequest, - requestURI, - requestContentType, - permitsBody, - throwable -> { - if (!emitter.isCancelled()) { - emitter.error(throwable); - } - }, - poolHandle.channel.eventLoop() - ); - String newUri = requestURI.getRawPath(); - if (requestURI.getRawQuery() != null) { - newUri += "?" + requestURI.getRawQuery(); - } - HttpRequest nettyRequest = NettyHttpRequestBuilder.asBuilder(finalRequest).toHttpRequestWithoutBody().setUri(newUri); - - if (log.isDebugEnabled()) { - debugRequest(requestURI, nettyRequest); - } - - if (log.isTraceEnabled()) { - traceRequest(finalRequest, nettyRequest); - } - - Promise> responsePromise = poolHandle.channel.eventLoop().newPromise(); - poolHandle.channel.pipeline().addLast(ChannelPipelineCustomizer.HANDLER_MICRONAUT_HTTP_RESPONSE, - new Http1ResponseHandler(new FullHttpResponseListener<>(responsePromise, poolHandle, finalRequest, bodyType, errorType))); - poolHandle.notifyRequestPipelineBuilt(); - Publisher> publisher = new NettyFuturePublisher<>(responsePromise, true); - publisher.subscribe(new ForwardingSubscriber<>(emitter)); - - new ByteBodyRequestWriter(nettyRequest, bytes).write(poolHandle); - } - - private Flux> streamRequestThroughChannel( - io.micronaut.http.HttpRequest parentRequest, - MutableHttpRequest request, - ConnectionManager.PoolHandle poolHandle, - boolean failOnError, - boolean sse) { - return Flux.>create(sink -> { - try { - streamRequestThroughChannel0(parentRequest, request, sink, poolHandle, sse); - } catch (HttpPostRequestEncoder.ErrorDataEncoderException e) { - sink.error(e); - } - }).flatMap(resp -> handleStreamHttpError(resp, failOnError)); - } - - private > Flux handleStreamHttpError( + private > Mono handleStreamHttpError( R response, boolean failOnError ) { boolean errorStatus = response.code() >= 400; if (errorStatus && failOnError) { // todo: close response properly - return Flux.error(decorate(new HttpClientResponseException(response.reason(), response))); + return Mono.error(decorate(new HttpClientResponseException(response.reason(), response))); } else { - return Flux.just(response); + return Mono.just(response); } } - private void streamRequestThroughChannel0( + /** + * This is the high-level request method. It sits above {@link #sendRawRequest} and handles + * things like filters, error handling, response parsing, request writing. + * + * @param parentRequest The parent server request from {@link ServerRequestContext}, for context propagation + * @param blockHint The optional block hint + * @param request The request to send. Must have resolved absolute URI (see {@link #resolveURI}) + * @param readResponse Function that reads the response from the raw + * {@link NettyClientByteBodyResponse} representation. This is run exactly + * once, but if there is a redirect, it potentially runs with a different + * request than the original (which is why it has a request parameter) + * @return A mono containing the response + */ + private Mono> sendRequestWithRedirects( io.micronaut.http.HttpRequest parentRequest, + @Nullable BlockHint blockHint, MutableHttpRequest request, - FluxSink> emitter, - ConnectionManager.PoolHandle poolHandle, - boolean sse) throws HttpPostRequestEncoder.ErrorDataEncoderException { - - URI requestURI = request.getUri(); - boolean permitsBody = io.micronaut.http.HttpMethod.permitsRequestBody(request.getMethod()); - NettyByteBody byteBody = buildNettyRequest( - request, - requestURI, - request - .getContentType() - .orElse(MediaType.APPLICATION_JSON_TYPE), - permitsBody, - throwable -> { - if (!emitter.isCancelled()) { - emitter.error(throwable); - } - }, - poolHandle.channel.eventLoop() - ); - String newUri = requestURI.getRawPath(); - if (requestURI.getRawQuery() != null) { - newUri += "?" + requestURI.getRawQuery(); + BiFunction, NettyClientByteBodyResponse, ? extends Mono>> readResponse + ) { + if (informationalServiceId != null && request.getAttribute(HttpAttributes.SERVICE_ID).isEmpty()) { + request.setAttribute(HttpAttributes.SERVICE_ID, informationalServiceId); } - HttpRequest nettyRequest = NettyHttpRequestBuilder.asBuilder(request).toHttpRequestWithoutBody().setUri(newUri); - - Promise> responsePromise = poolHandle.channel.eventLoop().newPromise(); - ChannelPipeline pipeline = poolHandle.channel.pipeline(); - pipeline.addLast(ChannelPipelineCustomizer.HANDLER_MICRONAUT_HTTP_RESPONSE, new Http1ResponseHandler(new StreamHttpResponseListener(responsePromise, parentRequest, request, poolHandle, sse))); - poolHandle.notifyRequestPipelineBuilt(); - if (log.isDebugEnabled()) { - debugRequest(request.getUri(), nettyRequest); + List filters = + filterResolver.resolveFilters(request, clientFilterEntries); + if (parentRequest != null) { + // todo: migrate to new filter + filters.add( + GenericHttpFilter.createLegacyFilter(new ClientServerContextFilter(parentRequest), new FilterOrder.Fixed(Ordered.HIGHEST_PRECEDENCE)) + ); } - if (log.isTraceEnabled()) { - traceRequest(request, nettyRequest); - } + FilterRunner.sortReverse(filters); - new ByteBodyRequestWriter(nettyRequest, byteBody).write(poolHandle); - responsePromise.addListener((Future> future) -> { - if (future.isSuccess()) { - emitter.next(future.getNow()); - emitter.complete(); - } else { - emitter.error(future.cause()); + FilterRunner runner = new FilterRunner(filters) { + @Override + protected ExecutionFlow> provideResponse(io.micronaut.http.HttpRequest request, PropagatedContext propagatedContext) { + try { + try (PropagatedContext.Scope ignore = propagatedContext.propagate()) { + return ReactiveExecutionFlow.fromPublisher(Mono.from(sendRequestWithRedirectsNoFilter( + parentRequest, + blockHint, + MutableHttpRequestWrapper.wrapIfNecessary(conversionService, request), + readResponse + ))); + } + } catch (Throwable e) { + return ExecutionFlow.error(e); + } } - }); - } - - private ByteBuf charSequenceToByteBuf(CharSequence bodyValue, MediaType requestContentType) { - return byteBufferFactory.copiedBuffer( - bodyValue.toString().getBytes( - requestContentType.getCharset().orElse(defaultCharset) - ) - ).asNativeBuffer(); - } - - private String getHostHeader(URI requestURI) { - RequestKey requestKey = new RequestKey(this, requestURI); - StringBuilder host = new StringBuilder(requestKey.getHost()); - int port = requestKey.getPort(); - if (port > -1 && port != 80 && port != 443) { - host.append(":").append(port); + }; + Mono> responseMono = Mono.from(ReactiveExecutionFlow.fromFlow(runner.run(request)).toPublisher()); + if (parentRequest != null) { + responseMono = responseMono.contextWrite(c -> { + // existing entry takes precedence. The parentRequest is derived from a thread + // local, and is more likely to be wrong than any reactive context we are fed. + if (c.hasKey(ServerRequestContext.KEY)) { + return c; + } else { + return c.put(ServerRequestContext.KEY, parentRequest); + } + }); } - return host.toString(); + return responseMono; } - private NettyByteBody buildFormRequest( + private Mono> sendRequestWithRedirectsNoFilter( + io.micronaut.http.HttpRequest parentRequest, + @Nullable BlockHint blockHint, MutableHttpRequest request, - EventLoop eventLoop, - ThrowingFunction buildMethod - ) throws HttpPostRequestEncoder.ErrorDataEncoderException { - // this function acts like a wrapper around HttpPostRequestEncoder. HttpPostRequestEncoder - // takes a request + form data and transforms it to a request + bytes. Because we only want - // the bytes, we need to copy the data from the netty request back to the original - // MutableHttpRequest. This is just the Content-Type header, which sometimes gets an extra - // boundary specifier that we need. - - // build the mock netty request (only the content-type matters) - HttpRequest nettyRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); - List relevantHeaders = List.of(HttpHeaderNames.CONTENT_TYPE); - for (AsciiString header : relevantHeaders) { - nettyRequest.headers().add(header, request.getHeaders().getAll(header)); + BiFunction, NettyClientByteBodyResponse, ? extends Mono>> readResponse + ) { + RequestKey requestKey; + try { + requestKey = new RequestKey(this, request.getUri()); + } catch (Exception e) { + return Mono.error(e); } + // first: connect + return connectionManager.connect(requestKey, blockHint) + .flatMap(poolHandle -> { + // build the raw request + request.setAttribute(NettyClientHttpRequest.CHANNEL, poolHandle.channel); - HttpPostRequestEncoder encoder = buildMethod.apply(nettyRequest); - HttpRequest finalized = encoder.finalizeRequest(); - // copy back the content-type + URI requestURI = request.getUri(); + boolean permitsBody = io.micronaut.http.HttpMethod.permitsRequestBody(request.getMethod()); + NettyByteBody byteBody; + try { + byteBody = buildNettyRequest( + request, + requestURI, + request + .getContentType() + .orElse(MediaType.APPLICATION_JSON_TYPE), + permitsBody, + poolHandle.channel.eventLoop() + ); + } catch (HttpPostRequestEncoder.ErrorDataEncoderException e) { + poolHandle.release(); + return Mono.error(e); + } + + // send the raw request + return sendRawRequest(poolHandle, request, byteBody, c -> handleResponseError(request, c)); + }) + .flatMap(byteBodyResponse -> { + // handle redirects or map the response bytes + + int code = byteBodyResponse.code(); + HttpHeaders nettyHeaders = byteBodyResponse.getHeaders().getNettyHeaders(); + if (code > 300 && code < 400 && configuration.isFollowRedirects() && nettyHeaders.contains(HttpHeaderNames.LOCATION)) { + byteBodyResponse.close(); + String location = nettyHeaders.get(HttpHeaderNames.LOCATION); + + MutableHttpRequest redirectRequest; + if (code == 307 || code == 308) { + redirectRequest = io.micronaut.http.HttpRequest.create(request.getMethod(), location); + request.getBody().ifPresent(redirectRequest::body); + } else { + redirectRequest = io.micronaut.http.HttpRequest.GET(location); + } + + setRedirectHeaders(request, redirectRequest); + return resolveRedirectURI(request, redirectRequest) + .flatMap(uri -> sendRequestWithRedirects(parentRequest, blockHint, redirectRequest.uri(uri), readResponse)); + } else { + io.micronaut.http.HttpHeaders headers = byteBodyResponse.getHeaders(); + if (log.isTraceEnabled()) { + log.trace("HTTP Client Response Received ({}) for Request: {} {}", byteBodyResponse.code(), request.getMethodName(), request.getUri()); + HttpHeadersUtil.trace(log, headers.names(), headers::getAll); + } + return readResponse.apply(request, byteBodyResponse); + } + }); + } + + /** + * This is the low-level request method, without redirect handling and with raw body bytes. + * + * @param poolHandle The pool handle to send the request on + * @param request The request to send + * @param byteBody The request body + * @param mapConnectionError Operator used to map connection errors (e.g. when the connection + * is closed by the remote). Other errors are not mapped + * @return A mono containing the response + */ + private Mono sendRawRequest( + ConnectionManager.PoolHandle poolHandle, + io.micronaut.http.HttpRequest request, + NettyByteBody byteBody, + UnaryOperator mapConnectionError + ) { + URI uri = request.getUri(); + String uriWithoutHost = uri.getRawPath(); + if (uri.getRawQuery() != null) { + uriWithoutHost += "?" + uri.getRawQuery(); + } + HttpRequest nettyRequest = NettyHttpRequestBuilder.asBuilder(request) + .toHttpRequestWithoutBody() + .setUri(uriWithoutHost); + + return Mono.create(sink -> { + if (log.isDebugEnabled()) { + log.debug("Sending HTTP {} to {}", request.getMethodName(), request.getUri()); + } + + boolean expectContinue = HttpUtil.is100ContinueExpected(nettyRequest); + ChannelPipeline pipeline = poolHandle.channel.pipeline(); + + // if the body is streamed, we have a StreamWriter, otherwise we have a ByteBuf. + StreamWriter streamWriter; + ByteBuf byteBuf; + if (byteBody instanceof AvailableNettyByteBody available) { + byteBuf = AvailableNettyByteBody.toByteBuf(available); + streamWriter = null; + } else { + streamWriter = new StreamWriter((StreamingNettyByteBody) byteBody, e -> { + poolHandle.taint(); + sink.error(e); + }); + pipeline.addLast(streamWriter); + byteBuf = null; + } + + if (log.isTraceEnabled()) { + HttpHeadersUtil.trace(log, nettyRequest.headers().names(), nettyRequest.headers()::getAll); + if (byteBuf != null) { + traceBody("Request", byteBuf); + } + } + + pipeline.addLast(ChannelPipelineCustomizer.HANDLER_MICRONAUT_HTTP_RESPONSE, new Http1ResponseHandler(new Http1ResponseHandler.ResponseListener() { + boolean stillExpectingContinue = expectContinue; + + @Override + public void fail(ChannelHandlerContext ctx, Throwable cause) { + poolHandle.taint(); + sink.error(mapConnectionError.apply(cause)); + } + + @Override + public void continueReceived(ChannelHandlerContext ctx) { + if (stillExpectingContinue) { + stillExpectingContinue = false; + if (streamWriter == null) { + ctx.writeAndFlush(new DefaultLastHttpContent(byteBuf), ctx.voidPromise()); + } else { + streamWriter.startWriting(); + } + } + } + + @Override + public void complete(io.netty.handler.codec.http.HttpResponse response, CloseableByteBody body) { + if (!HttpUtil.isKeepAlive(response)) { + poolHandle.taint(); + } + + sink.success(new NettyClientByteBodyResponse(response, body, conversionService)); + } + + @Override + public BodySizeLimits sizeLimits() { + return DefaultHttpClient.this.sizeLimits(); + } + + @Override + public boolean isHeadResponse() { + return nettyRequest.method().equals(HttpMethod.HEAD); + } + + @Override + public void finish(ChannelHandlerContext ctx) { + ctx.pipeline().remove(ChannelPipelineCustomizer.HANDLER_MICRONAUT_HTTP_RESPONSE); + if (streamWriter != null) { + if (!streamWriter.isCompleted()) { + // if there was an error, and we didn't fully write the request yet, the + // connection cannot be reused + poolHandle.taint(); + } + ctx.pipeline().remove(streamWriter); + } + if (stillExpectingContinue && byteBuf != null) { + byteBuf.release(); + } + poolHandle.release(); + } + })); + poolHandle.notifyRequestPipelineBuilt(); + + HttpHeaders headers = nettyRequest.headers(); + OptionalLong length = byteBody.expectedLength(); + if (length.isPresent()) { + headers.remove(HttpHeaderNames.TRANSFER_ENCODING); + if (length.getAsLong() != 0 || permitsRequestBody(nettyRequest.method())) { + headers.set(HttpHeaderNames.CONTENT_LENGTH, length.getAsLong()); + } + } else { + headers.remove(HttpHeaderNames.CONTENT_LENGTH); + headers.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED); + } + + if (!poolHandle.http2) { + if (poolHandle.canReturn()) { + nettyRequest.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); + } else { + nettyRequest.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); + } + } + + Channel channel = poolHandle.channel(); + if (streamWriter == null) { + if (!expectContinue) { + // it's a bit more efficient to use a full request for HTTP/2 + channel.writeAndFlush(new DefaultFullHttpRequest( + nettyRequest.protocolVersion(), + nettyRequest.method(), + nettyRequest.uri(), + byteBuf, + nettyRequest.headers(), + EmptyHttpHeaders.INSTANCE + ), channel.voidPromise()); + } else { + channel.writeAndFlush(nettyRequest, channel.voidPromise()); + } + } else { + channel.writeAndFlush(nettyRequest, channel.voidPromise()); + if (!expectContinue) { + streamWriter.startWriting(); + } + } + + // need to run the create() on the event loop so that pipeline modification happens synchronously + }).subscribeOn(Schedulers.fromExecutor(poolHandle.channel.eventLoop())); + } + + private ByteBuf charSequenceToByteBuf(CharSequence bodyValue, MediaType requestContentType) { + return byteBufferFactory.copiedBuffer( + bodyValue.toString().getBytes( + requestContentType.getCharset().orElse(defaultCharset) + ) + ).asNativeBuffer(); + } + + private String getHostHeader(URI requestURI) { + RequestKey requestKey = new RequestKey(this, requestURI); + StringBuilder host = new StringBuilder(requestKey.getHost()); + int port = requestKey.getPort(); + if (port > -1 && port != 80 && port != 443) { + host.append(":").append(port); + } + return host.toString(); + } + + private NettyByteBody buildFormRequest( + MutableHttpRequest request, + EventLoop eventLoop, + ThrowingFunction buildMethod + ) throws HttpPostRequestEncoder.ErrorDataEncoderException { + // this function acts like a wrapper around HttpPostRequestEncoder. HttpPostRequestEncoder + // takes a request + form data and transforms it to a request + bytes. Because we only want + // the bytes, we need to copy the data from the netty request back to the original + // MutableHttpRequest. This is just the Content-Type header, which sometimes gets an extra + // boundary specifier that we need. + + // build the mock netty request (only the content-type matters) + HttpRequest nettyRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); + List relevantHeaders = List.of(HttpHeaderNames.CONTENT_TYPE); + for (AsciiString header : relevantHeaders) { + nettyRequest.headers().add(header, request.getHeaders().getAll(header)); + } + + HttpPostRequestEncoder encoder = buildMethod.apply(nettyRequest); + HttpRequest finalized = encoder.finalizeRequest(); + // copy back the content-type for (AsciiString header : relevantHeaders) { request.getHeaders().remove(header); for (String value : finalized.headers().getAll(header)) { @@ -1702,6 +1846,7 @@ private HttpPostRequestEncoder buildFormDataRequest(HttpRequest baseRequest, Obj Map formData; if (bodyValue instanceof Map) { + //noinspection unchecked formData = (Map) bodyValue; } else { formData = BeanMap.of(bodyValue); @@ -1781,23 +1926,6 @@ public void setContent(InterfaceHttpData fileUploadObject, Object content) throw return postRequestEncoder; } - private void debugRequest(URI requestURI, HttpRequest nettyRequest) { - log.debug("Sending HTTP {} to {}", - nettyRequest.method(), - requestURI.toString()); - } - - private void traceRequest(io.micronaut.http.HttpRequest request, HttpRequest nettyRequest) { - HttpHeaders headers = nettyRequest.headers(); - HttpHeadersUtil.trace(log, headers.names(), headers::getAll); - if (io.micronaut.http.HttpMethod.permitsRequestBody(request.getMethod()) && request.getBody().isPresent() && nettyRequest instanceof FullHttpRequest fullHttpRequest) { - ByteBuf content = fullHttpRequest.content(); - if (log.isTraceEnabled()) { - traceBody("Request", content); - } - } - } - private void traceBody(String type, ByteBuf content) { log.trace("{} Body", type); log.trace("----"); @@ -1844,6 +1972,116 @@ private E decorate(E exc) { return HttpClientExceptionUtils.populateServiceId(exc, informationalServiceId, configuration); } + private @NonNull HttpClientException handleResponseError(io.micronaut.http.HttpRequest finalRequest, Throwable cause) { + String message = cause.getMessage(); + if (message == null) { + message = cause.getClass().getSimpleName(); + } + if (log.isTraceEnabled()) { + log.trace("HTTP Client exception ({}) occurred for request : {} {}", + message, finalRequest.getMethodName(), finalRequest.getUri()); + } + + HttpClientException result; + if (cause instanceof io.micronaut.http.exceptions.ContentLengthExceededException clee) { + result = decorate(new ContentLengthExceededException(clee.getMessage())); + } else if (cause instanceof io.micronaut.http.exceptions.BufferLengthExceededException blee) { + result = decorate(new ContentLengthExceededException(blee.getAdvertisedLength(), blee.getReceivedLength())); + } else if (cause instanceof io.netty.handler.timeout.ReadTimeoutException) { + result = ReadTimeoutException.TIMEOUT_EXCEPTION; + } else if (cause instanceof HttpClientException hce) { + result = decorate(hce); + } else { + result = decorate(new HttpClientException("Error occurred reading HTTP response: " + message, cause)); + } + return result; + } + + private static void setRedirectHeaders(@Nullable io.micronaut.http.HttpRequest request, MutableHttpRequest redirectRequest) { + if (request != null) { + for (Map.Entry> originalHeader : request.getHeaders()) { + if (!REDIRECT_HEADER_BLOCKLIST.contains(originalHeader.getKey())) { + final List originalHeaderValue = originalHeader.getValue(); + if (originalHeaderValue != null && !originalHeaderValue.isEmpty()) { + for (String value : originalHeaderValue) { + if (value != null) { + redirectRequest.header(originalHeader.getKey(), value); + } + } + } + } + } + } + } + + private BodySizeLimits sizeLimits() { + return new BodySizeLimits(Long.MAX_VALUE, configuration.getMaxContentLength()); + } + + private static boolean shouldConvertWithBodyType(io.netty.handler.codec.http.HttpResponse msg, + HttpClientConfiguration configuration, + Argument bodyType, + Argument errorType) { + if (msg.status().code() < 400) { + return true; + } + return !configuration.isExceptionOnErrorStatus() && bodyType.equalsType(errorType); + } + + /** + * Create a {@link HttpClientResponseException} if parsing of the HTTP error body failed. + */ + private HttpClientResponseException makeErrorBodyParseError(FullHttpResponse fullResponse, Throwable t) { + FullNettyClientHttpResponse errorResponse = new FullNettyClientHttpResponse<>( + fullResponse, + handlerRegistry, + null, + false, + conversionService + ); + return decorate(new HttpClientResponseException( + "Error decoding HTTP error response body: " + t.getMessage(), + t, + errorResponse, + null + )); + } + + /** + * Create a {@link HttpClientResponseException} from a response with a failed HTTP status. + */ + private HttpClientResponseException makeErrorFromRequestBody(Argument errorType, HttpResponseStatus status, FullNettyClientHttpResponse response) { + if (errorType != null && errorType != HttpClient.DEFAULT_ERROR_TYPE) { + return decorate(new HttpClientResponseException( + status.reasonPhrase(), + null, + response, + new HttpClientErrorDecoder() { + @Override + public Argument getErrorType(MediaType mediaType) { + return errorType; + } + } + )); + } else { + return decorate(new HttpClientResponseException(status.reasonPhrase(), response)); + } + } + + private static boolean hasBody(HttpResponse response) { + if (response.code() >= HttpStatus.CONTINUE.getCode() && response.code() < HttpStatus.OK.getCode()) { + return false; + } + + if (response.code() == HttpResponseStatus.NO_CONTENT.code() || + response.code() == HttpResponseStatus.NOT_MODIFIED.code()) { + return false; + } + + OptionalLong contentLength = response.getHeaders().contentLength(); + return contentLength.isEmpty() || contentLength.getAsLong() != 0; + } + /** * Key used for connection pooling and determining host/port. */ @@ -1927,200 +2165,6 @@ private E decorate(DefaultHttpClient ctx, E exc) } } - private abstract static class ContinueHandler extends ChannelInboundHandlerAdapter { - private boolean continued; - - @Override - public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { - if (!continued) { - discard(); - } - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - if (!continued) { - io.netty.handler.codec.http.HttpResponse response = (io.netty.handler.codec.http.HttpResponse) msg; - if (response.status() == HttpResponseStatus.CONTINUE) { - continued = true; - continueBody(ctx); - } else { - ctx.pipeline().remove(this); - ctx.fireChannelRead(msg); - } - } else { - ((HttpContent) msg).release(); - if (msg instanceof LastHttpContent) { - ctx.pipeline().remove(this); - } - } - } - - protected abstract void discard(); - - protected abstract void continueBody(ChannelHandlerContext ctx); - } - - private record ByteBodyRequestWriter(HttpRequest nettyRequest, NettyByteBody byteBody) { - ByteBodyRequestWriter { - HttpHeaders headers = nettyRequest.headers(); - OptionalLong length = byteBody.expectedLength(); - if (length.isPresent()) { - headers.remove(HttpHeaderNames.TRANSFER_ENCODING); - if (length.getAsLong() != 0 || permitsRequestBody(nettyRequest.method())) { - headers.set(HttpHeaderNames.CONTENT_LENGTH, length.getAsLong()); - } - } else { - headers.remove(HttpHeaderNames.CONTENT_LENGTH); - headers.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED); - } - } - - public void write(ConnectionManager.PoolHandle poolHandle) { - if (!poolHandle.http2) { - if (poolHandle.canReturn()) { - nettyRequest.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); - } else { - nettyRequest.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); - } - } - - Channel channel = poolHandle.channel(); - if (byteBody instanceof AvailableNettyByteBody available) { - ByteBuf byteBuf = AvailableNettyByteBody.toByteBuf(available); - if (HttpUtil.is100ContinueExpected(nettyRequest)) { - channel.writeAndFlush(nettyRequest, channel.voidPromise()); - channel.pipeline().addBefore(ChannelPipelineCustomizer.HANDLER_MICRONAUT_HTTP_RESPONSE, "continue-handler", new ContinueHandler() { - @Override - protected void discard() { - byteBuf.release(); - } - - @Override - protected void continueBody(ChannelHandlerContext ctx) { - ctx.writeAndFlush(new DefaultLastHttpContent(byteBuf), ctx.voidPromise()); - } - }); - } else { - // it's a bit more efficient to use a full request for HTTP/2 - channel.writeAndFlush(new DefaultFullHttpRequest( - nettyRequest.protocolVersion(), - nettyRequest.method(), - nettyRequest.uri(), - byteBuf, - nettyRequest.headers(), - EmptyHttpHeaders.INSTANCE - ), channel.voidPromise()); - } - } else { - StreamWriter streamWriter = new StreamWriter(); - streamWriter.upstream = ((StreamingNettyByteBody) byteBody).primary(streamWriter); - - channel.writeAndFlush(nettyRequest, channel.voidPromise()); - if (HttpUtil.is100ContinueExpected(nettyRequest)) { - channel.pipeline().addBefore(ChannelPipelineCustomizer.HANDLER_MICRONAUT_HTTP_RESPONSE, "continue-handler", new ContinueHandler() { - @Override - protected void discard() { - streamWriter.upstream.allowDiscard(); - streamWriter.upstream.disregardBackpressure(); - } - - @Override - protected void continueBody(ChannelHandlerContext ctx) { - channel.pipeline().addLast(streamWriter); - } - }); - } else { - channel.pipeline().addLast(streamWriter); - } - } - } - } - - private static class StreamWriter extends ChannelInboundHandlerAdapter implements BufferConsumer { - ChannelHandlerContext ctx; - EventLoopFlow flow; - Upstream upstream; - long unwritten = 0; - - @Override - public void handlerAdded(ChannelHandlerContext ctx) throws Exception { - this.ctx = ctx; - this.flow = new EventLoopFlow(ctx.channel().eventLoop()); - - upstream.start(); - } - - @Override - public void add(ByteBuf buf) { - if (flow.executeNow(() -> add0(buf))) { - add0(buf); - } - } - - private void add0(ByteBuf buf) { - if (ctx == null) { - // discarded - buf.release(); - return; - } - - int readable = buf.readableBytes(); - ctx.writeAndFlush(buf).addListener((ChannelFutureListener) future -> { - assert ctx.executor().inEventLoop(); - if (future.isSuccess()) { - if (ctx.channel().isWritable()) { - upstream.onBytesConsumed(readable); - } else { - unwritten += readable; - } - } - }); - } - - @Override - public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { - long unwritten = this.unwritten; - if (ctx.channel().isWritable() && unwritten != 0) { - this.unwritten = 0; - upstream.onBytesConsumed(unwritten); - } - super.channelWritabilityChanged(ctx); - } - - @Override - public void complete() { - if (flow.executeNow(this::complete0)) { - complete0(); - } - } - - private void complete0() { - if (ctx == null) { - // discarded - return; - } - - ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT, ctx.voidPromise()); - ctx.pipeline().remove(ctx.name()); - } - - @Override - public void discard() { - } - - @Override - public void error(Throwable e) { - if (ctx == null) { - // discarded - return; - } - - ctx.fireExceptionCaught(e); - ctx.pipeline().remove(ctx.name()); - } - } - /** * Used as a holder for the current SSE event. */ @@ -2130,354 +2174,4 @@ private static class CurrentEvent { String name; Duration retry; } - - private abstract class BaseHttpResponseListener implements Http1ResponseHandler.ResponseListener { - final Promise responsePromise; - final io.micronaut.http.HttpRequest parentRequest; - final io.micronaut.http.HttpRequest finalRequest; - - public BaseHttpResponseListener(Promise responsePromise, io.micronaut.http.HttpRequest parentRequest, io.micronaut.http.HttpRequest finalRequest) { - this.responsePromise = responsePromise; - this.parentRequest = parentRequest; - this.finalRequest = finalRequest; - } - - private static void setRedirectHeaders(@Nullable io.micronaut.http.HttpRequest request, MutableHttpRequest redirectRequest) { - if (request != null) { - for (Map.Entry> originalHeader : request.getHeaders()) { - if (!REDIRECT_HEADER_BLOCKLIST.contains(originalHeader.getKey())) { - final List originalHeaderValue = originalHeader.getValue(); - if (originalHeaderValue != null && !originalHeaderValue.isEmpty()) { - for (String value : originalHeaderValue) { - if (value != null) { - redirectRequest.header(originalHeader.getKey(), value); - } - } - } - } - } - } - } - - @Override - public final void fail(ChannelHandlerContext ctx, Throwable cause) { - fail(cause); - } - - protected void fail(Throwable cause) { - String message = cause.getMessage(); - if (message == null) { - message = cause.getClass().getSimpleName(); - } - if (log.isTraceEnabled()) { - log.trace("HTTP Client exception ({}) occurred for request : {} {}", - message, finalRequest.getMethodName(), finalRequest.getUri()); - } - - HttpClientException result; - if (cause instanceof io.micronaut.http.exceptions.ContentLengthExceededException clee) { - result = decorate(new ContentLengthExceededException(clee.getMessage())); - } else if (cause instanceof io.micronaut.http.exceptions.BufferLengthExceededException blee) { - result = decorate(new ContentLengthExceededException(blee.getAdvertisedLength(), blee.getReceivedLength())); - } else if (cause instanceof io.netty.handler.timeout.ReadTimeoutException) { - result = ReadTimeoutException.TIMEOUT_EXCEPTION; - } else if (cause instanceof HttpClientException hce) { - result = decorate(hce); - } else { - result = decorate(new HttpClientException("Error occurred reading HTTP response: " + message, cause)); - } - responsePromise.tryFailure(result); - } - - @Override - public void continueReceived(ChannelHandlerContext ctx) { - // TODO: move continue implementation here from its own handler - throw new UnsupportedOperationException(); - } - - @Override - public void complete(io.netty.handler.codec.http.HttpResponse response, CloseableByteBody body) { - int code = response.status().code(); - HttpHeaders headers1 = response.headers(); - if (code > 300 && code < 400 && configuration.isFollowRedirects() && headers1.contains(HttpHeaderNames.LOCATION)) { - body.close(); - String location = headers1.get(HttpHeaderNames.LOCATION); - - MutableHttpRequest redirectRequest; - if (code == 307 || code == 308) { - redirectRequest = io.micronaut.http.HttpRequest.create(finalRequest.getMethod(), location); - finalRequest.getBody().ifPresent(redirectRequest::body); - } else { - redirectRequest = io.micronaut.http.HttpRequest.GET(location); - } - - setRedirectHeaders(finalRequest, redirectRequest); - Flux.from(resolveRedirectURI(parentRequest, redirectRequest)) - .flatMap(makeRedirectHandler(parentRequest, redirectRequest)) - .subscribe(new NettyPromiseSubscriber<>(responsePromise)); - } else { - HttpHeaders headers = response.headers(); - if (log.isTraceEnabled()) { - log.trace("HTTP Client Response Received ({}) for Request: {} {}", response.status(), finalRequest.getMethodName(), finalRequest.getUri()); - HttpHeadersUtil.trace(log, headers.names(), headers::getAll); - } - complete0(response, body); - } - } - - @Override - public BodySizeLimits sizeLimits() { - return new BodySizeLimits(Long.MAX_VALUE, configuration.getMaxContentLength()); - } - - @Override - public boolean isHeadResponse() { - return finalRequest.getMethod() == io.micronaut.http.HttpMethod.HEAD; - } - - protected abstract void complete0(io.netty.handler.codec.http.HttpResponse response, CloseableByteBody body); - - protected abstract Function> makeRedirectHandler(io.micronaut.http.HttpRequest parentRequest, MutableHttpRequest redirectRequest); - } - - private final class FullHttpResponseListener extends BaseHttpResponseListener> { - private final Argument bodyType; - private final Argument errorType; - private final ConnectionManager.PoolHandle poolHandle; - - public FullHttpResponseListener( - Promise> responsePromise, - ConnectionManager.PoolHandle poolHandle, - io.micronaut.http.HttpRequest request, - Argument bodyType, - Argument errorType) { - super(responsePromise, request, request); - this.bodyType = bodyType; - this.errorType = errorType; - this.poolHandle = poolHandle; - } - - private static boolean shouldConvertWithBodyType(io.netty.handler.codec.http.HttpResponse msg, - HttpClientConfiguration configuration, - Argument bodyType, - Argument errorType) { - if (msg.status().code() < 400) { - return true; - } - return !configuration.isExceptionOnErrorStatus() && bodyType.equalsType(errorType); - - } - - @Override - protected void complete0(io.netty.handler.codec.http.HttpResponse r, CloseableByteBody body) { - if (!HttpUtil.isKeepAlive(r)) { - poolHandle.taint(); - } - - InternalByteBody.bufferFlow(body).onComplete((av, err) -> { - if (err != null) { - fail(err); - return; - } - - DefaultFullHttpResponse fullHttpResponse = new DefaultFullHttpResponse( - r.protocolVersion(), - r.status(), - AvailableNettyByteBody.toByteBuf(av), - r.headers(), - EmptyHttpHeaders.INSTANCE - ); - - try { - if (log.isTraceEnabled()) { - traceBody("Response", fullHttpResponse.content()); - } - - boolean convertBodyWithBodyType = shouldConvertWithBodyType(r, DefaultHttpClient.this.configuration, bodyType, errorType); - FullNettyClientHttpResponse response = new FullNettyClientHttpResponse<>(fullHttpResponse, handlerRegistry, bodyType, convertBodyWithBodyType, conversionService); - - if (convertBodyWithBodyType) { - responsePromise.trySuccess(response); - } else { // error flow - try { - responsePromise.tryFailure(makeErrorFromRequestBody(r.status(), response)); - } catch (HttpClientResponseException t) { - responsePromise.tryFailure(t); - } catch (Exception t) { - responsePromise.tryFailure(makeErrorBodyParseError(fullHttpResponse, t)); - } - } - } catch (HttpClientResponseException t) { - responsePromise.tryFailure(t); - } catch (Exception t) { - makeNormalBodyParseError(fullHttpResponse, t, cause -> { - if (!responsePromise.tryFailure(cause) && log.isWarnEnabled()) { - log.warn("Exception fired after handler completed: {}", t.getMessage(), t); - } - }); - } finally { - fullHttpResponse.release(); - } - }); - } - - @Override - protected void fail(Throwable cause) { - poolHandle.taint(); - super.fail(cause); - } - - @Override - public void finish(ChannelHandlerContext ctx) { - ctx.pipeline().remove(ChannelPipelineCustomizer.HANDLER_MICRONAUT_HTTP_RESPONSE); - poolHandle.release(); - } - - @Override - protected Function>> makeRedirectHandler(io.micronaut.http.HttpRequest parentRequest, MutableHttpRequest redirectRequest) { - return uri -> (Publisher) exchangeImpl(uri, parentRequest, redirectRequest, bodyType, errorType, null); - } - - /** - * Create a {@link HttpClientResponseException} from a response with a failed HTTP status. - */ - private HttpClientResponseException makeErrorFromRequestBody(HttpResponseStatus status, FullNettyClientHttpResponse response) { - if (errorType != null && errorType != HttpClient.DEFAULT_ERROR_TYPE) { - return decorate(new HttpClientResponseException( - status.reasonPhrase(), - null, - response, - new HttpClientErrorDecoder() { - @Override - public Argument getErrorType(MediaType mediaType) { - return errorType; - } - } - )); - } else { - return decorate(new HttpClientResponseException(status.reasonPhrase(), response)); - } - } - - /** - * Create a {@link HttpClientResponseException} if parsing of the HTTP error body failed. - */ - private HttpClientResponseException makeErrorBodyParseError(FullHttpResponse fullResponse, Throwable t) { - FullNettyClientHttpResponse errorResponse = new FullNettyClientHttpResponse<>( - fullResponse, - handlerRegistry, - null, - false, - conversionService - ); - return decorate(new HttpClientResponseException( - "Error decoding HTTP error response body: " + t.getMessage(), - t, - errorResponse, - null - )); - } - - private void makeNormalBodyParseError(FullHttpResponse fullResponse, Throwable t, Consumer forward) { - FullNettyClientHttpResponse response = new FullNettyClientHttpResponse<>( - fullResponse, - handlerRegistry, - null, - false, - conversionService - ); - HttpClientResponseException clientResponseError = decorate(new HttpClientResponseException( - "Error decoding HTTP response body: " + t.getMessage(), - t, - response, - new HttpClientErrorDecoder() { - @Override - public Argument getErrorType(MediaType mediaType) { - return errorType; - } - } - )); - forward.accept(clientResponseError); - } - } - - private final class StreamHttpResponseListener extends BaseHttpResponseListener> { - private final ConnectionManager.PoolHandle poolHandle; - private final boolean sse; - - public StreamHttpResponseListener( - Promise> responsePromise, - io.micronaut.http.HttpRequest parentRequest, - io.micronaut.http.HttpRequest finalRequest, ConnectionManager.PoolHandle poolHandle, - boolean sse) { - super(responsePromise, parentRequest, finalRequest); - this.poolHandle = poolHandle; - this.sse = sse; - } - - private static boolean hasBody(io.netty.handler.codec.http.HttpResponse response) { - if (response.status().code() >= HttpStatus.CONTINUE.getCode() && response.status().code() < HttpStatus.OK.getCode()) { - return false; - } - - if (response.status().equals(HttpResponseStatus.NO_CONTENT) || - response.status().equals(HttpResponseStatus.NOT_MODIFIED)) { - return false; - } - - if (HttpUtil.isTransferEncodingChunked(response)) { - return true; - } - - if (HttpUtil.isContentLengthSet(response)) { - return HttpUtil.getContentLength(response) > 0; - } - - return true; - } - - @Override - protected void complete0(io.netty.handler.codec.http.HttpResponse msg, CloseableByteBody bb) { - Publisher body; - if (!hasBody(msg)) { - bb.close(); - body = Publishers.empty(); - } else { - if (sse) { - if (bb instanceof AvailableNettyByteBody anbb) { - // same semantics as the streaming branch, but this is eager so it's more - // lax wrt unclosed responses. - ByteBuf single = AvailableNettyByteBody.toByteBuf(anbb); - List parts = SseSplitter.split(single); - parts.get(parts.size() - 1).release(); - body = Flux.fromIterable(parts.subList(0, parts.size() - 1)).map(DefaultHttpContent::new); - } else { - body = SseSplitter.split(NettyByteBody.toByteBufs(bb), sizeLimits()).map(DefaultHttpContent::new); - } - } else { - body = NettyByteBody.toByteBufs(bb).map(DefaultHttpContent::new); - } - } - - DefaultStreamedHttpResponse nettyResponse = new DefaultStreamedHttpResponse(msg.protocolVersion(), msg.status(), msg.headers(), body); - responsePromise.trySuccess(new NettyStreamedHttpResponse<>(nettyResponse, conversionService)); - } - - @Override - protected Function>> makeRedirectHandler(io.micronaut.http.HttpRequest parentRequest, MutableHttpRequest redirectRequest) { - return uri -> Mono.from(buildStreamExchange(parentRequest, redirectRequest, uri, null)).map(HttpResponse::toMutableResponse); - } - - @Override - protected void fail(Throwable cause) { - poolHandle.taint(); - super.fail(cause); - } - - @Override - public void finish(ChannelHandlerContext ctx) { - ctx.pipeline().remove(ChannelPipelineCustomizer.HANDLER_MICRONAUT_HTTP_RESPONSE); - poolHandle.release(); - } - } } diff --git a/http-client/src/main/java/io/micronaut/http/client/netty/ForwardingSubscriber.java b/http-client/src/main/java/io/micronaut/http/client/netty/ForwardingSubscriber.java deleted file mode 100644 index 3732d3086b8..00000000000 --- a/http-client/src/main/java/io/micronaut/http/client/netty/ForwardingSubscriber.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright 2017-2022 original authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.micronaut.http.client.netty; - -import io.micronaut.core.annotation.Internal; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; -import reactor.core.publisher.FluxSink; - -/** - * {@link Subscriber} implementation that forwards items into a {@link FluxSink}. - * - * @since 3.5.0 - * @author yawkat - * @param The message type - */ -@Internal -final class ForwardingSubscriber implements Subscriber { - private final FluxSink sink; - - ForwardingSubscriber(FluxSink sink) { - this.sink = sink; - } - - @Override - public void onSubscribe(Subscription s) { - sink.onRequest(s::request); - } - - @Override - public void onNext(T t) { - sink.next(t); - } - - @Override - public void onError(Throwable t) { - sink.error(t); - } - - @Override - public void onComplete() { - sink.complete(); - } -} diff --git a/http-client/src/main/java/io/micronaut/http/client/netty/NettyClientByteBodyResponse.java b/http-client/src/main/java/io/micronaut/http/client/netty/NettyClientByteBodyResponse.java new file mode 100644 index 00000000000..0444d3cd433 --- /dev/null +++ b/http-client/src/main/java/io/micronaut/http/client/netty/NettyClientByteBodyResponse.java @@ -0,0 +1,87 @@ +/* + * Copyright 2017-2024 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.http.client.netty; + +import io.micronaut.core.annotation.Internal; +import io.micronaut.core.annotation.NonNull; +import io.micronaut.core.convert.ConversionService; +import io.micronaut.core.convert.value.MutableConvertibleValues; +import io.micronaut.core.convert.value.MutableConvertibleValuesMap; +import io.micronaut.core.util.SupplierUtil; +import io.micronaut.http.ByteBodyHttpResponse; +import io.micronaut.http.body.ByteBody; +import io.micronaut.http.body.CloseableByteBody; +import io.micronaut.http.netty.NettyHttpHeaders; +import io.netty.handler.codec.http.HttpResponse; + +import java.util.Optional; +import java.util.function.Supplier; + +/** + * {@link ByteBodyHttpResponse} implementation for the client. + * + * @since 4.7.0 + * @author Jonas Konrad + */ +@Internal +final class NettyClientByteBodyResponse implements ByteBodyHttpResponse { + final HttpResponse nettyResponse; + + private final CloseableByteBody body; + private final NettyHttpHeaders headers; + private final Supplier> attributes = SupplierUtil.memoized(MutableConvertibleValuesMap::new); + + NettyClientByteBodyResponse(HttpResponse nettyResponse, CloseableByteBody body, ConversionService conversionService) { + this.nettyResponse = nettyResponse; + this.body = body; + this.headers = new NettyHttpHeaders(nettyResponse.headers(), conversionService); + } + + @Override + public @NonNull ByteBody byteBody() { + return body; + } + + @Override + public void close() { + body.close(); + } + + @Override + public int code() { + return nettyResponse.status().code(); + } + + @Override + public String reason() { + return nettyResponse.status().reasonPhrase(); + } + + @Override + public @NonNull NettyHttpHeaders getHeaders() { + return headers; + } + + @Override + public @NonNull MutableConvertibleValues getAttributes() { + return attributes.get(); + } + + @Override + public @NonNull Optional getBody() { + return Optional.empty(); + } +} diff --git a/http-client/src/main/java/io/micronaut/http/client/netty/NettyFuturePublisher.java b/http-client/src/main/java/io/micronaut/http/client/netty/NettyFuturePublisher.java deleted file mode 100644 index 65e586b01d7..00000000000 --- a/http-client/src/main/java/io/micronaut/http/client/netty/NettyFuturePublisher.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright 2017-2022 original authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.micronaut.http.client.netty; - -import io.netty.util.concurrent.Future; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -/** - * {@link Publisher} implementation that reads items from a netty {@link Future}. - * - * @since 3.5.0 - * @author yawkat - * @param The message type - */ -final class NettyFuturePublisher implements Publisher { - private final Future future; - private final boolean forwardCancel; - - /** - * @param future The netty future to use as the source of this publisher - * @param forwardCancel Whether to forward calls to {@link Subscription#cancel()} to the future. - */ - NettyFuturePublisher(Future future, boolean forwardCancel) { - this.future = future; - this.forwardCancel = forwardCancel; - } - - @Override - public void subscribe(Subscriber s) { - s.onSubscribe(new Subscription() { - boolean requested = false; - - @Override - public void request(long n) { - if (!requested) { - requested = true; - future.addListener((Future f) -> { - if (f.isSuccess()) { - s.onNext(f.getNow()); - s.onComplete(); - } else { - s.onError(f.cause()); - } - }); - } - } - - @Override - public void cancel() { - if (forwardCancel) { - future.cancel(true); - } - } - }); - } -} diff --git a/http-client/src/main/java/io/micronaut/http/client/netty/NettyPromiseSubscriber.java b/http-client/src/main/java/io/micronaut/http/client/netty/NettyPromiseSubscriber.java deleted file mode 100644 index 78d8095c98e..00000000000 --- a/http-client/src/main/java/io/micronaut/http/client/netty/NettyPromiseSubscriber.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright 2017-2022 original authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.micronaut.http.client.netty; - -import io.netty.util.concurrent.Promise; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -/** - * {@link Subscriber} implementation that writes into a netty {@link Promise} on completion. - * - * @since 3.5.0 - * @author yawkat - * @param The message type - */ -final class NettyPromiseSubscriber implements Subscriber { - private final Promise promise; - private T value; - - NettyPromiseSubscriber(Promise promise) { - this.promise = promise; - } - - @Override - public void onSubscribe(Subscription s) { - s.request(Long.MAX_VALUE); - } - - @Override - public void onNext(T t) { - this.value = t; - } - - @Override - public void onError(Throwable t) { - promise.tryFailure(t); - } - - @Override - public void onComplete() { - promise.trySuccess(value); - } -} diff --git a/http-client/src/main/java/io/micronaut/http/client/netty/StreamWriter.java b/http-client/src/main/java/io/micronaut/http/client/netty/StreamWriter.java new file mode 100644 index 00000000000..3af614ca49c --- /dev/null +++ b/http-client/src/main/java/io/micronaut/http/client/netty/StreamWriter.java @@ -0,0 +1,160 @@ +/* + * Copyright 2017-2024 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.http.client.netty; + +import io.micronaut.core.annotation.Internal; +import io.micronaut.http.netty.EventLoopFlow; +import io.micronaut.http.netty.body.BufferConsumer; +import io.micronaut.http.netty.body.StreamingNettyByteBody; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.http.LastHttpContent; + +import java.util.function.Consumer; + +/** + * This class is used to write from a {@link StreamingNettyByteBody} to a channel with appropriate + * backpressure control. + * + * @author Jonas Konrad + * @since 4.7.0 + */ +@Internal +final class StreamWriter extends ChannelInboundHandlerAdapter implements BufferConsumer { + private final Consumer errorHandler; + private ChannelHandlerContext ctx; + private EventLoopFlow flow; + private final Upstream upstream; + private long unwritten = 0; + private boolean completed = false; + + /** + * @param body The body to read from. This {@link StreamWriter} will immediately take ownership of this body. + * @param errorHandler Handler to call when the streaming body emits an error + */ + StreamWriter(StreamingNettyByteBody body, Consumer errorHandler) { + this.errorHandler = errorHandler; + this.upstream = body.primary(this); + } + + /** + * Subscribe to the upstream and start writing bytes. + */ + void startWriting() { + if (ctx == null) { + throw new IllegalStateException("Not added to a channel yet"); + } + try { + upstream.start(); + } catch (Exception e) { + errorHandler.accept(e); + } + } + + /** + * Cancel writing the body (e.g. because a {@code CONTINUE} response was never received). + */ + void cancel() { + upstream.allowDiscard(); + upstream.disregardBackpressure(); + } + + boolean isCompleted() { + return completed; + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) { + this.ctx = ctx; + this.flow = new EventLoopFlow(ctx.channel().eventLoop()); + } + + @Override + public void add(ByteBuf buf) { + if (flow.executeNow(() -> add0(buf))) { + add0(buf); + } + } + + private void add0(ByteBuf buf) { + if (ctx == null) { + // discarded + buf.release(); + return; + } + + int readable = buf.readableBytes(); + ctx.writeAndFlush(buf).addListener((ChannelFutureListener) future -> { + assert ctx.executor().inEventLoop(); + if (future.isSuccess()) { + if (ctx.channel().isWritable()) { + upstream.onBytesConsumed(readable); + } else { + unwritten += readable; + } + } + }); + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + long unwritten = this.unwritten; + if (ctx.channel().isWritable() && unwritten != 0) { + this.unwritten = 0; + upstream.onBytesConsumed(unwritten); + } + super.channelWritabilityChanged(ctx); + } + + @Override + public void complete() { + if (flow.executeNow(this::complete0)) { + complete0(); + } + } + + private void complete0() { + if (ctx == null) { + // discarded + return; + } + + ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT, ctx.voidPromise()); + completed = true; + } + + @Override + public void discard() { + // explicit cancel requested -> don't call errorHandler + } + + @Override + public void error(Throwable e) { + if (ctx == null) { + // discarded + return; + } + + errorHandler.accept(e); + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + cancel(); + } +} diff --git a/http-netty/src/main/java/io/micronaut/http/netty/NettyHttpRequestBuilder.java b/http-netty/src/main/java/io/micronaut/http/netty/NettyHttpRequestBuilder.java index 9e27e11fbcb..e85b5931742 100644 --- a/http-netty/src/main/java/io/micronaut/http/netty/NettyHttpRequestBuilder.java +++ b/http-netty/src/main/java/io/micronaut/http/netty/NettyHttpRequestBuilder.java @@ -17,6 +17,7 @@ import io.micronaut.core.annotation.Internal; import io.micronaut.core.annotation.NonNull; +import io.micronaut.core.annotation.Nullable; import io.micronaut.http.HttpRequestWrapper; import io.micronaut.http.body.ByteBody; import io.micronaut.http.netty.stream.StreamedHttpRequest; @@ -89,9 +90,9 @@ default Optional toHttpRequestDirect() { * * @return The body */ - @NonNull - default Optional byteBodyDirect() { - return Optional.empty(); + @Nullable + default ByteBody byteBodyDirect() { + return null; } /** diff --git a/http-netty/src/main/java/io/micronaut/http/netty/body/AvailableNettyByteBody.java b/http-netty/src/main/java/io/micronaut/http/netty/body/AvailableNettyByteBody.java index e50d12bcc93..5aff7564781 100644 --- a/http-netty/src/main/java/io/micronaut/http/netty/body/AvailableNettyByteBody.java +++ b/http-netty/src/main/java/io/micronaut/http/netty/body/AvailableNettyByteBody.java @@ -56,7 +56,8 @@ public static CloseableAvailableByteBody empty() { return new AvailableNettyByteBody(Unpooled.EMPTY_BUFFER); } - public static ByteBuf toByteBuf(AvailableByteBody body) { + @NonNull + public static ByteBuf toByteBuf(@NonNull AvailableByteBody body) { if (body instanceof AvailableNettyByteBody net) { return net.claim(); } else { diff --git a/http-server-netty/src/main/java/io/micronaut/http/server/netty/NettyHttpRequest.java b/http-server-netty/src/main/java/io/micronaut/http/server/netty/NettyHttpRequest.java index b2cb9152c04..5bfdc73c761 100644 --- a/http-server-netty/src/main/java/io/micronaut/http/server/netty/NettyHttpRequest.java +++ b/http-server-netty/src/main/java/io/micronaut/http/server/netty/NettyHttpRequest.java @@ -627,8 +627,8 @@ public Optional toHttpRequestDirect() { } @Override - public @NonNull Optional byteBodyDirect() { - return Optional.of(byteBody()); + public ByteBody byteBodyDirect() { + return byteBody(); } @Override @@ -868,8 +868,9 @@ public Optional toHttpRequestDirect() { } @Override - public @NonNull Optional byteBodyDirect() { - return body != null ? Optional.empty() : NettyHttpRequest.this.byteBodyDirect(); + public ByteBody byteBodyDirect() { + // if the body has been changed we can't return the byteBody directly + return body != null ? null : NettyHttpRequest.this.byteBodyDirect(); } } diff --git a/http/src/main/java/io/micronaut/http/ByteBodyHttpResponse.java b/http/src/main/java/io/micronaut/http/ByteBodyHttpResponse.java index 9d97c2abe31..20692c6be26 100644 --- a/http/src/main/java/io/micronaut/http/ByteBodyHttpResponse.java +++ b/http/src/main/java/io/micronaut/http/ByteBodyHttpResponse.java @@ -30,7 +30,7 @@ * @author Jonas Konrad */ @Experimental -public sealed interface ByteBodyHttpResponse extends HttpResponse, Closeable permits ByteBodyHttpResponseWrapper { +public interface ByteBodyHttpResponse extends HttpResponse, Closeable { /** * The body bytes. *