Skip to content

Commit

Permalink
Handle streaming response errors (#595)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremyg484 authored Nov 20, 2023
1 parent 48ab7ba commit 3fe286c
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import io.micronaut.http.annotation.Produces
import io.micronaut.http.annotation.Status
import io.micronaut.http.client.HttpClient
import io.micronaut.http.client.annotation.Client
import io.micronaut.http.client.exceptions.HttpClientResponseException
import io.micronaut.test.extensions.spock.annotation.MicronautTest
import jakarta.inject.Inject
import reactor.core.publisher.Flux
import spock.lang.Specification

@MicronautTest
Expand All @@ -35,14 +37,49 @@ class JettyErrorSpec extends Specification {
result.header(HttpHeaders.CONTENT_TYPE).startsWith(MediaType.TEXT_PLAIN)
}

void "error that occurs with streaming response results in client receiving an error"() {
when:
client.toBlocking().exchange("/errors/stream-immediate", String)

then:
HttpClientResponseException ex = thrown()
ex.status == HttpStatus.INTERNAL_SERVER_ERROR
ex.response.body.orElseThrow() == "Internal Server Error: Immediate error"
}

void "error that occurs with streaming response after data sent results in client receiving incomplete data"() {
when:
client.toBlocking().exchange("/errors/stream-delayed", Integer[].class)

then:
HttpClientResponseException ex = thrown()
ex.status == HttpStatus.OK
ex.message.contains("Unexpected end-of-input")
}

@Requires(property = "spec.name", value = "JettyErrorSpec")
@Controller("/errors")
static class ErrorController {

@Get("/local")
@Produces(MediaType.APPLICATION_PDF)
String localHandler() {
throw new AnotherException("bad things");
throw new AnotherException("bad things")
}

@Get("/stream-immediate")
Flux<String> streamingImmediateError() {
return Flux.error(new IllegalStateException("Immediate error"))
}

@Get("/stream-delayed")
Flux<Integer> streamingDelayedError() {
return Flux.range(1, 5).map(data -> {
if (data == 3) {
throw new IllegalStateException("Delayed error")
}
return data
})
}

@Error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import io.micronaut.http.annotation.Produces
import io.micronaut.http.annotation.Status
import io.micronaut.http.client.HttpClient
import io.micronaut.http.client.annotation.Client
import io.micronaut.http.client.exceptions.HttpClientResponseException
import io.micronaut.test.extensions.spock.annotation.MicronautTest
import jakarta.inject.Inject
import reactor.core.publisher.Flux
import spock.lang.Specification

@MicronautTest
Expand All @@ -35,14 +37,49 @@ class TomcatErrorSpec extends Specification {
result.header(HttpHeaders.CONTENT_TYPE).startsWith(MediaType.TEXT_PLAIN)
}

void "error that occurs with streaming response results in client receiving an error"() {
when:
client.toBlocking().exchange("/errors/stream-immediate", String)

then:
HttpClientResponseException ex = thrown()
ex.status == HttpStatus.INTERNAL_SERVER_ERROR
ex.response.body.orElseThrow() == "Internal Server Error: Immediate error"
}

void "error that occurs with streaming response after data sent results in client receiving incomplete data"() {
when:
client.toBlocking().exchange("/errors/stream-delayed", Integer[].class)

then:
HttpClientResponseException ex = thrown()
ex.status == HttpStatus.OK
ex.message.contains("Unexpected end-of-input")
}

@Requires(property = "spec.name", value = "TomcatErrorSpec")
@Controller("/errors")
static class ErrorController {

@Get("/local")
@Produces(MediaType.APPLICATION_PDF)
String localHandler() {
throw new AnotherException("bad things");
throw new AnotherException("bad things")
}

@Get("/stream-immediate")
Flux<String> streamingImmediateError() {
return Flux.error(new IllegalStateException("Immediate error"))
}

@Get("/stream-delayed")
Flux<Integer> streamingDelayedError() {
return Flux.range(1, 5).map(data -> {
if (data == 3) {
throw new IllegalStateException("Delayed error")
}
return data
})
}

@Error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import io.micronaut.http.annotation.Produces
import io.micronaut.http.annotation.Status
import io.micronaut.http.client.HttpClient
import io.micronaut.http.client.annotation.Client
import io.micronaut.http.client.exceptions.HttpClientResponseException
import io.micronaut.test.extensions.spock.annotation.MicronautTest
import jakarta.inject.Inject
import reactor.core.publisher.Flux
import spock.lang.Specification

@MicronautTest
Expand All @@ -35,14 +37,49 @@ class UndertowErrorSpec extends Specification {
result.header(HttpHeaders.CONTENT_TYPE).startsWith(MediaType.TEXT_PLAIN)
}

void "error that occurs with streaming response results in client receiving an error"() {
when:
client.toBlocking().exchange("/errors/stream-immediate", String)

then:
HttpClientResponseException ex = thrown()
ex.status == HttpStatus.INTERNAL_SERVER_ERROR
ex.response.body.orElseThrow() == "Internal Server Error: Immediate error"
}

void "error that occurs with streaming response after data sent results in client receiving incomplete data"() {
when:
client.toBlocking().exchange("/errors/stream-delayed", Integer[].class)

then:
HttpClientResponseException ex = thrown()
ex.status == HttpStatus.OK
ex.message.contains("Unexpected end-of-input")
}

@Requires(property = "spec.name", value = "UndertowErrorSpec")
@Controller("/errors")
static class ErrorController {

@Get("/local")
@Produces(MediaType.APPLICATION_PDF)
String localHandler() {
throw new AnotherException("bad things");
throw new AnotherException("bad things")
}

@Get("/stream-immediate")
Flux<String> streamingImmediateError() {
return Flux.error(new IllegalStateException("Immediate error"))
}

@Get("/stream-delayed")
Flux<Integer> streamingDelayedError() {
return Flux.range(1, 5).map(data -> {
if (data == 3) {
throw new IllegalStateException("Delayed error")
}
return data
})
}

@Error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,9 @@ private void encodeResponse(ServletExchange<REQ, RES> exchange,
} else {
// stream case
if (exchange.getRequest().isAsyncSupported()) {
Mono.from(servletResponse.stream(publisher)).subscribe(responsePublisherCallback);
Mono.from(servletResponse.stream(publisher)).subscribe(responsePublisherCallback, throwable -> {
responsePublisherCallback.accept(null);
});
return;
} else {
// fallback to blocking
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,17 @@
import io.micronaut.http.cookie.Cookie;
import io.micronaut.http.exceptions.HttpStatusException;
import io.micronaut.servlet.http.ServletHttpResponse;
import jakarta.servlet.ServletOutputStream;
import jakarta.servlet.WriteListener;
import jakarta.servlet.http.HttpServletResponse;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

import jakarta.servlet.ServletOutputStream;
import jakarta.servlet.WriteListener;
import jakarta.servlet.http.HttpServletResponse;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStream;
Expand All @@ -67,6 +69,8 @@
@Internal
public class DefaultServletHttpResponse<B> implements ServletHttpResponse<HttpServletResponse, B> {

private static final Logger LOG = LoggerFactory.getLogger(DefaultServletHttpResponse.class);

private static final byte[] EMPTY_ARRAY = "[]".getBytes();

private final ConversionService conversionService;
Expand Down Expand Up @@ -199,7 +203,10 @@ public void onError(Throwable t) {
if (t instanceof HttpStatusException) {
maybeReportErrorDownstream(t);
} else {
emitter.error(t);
if (LOG.isWarnEnabled()) {
LOG.warn("Reactive response received an error after some data has already been written. This error cannot be forwarded to the client.", t);
}
maybeReportErrorDownstream(new HttpStatusException(HttpStatus.INTERNAL_SERVER_ERROR, HttpStatus.INTERNAL_SERVER_ERROR.getReason() + ": " + t.getMessage()));
}
subscription.cancel();
}
Expand Down

0 comments on commit 3fe286c

Please sign in to comment.