Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle streaming response errors #595

Merged
merged 9 commits into from
Nov 20, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import io.micronaut.http.annotation.Produces
import io.micronaut.http.annotation.Status
import io.micronaut.http.client.HttpClient
import io.micronaut.http.client.annotation.Client
import io.micronaut.http.client.exceptions.HttpClientResponseException
import io.micronaut.test.extensions.spock.annotation.MicronautTest
import jakarta.inject.Inject
import reactor.core.publisher.Flux
import spock.lang.Specification

@MicronautTest
Expand All @@ -35,14 +37,49 @@ class JettyErrorSpec extends Specification {
result.header(HttpHeaders.CONTENT_TYPE).startsWith(MediaType.TEXT_PLAIN)
}

void "error that occurs with streaming response results in client receiving an error"() {
when:
client.toBlocking().exchange("/errors/stream-immediate", String)

then:
HttpClientResponseException ex = thrown(HttpClientResponseException)
wetted marked this conversation as resolved.
Show resolved Hide resolved
ex.status == HttpStatus.INTERNAL_SERVER_ERROR
ex.response.body.orElseThrow() == "Internal Server Error: Immediate error"
}

void "error that occurs with streaming response after data sent results in client receiving incomplete data"() {
when:
client.toBlocking().exchange("/errors/stream-delayed", Integer[].class)

then:
HttpClientResponseException ex = thrown(HttpClientResponseException)
wetted marked this conversation as resolved.
Show resolved Hide resolved
ex.status == HttpStatus.OK
ex.message.contains("Unexpected end-of-input")
}

