From 1f252c0acbac7bc8702ad5fe94693c7752ac93c9 Mon Sep 17 00:00:00 2001 From: George Campbell Date: Tue, 5 Aug 2014 11:06:03 -0700 Subject: [PATCH] Fixing a bug and a potential for other concurrency issues. --- .../operators/BlockingOperatorMostRecent.java | 89 +++++++++---------- .../BlockingOperatorMostRecentTest.java | 4 + 2 files changed, 48 insertions(+), 45 deletions(-) diff --git a/rxjava-core/src/main/java/rx/internal/operators/BlockingOperatorMostRecent.java b/rxjava-core/src/main/java/rx/internal/operators/BlockingOperatorMostRecent.java index c481b7022d..3c00fd4ca6 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/BlockingOperatorMostRecent.java +++ b/rxjava-core/src/main/java/rx/internal/operators/BlockingOperatorMostRecent.java @@ -16,6 +16,7 @@ package rx.internal.operators; import java.util.Iterator; +import java.util.NoSuchElementException; import rx.Observable; import rx.Subscriber; @@ -41,12 +42,10 @@ public final class BlockingOperatorMostRecent { * {@code initialValue} if {@code source} has not yet emitted any items */ public static Iterable mostRecent(final Observable source, final T initialValue) { - return new Iterable() { @Override public Iterator iterator() { MostRecentObserver mostRecentObserver = new MostRecentObserver(initialValue); - final MostRecentIterator nextIterator = new MostRecentIterator(mostRecentObserver); /** * Subscribe instead of unsafeSubscribe since this is the final subscribe in the chain @@ -54,41 +53,13 @@ public Iterator iterator() { */ source.subscribe(mostRecentObserver); - return nextIterator; + return mostRecentObserver.getIterable(); } }; - - } - - private static class MostRecentIterator implements Iterator { - - private final MostRecentObserver observer; - - private MostRecentIterator(MostRecentObserver observer) { - this.observer = observer; - } - - @Override - public boolean hasNext() { - return !observer.isCompleted(); - } - - @Override - public T next() { - if (observer.getThrowable() != null) { - throw Exceptions.propagate(observer.getThrowable()); - } - return observer.getRecentValue(); - } - - @Override - public void remove() { - throw new UnsupportedOperationException("Read only iterator"); - } } private static class MostRecentObserver extends Subscriber { - static final NotificationLite nl = NotificationLite.instance(); + final NotificationLite nl = NotificationLite.instance(); volatile Object value; private MostRecentObserver(T value) { @@ -110,19 +81,47 @@ public void onNext(T args) { value = nl.next(args); } - private boolean isCompleted() { - return nl.isCompleted(value); - } - - private Throwable getThrowable() { - Object v = value; - return nl.isError(v) ? nl.getError(v) : null; - } - - @SuppressWarnings("unchecked") - private T getRecentValue() { - return (T)value; + /** + * The {@link Iterator} return is not thread safe. In other words don't call {@link Iterator#hasNext()} in one + * thread expect {@link Iterator#next()} called from a different thread to work. + * @return + */ + public Iterator getIterable() { + return new Iterator() { + /** + * buffer to make sure that the state of the iterator doesn't change between calling hasNext() and next(). + */ + private Object buf = null; + + @Override + public boolean hasNext() { + buf = value; + return !nl.isCompleted(buf); + } + + @Override + public T next() { + try { + // if hasNext wasn't called before calling next. + if (buf == null) + buf = value; + if (nl.isCompleted(buf)) + throw new NoSuchElementException(); + if (nl.isError(buf)) { + throw Exceptions.propagate(nl.getError(buf)); + } + return nl.getValue(buf); + } + finally { + buf = null; + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Read only iterator"); + } + }; } - } } diff --git a/rxjava-core/src/test/java/rx/internal/operators/BlockingOperatorMostRecentTest.java b/rxjava-core/src/test/java/rx/internal/operators/BlockingOperatorMostRecentTest.java index 60415eba85..3f377cffb1 100644 --- a/rxjava-core/src/test/java/rx/internal/operators/BlockingOperatorMostRecentTest.java +++ b/rxjava-core/src/test/java/rx/internal/operators/BlockingOperatorMostRecentTest.java @@ -34,6 +34,10 @@ import rx.subjects.Subject; public class BlockingOperatorMostRecentTest { + @Test + public void testMostRecentNull() { + assertEquals(null, Observable.never().toBlocking().mostRecent(null).iterator().next()); + } @Test public void testMostRecent() {