Skip to content

Commit

Permalink
Merge pull request ReactiveX#156 from mairbek/toIterable
Browse files Browse the repository at this point in the history
Implemented ToIterable Operation
  • Loading branch information
benjchristensen committed Feb 27, 2013
2 parents 5d8a13e + f805834 commit 04b57c2
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 8 deletions.
140 changes: 132 additions & 8 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,12 @@
*/
package rx;

import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.Before;
Expand Down Expand Up @@ -58,6 +53,7 @@
import rx.plugins.RxJavaPlugins;
import rx.util.AtomicObservableSubscription;
import rx.util.AtomicObserver;
import rx.util.Exceptions;
import rx.util.Range;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
Expand Down Expand Up @@ -1524,6 +1520,79 @@ public static <T> Observable<List<T>> toList(final Observable<T> that) {
return _create(OperationToObservableList.toObservableList(that));
}

/**
* Converts an observable sequence to an Iterable.
*
* @param that the source Observable
* @return Observable converted to Iterable.
*/
public static <T> Iterable<T> toIterable(final Observable<T> that) {
final BlockingQueue<Notification<T>> notifications = new LinkedBlockingQueue<Notification<T>>();

materialize(that).subscribe(new Observer<Notification<T>>() {
@Override
public void onCompleted() {
// ignore
}

@Override
public void onError(Exception e) {
// ignore
}

@Override
public void onNext(Notification<T> args) {
notifications.offer(args);
}
});

final Iterator<T> it = new Iterator<T>() {
private Notification<T> buf;

@Override
public boolean hasNext() {
if (buf == null) {
buf = take();
}
return !buf.isOnCompleted();
}

@Override
public T next() {
if (buf == null) {
buf = take();
}
if (buf.isOnError()) {
throw Exceptions.propagate(buf.getException());
}

T result = buf.getValue();
buf = null;
return result;
}

private Notification<T> take() {
try {
return notifications.take();
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
}

@Override
public void remove() {
throw new UnsupportedOperationException("Read-only iterator");
}
};

return new Iterable<T>() {
@Override
public Iterator<T> iterator() {
return it;
}
};
}

/**
* Converts an Iterable sequence to an Observable sequence.
*
Expand Down Expand Up @@ -2544,6 +2613,15 @@ public Observable<List<T>> toSortedList(final Object sortFunction) {
return toSortedList(this, sortFunction);
}

/**
* Converts an observable sequence to an Iterable.
*
* @return Observable converted to Iterable.
*/
public Iterable<T> toIterable() {
return toIterable(this);
}

public static class UnitTest {

@Mock
Expand Down Expand Up @@ -2623,6 +2701,52 @@ public void testSequenceEqual() {
verify(result, times(1)).onNext(false);
}


@Test
public void testToIterable() {
Observable<String> obs = toObservable("one", "two", "three");

Iterator<String> it = obs.toIterable().iterator();

assertEquals(true, it.hasNext());
assertEquals("one", it.next());

assertEquals(true, it.hasNext());
assertEquals("two", it.next());

assertEquals(true, it.hasNext());
assertEquals("three", it.next());

assertEquals(false, it.hasNext());

}

@Test(expected = TestException.class)
public void testToIterableWithException() {
Observable<String> obs = create(new Func1<Observer<String>, Subscription>() {

@Override
public Subscription call(Observer<String> observer) {
observer.onNext("one");
observer.onError(new TestException());
return Observable.noOpSubscription();
}
});

Iterator<String> it = obs.toIterable().iterator();

assertEquals(true, it.hasNext());
assertEquals("one", it.next());

assertEquals(true, it.hasNext());
it.next();

}

private static class TestException extends RuntimeException {

}

}

}
31 changes: 31 additions & 0 deletions rxjava-core/src/main/java/rx/util/Exceptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.util;

public class Exceptions {
private Exceptions() {

}

public static RuntimeException propagate(Throwable t) {
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else {
throw new RuntimeException(t);
}
}

}

0 comments on commit 04b57c2

Please sign in to comment.