Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ReplaySubject emits items received after onError #544

Closed
samuelgruetter opened this issue Nov 29, 2013 · 4 comments
Closed

ReplaySubject emits items received after onError #544

samuelgruetter opened this issue Nov 29, 2013 · 4 comments

Comments

@samuelgruetter
Copy link
Contributor

This snippet

    val sub = ReplaySubject[Int]()

    sub.onNext(1)
    sub.onNext(2)
    sub.onError(new Exception("Kabloooy!")) // Gets moved to after onNext(3)
    sub.onNext(3)

    sub.subscribe(
      n => println("sub1: " + n),
      e => println("sub1: " + e.getMessage),
      () => println("sub1: completed")
    )

    println("done")

outputs

sub1: 1
sub1: 2
sub1: 3
sub1: Kabloooy!
done

but I would expect

sub1: 1
sub1: 2
sub1: Kabloooy!
done

since items received after onError should not be emitted by ReplaySubject.

(Thank you Dragisa Krsmanovic for pointing this out)

@headinthebox
Copy link
Contributor

Ben and I hacked on subjects earlier this week, all subjects are/were in pretty bad shape, but we'll fix it.

@benjchristensen
Copy link
Member

ReplaySubject is the one I didn’t touch, I refactored Publish/Behavior/Async. I’ll put Replay on my TODO.

@akarnokd
Copy link
Member

I think this is a trivial fix

    @Override
    public void onNext(T args)
    {
        synchronized (subscriptions) {
            if (isDone) {
                return;
            }
            history.add(args);
            for (Observer<? super T> observer : new ArrayList<Observer<? super T>>(subscriptions.values())) {
                observer.onNext(args);
            }
        }
    }

@headinthebox
Copy link
Contributor

We should also make sure no additional,on errors,are propagated.

Sent from my iPad

On Nov 30, 2013, at 4:52 AM, akarnokd [email protected] wrote:

I think this is a trivial fix

@Override
public void onNext(T args)
{
    synchronized (subscriptions) {
        if (isDone) {
            return;
        }
        history.add(args);
        for (Observer<? super T> observer : new ArrayList<Observer<? super T>>(subscriptions.values())) {
            observer.onNext(args);
        }
    }
}


Reply to this email directly or view it on GitHub.

rickbw pushed a commit to rickbw/RxJava that referenced this issue Jan 9, 2014
jihoonson pushed a commit to jihoonson/RxJava that referenced this issue Mar 6, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants