Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operator: Latest #59

Closed
benjchristensen opened this issue Jan 18, 2013 · 11 comments
Closed

Operator: Latest #59

benjchristensen opened this issue Jan 18, 2013 · 11 comments

Comments

@benjchristensen
Copy link
Member

http://msdn.microsoft.com/en-us/library/hh212115(v=vs.103).aspx

@abersnaze
Copy link
Contributor

I believe latest can return a value from the past but falls back the next value if one hasn't been received yet.

@zsxwing
Copy link
Member

zsxwing commented Oct 25, 2013

I believe latest can return a value from the past but falls back the next value if one hasn't been received yet.

@abersnaze , Do you mean if the next value hasn't arrived, the latest will return the old cached value?

But there are the following codes in the TryMoveNext method of Latest.

            public override bool TryMoveNext(out TSource current)
            {
                var kind = default(NotificationKind);
                var value = default(TSource);
                var error = default(Exception);

#if !NO_CDS
                _semaphore.Wait();
#else
                _semaphore.WaitOne();
#endif

So when TryMoveNext is called, it will block the thread and wait for the incoming value.

I tried the following codes in Rx.Next:

            IObservable<int> ob =
                    Observable.Create<int>(o =>
                    {
                        Console.WriteLine("Subscribed: Before onNext");
                        o.OnNext(1);
                        Thread.Sleep(2000);
                        o.OnNext(2);
                        Console.WriteLine("Subscribed: After OnNext");
                        o.OnCompleted();
                        return Disposable.Empty;
                    }
                    );
            var iter = ob.SubscribeOn(Scheduler.NewThread).Latest().GetEnumerator();
            Console.WriteLine("Before MoveNext");
            while (iter.MoveNext())
            {
                Console.WriteLine("Find a value");
                Console.WriteLine("Got " + iter.Current);
            }
            Console.WriteLine("After MoveNext");

The output is:

Before MoveNext
Subscribed: Before onNext
Find a value
Got 1
Subscribed: After OnNext
Find a value
Got 2
After MoveNext

@zsxwing
Copy link
Member

zsxwing commented Dec 11, 2013

I did some tests on Rx.Net about Latest and Next.

I draw two Marble diagrams to summary the differences between Latest and Next.

  • Latest

latest

For Latest, the next of Iterator will check if there is a cached value, if so, returns the cached value, and deleted the cached value. If there is not a cached value, it will blocks until the next value is emitted from the Observable, and returns it.

  • Next

next

For Next, the next method of Iterator always blocks until the next value is emitted from the Observable, and returns it.

Please let me know if you find any mistake.

@akarnokd
Copy link
Member

So next will return every value emitted by source whereas latest only those which were observed just before the hasNext() call and only once?

Looking at OperationNext, I find it strange to have the waiting atomic boolean in L132, checked in L147 and set in L162. It appears if there is no one is using takeNext() the onNext skips an onNext-notification. L152 quotes some race condition which I don't really see as the observer ignores onError and onCompleted, and the materialized source should be race-free anyway.

@zsxwing
Copy link
Member

zsxwing commented Dec 16, 2013

Sorry that the next diagram is not clear. Here is a more clear diagram:
next1

So for the next operator, items will be ignored if they are emitted between two next calls.

@zsxwing
Copy link
Member

zsxwing commented Dec 16, 2013

However, as the Iterator has two methods: hasNext and next. it's more complicated. Check the document here: https://github.com/Netflix/RxJava/wiki/Blocking-Observable-Operators#next and the discussion here: #433

@akarnokd
Copy link
Member

@zsxwing Are you going to implement this?

In addition, I've looked at the OperationMostRecent, and if I understand it correctly, it shares a single subscription to the source observable, i.e, if I try to iterate the same source twice, it won't work the second time:

BlockingObservable<Long> source = Observable.interval(100, TimeUnit.MILLISECONDS)
.take(10).toBlockingObservable();

Iterable<Long> it = source.mostRecent(-1L);

for (Long l : it) {
   System.out.println(l);
   Thread.sleep(50);
}

System.out.println("----------------------------------------");

for (Long l : it) {
    System.out.println(l);
    Thread.sleep(50);
}

Same goes for OperationNext.next.

I can fix both mostRecent and next and take latest.

@zsxwing
Copy link
Member

zsxwing commented Dec 16, 2013

You're right. The subscribe action should be created in the Iterator rather than the Iterable. As I'm busy at other work now, it would be appreciated if you can fix this issue and the latest operator.

@akarnokd
Copy link
Member

I'm on it. Thanks.

@benjchristensen
Copy link
Member Author

Implemented.

jihoonson pushed a commit to jihoonson/RxJava that referenced this issue Mar 6, 2020
Project has been renamed to resilience4j and has been modularized.
jihoonson pushed a commit to jihoonson/RxJava that referenced this issue Mar 6, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants