Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"I want the try/catch equivalent of…​ catch, log and ignore" #572

Closed
oleg-alexeyev opened this issue Apr 28, 2017 · 8 comments
Closed
Labels
for/stackoverflow Questions are best asked on SO or Gitter
Milestone

Comments

@oleg-alexeyev
Copy link

Having a Flux which must never end, I cannot find an operator which would allow me catch any exception, log it and continue - similar to top level catch(Throwable) in a thread which must never end (e.g. AWT Event Dispatch Thread).

@simonbasle simonbasle added the for/stackoverflow Questions are best asked on SO or Gitter label Apr 28, 2017
@simonbasle
Copy link
Member

Hi @oleg-alexeyev,
You must keep in mind that in reactive streams, an error is a terminal event. Even though there are operators that deal with errors by starting new sequences, the source Flux that failed is still terminated by that error, so there's no way of "continuing" in this case.

@oleg-alexeyev
Copy link
Author

Yes, I know. And I need some recipe how to cook such never-ending flows. For now I have to wrap all callbacks into catch(Throwable).

@artembilan
Copy link

I'm curious in the subject as well.
My apologies if you have something on the in the Reference Manual. Just point me there!

So, my concern is like: what is the point to terminate the source publisher if it isn't its problem that one items in a sequence is wrong? That is really just subscriber responsibility (maybe via onError()) to handle such a situation. By why do we terminate the whole sequence?
Let's come back to the classical Reactive Streams sample - clicks on the page. That fully breaks my mind that such a hot source should be terminated just because one of the clicks hasn't made it into server.

We have many other similar use-cases, JMS, AMQP, Kafka etc. There I start a Flux to accept polled messages from the queue/partition. Each of the messages don't care that others might be erroneous, they just should be able to reach consumer.

So, @simonbasle , please, describe or share the link "Why?" we have to terminate those hot sources?
And on the other hand: how to be if we would like do not terminate the source in case of error in one of the subscribers with one of the event.

Thank you!

@simonbasle
Copy link
Member

I need more information on the source, and the kind of error. For hot sources, it is true that you have kind of a way of ignoring errors that don't happen right in the source but rather due to transformations downstream. Typically, by resubscribing using retry()!

Let's work with the mouseclick analogy: You have an infinite sequence of mouseclicks, that you react to by drawing a ping animation on your app UI.
An error happening in the source, and thus not recoverable, would be a Blue Screen Of Death by the mouse driver.
An error happening downstream would be a NullPointerException because you attempted to map a mouseclick to a UI element but the pointer was outside the app's window at that time.

For the later, if you simply want to log and continue, then you can do .doOnError(e -> log(e)).retry().

This will resubscribe to the mouseclick Flux, but since it is hot (and that's the key), it will only feed you new clicks that happened after the one causing the error, and subsequent resubscription.

@artembilan
Copy link

Thank you, @simonbasle !

I think .retry() does the trick for me.
I'll give it a shot! 😄

@oleg-alexeyev ,
Looks like that is an answer to your concern as well. Isn't it ?

@oleg-alexeyev
Copy link
Author

Seems so, thanks!

@oleg-alexeyev
Copy link
Author

Confirmed, our case of using WorkQueueProcessor works OK using retry().

@smaldini smaldini modified the milestone: 3.1.0.M2 May 2, 2017
@dfeist
Copy link
Contributor

dfeist commented May 2, 2017

@oleg-alexeyev @artembilan Be aware that using WorkQueueProcessor and retry() is suspect to signal loss and is not (at least currently) a decent solution to allow for errors to be handled in a continuous stream (flux). This occurs because an error in the stream as well as producing an error signal downstream which results in the retry() performing a re-subscribe, it also produces a cancel() signal upstream. If you are using any operators that maintain an internal buffer then this cancel() will cause these buffered signals to be dropped.

Few solutions:
i) Don't use a continuous stream, use a Mono for each event you have to process. (works but you can't do back-pressure, buffering and not as performant)
ii) Add ability in reactor to allow more localised error handling, that does not produce error/cancel on main Flux. (does not currently exist)
iii) Add option to configure you flux/operators to be memory-consistent and drain gracefully on cancel rather than drop. (does not currently exist)
iv) Use flatMap/just pattern to handle errors in a child context that does not impact main stream, does not require retry() and does not cancel upstream causing signal loss (easy to do today, but has a certain amount of overhead performance wise, no worse that Mono per request though).

I started with WorkQueueProcessor/retry() a few months ago and discovered these issues and had to revert to using i) (Mono per request). I've had multiple discussions with @smaldini about ii) and iii) and there is something in the works AFAIU. Meantime I've moved to using iv) because i want to be able to leverage back-pressure/buffering.

See: #435

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
for/stackoverflow Questions are best asked on SO or Gitter
Projects
None yet
Development

No branches or pull requests

5 participants