Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exception in the thread who subscribed the flux or mono #1078

Closed
stavalfi opened this issue Feb 15, 2018 · 6 comments
Closed

Exception in the thread who subscribed the flux or mono #1078

stavalfi opened this issue Feb 15, 2018 · 6 comments
Labels
type/bug A general bug
Milestone

Comments

@stavalfi
Copy link

stavalfi commented Feb 15, 2018

The following scenario will prove theoretically that even if I check if the stream canceled before sending error signal, then flux will throw an exception to the subscribed thread: consider the following scenario inside Flux.create which the main thread subscribed to it (flux.create(..)....parallel()...take(1)...):

  1. main thread check if the stream is canceled before sending an error signal. the answer is no.
  2. main thread is on hold by the operating system.
  3. parallelThread1 pass the take(1) and terminate the stream.
  4. parallelThread1 is on hold by the operating system.
  5. main thread is alive now and sending an error signal. the error will be thrown in the main thread.

In general, what are my options to avoid exceptions in the thread who subscribed the flux?

@stavalfi stavalfi changed the title exceptions in the thread who subscribed the flux or mono Exception in the thread who subscribed the flux or mono Feb 15, 2018
@smaldini smaldini added this to the 3.2.0.RELEASE milestone Feb 16, 2018
@smaldini smaldini added status/need-investigation This needs more in-depth investigation type/bug A general bug and removed status/need-investigation This needs more in-depth investigation labels Feb 16, 2018
smaldini pushed a commit that referenced this issue Feb 19, 2018
add missing onNextDropped hook in Mono.create
smaldini pushed a commit that referenced this issue Feb 19, 2018
add missing onNextDropped hook in Mono.create
@dfeist
Copy link
Contributor

dfeist commented Feb 20, 2018

@smaldini I believe this also fixes the minor inconsistency we talked about the other day.

@simonbasle
Copy link
Member

@stavalfi the PR that @smaldini opened (#1085) replaces the onErrorDropped behaviour with an onOperatorError behaviour. The difference is that, unless the global hook has been customized, the error will now be swallowed instead of being thrown. For create, one could argue this is a saner default.

Additionally, even though it was also the case with the previous behaviour, you can customize the hook at the level of a given Flux/Mono rather than globally and then catch the dropped errors. This necessitates a bit of undocumented boilerplate:

Flux.create(...)
    .subscriberContext(Context.of("reactor.onOperatorError.local", 
        (droppedError, valueIfRelevant) -> {...process dropped error...});

Would you be able to quickly use a 3.2.0.BUILD-SNAPSHOT once that PR gets merged to see if that improves your situation, and provide feedback? We would then maybe consider porting it back to the 3.1.x line (as we'll probably release a 3.1.5).

@simonbasle
Copy link
Member

☝️merged into master and should be available in a snapshot soon, pending user feedback before we decide on a potential backport to 3.1

@stavalfi
Copy link
Author

stavalfi commented Feb 20, 2018

Hi team. thanks for the quick upgrade.

  1. I will try it out until the end of this week. If it is critical for you to make any decision, please feel free to ignore my feedback.

  2. I was asking if it is thread-safe to use FluxSink::isCancelled because as I mentioned above, there can be a situation when FluxSink::isCancelled returns false but only then the stream is canceled.

For solving that, I can see the following fix:
FluxSink::isCancelled will receive a lambda which will be activated after you will lock an object which will block any attempt to cancel the stream for any reason. The disadvantage is clear: if the lambda I provided will not end, the stream will never be canceled.

Thanks again!

@stavalfi
Copy link
Author

My feedback: working great. When will this feature be added to 3.1?

simonbasle pushed a commit that referenced this issue Feb 26, 2018
Flux.create and Mono.create don't use `onErrorDropped` anymore,
preventing throws when the default behaviour is still in place. Instead,
they call `onOperatorError` without signalling the resulting Throwable.

This has a saner default of "swallowing" the exception rather than
throwing it out of the reactive chain and causing problems. It can still
be locally customized using the `Context` undocumented key approach.

Also added a missing onNextDropped hook in Mono.create.

This is a backport of #1085 (commit db244e0), as tracked in #1083
@simonbasle
Copy link
Member

Great @stavalfi. This has now been backported in the 3.1.x branch. The 3.1.5 release should happen within the month.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug A general bug
Projects
None yet
Development

No branches or pull requests

4 participants