Skip to content

Commit

Permalink
Handle exceptions thrown by Subscriber
Browse files Browse the repository at this point in the history
Motivation:
According to https://github.com/reactive-streams/reactive-streams-jvm#2.13,
a subscriber must not throw an exception but just call `Subscription.cancel()`.
Some of our subscribers violated this rule so we should fix them.

Modifications:
- Call `Subscription.cancel()` instead of throwing an exception in `Subscriber`s

Result:
- Close line#2475
- `Subscriber` does not throw an exception anymore.
  • Loading branch information
minwoox committed Feb 26, 2020
1 parent 2ed97c0 commit c1b4983
Show file tree
Hide file tree
Showing 11 changed files with 77 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,9 @@ private RequestHeaders autoFillHeaders() {
@Override
public void onNext(HttpObject o) {
if (!(o instanceof HttpData) && !(o instanceof HttpHeaders)) {
throw newIllegalStateException(
"published an HttpObject that's neither Http2Headers nor Http2Data: " + o);
fail(new IllegalArgumentException(
"published an HttpObject that's neither Http2Headers nor Http2Data: " + o));
return;
}

boolean endOfStream = o.isEndOfStream();
Expand All @@ -246,7 +247,8 @@ public void onNext(HttpObject o) {
if (o instanceof HttpHeaders) {
final HttpHeaders trailers = (HttpHeaders) o;
if (trailers.contains(HttpHeaderNames.STATUS)) {
throw newIllegalStateException("published a trailers with status: " + o);
fail(new IllegalArgumentException("published a trailers with status: " + o));
return;
}
// Trailers always end the stream even if not explicitly set.
endOfStream = true;
Expand Down Expand Up @@ -374,10 +376,4 @@ private boolean cancelTimeout() {
this.timeoutFuture = null;
return timeoutFuture.cancel(false);
}

private IllegalStateException newIllegalStateException(String msg) {
final IllegalStateException cause = new IllegalStateException(msg);
fail(cause);
return cause;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ public void onSubscribe(Subscription subscription) {

@Override
public void onNext(HttpObject obj) {
// There's no chance that an exception is raised from the underlying logic, so we don't do
// try catch not to propagate the exception to the publisher.
// https://github.com/reactive-streams/reactive-streams-jvm#2.13

if (closeable.isClosing()) {
subscription.cancel();
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ public final void onComplete() {

@Override
public final void onNext(HttpObject o) {
// There's no chance that an exception is raised from the underlying logic, so we don't do try catch not
// to propagate the exception to the publisher.
// https://github.com/reactive-streams/reactive-streams-jvm#2.13

if (o instanceof HttpHeaders) {
onHeaders((HttpHeaders) o);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,6 @@ public void onNext(T obj) {

@Override
public void onError(Throwable cause) {
if (cause == null) {
cause = new IllegalStateException("onError() was invoked with null cause.");
}
pushSignal(new CloseEvent(cause));
}

Expand Down Expand Up @@ -240,7 +237,7 @@ private void doPushSignal(Object obj) {
if (dataLength > allowedMaxSignalLength) {
final ContentTooLargeException cause = ContentTooLargeException.get();
upstream.abort(cause);
throw cause;
return;
}
signalLength += dataLength;
}
Expand All @@ -251,7 +248,7 @@ private void doPushSignal(Object obj) {
signalLength -= removedLength;
} catch (IllegalStateException e) {
upstream.abort(e);
throw e;
return;
}

upstreamOffset++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class NeverInvokedSubscriber<T> implements Subscriber<T> {

private static final Logger logger = LoggerFactory.getLogger(NeverInvokedSubscriber.class);

private static final NeverInvokedSubscriber<Object> INSTANCE = new NeverInvokedSubscriber<>();

@SuppressWarnings("unchecked")
Expand All @@ -30,21 +34,22 @@ static <T> NeverInvokedSubscriber<T> get() {

@Override
public void onSubscribe(Subscription s) {
throw new IllegalStateException("onSubscribe(" + s + ')');
s.cancel();
logger.warn("onSubscribe({}) is invoked.", s);
}

@Override
public void onNext(T t) {
throw new IllegalStateException("onNext(" + t + ')');
logger.warn("onNext({}) is invoked.", t);
}

@Override
public void onError(Throwable t) {
throw new IllegalStateException("onError(" + t + ')', t);
logger.warn("onError() is invoked with:", t);
}

@Override
public void onComplete() {
throw new IllegalStateException("onComplete()");
logger.warn("onComplete() is invoked.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpHeaders;
Expand All @@ -41,6 +43,8 @@
*/
public final class ResponseConversionUtil {

private static final Logger logger = LoggerFactory.getLogger(ResponseConversionUtil.class);

/**
* Returns a new {@link HttpResponseWriter} which has a content converted from the collected objects.
*
Expand Down Expand Up @@ -248,7 +252,9 @@ public void onComplete() {
return;
}
if (!trailers.isEmpty()) {
writer.write(trailers);
if (!writer.tryWrite(trailers)) {
logger.warn("Failed to write a trailers: {}", trailers);
}
}
writer.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ final class HttpResponseSubscriber extends DefaultTimeoutController implements S

private static final Logger logger = LoggerFactory.getLogger(HttpResponseSubscriber.class);

private static final AggregatedHttpResponse INTERNAL_SERVER_ERROR_MESSAGE =
private static final AggregatedHttpResponse internalServerErrorResponse =
AggregatedHttpResponse.of(HttpStatus.INTERNAL_SERVER_ERROR);
private static final AggregatedHttpResponse SERVICE_UNAVAILABLE_MESSAGE =
private static final AggregatedHttpResponse serviceUnavailableResponse =
AggregatedHttpResponse.of(HttpStatus.SERVICE_UNAVAILABLE);

private static final Set<AsciiString> ADDITIONAL_HEADER_BLACKLIST = ImmutableSet.of(
Expand Down Expand Up @@ -133,19 +133,21 @@ public void onSubscribe(Subscription subscription) {
@Override
public void onNext(HttpObject o) {
if (!(o instanceof HttpData) && !(o instanceof HttpHeaders)) {
throw newIllegalStateException(
failAndRespond(new IllegalArgumentException(
"published an HttpObject that's neither HttpHeaders nor HttpData: " + o +
" (service: " + service() + ')');
" (service: " + service() + ')'));
return;
}

boolean endOfStream = o.isEndOfStream();
switch (state) {
case NEEDS_HEADERS: {
logBuilder().startResponse();
if (!(o instanceof ResponseHeaders)) {
throw newIllegalStateException(
failAndRespond(new IllegalStateException(
"published an HttpData without a preceding ResponseHeaders: " + o +
" (service: " + service() + ')');
" (service: " + service() + ')'));
return;
}

ResponseHeaders headers = (ResponseHeaders) o;
Expand Down Expand Up @@ -198,9 +200,10 @@ public void onNext(HttpObject o) {
if (o instanceof HttpHeaders) {
final HttpHeaders trailers = (HttpHeaders) o;
if (trailers.contains(HttpHeaderNames.STATUS)) {
throw newIllegalStateException(
failAndRespond(new IllegalArgumentException(
"published an HTTP trailers with status: " + o +
" (service: " + service() + ')');
" (service: " + service() + ')'));
return;
}
final HttpHeaders additionalTrailers = reqCtx.additionalResponseTrailers();
final HttpHeaders addedTrailers = fillAdditionalTrailers(trailers, additionalTrailers);
Expand All @@ -213,7 +216,6 @@ public void onNext(HttpObject o) {
final HttpHeaders additionalTrailers = reqCtx.additionalResponseTrailers();
if (!additionalTrailers.isEmpty()) {
write(o, false);

o = additionalTrailers;
}
}
Expand All @@ -240,7 +242,7 @@ public void onError(Throwable cause) {
.handleAsync((res, throwable) -> {
if (throwable != null) {
failAndRespond(throwable,
INTERNAL_SERVER_ERROR_MESSAGE,
internalServerErrorResponse,
Http2Error.CANCEL);
} else {
failAndRespond(cause, res, Http2Error.CANCEL);
Expand All @@ -257,7 +259,7 @@ public void onError(Throwable cause) {
logger.warn("{} Unexpected exception from a service or a response publisher: {}",
ctx.channel(), service(), cause);

failAndRespond(cause, INTERNAL_SERVER_ERROR_MESSAGE, Http2Error.INTERNAL_ERROR);
failAndRespond(cause, internalServerErrorResponse, Http2Error.INTERNAL_ERROR);
}
}

Expand Down Expand Up @@ -378,6 +380,10 @@ private void fail(Throwable cause) {
}
}

private void failAndRespond(Throwable cause) {
failAndRespond(cause, internalServerErrorResponse, Http2Error.INTERNAL_ERROR);
}

private void failAndRespond(Throwable cause, AggregatedHttpResponse res, Http2Error error) {
final ResponseHeaders headers = res.headers();
final HttpData content = res.content();
Expand Down Expand Up @@ -500,12 +506,6 @@ private static HttpHeadersBuilder fillAdditionalTrailers(HttpHeadersBuilder buil
return builder;
}

private IllegalStateException newIllegalStateException(String msg) {
final IllegalStateException cause = new IllegalStateException(msg);
failAndRespond(cause, INTERNAL_SERVER_ERROR_MESSAGE, Http2Error.INTERNAL_ERROR);
return cause;
}

private TimeoutTask newTimeoutTask() {
return new TimeoutTask() {
@Override
Expand All @@ -521,8 +521,8 @@ public void run() {
if (requestTimeoutHandler != null) {
requestTimeoutHandler.run();
} else {
failAndRespond(RequestTimeoutException.get(),
SERVICE_UNAVAILABLE_MESSAGE, Http2Error.INTERNAL_ERROR);
failAndRespond(RequestTimeoutException.get(), serviceUnavailableResponse,
Http2Error.INTERNAL_ERROR);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ public void request(int numMessages) {
}

/**
* Indicates whether delivery is currently stalled, pending receipt of more data. This means
* Indicates whether delivery is currently stalled, pending receipt of more data. This means
* that no additional data can be delivered to the application.
*/
public boolean isStalled() {
Expand Down Expand Up @@ -289,7 +289,9 @@ public void deframe(HttpData data, boolean endOfStream) {
deliver();
}

/** Requests closing this deframer when any messages currently queued have been requested and delivered. */
/**
* Requests closing this deframer when any messages currently queued have been requested and delivered.
*/
public void closeWhenComplete() {
if (isClosed()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;

Expand All @@ -47,8 +45,6 @@
*/
public final class HttpStreamReader implements Subscriber<HttpObject>, BiFunction<Void, Throwable, Void> {

private static final Logger logger = LoggerFactory.getLogger(HttpStreamReader.class);

private final DecompressorRegistry decompressorRegistry;
private final TransportStatusListener transportStatusListener;

Expand Down Expand Up @@ -144,7 +140,12 @@ public void onNext(HttpObject obj) {
"Can't find decompressor for " + grpcEncoding));
return;
}
deframer.decompressor(ForwardingDecompressor.forGrpc(decompressor));
try {
deframer.decompressor(ForwardingDecompressor.forGrpc(decompressor));
} catch (Throwable t) {
transportStatusListener.transportReportStatus(GrpcStatus.fromThrowable(t));
return;
}
}
requestHttpFrame();
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ public final void onSubscribe(Subscription subscription) {

@Override
public final void onNext(HttpObject httpObject) {
// There's no chance that an exception is raised from the underlying logic, so we don't do try catch not
// to propagate the exception to the publisher.
// https://github.com/reactive-streams/reactive-streams-jvm#2.13

if (armeriaCall.isCanceled()) {
onCancelled();
assert subscription != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package com.linecorp.armeria.spring.web.reactive;

import static com.google.common.base.Preconditions.checkState;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

Expand Down Expand Up @@ -90,11 +88,22 @@ public void onNext(HttpObject httpObject) {
}

if (!headersFuture.isDone()) {
upstreamSubscription().request(1);
final Subscription subscription = this.subscription;
assert subscription != null;
subscription.request(1);
return;
}

bodyPublisher.relayOnNext(httpObject);
final Subscriber<? super HttpObject> subscriber = bodyPublisher.subscriber;
if (subscriber == null) {
onError(new IllegalStateException(
"HttpObject was relayed downstream when there's no subscriber: " + httpObject));
final Subscription subscription = this.subscription;
assert subscription != null;
subscription.cancel();
return;
}
subscriber.onNext(httpObject);
}

@Override
Expand Down Expand Up @@ -198,13 +207,6 @@ public void cancel() {
parent.upstreamSubscription().cancel();
}

private void relayOnNext(HttpObject obj) {
final Subscriber<? super HttpObject> subscriber = this.subscriber;
checkState(subscriber != null,
"HttpObject was relayed downstream when there's no subscriber: %s", obj);
subscriber.onNext(obj);
}

private void relayOnComplete() {
final Subscriber<? super HttpObject> subscriber = this.subscriber;
final Throwable cause = parent.completedCause;
Expand Down

0 comments on commit c1b4983

Please sign in to comment.