From 1ea15baf5b76d407c1aa97331c3857c95d229bd6 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 17 Sep 2013 22:39:51 -0700 Subject: [PATCH 1/4] Fix ObserveOn, NewThreadScheduler and ScheduledObserver bugs @headinthebox and I were working on some code and found differences in behavior between Rx.Net and RxJava with observeOn. This commit should fix that. --- .../rx/concurrency/NewThreadScheduler.java | 78 +++++++++--- .../java/rx/operators/OperationObserveOn.java | 7 +- .../java/rx/operators/OperationRetry.java | 25 ++-- .../java/rx/operators/ScheduledObserver.java | 112 ++++++++++++----- .../subscriptions/CompositeSubscription.java | 27 ++-- .../MultipleAssignmentSubscription.java | 45 +++++++ .../java/rx/subscriptions/Subscriptions.java | 52 ++++++-- .../src/test/java/rx/ObserveOnTests.java | 118 ++++++++++++++++++ 8 files changed, 380 insertions(+), 84 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java create mode 100644 rxjava-core/src/test/java/rx/ObserveOnTests.java diff --git a/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java b/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java index d8e178bc0d..c33918353b 100644 --- a/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java @@ -15,12 +15,15 @@ */ package rx.concurrency; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import rx.Scheduler; import rx.Subscription; -import rx.operators.SafeObservableSubscription; import rx.subscriptions.CompositeSubscription; import rx.subscriptions.Subscriptions; import rx.util.functions.Func2; @@ -29,27 +32,74 @@ * Schedules work on a new thread. */ public class NewThreadScheduler extends Scheduler { - private static final NewThreadScheduler INSTANCE = new NewThreadScheduler(); + + private final static NewThreadScheduler INSTANCE = new NewThreadScheduler(); + private final static AtomicLong count = new AtomicLong(); public static NewThreadScheduler getInstance() { return INSTANCE; } - @Override - public Subscription schedule(final T state, final Func2 action) { - final SafeObservableSubscription subscription = new SafeObservableSubscription(); - final Scheduler _scheduler = this; + private NewThreadScheduler() { - Thread t = new Thread(new Runnable() { - @Override - public void run() { - subscription.wrap(action.call(_scheduler, state)); - } - }, "RxNewThreadScheduler"); + } - t.start(); + private static class EventLoopScheduler extends Scheduler { + private final ExecutorService executor; - return subscription; + private EventLoopScheduler() { + executor = Executors.newFixedThreadPool(1, new ThreadFactory() { + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "RxNewThreadScheduler-" + count.incrementAndGet()); + } + }); + } + + @Override + public Subscription schedule(final T state, final Func2 action) { + final Scheduler _scheduler = this; + return Subscriptions.from(executor.submit(new Runnable() { + + @Override + public void run() { + action.call(_scheduler, state); + } + })); + } + + @Override + public Subscription schedule(final T state, final Func2 action, final long delayTime, final TimeUnit unit) { + // we will use the system scheduler since it doesn't make sense to launch a new Thread and then sleep + // we will instead schedule the event then launch the thread after the delay has passed + final Scheduler _scheduler = this; + final CompositeSubscription subscription = new CompositeSubscription(); + ScheduledFuture f = GenericScheduledExecutorService.getInstance().schedule(new Runnable() { + + @Override + public void run() { + if (!subscription.isUnsubscribed()) { + // when the delay has passed we now do the work on the actual scheduler + Subscription s = _scheduler.schedule(state, action); + // add the subscription to the CompositeSubscription so it is unsubscribed + subscription.add(s); + } + } + }, delayTime, unit); + + // add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens + subscription.add(Subscriptions.create(f)); + + return subscription; + } + + } + + @Override + public Subscription schedule(final T state, final Func2 action) { + EventLoopScheduler s = new EventLoopScheduler(); + return s.schedule(state, action); } @Override diff --git a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java index f947333456..aef1cb9548 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java @@ -20,6 +20,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; import org.mockito.InOrder; @@ -33,6 +34,8 @@ import rx.Subscription; import rx.concurrency.ImmediateScheduler; import rx.concurrency.Schedulers; +import rx.subscriptions.CompositeSubscription; +import rx.util.functions.Func2; /** * Asynchronously notify Observers on the specified Scheduler. @@ -60,7 +63,9 @@ public Subscription onSubscribe(final Observer observer) { // do nothing if we request ImmediateScheduler so we don't invoke overhead return source.subscribe(observer); } else { - return source.subscribe(new ScheduledObserver(observer, scheduler)); + CompositeSubscription s = new CompositeSubscription(); + s.add(source.subscribe(new ScheduledObserver(s, observer, scheduler))); + return s; } } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationRetry.java b/rxjava-core/src/main/java/rx/operators/OperationRetry.java index 977373cb01..0f664ee3ec 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationRetry.java +++ b/rxjava-core/src/main/java/rx/operators/OperationRetry.java @@ -26,11 +26,13 @@ import rx.Observable; import rx.Observable.OnSubscribeFunc; import rx.Observer; +import rx.Scheduler; import rx.Subscription; import rx.concurrency.Schedulers; import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.MultipleAssignmentSubscription; import rx.subscriptions.Subscriptions; -import rx.util.functions.Action0; +import rx.util.functions.Func2; public class OperationRetry { @@ -58,17 +60,19 @@ public Retry(Observable source, int retryCount) { @Override public Subscription onSubscribe(Observer observer) { - subscription.add(Schedulers.currentThread().schedule(attemptSubscription(observer))); + MultipleAssignmentSubscription rescursiveSubscription = new MultipleAssignmentSubscription(); + subscription.add(Schedulers.currentThread().schedule(rescursiveSubscription, attemptSubscription(observer))); + subscription.add(rescursiveSubscription); return subscription; } - private Action0 attemptSubscription(final Observer observer) { - return new Action0() { + private Func2 attemptSubscription(final Observer observer) { + return new Func2() { @Override - public void call() { + public Subscription call(final Scheduler scheduler, final MultipleAssignmentSubscription rescursiveSubscription) { attempts.incrementAndGet(); - source.subscribe(new Observer() { + return source.subscribe(new Observer() { @Override public void onCompleted() { @@ -79,10 +83,8 @@ public void onCompleted() { public void onError(Throwable e) { if ((retryCount == INFINITE_RETRY || attempts.get() <= retryCount) && !subscription.isUnsubscribed()) { // retry again - // remove the last subscription since we have completed (so as we retry we don't build up a huge list) - subscription.removeLast(); - // add the new subscription and schedule a retry - subscription.add(Schedulers.currentThread().schedule(attemptSubscription(observer))); + // add the new subscription and schedule a retry recursively + rescursiveSubscription.setSubscription(scheduler.schedule(rescursiveSubscription, attemptSubscription(observer))); } else { // give up and pass the failure observer.onError(e); @@ -96,6 +98,7 @@ public void onNext(T v) { }); } + }; } @@ -157,7 +160,7 @@ public void testRetrySuccess() { inOrder.verify(observer, times(1)).onCompleted(); inOrder.verifyNoMoreInteractions(); } - + @Test public void testInfiniteRetry() { int NUM_FAILURES = 20; diff --git a/rxjava-core/src/main/java/rx/operators/ScheduledObserver.java b/rxjava-core/src/main/java/rx/operators/ScheduledObserver.java index a5eff1b852..ed1e83e299 100644 --- a/rxjava-core/src/main/java/rx/operators/ScheduledObserver.java +++ b/rxjava-core/src/main/java/rx/operators/ScheduledObserver.java @@ -16,21 +16,28 @@ package rx.operators; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import rx.Notification; import rx.Observer; import rx.Scheduler; -import rx.util.functions.Action0; +import rx.Subscription; +import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.MultipleAssignmentSubscription; +import rx.util.functions.Func2; /* package */class ScheduledObserver implements Observer { private final Observer underlying; private final Scheduler scheduler; + private final CompositeSubscription parentSubscription; + private final EventLoop eventLoop = new EventLoop(); private final ConcurrentLinkedQueue> queue = new ConcurrentLinkedQueue>(); - private final AtomicInteger counter = new AtomicInteger(0); + private final AtomicBoolean started = new AtomicBoolean(); - public ScheduledObserver(Observer underlying, Scheduler scheduler) { + public ScheduledObserver(CompositeSubscription s, Observer underlying, Scheduler scheduler) { + this.parentSubscription = s; this.underlying = underlying; this.scheduler = scheduler; } @@ -50,46 +57,83 @@ public void onNext(final T args) { enqueue(new Notification(args)); } + final AtomicInteger counter = new AtomicInteger(); + private void enqueue(Notification notification) { - // this must happen before 'counter' is used to provide synchronization between threads + // this must happen before synchronization between threads queue.offer(notification); - // we now use counter to atomically determine if we need to start processing or not - // it will be 0 if it's the first notification or the scheduler has finished processing work - // and we need to start doing it again - if (counter.getAndIncrement() == 0) { - processQueue(); + /** + * If the counter is currently at 0 (before incrementing with this addition) + * we will schedule the work. + */ + if (counter.getAndIncrement() <= 0) { + if (!started.get() && started.compareAndSet(false, true)) { + // first time we use the parent scheduler to start the event loop + MultipleAssignmentSubscription recursiveSubscription = new MultipleAssignmentSubscription(); + parentSubscription.add(scheduler.schedule(recursiveSubscription, eventLoop)); + parentSubscription.add(recursiveSubscription); + } else { + // subsequent times we reschedule existing one + eventLoop.reschedule(); + } } } - private void processQueue() { - scheduler.schedule(new Action0() { - @Override - public void call() { - Notification not = queue.poll(); - - switch (not.getKind()) { - case OnNext: - underlying.onNext(not.getValue()); - break; - case OnError: - underlying.onError(not.getThrowable()); - break; - case OnCompleted: - underlying.onCompleted(); - break; - default: - throw new IllegalStateException("Unknown kind of notification " + not); + private class EventLoop implements Func2 { - } + volatile Scheduler _recursiveScheduler; + volatile MultipleAssignmentSubscription _recursiveSubscription; - // decrement count and if we still have work to do - // recursively schedule ourselves to process again - if (counter.decrementAndGet() > 0) { - scheduler.schedule(this); - } + public void reschedule() { + _recursiveSubscription.setSubscription(_recursiveScheduler.schedule(_recursiveSubscription, this)); + } + @Override + public Subscription call(Scheduler s, MultipleAssignmentSubscription recursiveSubscription) { + /* + * -------------------------------------------------------------------------------------- + * Set these the first time through so we can externally trigger recursive execution again + */ + if (_recursiveScheduler == null) { + _recursiveScheduler = s; + } + if (_recursiveSubscription == null) { + _recursiveSubscription = recursiveSubscription; } - }); + /* + * Back to regular flow + * -------------------------------------------------------------------------------------- + */ + + do { + Notification notification = queue.poll(); + // if we got a notification, send it + if (notification != null) { + + // if unsubscribed stop working + if (parentSubscription.isUnsubscribed()) { + return parentSubscription; + } + // process notification + + switch (notification.getKind()) { + case OnNext: + underlying.onNext(notification.getValue()); + break; + case OnError: + underlying.onError(notification.getThrowable()); + break; + case OnCompleted: + underlying.onCompleted(); + break; + default: + throw new IllegalStateException("Unknown kind of notification " + notification); + } + } + } while (counter.decrementAndGet() > 0); + + return parentSubscription; + } } } diff --git a/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java index 330657cd5d..6ca5c8a699 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java @@ -20,7 +20,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -42,18 +42,24 @@ public class CompositeSubscription implements Subscription { * TODO evaluate whether use of synchronized is a performance issue here and if it's worth using an atomic state machine or other non-locking approach */ private AtomicBoolean unsubscribed = new AtomicBoolean(false); - private final LinkedBlockingDeque subscriptions = new LinkedBlockingDeque(); + private final ConcurrentHashMap subscriptions = new ConcurrentHashMap(); public CompositeSubscription(List subscriptions) { - this.subscriptions.addAll(subscriptions); + for (Subscription s : subscriptions) { + this.subscriptions.put(s, Boolean.TRUE); + } } public CompositeSubscription(Subscription... subscriptions) { for (Subscription s : subscriptions) { - this.subscriptions.add(s); + this.subscriptions.put(s, Boolean.TRUE); } } + public void remove(Subscription s) { + this.subscriptions.remove(s); + } + public boolean isUnsubscribed() { return unsubscribed.get(); } @@ -62,24 +68,15 @@ public synchronized void add(Subscription s) { if (unsubscribed.get()) { s.unsubscribe(); } else { - subscriptions.add(s); + subscriptions.put(s, Boolean.TRUE); } } - /** - * Remove the last Subscription that was added. - * - * @return Subscription or null if none exists - */ - public synchronized Subscription removeLast() { - return subscriptions.pollLast(); - } - @Override public synchronized void unsubscribe() { if (unsubscribed.compareAndSet(false, true)) { Collection es = null; - for (Subscription s : subscriptions) { + for (Subscription s : subscriptions.keySet()) { try { s.unsubscribe(); } catch (Throwable e) { diff --git a/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java new file mode 100644 index 0000000000..2af6501425 --- /dev/null +++ b/rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java @@ -0,0 +1,45 @@ +package rx.subscriptions; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import rx.Observable; +import rx.Subscription; + +/** + * Subscription that can be checked for status such as in a loop inside an {@link Observable} to exit the loop if unsubscribed. + * + * @see Rx.Net equivalent MultipleAssignmentDisposable + */ +public class MultipleAssignmentSubscription implements Subscription { + + private final AtomicBoolean unsubscribed = new AtomicBoolean(false); + private AtomicReference subscription = new AtomicReference(); + + public boolean isUnsubscribed() { + return unsubscribed.get(); + } + + @Override + public synchronized void unsubscribe() { + unsubscribed.set(true); + Subscription s = getSubscription(); + if (s != null) { + s.unsubscribe(); + } + + } + + public synchronized void setSubscription(Subscription s) { + if (unsubscribed.get()) { + s.unsubscribe(); + } else { + subscription.set(s); + } + } + + public Subscription getSubscription() { + return subscription.get(); + } + +} diff --git a/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java b/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java index 5d52a69cb2..032a0eaece 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java +++ b/rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -26,7 +26,7 @@ public class Subscriptions { /** * A {@link Subscription} that does nothing. - * + * * @return {@link Subscription} */ public static Subscription empty() { @@ -35,8 +35,9 @@ public static Subscription empty() { /** * A {@link Subscription} which invokes the given {@link Action0} when unsubscribed. - * - * @param unsubscribe Action to invoke on unsubscribe. + * + * @param unsubscribe + * Action to invoke on unsubscribe. * @return {@link Subscription} */ public static Subscription create(final Action0 unsubscribe) { @@ -52,12 +53,32 @@ public void unsubscribe() { /** * A {@link Subscription} that wraps a {@link Future} and cancels it when unsubscribed. - * - * + * + * * @param f * {@link Future} * @return {@link Subscription} */ + public static Subscription from(final Future f) { + return new Subscription() { + + @Override + public void unsubscribe() { + f.cancel(true); + } + + }; + } + + /** + * A {@link Subscription} that wraps a {@link Future} and cancels it when unsubscribed. + * + * + * @param f + * {@link Future} + * @return {@link Subscription} + * @deprecated Use {@link #from(Future)} instead + */ public static Subscription create(final Future f) { return new Subscription() { @@ -71,10 +92,23 @@ public void unsubscribe() { /** * A {@link Subscription} that groups multiple Subscriptions together and unsubscribes from all of them together. - * + * + * @param subscriptions + * Subscriptions to group together + * @return {@link Subscription} + */ + + public static CompositeSubscription from(Subscription... subscriptions) { + return new CompositeSubscription(subscriptions); + } + + /** + * A {@link Subscription} that groups multiple Subscriptions together and unsubscribes from all of them together. + * * @param subscriptions * Subscriptions to group together * @return {@link Subscription} + * @deprecated Use {@link #from(Subscription...)} instead */ public static CompositeSubscription create(Subscription... subscriptions) { diff --git a/rxjava-core/src/test/java/rx/ObserveOnTests.java b/rxjava-core/src/test/java/rx/ObserveOnTests.java new file mode 100644 index 0000000000..e6e4e46b2d --- /dev/null +++ b/rxjava-core/src/test/java/rx/ObserveOnTests.java @@ -0,0 +1,118 @@ +package rx; + +import static org.junit.Assert.*; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import rx.concurrency.Schedulers; +import rx.util.functions.Action1; +import rx.util.functions.Func1; + +public class ObserveOnTests { + + /** + * Confirm that running on a NewThreadScheduler uses the same thread for the entire stream + */ + @Test + public void testObserveOnWithNewThreadScheduler() { + final AtomicInteger count = new AtomicInteger(); + final int _multiple = 99; + + Observable.range(1, 100000).map(new Func1() { + + @Override + public Integer call(Integer t1) { + return t1 * _multiple; + } + + }).observeOn(Schedulers.newThread()) + .toBlockingObservable().forEach(new Action1() { + + @Override + public void call(Integer t1) { + assertEquals(count.incrementAndGet() * _multiple, t1.intValue()); + assertTrue(Thread.currentThread().getName().startsWith("RxNewThreadScheduler")); + } + + }); + } + + /** + * Confirm that running on a ThreadPoolScheduler allows multiple threads but is still ordered. + */ + @Test + public void testObserveOnWithThreadPoolScheduler() { + final AtomicInteger count = new AtomicInteger(); + final int _multiple = 99; + + Observable.range(1, 100000).map(new Func1() { + + @Override + public Integer call(Integer t1) { + return t1 * _multiple; + } + + }).observeOn(Schedulers.threadPoolForComputation()) + .toBlockingObservable().forEach(new Action1() { + + @Override + public void call(Integer t1) { + assertEquals(count.incrementAndGet() * _multiple, t1.intValue()); + assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool")); + } + + }); + } + + /** + * Attempts to confirm that when pauses exist between events, the ScheduledObserver + * does not lose or reorder any events since the scheduler will not block, but will + * be re-scheduled when it receives new events after each pause. + * + * + * This is non-deterministic in proving success, but if it ever fails (non-deterministically) + * it is a sign of potential issues as thread-races and scheduling should not affect output. + */ + @Test + public void testObserveOnOrderingConcurrency() { + final AtomicInteger count = new AtomicInteger(); + final int _multiple = 99; + + Observable.range(1, 10000).map(new Func1() { + + @Override + public Integer call(Integer t1) { + if (randomIntFrom0to100() > 98) { + try { + Thread.sleep(2); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + return t1 * _multiple; + } + + }).observeOn(Schedulers.threadPoolForComputation()) + .toBlockingObservable().forEach(new Action1() { + + @Override + public void call(Integer t1) { + assertEquals(count.incrementAndGet() * _multiple, t1.intValue()); + assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool")); + } + + }); + } + + private static int randomIntFrom0to100() { + // XORShift instead of Math.random http://javamex.com/tutorials/random_numbers/xorshift.shtml + long x = System.nanoTime(); + x ^= (x << 21); + x ^= (x >>> 35); + x ^= (x << 4); + return Math.abs((int) x % 100); + } + +} From 53c30d3d113c42c049947e1010f03fac91dfa626 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Wed, 18 Sep 2013 21:34:03 -0700 Subject: [PATCH 2/4] Parallel Operator This operator came out of discussions and work with @headinthebox to allow explicit and composable declaration of blocks of work that can be scheduled for parallel execution. An Observable event stream will be sharded using groupBy using a value from Scheduler. degreeOfParallelism() (defaulting to number of CPU cores) and perform the defined work in parallel. Instead of having various parallel operators like parallelMap, parallelFilter parallelScan etc this can work generically for any operators or sequence of operators. --- .../groovy/rx/lang/groovy/TestParallel.groovy | 21 ++++ rxjava-core/src/main/java/rx/Observable.java | 36 +++++-- rxjava-core/src/main/java/rx/Scheduler.java | 13 ++- .../java/rx/operators/OperationParallel.java | 99 +++++++++++++++++++ 4 files changed, 162 insertions(+), 7 deletions(-) create mode 100644 language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/TestParallel.groovy create mode 100644 rxjava-core/src/main/java/rx/operators/OperationParallel.java diff --git a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/TestParallel.groovy b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/TestParallel.groovy new file mode 100644 index 0000000000..c2e2eb52bd --- /dev/null +++ b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/TestParallel.groovy @@ -0,0 +1,21 @@ +package rx.lang.groovy + +import org.junit.Test + +import rx.Observable +import rx.Scheduler +import rx.concurrency.Schedulers +import rx.util.functions.Func1 + +class TestParallel { + + @Test + public void testParallelOperator() { + Observable.range(0, 100) + .parallel({ + it.map({ return it; }) + }) + .toBlockingObservable() + .forEach({ println("T: " + it + " Thread: " + Thread.currentThread()); }); + } +} diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index a14e78329c..6f515b2be3 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -34,10 +34,11 @@ import rx.operators.OperationCache; import rx.operators.OperationCombineLatest; import rx.operators.OperationConcat; +import rx.operators.OperationDebounce; import rx.operators.OperationDefer; import rx.operators.OperationDematerialize; -import rx.operators.OperationDistinctUntilChanged; import rx.operators.OperationDistinct; +import rx.operators.OperationDistinctUntilChanged; import rx.operators.OperationFilter; import rx.operators.OperationFinally; import rx.operators.OperationFirstOrDefault; @@ -53,6 +54,7 @@ import rx.operators.OperationOnErrorResumeNextViaObservable; import rx.operators.OperationOnErrorReturn; import rx.operators.OperationOnExceptionResumeNextViaObservable; +import rx.operators.OperationParallel; import rx.operators.OperationRetry; import rx.operators.OperationSample; import rx.operators.OperationScan; @@ -67,7 +69,6 @@ import rx.operators.OperationTakeUntil; import rx.operators.OperationTakeWhile; import rx.operators.OperationThrottleFirst; -import rx.operators.OperationDebounce; import rx.operators.OperationTimestamp; import rx.operators.OperationToObservableFuture; import rx.operators.OperationToObservableIterable; @@ -1773,15 +1774,13 @@ public static Observable switchOnNext(Observable * the type of item emitted by the source Observable * @return an Observable that is a chronologically well-behaved version of the source * Observable, and that synchronously notifies its {@link Observer}s */ - public static Observable synchronize(Observable observable) { - return create(OperationSynchronize.synchronize(observable)); + public Observable synchronize() { + return create(OperationSynchronize.synchronize(this)); } @@ -3484,6 +3483,31 @@ public Observable cache() { return create(OperationCache.cache(this)); } + /** + * Perform work in parallel by sharding an {@code Observable} on a {@link Schedulers#threadPoolForComputation()} {@link Scheduler} and return an {@code Observable} with the output. + * + * @param f + * a {@link Func1} that applies Observable operators to {@code Observable} in parallel and returns an {@code Observable} + * @return an Observable with the output of the {@link Func1} executed on a {@link Scheduler} + */ + public Observable parallel(Func1, Observable> f) { + return OperationParallel.parallel(this, f); + } + + /** + * Perform work in parallel by sharding an {@code Observable} on a {@link Scheduler} and return an {@code Observable} with the output. + * + * @param f + * a {@link Func1} that applies Observable operators to {@code Observable} in parallel and returns an {@code Observable} + * @param s + * a {@link Scheduler} to perform the work on. + * @return an Observable with the output of the {@link Func1} executed on a {@link Scheduler} + */ + + public Observable parallel(final Func1, Observable> f, final Scheduler s) { + return OperationParallel.parallel(this, f, s); + } + /** * Returns a {@link ConnectableObservable}, which waits until its {@link ConnectableObservable#connect connect} method is called before it begins emitting * items to those {@link Observer}s that have subscribed to it. diff --git a/rxjava-core/src/main/java/rx/Scheduler.java b/rxjava-core/src/main/java/rx/Scheduler.java index 06a708d4d4..6fdfc286c0 100644 --- a/rxjava-core/src/main/java/rx/Scheduler.java +++ b/rxjava-core/src/main/java/rx/Scheduler.java @@ -212,12 +212,23 @@ public Subscription call(Scheduler scheduler, Void state) { } /** - * Returns the scheduler's notion of current absolute time in milliseconds. + * @return the scheduler's notion of current absolute time in milliseconds. */ public long now() { return System.currentTimeMillis(); } + /** + * Parallelism available to a Scheduler. + *

+ * This defaults to {@code Runtime.getRuntime().availableProcessors()} but can be overridden for use cases such as scheduling work on a computer cluster. + * + * @return the scheduler's available degree of parallelism. + */ + public int degreeOfParallelism() { + return Runtime.getRuntime().availableProcessors(); + } + public static class UnitTest { @SuppressWarnings("unchecked") // mocking is unchecked, unfortunately @Test diff --git a/rxjava-core/src/main/java/rx/operators/OperationParallel.java b/rxjava-core/src/main/java/rx/operators/OperationParallel.java new file mode 100644 index 0000000000..125fd2d291 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationParallel.java @@ -0,0 +1,99 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.junit.Assert.*; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import rx.Observable; +import rx.Scheduler; +import rx.concurrency.Schedulers; +import rx.observables.GroupedObservable; +import rx.util.functions.Action1; +import rx.util.functions.Func0; +import rx.util.functions.Func1; + +/** + * Identifies unit of work that can be executed in parallel on a given Scheduler. + */ +public final class OperationParallel { + + public static Observable parallel(Observable source, Func1, Observable> f) { + return parallel(source, f, Schedulers.threadPoolForComputation()); + } + + public static Observable parallel(final Observable source, final Func1, Observable> f, final Scheduler s) { + return Observable.defer(new Func0>() { + + @Override + public Observable call() { + final AtomicInteger i = new AtomicInteger(0); + return source.groupBy(new Func1() { + + @Override + public Integer call(T t) { + return i.incrementAndGet() % s.degreeOfParallelism(); + } + + }).flatMap(new Func1, Observable>() { + + @Override + public Observable call(GroupedObservable group) { + return f.call(group.observeOn(s)); + } + }).synchronize(); + } + }); + } + + public static class UnitTest { + + @Test + public void testParallel() { + int NUM = 1000; + final AtomicInteger count = new AtomicInteger(); + Observable.range(1, NUM).parallel( + new Func1, Observable>() { + + @Override + public Observable call(Observable o) { + return o.map(new Func1() { + + @Override + public Integer[] call(Integer t) { + return new Integer[] { t, t * 99 }; + } + + }); + } + }).toBlockingObservable().forEach(new Action1() { + + @Override + public void call(Integer[] v) { + count.incrementAndGet(); + System.out.println("V: " + v[0] + " R: " + v[1] + " Thread: " + Thread.currentThread()); + } + + }); + + // just making sure we finish and get the number we expect + assertEquals(NUM, count.get()); + } + } +} From c2fd36e7eb6063aa94c94039c497c142303691a0 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Wed, 18 Sep 2013 21:47:54 -0700 Subject: [PATCH 3/4] Making Observable.synchronize an instance method rather than static --- .../src/main/scala/rx/lang/scala/Observable.scala | 2 +- rxjava-core/src/main/java/rx/Observable.java | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index d26602bada..ee9ed92d5e 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -176,7 +176,7 @@ class Observable[+T](val asJava: rx.Observable[_ <: T]) * Observable, and that synchronously notifies its {@link Observer}s */ def synchronize: Observable[T] = { - Observable[T](JObservable.synchronize(asJava)) + Observable[T](asJava.synchronize) } /** diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 6f515b2be3..f790323ee8 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -1783,6 +1783,13 @@ public Observable synchronize() { return create(OperationSynchronize.synchronize(this)); } + /** + * @deprecated Replaced with instance method. + */ + @Deprecated + public static Observable synchronize(Observable source) { + return create(OperationSynchronize.synchronize(source)); + } /** * Emits an item each time interval (containing a sequential number). From e21805091b1f479ef81101dec4fb1b7e114753af Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Wed, 18 Sep 2013 21:53:46 -0700 Subject: [PATCH 4/4] Reorg fields --- .../src/main/java/rx/operators/ScheduledObserver.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/ScheduledObserver.java b/rxjava-core/src/main/java/rx/operators/ScheduledObserver.java index ed1e83e299..20416f2235 100644 --- a/rxjava-core/src/main/java/rx/operators/ScheduledObserver.java +++ b/rxjava-core/src/main/java/rx/operators/ScheduledObserver.java @@ -32,10 +32,12 @@ private final Scheduler scheduler; private final CompositeSubscription parentSubscription; private final EventLoop eventLoop = new EventLoop(); - - private final ConcurrentLinkedQueue> queue = new ConcurrentLinkedQueue>(); + final AtomicInteger counter = new AtomicInteger(); private final AtomicBoolean started = new AtomicBoolean(); + private final ConcurrentLinkedQueue> queue = new ConcurrentLinkedQueue>(); + + public ScheduledObserver(CompositeSubscription s, Observer underlying, Scheduler scheduler) { this.parentSubscription = s; this.underlying = underlying; @@ -57,8 +59,6 @@ public void onNext(final T args) { enqueue(new Notification(args)); } - final AtomicInteger counter = new AtomicInteger(); - private void enqueue(Notification notification) { // this must happen before synchronization between threads queue.offer(notification);