Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle exceptions thrown by Subscriber #2475

Closed
trustin opened this issue Feb 10, 2020 · 0 comments · Fixed by #2553
Closed

Handle exceptions thrown by Subscriber #2475

trustin opened this issue Feb 10, 2020 · 0 comments · Fixed by #2553
Labels
Milestone

Comments

@trustin
Copy link
Member

trustin commented Feb 10, 2020

An exception raised by a Subscriber's onNext() or other handler methods can cause an unexpected connection drop. The following stack trace shows the case where an exception raised by onNext() is propagated to Http2ResponseDecoder which triggers an HTTP/2 connection error:

HTTP/2 connection error:
io.netty.handler.codec.http2.Http2Exception: failed to consume a HEADERS frame
	at io.netty.handler.codec.http2.Http2Exception.connectionError(Http2Exception.java:117)
	at com.linecorp.armeria.client.Http2ResponseDecoder.onHeadersRead(Http2ResponseDecoder.java:200)
	at com.linecorp.armeria.client.Http2ResponseDecoder.onHeadersRead(Http2ResponseDecoder.java:213)
	at io.netty.handler.codec.http2.Http2FrameListenerDecorator.onHeadersRead(Http2FrameListenerDecorator.java:48)
	at io.netty.handler.codec.http2.Http2EmptyDataFrameListener.onHeadersRead(Http2EmptyDataFrameListener.java:63)
	at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead(DefaultHttp2ConnectionDecoder.java:373)
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader$1.processFragment(DefaultHttp2FrameReader.java:457)
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readHeadersFrame(DefaultHttp2FrameReader.java:464)
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:254)
	at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:160)
	at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:174)
	at io.netty.handler.codec.http2.DecoratingHttp2ConnectionDecoder.decodeFrame(DecoratingHttp2ConnectionDecoder.java:63)
	at io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
	at io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:498)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:437)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
	at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:227)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
	at io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:830)
Caused by: java.lang.StringIndexOutOfBoundsException: begin 0, end -1, length 2
	at java.base/java.lang.String.checkBoundsBeginEnd(String.java:3720)
	at java.base/java.lang.String.substring(String.java:1909)
	at com.linecorp.armeria.client.endpoint.healthcheck.HttpHealthChecker$HealthCheckResponseSubscriber.updateLongPollingSettings(HttpHealthChecker.java:250)
	at com.linecorp.armeria.client.endpoint.healthcheck.HttpHealthChecker$HealthCheckResponseSubscriber.onNext(HttpHealthChecker.java:189)
	at com.linecorp.armeria.client.endpoint.healthcheck.HttpHealthChecker$HealthCheckResponseSubscriber.onNext(HttpHealthChecker.java:149)
	at com.linecorp.armeria.common.stream.DefaultStreamMessage.notifySubscriberWithElements(DefaultStreamMessage.java:344)
	at com.linecorp.armeria.common.stream.DefaultStreamMessage.notifySubscriber0(DefaultStreamMessage.java:322)
	at com.linecorp.armeria.common.stream.DefaultStreamMessage.notifySubscriber(DefaultStreamMessage.java:255)
	at com.linecorp.armeria.common.stream.DefaultStreamMessage.addObjectOrEvent(DefaultStreamMessage.java:241)
	at com.linecorp.armeria.common.stream.DefaultStreamMessage.addObject(DefaultStreamMessage.java:161)
	at com.linecorp.armeria.common.stream.AbstractStreamMessageAndWriter.tryWrite(AbstractStreamMessageAndWriter.java:74)
	at com.linecorp.armeria.common.stream.DefaultStreamMessage.tryWrite(DefaultStreamMessage.java:65)
	at com.linecorp.armeria.client.DecodedHttpResponse.tryWrite(DecodedHttpResponse.java:55)
	at com.linecorp.armeria.client.HttpResponseDecoder$HttpResponseWrapper.tryWrite(HttpResponseDecoder.java:227)
	at com.linecorp.armeria.client.Http2ResponseDecoder.onHeadersRead(Http2ResponseDecoder.java:197)
	... 41 common frames omitted

We must catch any exceptions raised by Subscriber and abort the StreamMessage with the caught exception.

@trustin trustin added the defect label Feb 10, 2020
@trustin trustin added this to the 1.0.0 milestone Feb 10, 2020
minwoox added a commit to minwoox/armeria that referenced this issue Feb 26, 2020
Motivation:
According to https://github.com/reactive-streams/reactive-streams-jvm#2.13,
a subscriber must not throw an exception but just call `Subscription.cancel()`.
Some of our subscribers violated this rule so we should fix them.

Modifications:
- Call `Subscription.cancel()` instead of throwing an exception in `Subscriber`s

Result:
- Close line#2475
- `Subscriber` does not throw an exception anymore.
minwoox added a commit to minwoox/armeria that referenced this issue Feb 26, 2020
Motivation:
According to https://github.com/reactive-streams/reactive-streams-jvm#2.13,
a subscriber must not throw an exception but just call `Subscription.cancel()`.
Some of our subscribers violated this rule so we should fix them.

Modifications:
- Call `Subscription.cancel()` instead of throwing an exception in `Subscriber`s

Result:
- Close line#2475
- `Subscriber` does not throw an exception anymore.
minwoox added a commit to minwoox/armeria that referenced this issue Feb 26, 2020
Motivation:
According to https://github.com/reactive-streams/reactive-streams-jvm#2.13,
a subscriber must not throw an exception but just call `Subscription.cancel()`.
Some of our subscribers violated this rule so we should fix them.

Modifications:
- Call `Subscription.cancel()` instead of throwing an exception in `Subscriber`s

Result:
- Close line#2475
- `Subscriber` does not throw an exception anymore.
minwoox added a commit to minwoox/armeria that referenced this issue Mar 5, 2020
Motivation:
An exception raised by a `Subscriber`'s `onNext()` or other handler methods can cause an unexpected connection drop.
We should handle it.

Modifications:
- Catch the exception thrown by `onSubscribe()` and `onNext()` from `Subscriber`s.
  - Aabort the stream or call `onError` with the cause.
- Catch the exception thrown by `onComplete()` and `onError()`, and log it.

Result:
- Close line#2475
- The connection is not closed by unhandled exceptions from `Subscriber`s anymore.
trustin pushed a commit that referenced this issue Mar 10, 2020
Motivation:
An exception raised by a `Subscriber`'s `onNext()` or other handler methods can cause an unexpected connection drop.
We should handle it.

Modifications:
- Catch the exception thrown by `onSubscribe()` and `onNext()` from `Subscriber`s.
  - Aabort the stream or call `onError` with the cause.
- Catch the exception thrown by `onComplete()` and `onError()`, and log it.
- Fork `CompositeException` from RxJava.

Result:
- Close #2475
- The connection is not closed by unhandled exceptions from `Subscriber`s anymore.
@trustin trustin modified the milestones: 1.0.0, 0.99.0 Mar 10, 2020
fmguerreiro pushed a commit to fmguerreiro/armeria that referenced this issue Sep 19, 2020
Motivation:
An exception raised by a `Subscriber`'s `onNext()` or other handler methods can cause an unexpected connection drop.
We should handle it.

Modifications:
- Catch the exception thrown by `onSubscribe()` and `onNext()` from `Subscriber`s.
  - Aabort the stream or call `onError` with the cause.
- Catch the exception thrown by `onComplete()` and `onError()`, and log it.
- Fork `CompositeException` from RxJava.

Result:
- Close line#2475
- The connection is not closed by unhandled exceptions from `Subscriber`s anymore.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant