-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Operators Skip, SkipLast, Take with time #667
Conversation
RxJava-pull-requests #601 FAILURE |
private static final class SourceObserver<T> implements Observer<T>, Action0 { | ||
final Observer<? super T> observer; | ||
final Subscription cancel; | ||
final AtomicInteger state = new AtomicInteger(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need an atomic state machine in here when there is not going to be any concurrency when the Observer
is invoked?
The concurrency will happen in a very controlled place when the timer fires and you emit whatever is queued and onCompleted, but the on*
events will not be invoked concurrently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Take doesn't queue anything. It relays events until the timer fires, which might run at the same time as a regular onNext event. The atomic is there to prevent this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a simple compareAndSet
then on being finished is it not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I use the classic get/compareAndSet, the following case might happen:
T1: if (state.get()) succeeds
T2: if (compareAndSet(true, false)) succeeds
T1: observer.onNext() executing
T2: observer.onCompleted executing
One would need to mutually exclude observer.onXXX calls, which could be done via synchronization and some overhead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah right ...
Manual Merge of Pull #667
Rebased version, without the drain scheduler variant.