Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing a bug and a potential for other concurrency issues. #1552

Merged
merged 1 commit into from
Aug 8, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package rx.internal.operators;

import java.util.Iterator;
import java.util.NoSuchElementException;

import rx.Observable;
import rx.Subscriber;
Expand All @@ -41,54 +42,24 @@ public final class BlockingOperatorMostRecent {
* {@code initialValue} if {@code source} has not yet emitted any items
*/
public static <T> Iterable<T> mostRecent(final Observable<? extends T> source, final T initialValue) {

return new Iterable<T>() {
@Override
public Iterator<T> iterator() {
MostRecentObserver<T> mostRecentObserver = new MostRecentObserver<T>(initialValue);
final MostRecentIterator<T> nextIterator = new MostRecentIterator<T>(mostRecentObserver);

/**
* Subscribe instead of unsafeSubscribe since this is the final subscribe in the chain
* since it is for BlockingObservable.
*/
source.subscribe(mostRecentObserver);

return nextIterator;
return mostRecentObserver.getIterable();
}
};

}

private static class MostRecentIterator<T> implements Iterator<T> {

private final MostRecentObserver<T> observer;

private MostRecentIterator(MostRecentObserver<T> observer) {
this.observer = observer;
}

@Override
public boolean hasNext() {
return !observer.isCompleted();
}

@Override
public T next() {
if (observer.getThrowable() != null) {
throw Exceptions.propagate(observer.getThrowable());
}
return observer.getRecentValue();
}

@Override
public void remove() {
throw new UnsupportedOperationException("Read only iterator");
}
}

private static class MostRecentObserver<T> extends Subscriber<T> {
static final NotificationLite<Object> nl = NotificationLite.instance();
final NotificationLite<T> nl = NotificationLite.instance();
volatile Object value;

private MostRecentObserver(T value) {
Expand All @@ -110,19 +81,47 @@ public void onNext(T args) {
value = nl.next(args);
}

private boolean isCompleted() {
return nl.isCompleted(value);
}

private Throwable getThrowable() {
Object v = value;
return nl.isError(v) ? nl.getError(v) : null;
}

@SuppressWarnings("unchecked")
private T getRecentValue() {
return (T)value;
/**
* The {@link Iterator} return is not thread safe. In other words don't call {@link Iterator#hasNext()} in one
* thread expect {@link Iterator#next()} called from a different thread to work.
* @return
*/
public Iterator<T> getIterable() {
return new Iterator<T>() {
/**
* buffer to make sure that the state of the iterator doesn't change between calling hasNext() and next().
*/
private Object buf = null;

@Override
public boolean hasNext() {
buf = value;
return !nl.isCompleted(buf);
}

@Override
public T next() {
try {
// if hasNext wasn't called before calling next.
if (buf == null)
buf = value;
if (nl.isCompleted(buf))
throw new NoSuchElementException();
if (nl.isError(buf)) {
throw Exceptions.propagate(nl.getError(buf));
}
return nl.getValue(buf);
}
finally {
buf = null;
}
}

@Override
public void remove() {
throw new UnsupportedOperationException("Read only iterator");
}
};
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
import rx.subjects.Subject;

public class BlockingOperatorMostRecentTest {
@Test
public void testMostRecentNull() {
assertEquals(null, Observable.<Void>never().toBlocking().mostRecent(null).iterator().next());
}

@Test
public void testMostRecent() {
Expand Down