Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds doOnEach operator #479

Merged
merged 4 commits into from
Nov 12, 2013
Merged

Conversation

nullstyle
Copy link
Contributor

I thought this would be a good and simple way to start getting my feet wet with RxJava. This implements #40.

Any feedback would be great.

Thanks,
Scott

@samuelgruetter
Copy link
Contributor

Thank you @nullstyle for this PR, I always wanted to have doOnEach, but never had time to implement it :-)

One issue to discuss: What should happen if an exception is thrown during the execution of the onNext/onError/onComplete function passed to doOnEach?
I'm not sure, and I don't have a computer right now to test it, but I'd answer as follows:

  • exception in onNext: catch and call onError, the same way as map does
  • exception in onError or onComplete: this "must not happen"
    

Can someone please test it in C# and correct me if needed?

@nullstyle
Copy link
Contributor Author

Good point... I handn't considered behavior when the observer provided to doOnEach is the source of the exception. I'll write up a simple C# app to investigate how it behaves.

@nullstyle
Copy link
Contributor Author

Looking at the source for Rx (I don't have access to a windows machine at the moment) it seems that any errors in the actions provided to Do will call onError on the downstream observables as expressed by this snippet:

            public void OnNext(TSource value)
            {
                try
                {
                    _parent._onNext(value);
                }
                catch (Exception ex)
                {
                    base._observer.OnError(ex);
                    base.Dispose();
                    return;
                }

                base._observer.OnNext(value);
            }

I'll update my implementation tonight to provide consistent behavior and add some test cases to cover it.

This commit leverages the SafeObserver facility to get the desired
behavior in the face of exceptions.  Specifically, if any of the
operations performed within the doOnEach handler raises an exception,
that exception will propagate through the observable chain.
@cloudbees-pull-request-builder

RxJava-pull-requests #408 SUCCESS
This pull request looks good

@benjchristensen
Copy link
Member

That's correct, anytime a user function is invoked it must have error handling around it.

We sometimes use try/catch, other times wrap with SafeObserver.

Thank you for contributing!

@benjchristensen
Copy link
Member

Looks good. Only change I'd make is put the Observer/Action wrappers inside the Operation class so we reduce code in Observable (since that class is huge). Merging now.

Thank you for getting involved!

benjchristensen added a commit that referenced this pull request Nov 12, 2013
@benjchristensen benjchristensen merged commit 50ba9de into ReactiveX:master Nov 12, 2013
@nullstyle
Copy link
Contributor Author

Thanks for merging!

I'll work on a separate PR to refactor that code into the operation class this weekend when I get some time.

-Scott

rickbw pushed a commit to rickbw/RxJava that referenced this pull request Jan 9, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants