Skip to content

Commit

Permalink
Operator ParallelMerge
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Apr 28, 2014
1 parent 95e0636 commit cd0a844
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 9 deletions.
6 changes: 3 additions & 3 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import rx.operators.OperationOnErrorResumeNextViaObservable;
import rx.operators.OperationOnErrorReturn;
import rx.operators.OperationOnExceptionResumeNextViaObservable;
import rx.operators.OperationParallelMerge;
import rx.operators.OperationReplay;
import rx.operators.OperationSample;
import rx.operators.OperationSequenceEqual;
Expand Down Expand Up @@ -109,6 +108,7 @@
import rx.operators.OperatorOnErrorFlatMap;
import rx.operators.OperatorOnErrorResumeNextViaFunction;
import rx.operators.OperatorParallel;
import rx.operators.OperatorParallelMerge;
import rx.operators.OperatorPivot;
import rx.operators.OperatorRepeat;
import rx.operators.OperatorRetry;
Expand Down Expand Up @@ -2323,7 +2323,7 @@ public final static <T> Observable<T> never() {
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-parallelmerge">RxJava Wiki: parallelMerge()</a>
*/
public final static <T> Observable<Observable<T>> parallelMerge(Observable<Observable<T>> source, int parallelObservables) {
return OperationParallelMerge.parallelMerge(source, parallelObservables);
return OperatorParallelMerge.parallelMerge(source, parallelObservables);
}

/**
Expand All @@ -2346,7 +2346,7 @@ public final static <T> Observable<Observable<T>> parallelMerge(Observable<Obser
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-parallelmerge">RxJava Wiki: parallelMerge()</a>
*/
public final static <T> Observable<Observable<T>> parallelMerge(Observable<Observable<T>> source, int parallelObservables, Scheduler scheduler) {
return OperationParallelMerge.parallelMerge(source, parallelObservables, scheduler);
return OperatorParallelMerge.parallelMerge(source, parallelObservables, scheduler);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import rx.observables.GroupedObservable;
import rx.schedulers.Schedulers;

public class OperationParallelMerge {
public final class OperatorParallelMerge {
private OperatorParallelMerge() { throw new IllegalStateException("No instances!"); }

public static <T> Observable<Observable<T>> parallelMerge(final Observable<Observable<T>> source, final int parallelObservables) {
return parallelMerge(source, parallelObservables, Schedulers.immediate());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;

public class OperationParallelMergeTest {
public class OperatorParallelMergeTest {

@Test
public void testParallelMerge() {
Expand All @@ -41,8 +41,8 @@ public void testParallelMerge() {

Observable<Observable<String>> fourStreams = Observable.<Observable<String>> from(p1, p2, p3, p4);

Observable<Observable<String>> twoStreams = OperationParallelMerge.parallelMerge(fourStreams, 2);
Observable<Observable<String>> threeStreams = OperationParallelMerge.parallelMerge(fourStreams, 3);
Observable<Observable<String>> twoStreams = Observable.parallelMerge(fourStreams, 2);
Observable<Observable<String>> threeStreams = Observable.parallelMerge(fourStreams, 3);

List<? super Observable<String>> fourList = fourStreams.toList().toBlockingObservable().last();
List<? super Observable<String>> threeList = threeStreams.toList().toBlockingObservable().last();
Expand All @@ -62,7 +62,7 @@ public void testNumberOfThreads() {
final ConcurrentHashMap<Long, Long> threads = new ConcurrentHashMap<Long, Long>();
// parallelMerge into 3 streams and observeOn for each
// we expect 3 threads in the output
OperationParallelMerge.parallelMerge(getStreams(), 3)
Observable.parallelMerge(getStreams(), 3)
.flatMap(new Func1<Observable<String>, Observable<String>>() {

@Override
Expand All @@ -89,7 +89,7 @@ public void testNumberOfThreadsOnScheduledMerge() {

// now we parallelMerge into 3 streams and observeOn for each
// we expect 3 threads in the output
Observable.merge(OperationParallelMerge.parallelMerge(getStreams(), 3, Schedulers.newThread()))
Observable.merge(Observable.parallelMerge(getStreams(), 3, Schedulers.newThread()))
.toBlockingObservable().forEach(new Action1<String>() {

@Override
Expand Down

0 comments on commit cd0a844

Please sign in to comment.