diff --git a/core/src/main/java/com/linecorp/armeria/client/HttpRequestSubscriber.java b/core/src/main/java/com/linecorp/armeria/client/HttpRequestSubscriber.java index b1540e9a1ff..3ba3917ae58 100644 --- a/core/src/main/java/com/linecorp/armeria/client/HttpRequestSubscriber.java +++ b/core/src/main/java/com/linecorp/armeria/client/HttpRequestSubscriber.java @@ -236,8 +236,9 @@ private RequestHeaders autoFillHeaders() { @Override public void onNext(HttpObject o) { if (!(o instanceof HttpData) && !(o instanceof HttpHeaders)) { - throw newIllegalStateException( - "published an HttpObject that's neither Http2Headers nor Http2Data: " + o); + fail(new IllegalArgumentException( + "published an HttpObject that's neither Http2Headers nor Http2Data: " + o)); + return; } boolean endOfStream = o.isEndOfStream(); @@ -246,7 +247,8 @@ public void onNext(HttpObject o) { if (o instanceof HttpHeaders) { final HttpHeaders trailers = (HttpHeaders) o; if (trailers.contains(HttpHeaderNames.STATUS)) { - throw newIllegalStateException("published a trailers with status: " + o); + fail(new IllegalArgumentException("published a trailers with status: " + o)); + return; } // Trailers always end the stream even if not explicitly set. endOfStream = true; @@ -374,10 +376,4 @@ private boolean cancelTimeout() { this.timeoutFuture = null; return timeoutFuture.cancel(false); } - - private IllegalStateException newIllegalStateException(String msg) { - final IllegalStateException cause = new IllegalStateException(msg); - fail(cause); - return cause; - } } diff --git a/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/HttpHealthChecker.java b/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/HttpHealthChecker.java index b4ed5dcff1c..393a31e6498 100644 --- a/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/HttpHealthChecker.java +++ b/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/HttpHealthChecker.java @@ -174,6 +174,10 @@ public void onSubscribe(Subscription subscription) { @Override public void onNext(HttpObject obj) { + // There's no chance that an exception is raised from the underlying logic, so we don't do + // try catch not to propagate the exception to the publisher. + // https://github.com/reactive-streams/reactive-streams-jvm#2.13 + if (closeable.isClosing()) { subscription.cancel(); return; diff --git a/core/src/main/java/com/linecorp/armeria/common/HttpMessageAggregator.java b/core/src/main/java/com/linecorp/armeria/common/HttpMessageAggregator.java index 405516ed6bf..2ebc58fa21b 100644 --- a/core/src/main/java/com/linecorp/armeria/common/HttpMessageAggregator.java +++ b/core/src/main/java/com/linecorp/armeria/common/HttpMessageAggregator.java @@ -103,6 +103,10 @@ public final void onComplete() { @Override public final void onNext(HttpObject o) { + // There's no chance that an exception is raised from the underlying logic, so we don't do try catch not + // to propagate the exception to the publisher. + // https://github.com/reactive-streams/reactive-streams-jvm#2.13 + if (o instanceof HttpHeaders) { onHeaders((HttpHeaders) o); } else { diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/DefaultStreamMessageDuplicator.java b/core/src/main/java/com/linecorp/armeria/common/stream/DefaultStreamMessageDuplicator.java index 1367e70ab3a..aca80f6c6e0 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/DefaultStreamMessageDuplicator.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/DefaultStreamMessageDuplicator.java @@ -209,9 +209,6 @@ public void onNext(T obj) { @Override public void onError(Throwable cause) { - if (cause == null) { - cause = new IllegalStateException("onError() was invoked with null cause."); - } pushSignal(new CloseEvent(cause)); } @@ -240,7 +237,7 @@ private void doPushSignal(Object obj) { if (dataLength > allowedMaxSignalLength) { final ContentTooLargeException cause = ContentTooLargeException.get(); upstream.abort(cause); - throw cause; + return; } signalLength += dataLength; } @@ -251,7 +248,7 @@ private void doPushSignal(Object obj) { signalLength -= removedLength; } catch (IllegalStateException e) { upstream.abort(e); - throw e; + return; } upstreamOffset++; diff --git a/core/src/main/java/com/linecorp/armeria/common/stream/NeverInvokedSubscriber.java b/core/src/main/java/com/linecorp/armeria/common/stream/NeverInvokedSubscriber.java index 17e33aaf790..6325c555a6a 100644 --- a/core/src/main/java/com/linecorp/armeria/common/stream/NeverInvokedSubscriber.java +++ b/core/src/main/java/com/linecorp/armeria/common/stream/NeverInvokedSubscriber.java @@ -18,9 +18,13 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; final class NeverInvokedSubscriber implements Subscriber { + private static final Logger logger = LoggerFactory.getLogger(NeverInvokedSubscriber.class); + private static final NeverInvokedSubscriber INSTANCE = new NeverInvokedSubscriber<>(); @SuppressWarnings("unchecked") @@ -30,21 +34,22 @@ static NeverInvokedSubscriber get() { @Override public void onSubscribe(Subscription s) { - throw new IllegalStateException("onSubscribe(" + s + ')'); + s.cancel(); + logger.warn("onSubscribe({}) is invoked.", s); } @Override public void onNext(T t) { - throw new IllegalStateException("onNext(" + t + ')'); + logger.warn("onNext({}) is invoked.", t); } @Override public void onError(Throwable t) { - throw new IllegalStateException("onError(" + t + ')', t); + logger.warn("onError() is invoked with:", t); } @Override public void onComplete() { - throw new IllegalStateException("onComplete()"); + logger.warn("onComplete() is invoked."); } } diff --git a/core/src/main/java/com/linecorp/armeria/internal/server/ResponseConversionUtil.java b/core/src/main/java/com/linecorp/armeria/internal/server/ResponseConversionUtil.java index 741a0c764e8..fab2e948805 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/server/ResponseConversionUtil.java +++ b/core/src/main/java/com/linecorp/armeria/internal/server/ResponseConversionUtil.java @@ -29,6 +29,8 @@ import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.linecorp.armeria.common.HttpData; import com.linecorp.armeria.common.HttpHeaders; @@ -41,6 +43,8 @@ */ public final class ResponseConversionUtil { + private static final Logger logger = LoggerFactory.getLogger(ResponseConversionUtil.class); + /** * Returns a new {@link HttpResponseWriter} which has a content converted from the collected objects. * @@ -248,7 +252,9 @@ public void onComplete() { return; } if (!trailers.isEmpty()) { - writer.write(trailers); + if (!writer.tryWrite(trailers)) { + logger.warn("Failed to write a trailers: {}", trailers); + } } writer.close(); } diff --git a/core/src/main/java/com/linecorp/armeria/server/HttpResponseSubscriber.java b/core/src/main/java/com/linecorp/armeria/server/HttpResponseSubscriber.java index bd79608dad6..4575d367a24 100644 --- a/core/src/main/java/com/linecorp/armeria/server/HttpResponseSubscriber.java +++ b/core/src/main/java/com/linecorp/armeria/server/HttpResponseSubscriber.java @@ -62,9 +62,9 @@ final class HttpResponseSubscriber extends DefaultTimeoutController implements S private static final Logger logger = LoggerFactory.getLogger(HttpResponseSubscriber.class); - private static final AggregatedHttpResponse INTERNAL_SERVER_ERROR_MESSAGE = + private static final AggregatedHttpResponse internalServerErrorResponse = AggregatedHttpResponse.of(HttpStatus.INTERNAL_SERVER_ERROR); - private static final AggregatedHttpResponse SERVICE_UNAVAILABLE_MESSAGE = + private static final AggregatedHttpResponse serviceUnavailableResponse = AggregatedHttpResponse.of(HttpStatus.SERVICE_UNAVAILABLE); private static final Set ADDITIONAL_HEADER_BLACKLIST = ImmutableSet.of( @@ -133,9 +133,10 @@ public void onSubscribe(Subscription subscription) { @Override public void onNext(HttpObject o) { if (!(o instanceof HttpData) && !(o instanceof HttpHeaders)) { - throw newIllegalStateException( + failAndRespond(new IllegalArgumentException( "published an HttpObject that's neither HttpHeaders nor HttpData: " + o + - " (service: " + service() + ')'); + " (service: " + service() + ')')); + return; } boolean endOfStream = o.isEndOfStream(); @@ -143,9 +144,10 @@ public void onNext(HttpObject o) { case NEEDS_HEADERS: { logBuilder().startResponse(); if (!(o instanceof ResponseHeaders)) { - throw newIllegalStateException( + failAndRespond(new IllegalStateException( "published an HttpData without a preceding ResponseHeaders: " + o + - " (service: " + service() + ')'); + " (service: " + service() + ')')); + return; } ResponseHeaders headers = (ResponseHeaders) o; @@ -198,9 +200,10 @@ public void onNext(HttpObject o) { if (o instanceof HttpHeaders) { final HttpHeaders trailers = (HttpHeaders) o; if (trailers.contains(HttpHeaderNames.STATUS)) { - throw newIllegalStateException( + failAndRespond(new IllegalArgumentException( "published an HTTP trailers with status: " + o + - " (service: " + service() + ')'); + " (service: " + service() + ')')); + return; } final HttpHeaders additionalTrailers = reqCtx.additionalResponseTrailers(); final HttpHeaders addedTrailers = fillAdditionalTrailers(trailers, additionalTrailers); @@ -213,7 +216,6 @@ public void onNext(HttpObject o) { final HttpHeaders additionalTrailers = reqCtx.additionalResponseTrailers(); if (!additionalTrailers.isEmpty()) { write(o, false); - o = additionalTrailers; } } @@ -240,7 +242,7 @@ public void onError(Throwable cause) { .handleAsync((res, throwable) -> { if (throwable != null) { failAndRespond(throwable, - INTERNAL_SERVER_ERROR_MESSAGE, + internalServerErrorResponse, Http2Error.CANCEL); } else { failAndRespond(cause, res, Http2Error.CANCEL); @@ -257,7 +259,7 @@ public void onError(Throwable cause) { logger.warn("{} Unexpected exception from a service or a response publisher: {}", ctx.channel(), service(), cause); - failAndRespond(cause, INTERNAL_SERVER_ERROR_MESSAGE, Http2Error.INTERNAL_ERROR); + failAndRespond(cause, internalServerErrorResponse, Http2Error.INTERNAL_ERROR); } } @@ -378,6 +380,10 @@ private void fail(Throwable cause) { } } + private void failAndRespond(Throwable cause) { + failAndRespond(cause, internalServerErrorResponse, Http2Error.INTERNAL_ERROR); + } + private void failAndRespond(Throwable cause, AggregatedHttpResponse res, Http2Error error) { final ResponseHeaders headers = res.headers(); final HttpData content = res.content(); @@ -500,12 +506,6 @@ private static HttpHeadersBuilder fillAdditionalTrailers(HttpHeadersBuilder buil return builder; } - private IllegalStateException newIllegalStateException(String msg) { - final IllegalStateException cause = new IllegalStateException(msg); - failAndRespond(cause, INTERNAL_SERVER_ERROR_MESSAGE, Http2Error.INTERNAL_ERROR); - return cause; - } - private TimeoutTask newTimeoutTask() { return new TimeoutTask() { @Override @@ -521,8 +521,8 @@ public void run() { if (requestTimeoutHandler != null) { requestTimeoutHandler.run(); } else { - failAndRespond(RequestTimeoutException.get(), - SERVICE_UNAVAILABLE_MESSAGE, Http2Error.INTERNAL_ERROR); + failAndRespond(RequestTimeoutException.get(), serviceUnavailableResponse, + Http2Error.INTERNAL_ERROR); } } } diff --git a/grpc-protocol/src/main/java/com/linecorp/armeria/common/grpc/protocol/ArmeriaMessageDeframer.java b/grpc-protocol/src/main/java/com/linecorp/armeria/common/grpc/protocol/ArmeriaMessageDeframer.java index 7a3ac423d9c..1939afaa090 100644 --- a/grpc-protocol/src/main/java/com/linecorp/armeria/common/grpc/protocol/ArmeriaMessageDeframer.java +++ b/grpc-protocol/src/main/java/com/linecorp/armeria/common/grpc/protocol/ArmeriaMessageDeframer.java @@ -247,7 +247,7 @@ public void request(int numMessages) { } /** - * Indicates whether delivery is currently stalled, pending receipt of more data. This means + * Indicates whether delivery is currently stalled, pending receipt of more data. This means * that no additional data can be delivered to the application. */ public boolean isStalled() { @@ -289,7 +289,9 @@ public void deframe(HttpData data, boolean endOfStream) { deliver(); } - /** Requests closing this deframer when any messages currently queued have been requested and delivered. */ + /** + * Requests closing this deframer when any messages currently queued have been requested and delivered. + */ public void closeWhenComplete() { if (isClosed()) { return; diff --git a/grpc/src/main/java/com/linecorp/armeria/internal/common/grpc/HttpStreamReader.java b/grpc/src/main/java/com/linecorp/armeria/internal/common/grpc/HttpStreamReader.java index 4bc325cd896..33e78dc77d7 100644 --- a/grpc/src/main/java/com/linecorp/armeria/internal/common/grpc/HttpStreamReader.java +++ b/grpc/src/main/java/com/linecorp/armeria/internal/common/grpc/HttpStreamReader.java @@ -24,8 +24,6 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; @@ -47,8 +45,6 @@ */ public final class HttpStreamReader implements Subscriber, BiFunction { - private static final Logger logger = LoggerFactory.getLogger(HttpStreamReader.class); - private final DecompressorRegistry decompressorRegistry; private final TransportStatusListener transportStatusListener; @@ -144,7 +140,12 @@ public void onNext(HttpObject obj) { "Can't find decompressor for " + grpcEncoding)); return; } - deframer.decompressor(ForwardingDecompressor.forGrpc(decompressor)); + try { + deframer.decompressor(ForwardingDecompressor.forGrpc(decompressor)); + } catch (Throwable t) { + transportStatusListener.transportReportStatus(GrpcStatus.fromThrowable(t)); + return; + } } requestHttpFrame(); return; diff --git a/retrofit2/src/main/java/com/linecorp/armeria/client/retrofit2/AbstractSubscriber.java b/retrofit2/src/main/java/com/linecorp/armeria/client/retrofit2/AbstractSubscriber.java index 97eec456fd7..6cce08009d2 100644 --- a/retrofit2/src/main/java/com/linecorp/armeria/client/retrofit2/AbstractSubscriber.java +++ b/retrofit2/src/main/java/com/linecorp/armeria/client/retrofit2/AbstractSubscriber.java @@ -85,6 +85,10 @@ public final void onSubscribe(Subscription subscription) { @Override public final void onNext(HttpObject httpObject) { + // There's no chance that an exception is raised from the underlying logic, so we don't do try catch not + // to propagate the exception to the publisher. + // https://github.com/reactive-streams/reactive-streams-jvm#2.13 + if (armeriaCall.isCanceled()) { onCancelled(); assert subscription != null; diff --git a/spring/boot-webflux-autoconfigure/src/main/java/com/linecorp/armeria/spring/web/reactive/ArmeriaHttpClientResponseSubscriber.java b/spring/boot-webflux-autoconfigure/src/main/java/com/linecorp/armeria/spring/web/reactive/ArmeriaHttpClientResponseSubscriber.java index e7ee55fb148..333b8a46b51 100644 --- a/spring/boot-webflux-autoconfigure/src/main/java/com/linecorp/armeria/spring/web/reactive/ArmeriaHttpClientResponseSubscriber.java +++ b/spring/boot-webflux-autoconfigure/src/main/java/com/linecorp/armeria/spring/web/reactive/ArmeriaHttpClientResponseSubscriber.java @@ -15,8 +15,6 @@ */ package com.linecorp.armeria.spring.web.reactive; -import static com.google.common.base.Preconditions.checkState; - import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -90,11 +88,22 @@ public void onNext(HttpObject httpObject) { } if (!headersFuture.isDone()) { - upstreamSubscription().request(1); + final Subscription subscription = this.subscription; + assert subscription != null; + subscription.request(1); return; } - bodyPublisher.relayOnNext(httpObject); + final Subscriber subscriber = bodyPublisher.subscriber; + if (subscriber == null) { + onError(new IllegalStateException( + "HttpObject was relayed downstream when there's no subscriber: " + httpObject)); + final Subscription subscription = this.subscription; + assert subscription != null; + subscription.cancel(); + return; + } + subscriber.onNext(httpObject); } @Override @@ -198,13 +207,6 @@ public void cancel() { parent.upstreamSubscription().cancel(); } - private void relayOnNext(HttpObject obj) { - final Subscriber subscriber = this.subscriber; - checkState(subscriber != null, - "HttpObject was relayed downstream when there's no subscriber: %s", obj); - subscriber.onNext(obj); - } - private void relayOnComplete() { final Subscriber subscriber = this.subscriber; final Throwable cause = parent.completedCause;