@Requires(property = "spec.name", value = "JettyErrorSpec")
@Controller("/errors")
static class ErrorController {

@Get("/local")
@Produces(MediaType.APPLICATION_PDF)
String localHandler() {
throw new AnotherException("bad things");
throw new AnotherException("bad things")
}

@Get("/stream-immediate")
Flux<String> streamingImmediateError() {
return Flux.error(new IllegalStateException("Immediate error"))
}

@Get("/stream-delayed")
Flux<Integer> streamingDelayedError() {
return Flux.range(1, 5).map(data -> {
if (data == 3) {
throw new IllegalStateException("Delayed error")
}
return data
})
}

@Error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import io.micronaut.http.annotation.Produces
import io.micronaut.http.annotation.Status
import io.micronaut.http.client.HttpClient
import io.micronaut.http.client.annotation.Client
import io.micronaut.http.client.exceptions.HttpClientResponseException
import io.micronaut.test.extensions.spock.annotation.MicronautTest
import jakarta.inject.Inject
import reactor.core.publisher.Flux
import spock.lang.Specification

@MicronautTest
Expand All @@ -35,14 +37,49 @@ class TomcatErrorSpec extends Specification {
result.header(HttpHeaders.CONTENT_TYPE).startsWith(MediaType.TEXT_PLAIN)
}

void "error that occurs with streaming response results in client receiving an error"() {
when:
client.toBlocking().exchange("/errors/stream-immediate", String)

then:
HttpClientResponseException ex = thrown(HttpClientResponseException)
wetted marked this conversation as resolved.
Show resolved Hide resolved
ex.status == HttpStatus.INTERNAL_SERVER_ERROR
ex.response.body.orElseThrow() == "Internal Server Error: Immediate error"
}

void "error that occurs with streaming response after data sent results in client receiving incomplete data"() {
when:
client.toBlocking().exchange("/errors/stream-delayed", Integer[].class)

then:
HttpClientResponseException ex = thrown(HttpClientResponseException)
wetted marked this conversation as resolved.
Show resolved Hide resolved
ex.status == HttpStatus.OK
ex.message.contains("Unexpected end-of-input")
}

@Requires(property = "spec.name", value = "TomcatErrorSpec")
@Controller("/errors")
static class ErrorController {

@Get("/local")
@Produces(MediaType.APPLICATION_PDF)
String localHandler() {
throw new AnotherException("bad things");
throw new AnotherException("bad things")
}

@Get("/stream-immediate")
Flux<String> streamingImmediateError() {
return Flux.error(new IllegalStateException("Immediate error"))
}

@Get("/stream-delayed")
Flux<Integer> streamingDelayedError() {
return Flux.range(1, 5).map(data -> {
if (data == 3) {
throw new IllegalStateException("Delayed error")
}
return data
})
}

@Error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import io.micronaut.http.annotation.Produces
import io.micronaut.http.annotation.Status
import io.micronaut.http.client.HttpClient
import io.micronaut.http.client.annotation.Client
import io.micronaut.http.client.exceptions.HttpClientResponseException
import io.micronaut.test.extensions.spock.annotation.MicronautTest
import jakarta.inject.Inject
import reactor.core.publisher.Flux
import spock.lang.Specification

@MicronautTest
Expand All @@ -35,14 +37,49 @@ class UndertowErrorSpec extends Specification {
result.header(HttpHeaders.CONTENT_TYPE).startsWith(MediaType.TEXT_PLAIN)
}

void "error that occurs with streaming response results in client receiving an error"() {
when:
client.toBlocking().exchange("/errors/stream-immediate", String)

then:
HttpClientResponseException ex = thrown(HttpClientResponseException)
wetted marked this conversation as resolved.
Show resolved Hide resolved
ex.status == HttpStatus.INTERNAL_SERVER_ERROR
ex.response.body.orElseThrow() == "Internal Server Error: Immediate error"
}

void "error that occurs with streaming response after data sent results in client receiving incomplete data"() {
when:
client.toBlocking().exchange("/errors/stream-delayed", Integer[].class)

then:
HttpClientResponseException ex = thrown(HttpClientResponseException)
wetted marked this conversation as resolved.
Show resolved Hide resolved
ex.status == HttpStatus.OK
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is 200 code expected for an error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sdelamo Yes, because this is an odd case. As the error occurs during a chunked response, the status code has already been written and can't be changed. The only thing the server can do is just stop writing the data and end the response. The expected behavior being verified is that incomplete data is sent and thus the client exception happens when actually trying to deserialize the response.

ex.message.contains("Unexpected end-of-input")
}

@Requires(property = "spec.name", value = "UndertowErrorSpec")
@Controller("/errors")
static class ErrorController {

@Get("/local")
@Produces(MediaType.APPLICATION_PDF)
String localHandler() {
throw new AnotherException("bad things");
throw new AnotherException("bad things")
}

@Get("/stream-immediate")
Flux<String> streamingImmediateError() {
return Flux.error(new IllegalStateException("Immediate error"))
}

@Get("/stream-delayed")
Flux<Integer> streamingDelayedError() {
return Flux.range(1, 5).map(data -> {
if (data == 3) {
throw new IllegalStateException("Delayed error")
}
return data
})
}

@Error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,9 @@ private void encodeResponse(ServletExchange<REQ, RES> exchange,
} else {
// stream case
if (exchange.getRequest().isAsyncSupported()) {
Mono.from(servletResponse.stream(publisher)).subscribe(responsePublisherCallback);
Mono.from(servletResponse.stream(publisher)).subscribe(responsePublisherCallback, throwable -> {
responsePublisherCallback.accept(null);
});
return;
} else {
// fallback to blocking
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,17 @@
import io.micronaut.http.cookie.Cookie;
import io.micronaut.http.exceptions.HttpStatusException;
import io.micronaut.servlet.http.ServletHttpResponse;
import jakarta.servlet.ServletOutputStream;
import jakarta.servlet.WriteListener;
import jakarta.servlet.http.HttpServletResponse;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

import jakarta.servlet.ServletOutputStream;
import jakarta.servlet.WriteListener;
import jakarta.servlet.http.HttpServletResponse;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStream;
Expand All @@ -67,6 +69,8 @@
@Internal
public class DefaultServletHttpResponse<B> implements ServletHttpResponse<HttpServletResponse, B> {

private static final Logger LOG = LoggerFactory.getLogger(DefaultServletHttpResponse.class);

private static final byte[] EMPTY_ARRAY = "[]".getBytes();

private final ConversionService conversionService;
Expand Down Expand Up @@ -199,7 +203,10 @@ public void onError(Throwable t) {
if (t instanceof HttpStatusException) {
maybeReportErrorDownstream(t);
} else {
emitter.error(t);
if (LOG.isWarnEnabled()) {
LOG.warn("Reactive response received an error after some data has already been written. This error cannot be forwarded to the client.", t);
}
maybeReportErrorDownstream(new HttpStatusException(HttpStatus.INTERNAL_SERVER_ERROR, HttpStatus.INTERNAL_SERVER_ERROR.getReason() + ": " + t.getMessage()));
}
subscription.cancel();
}
Expand Down
Loading