Skip to content

Commit

Permalink
bug: Writeable responses not being offloaded since 4.0.0 (#10723) (#1…
Browse files Browse the repository at this point in the history
…0729)

* Reproducer for Writeable responses not being offloaded

* Wrap all writeTo calls in io executor

* backported from 4.4.x

---------

Co-authored-by: yawkat <[email protected]>
  • Loading branch information
timyates and yawkat authored Apr 17, 2024
1 parent 932ad74 commit 717e3a6
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2020 original authors
* Copyright 2017-2024 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -78,6 +78,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import javax.net.ssl.SSLException;
import java.io.IOException;
Expand Down Expand Up @@ -337,13 +339,13 @@ private void encodeHttpResponse(
} else {
actualResponseType = responseBodyType;
}
NettyBodyWriter<Object> closure = wrap(messageBodyWriter);
NettyBodyWriter<Object> closure = specialize(wrap(messageBodyWriter), actualResponseType, responseMediaType, body);
closeConnectionIfError(response, nettyRequest, outboundAccess);
if (closure.isBlocking()) {
MediaType finalResponseMediaType = responseMediaType;
getIoExecutor().execute(() -> writeNettyMessageBody(nettyRequest, (MutableHttpResponse<Object>) response, actualResponseType, finalResponseMediaType, body, closure, outboundAccess));
getIoExecutor().execute(() -> writeNettyMessageBody(nettyRequest, (MutableHttpResponse<Object>) response, actualResponseType, finalResponseMediaType, body, closure, outboundAccess, true));
} else {
writeNettyMessageBody(nettyRequest, (MutableHttpResponse<Object>) response, actualResponseType, responseMediaType, body, closure, outboundAccess);
writeNettyMessageBody(nettyRequest, (MutableHttpResponse<Object>) response, actualResponseType, responseMediaType, body, closure, outboundAccess, false);
}
} else {
response.body(null);
Expand All @@ -362,7 +364,8 @@ private void writeNettyMessageBody(
MediaType mediaType,
Object body,
NettyBodyWriter<Object> nettyMessageBodyWriter,
PipeliningServerHandler.OutboundAccess outboundAccess) {
PipeliningServerHandler.OutboundAccess outboundAccess,
boolean onIoExecutor) {
try {
nettyMessageBodyWriter.writeTo(
nettyRequest,
Expand All @@ -373,9 +376,12 @@ private void writeNettyMessageBody(
} catch (CodecException e) {
final MutableHttpResponse<?> errorResponse = routeExecutor.createDefaultErrorResponse(nettyRequest, e);
MediaType t = errorResponse.getContentType().orElse(MediaType.APPLICATION_JSON_TYPE);
//noinspection unchecked
wrap(new DynamicMessageBodyWriter(messageBodyHandlerRegistry, List.of(t)))
.writeTo(nettyRequest, (MutableHttpResponse<Object>) errorResponse, Argument.OBJECT_ARGUMENT, t, errorResponse.body(), outboundAccess);
NettyBodyWriter<Object> dyn = wrap(new DynamicMessageBodyWriter(messageBodyHandlerRegistry, List.of(t)).find(Argument.OBJECT_ARGUMENT, t, errorResponse.body()));
if (onIoExecutor || !dyn.isBlocking()) {
dyn.writeTo(nettyRequest, (MutableHttpResponse<Object>) errorResponse, Argument.OBJECT_ARGUMENT, t, errorResponse.body(), outboundAccess);
} else {
ioExecutor.execute(() -> dyn.writeTo(nettyRequest, (MutableHttpResponse<Object>) errorResponse, Argument.OBJECT_ARGUMENT, t, errorResponse.body(), outboundAccess));
}
}
}

Expand All @@ -397,23 +403,25 @@ private Flux<HttpContent> mapToHttpContent(NettyHttpRequest<?> request,
mediaType.getExtension().equals(MediaType.EXTENSION_JSON) && routeInfo.isResponseBodyJsonFormattable();
MediaType finalMediaType = mediaType;
@SuppressWarnings("unchecked") Argument<Object> responseBodyType = (Argument<Object>) routeInfo.getResponseBodyType();
httpContentPublisher = bodyPublisher.map(message -> {
httpContentPublisher = bodyPublisher.concatMap(message -> {
MessageBodyWriter<Object> messageBodyWriter = routeInfo.getMessageBodyWriter();

if (messageBodyWriter == null || !responseBodyType.isInstance(message) || !messageBodyWriter.isWriteable(responseBodyType, finalMediaType)) {
messageBodyWriter = new DynamicMessageBodyWriter(messageBodyHandlerRegistry, List.of(finalMediaType));
}
ByteBuffer<?> byteBuffer = messageBodyWriter.writeTo(
return writeAsync(
messageBodyWriter,
responseBodyType.isInstance(message) ? responseBodyType : (Argument<Object>) Argument.of(message.getClass()),
finalMediaType,
message,
response.getHeaders(), byteBufferFactory);
return new DefaultHttpContent((ByteBuf) byteBuffer.asNativeBuffer());
});
}).map(byteBuffer -> new DefaultHttpContent((ByteBuf) byteBuffer.asNativeBuffer()));
} else {
MediaType finalMediaType = mediaType;
DynamicMessageBodyWriter dynamicWriter = new DynamicMessageBodyWriter(messageBodyHandlerRegistry, mediaType == null ? List.of() : List.of(mediaType));
httpContentPublisher = bodyPublisher.map(message -> new DefaultHttpContent((ByteBuf) dynamicWriter.writeTo(Argument.OBJECT_ARGUMENT, finalMediaType, message, response.getHeaders(), byteBufferFactory).asNativeBuffer()));
httpContentPublisher = bodyPublisher
.concatMap(message -> writeAsync(dynamicWriter, Argument.OBJECT_ARGUMENT, finalMediaType, message, response.getHeaders(), byteBufferFactory))
.map(byteBuffer -> new DefaultHttpContent((ByteBuf) byteBuffer.asNativeBuffer()));
}

if (isJson) {
Expand All @@ -429,6 +437,26 @@ private Flux<HttpContent> mapToHttpContent(NettyHttpRequest<?> request,
return httpContentPublisher;
}

private <T> Publisher<ByteBuffer<?>> writeAsync(
@NonNull MessageBodyWriter<T> messageBodyWriter,
@NonNull Argument<T> type,
@NonNull MediaType mediaType,
T object,
@NonNull MutableHeaders outgoingHeaders,
@NonNull ByteBufferFactory<?, ?> bufferFactory
) {
if (messageBodyWriter instanceof DynamicMessageBodyWriter dyn) {
messageBodyWriter = (MessageBodyWriter<T>) dyn.find((Argument<Object>) type, mediaType, object);
}
if (messageBodyWriter.isBlocking()) {
MessageBodyWriter<T> finalMessageBodyWriter = messageBodyWriter;
return Mono.<ByteBuffer<?>>defer(() -> Mono.just(finalMessageBodyWriter.writeTo(type, mediaType, object, outgoingHeaders, bufferFactory)))
.subscribeOn(Schedulers.fromExecutor(ioExecutor));
} else {
return Mono.just(messageBodyWriter.writeTo(type, mediaType, object, outgoingHeaders, bufferFactory));
}
}

private void writeFinalNettyResponse(MutableHttpResponse<?> message, NettyHttpRequest<?> request, PipeliningServerHandler.OutboundAccess outboundAccess) {
// default Connection header if not set explicitly
closeConnectionIfError(message, request, outboundAccess);
Expand Down Expand Up @@ -521,6 +549,18 @@ <T> NettyBodyWriter<T> wrap(MessageBodyWriter<T> closure) {
}
}

/**
* Specialize a {@link NettyBodyWriter} with the given body information to make
* {@link NettyBodyWriter#isBlocking()} work.
*/
<T> NettyBodyWriter<T> specialize(NettyBodyWriter<T> original, Argument<T> bodyType, MediaType mediaType, T body) {
if (original instanceof CompatNettyWriteClosure<T> cnwc && cnwc.delegate instanceof DynamicMessageBodyWriter dyn) {
return (NettyBodyWriter<T>) new CompatNettyWriteClosure<>(dyn.find((Argument<Object>) bodyType, mediaType, body));
} else {
return original;
}
}

private final class CompatNettyWriteClosure<T> implements NettyBodyWriter<T> {
private final MessageBodyWriter<T> delegate;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package io.micronaut.http.body

import io.micronaut.context.annotation.Property
import io.micronaut.context.annotation.Requires
import io.micronaut.core.io.Writable
import io.micronaut.http.HttpRequest
import io.micronaut.http.MediaType
import io.micronaut.http.MutableHttpResponse
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Filter
import io.micronaut.http.annotation.Get
import io.micronaut.http.client.HttpClient
import io.micronaut.http.client.annotation.Client
import io.micronaut.http.filter.HttpServerFilter
import io.micronaut.http.filter.ServerFilterChain
import io.micronaut.test.extensions.spock.annotation.MicronautTest
import jakarta.inject.Inject
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import spock.lang.Specification

@MicronautTest
@Property(name = "spec.name", value = "WriteableOffloadSpec")
class WriteableOffloadSpec extends Specification {

@Inject
@Client("/")
HttpClient client

void "writeable is offloaded"() {
when:
def result = client.toBlocking().retrieve("/writeable")

then:
result.startsWith(expectedThreadNamePrefix)
result.endsWith("Hello")
}

String getExpectedThreadNamePrefix() {
Runtime.version().feature() > 17 ? 'virtual-executor' : 'io-executor'
}

@SuppressWarnings('unused')
@Requires(property = "spec.name", value = "WriteableOffloadSpec")
@Filter(Filter.MATCH_ALL_PATTERN)
static class WriteableOffloadFilter implements HttpServerFilter {

@Override
Publisher<MutableHttpResponse<?>> doFilter(HttpRequest<?> request, ServerFilterChain chain) {
return Flux.from(chain.proceed(request))
.map { response ->
response.contentType(MediaType.TEXT_PLAIN)
response.body(new ThreadWriteable(response.body()))
}
}
}

@Requires(property = "spec.name", value = "WriteableOffloadSpec")
@Controller
static class WriteableOffloadController {

@Get("/writeable")
String get() {
"Hello"
}
}


static class ThreadWriteable<B> implements Writable {

private final B body;

ThreadWriteable(B body) {
this.body = body;
}

@Override
void writeTo(Writer out) throws IOException {
out.write(Thread.currentThread().getName())
out.write(" ")
out.write(body.toString())
}
}
}

0 comments on commit 717e3a6

Please sign in to comment.