Skip to content

Commit

Permalink
Merge pull request #1552 from abersnaze/most-recent-null
Browse files Browse the repository at this point in the history
Fixing a bug and a potential for other concurrency issues.
  • Loading branch information
benjchristensen committed Aug 8, 2014
2 parents 0fe6e01 + 1f252c0 commit 03fe25c
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package rx.internal.operators;

import java.util.Iterator;
import java.util.NoSuchElementException;

import rx.Observable;
import rx.Subscriber;
Expand All @@ -41,54 +42,24 @@ public final class BlockingOperatorMostRecent {
* {@code initialValue} if {@code source} has not yet emitted any items
*/
public static <T> Iterable<T> mostRecent(final Observable<? extends T> source, final T initialValue) {

return new Iterable<T>() {
@Override
public Iterator<T> iterator() {
MostRecentObserver<T> mostRecentObserver = new MostRecentObserver<T>(initialValue);
final MostRecentIterator<T> nextIterator = new MostRecentIterator<T>(mostRecentObserver);

/**
* Subscribe instead of unsafeSubscribe since this is the final subscribe in the chain
* since it is for BlockingObservable.
*/
source.subscribe(mostRecentObserver);

return nextIterator;
return mostRecentObserver.getIterable();
}
};

}

private static class MostRecentIterator<T> implements Iterator<T> {

private final MostRecentObserver<T> observer;

private MostRecentIterator(MostRecentObserver<T> observer) {
this.observer = observer;
}

@Override
public boolean hasNext() {
return !observer.isCompleted();
}

@Override
public T next() {
if (observer.getThrowable() != null) {
throw Exceptions.propagate(observer.getThrowable());
}
return observer.getRecentValue();
}

@Override
public void remove() {
throw new UnsupportedOperationException("Read only iterator");
}
}

private static class MostRecentObserver<T> extends Subscriber<T> {
static final NotificationLite<Object> nl = NotificationLite.instance();
final NotificationLite<T> nl = NotificationLite.instance();
volatile Object value;

private MostRecentObserver(T value) {
Expand All @@ -110,19 +81,47 @@ public void onNext(T args) {
value = nl.next(args);
}

private boolean isCompleted() {
return nl.isCompleted(value);
}

private Throwable getThrowable() {
Object v = value;
return nl.isError(v) ? nl.getError(v) : null;
}

@SuppressWarnings("unchecked")
private T getRecentValue() {
return (T)value;
/**
* The {@link Iterator} return is not thread safe. In other words don't call {@link Iterator#hasNext()} in one
* thread expect {@link Iterator#next()} called from a different thread to work.
* @return
*/
public Iterator<T> getIterable() {
return new Iterator<T>() {
/**
* buffer to make sure that the state of the iterator doesn't change between calling hasNext() and next().
*/
private Object buf = null;

@Override
public boolean hasNext() {
buf = value;
return !nl.isCompleted(buf);
}

@Override
public T next() {
try {
// if hasNext wasn't called before calling next.
if (buf == null)
buf = value;
if (nl.isCompleted(buf))
throw new NoSuchElementException();
if (nl.isError(buf)) {
throw Exceptions.propagate(nl.getError(buf));
}
return nl.getValue(buf);
}
finally {
buf = null;
}
}

@Override
public void remove() {
throw new UnsupportedOperationException("Read only iterator");
}
};
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
import rx.subjects.Subject;

public class BlockingOperatorMostRecentTest {
@Test
public void testMostRecentNull() {
assertEquals(null, Observable.<Void>never().toBlocking().mostRecent(null).iterator().next());
}

@Test
public void testMostRecent() {
Expand Down

0 comments on commit 03fe25c

Please sign in to comment.