From f731ac78240455243a69e13584d3e07c53fbd1e0 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 20 Feb 2015 10:03:25 +0100 Subject: [PATCH] Helper method to submit an (interruptible) action to an executorservice --- .../internal/schedulers/ScheduledAction.java | 53 ++- src/main/java/rx/schedulers/Schedulers.java | 57 +++- .../java/rx/schedulers/SchedulersTest.java | 323 ++++++++++++++++++ 3 files changed, 420 insertions(+), 13 deletions(-) create mode 100644 src/test/java/rx/schedulers/SchedulersTest.java diff --git a/src/main/java/rx/internal/schedulers/ScheduledAction.java b/src/main/java/rx/internal/schedulers/ScheduledAction.java index 24240096c9..0537dac25c 100644 --- a/src/main/java/rx/internal/schedulers/ScheduledAction.java +++ b/src/main/java/rx/internal/schedulers/ScheduledAction.java @@ -16,28 +16,48 @@ package rx.internal.schedulers; import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.*; import rx.Subscription; import rx.exceptions.OnErrorNotImplementedException; import rx.functions.Action0; +import rx.internal.util.SubscriptionList; import rx.plugins.RxJavaPlugins; import rx.subscriptions.CompositeSubscription; /** * A {@code Runnable} that executes an {@code Action0} and can be cancelled. The analog is the * {@code Subscriber} in respect of an {@code Observer}. + *

System-wide properties: + *

*/ public final class ScheduledAction extends AtomicReference implements Runnable, Subscription { /** */ private static final long serialVersionUID = -3962399486978279857L; - final CompositeSubscription cancel; + static final boolean DEFAULT_INTERRUPT_ON_UNSUBSCRIBE; + static final String KEY_INTERRUPT_ON_UNSUBSCRIBE = "rx.scheduler.interrupt-on-unsubscribe"; + static { + String interruptOnUnsubscribeValue = System.getProperty(KEY_INTERRUPT_ON_UNSUBSCRIBE); + DEFAULT_INTERRUPT_ON_UNSUBSCRIBE = interruptOnUnsubscribeValue == null || "true".equals(interruptOnUnsubscribeValue); + } + final SubscriptionList cancel; final Action0 action; + volatile int interruptOnUnsubscribe; + static final AtomicIntegerFieldUpdater INTERRUPT_ON_UNSUBSCRIBE + = AtomicIntegerFieldUpdater.newUpdater(ScheduledAction.class, "interruptOnUnsubscribe"); public ScheduledAction(Action0 action) { + this(action, DEFAULT_INTERRUPT_ON_UNSUBSCRIBE); + } + + public ScheduledAction(Action0 action, boolean interruptOnUnsubscribe) { this.action = action; - this.cancel = new CompositeSubscription(); + this.cancel = new SubscriptionList(); + this.interruptOnUnsubscribe = interruptOnUnsubscribe ? 1 : 0; } @Override @@ -61,6 +81,21 @@ public void run() { } } + /** + * Sets the flag to indicate the underlying Future task should be interrupted on unsubscription or not. + * @param interrupt the new interruptible status + */ + public void setInterruptOnUnsubscribe(boolean interrupt) { + INTERRUPT_ON_UNSUBSCRIBE.lazySet(this, interrupt ? 1 : 0); + } + /** + * Returns {@code true} if the underlying Future task will be interrupted on unsubscription. + * @return the current interruptible status + */ + public boolean isInterruptOnUnsubscribe() { + return interruptOnUnsubscribe != 0; + } + @Override public boolean isUnsubscribed() { return cancel.isUnsubscribed(); @@ -68,9 +103,7 @@ public boolean isUnsubscribed() { @Override public void unsubscribe() { - if (!cancel.isUnsubscribed()) { - cancel.unsubscribe(); - } + cancel.unsubscribe(); } /** @@ -89,7 +122,7 @@ public void add(Subscription s) { * @param f the future to add */ public void add(final Future f) { - cancel.add(new FutureCompleter(f)); + add(new FutureCompleter(f)); } /** @@ -100,7 +133,7 @@ public void add(final Future f) { * the parent {@code CompositeSubscription} to add */ public void addParent(CompositeSubscription parent) { - cancel.add(new Remover(this, parent)); + add(new Remover(this, parent)); } /** @@ -119,7 +152,7 @@ private FutureCompleter(Future f) { @Override public void unsubscribe() { if (ScheduledAction.this.get() != Thread.currentThread()) { - f.cancel(true); + f.cancel(interruptOnUnsubscribe != 0); } else { f.cancel(false); } diff --git a/src/main/java/rx/schedulers/Schedulers.java b/src/main/java/rx/schedulers/Schedulers.java index 374448d695..dd6bc5f845 100644 --- a/src/main/java/rx/schedulers/Schedulers.java +++ b/src/main/java/rx/schedulers/Schedulers.java @@ -15,10 +15,13 @@ */ package rx.schedulers; -import rx.Scheduler; -import rx.plugins.RxJavaPlugins; +import java.util.concurrent.*; -import java.util.concurrent.Executor; +import rx.*; +import rx.functions.Action0; +import rx.internal.schedulers.ScheduledAction; +import rx.plugins.RxJavaPlugins; +import rx.subscriptions.CompositeSubscription; /** * Static factory methods for creating Schedulers. @@ -136,4 +139,52 @@ public static TestScheduler test() { public static Scheduler from(Executor executor) { return new ExecutorScheduler(executor); } + /** + * Submit an Action0 to the specified executor service with the option to interrupt the task + * on unsubscription and add it to a parent composite subscription. + * @param executor the target executor service + * @param action the action to execute + * @param parent if not {@code null} the subscription representing the action is added to this composite with logic to remove it + * once the action completes or is unsubscribed. + * @param interruptOnUnsubscribe if {@code false}, unsubscribing the task will not interrupt the task if it is running + * @return the Subscription representing the scheduled action which is also added to the {@code parent} composite + */ + public static Subscription submitTo(ExecutorService executor, Action0 action, CompositeSubscription parent, boolean interruptOnUnsubscribe) { + ScheduledAction sa = new ScheduledAction(action, interruptOnUnsubscribe); + + if (parent != null) { + parent.add(sa); + sa.addParent(parent); + } + + Future f = executor.submit(sa); + sa.add(f); + + return sa; + } + /** + * Submit an Action0 to the specified executor service with the given delay and the option to interrupt the task + * on unsubscription and add it to a parent composite subscription. + * @param executor the target executor service + * @param action the action to execute + * @param delay the delay value + * @param unit the time unit of the delay value + * @param parent if not {@code null} the subscription representing the action is added to this composite with logic to remove it + * once the action completes or is unsubscribed. + * @param interruptOnUnsubscribe if {@code false}, unsubscribing the task will not interrupt the task if it is running + * @return the Subscription representing the scheduled action which is also added to the {@code parent} composite + */ + public static Subscription submitTo(ScheduledExecutorService executor, Action0 action, long delay, TimeUnit unit, CompositeSubscription parent, boolean interruptOnUnsubscribe) { + ScheduledAction sa = new ScheduledAction(action, interruptOnUnsubscribe); + + if (parent != null) { + parent.add(sa); + sa.addParent(parent); + } + + Future f = executor.schedule(sa, delay, unit); + sa.add(f); + + return sa; + } } diff --git a/src/test/java/rx/schedulers/SchedulersTest.java b/src/test/java/rx/schedulers/SchedulersTest.java new file mode 100644 index 0000000000..5540907530 --- /dev/null +++ b/src/test/java/rx/schedulers/SchedulersTest.java @@ -0,0 +1,323 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.schedulers; + +import static org.junit.Assert.*; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Test; + +import rx.Subscription; +import rx.functions.Action0; +import rx.subscriptions.CompositeSubscription; + +public class SchedulersTest { + static final class RunAction extends AtomicBoolean implements Action0 { + /** */ + private static final long serialVersionUID = -3148738938700490457L; + private CountDownLatch startLatch = new CountDownLatch(1); + private CountDownLatch runLatch = new CountDownLatch(1); + private CountDownLatch completeLatch = new CountDownLatch(1); + private volatile boolean waitInterrupted; + @Override + public void call() { + startLatch.countDown(); + try { + runLatch.await(); + } catch (InterruptedException ex) { + waitInterrupted = true; + completeLatch.countDown(); + return; + } + lazySet(true); + completeLatch.countDown(); + } + private void await(CountDownLatch latch) { + try { + latch.await(); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + public void awaitStart() { + await(startLatch); + } + public void awaitComplete() { + await(completeLatch); + } + public boolean isWaitInterrupted() { + return waitInterrupted; + } + public void run() { + runLatch.countDown(); + } + } + @Test + public void submitToSimpleInterrupt() { + RunAction ra = new RunAction(); + + CompositeSubscription csub = new CompositeSubscription(); + + ExecutorService exec = Executors.newFixedThreadPool(1); + try { + Subscription s = Schedulers.submitTo(exec, ra, csub, true); + + ra.awaitStart(); + + csub.remove(s); + + ra.awaitComplete(); + + assertTrue(ra.isWaitInterrupted()); + assertFalse(ra.get()); + assertTrue(s.isUnsubscribed()); + + } finally { + exec.shutdownNow(); + } + + } + + @Test + public void submitToSimpleNoInterrupt() { + RunAction ra = new RunAction(); + + CompositeSubscription csub = new CompositeSubscription(); + + ExecutorService exec = Executors.newFixedThreadPool(1); + try { + Subscription s = Schedulers.submitTo(exec, ra, csub, false); + + ra.awaitStart(); + + csub.remove(s); + + ra.run(); + + ra.awaitComplete(); + + assertFalse(ra.isWaitInterrupted()); + assertTrue(ra.get()); + assertTrue(s.isUnsubscribed()); + + } finally { + exec.shutdownNow(); + } + + } + @Test + public void submitToSimpleInterruptNoParent() { + RunAction ra = new RunAction(); + + ExecutorService exec = Executors.newFixedThreadPool(1); + try { + Subscription s = Schedulers.submitTo(exec, ra, null, true); + + ra.awaitStart(); + + s.unsubscribe(); + + ra.awaitComplete(); + + assertTrue(ra.isWaitInterrupted()); + assertFalse(ra.get()); + assertTrue(s.isUnsubscribed()); + + } finally { + exec.shutdownNow(); + } + + } + + @Test + public void submitToSimpleNoInterruptNoParent() { + RunAction ra = new RunAction(); + + ExecutorService exec = Executors.newFixedThreadPool(1); + try { + Subscription s = Schedulers.submitTo(exec, ra, null, false); + + ra.awaitStart(); + + s.unsubscribe(); + + ra.run(); + + ra.awaitComplete(); + + assertFalse(ra.isWaitInterrupted()); + assertTrue(ra.get()); + assertTrue(s.isUnsubscribed()); + + } finally { + exec.shutdownNow(); + } + + } + @Test + public void submitToDelayedSimpleInterrupt() { + RunAction ra = new RunAction(); + + CompositeSubscription csub = new CompositeSubscription(); + + ScheduledExecutorService exec = Executors.newScheduledThreadPool(1); + try { + Subscription s = Schedulers.submitTo(exec, ra, 500, TimeUnit.MILLISECONDS, csub, true); + + ra.awaitStart(); + + csub.remove(s); + + ra.awaitComplete(); + + assertTrue(ra.isWaitInterrupted()); + assertFalse(ra.get()); + assertTrue(s.isUnsubscribed()); + + } finally { + exec.shutdownNow(); + } + + } + @Test(timeout = 3000) + public void submitToDelayedSimpleInterruptBeforeRun() throws InterruptedException { + RunAction ra = new RunAction(); + + CompositeSubscription csub = new CompositeSubscription(); + + ScheduledExecutorService exec = Executors.newScheduledThreadPool(1); + try { + Subscription s = Schedulers.submitTo(exec, ra, 1000, TimeUnit.MILLISECONDS, csub, true); + + Thread.sleep(500); + + csub.remove(s); + + Thread.sleep(1000); + + assertFalse(ra.isWaitInterrupted()); + assertFalse(ra.get()); + assertTrue(s.isUnsubscribed()); + } finally { + exec.shutdownNow(); + } + + } + + @Test + public void submitToDelayedSimpleNoInterrupt() { + RunAction ra = new RunAction(); + + CompositeSubscription csub = new CompositeSubscription(); + + ScheduledExecutorService exec = Executors.newScheduledThreadPool(1); + + try { + Subscription s = Schedulers.submitTo(exec, ra, 500, TimeUnit.MILLISECONDS, csub, false); + + ra.awaitStart(); + + csub.remove(s); + + ra.run(); + + ra.awaitComplete(); + + assertFalse(ra.isWaitInterrupted()); + assertTrue(ra.get()); + assertTrue(s.isUnsubscribed()); + + } finally { + exec.shutdownNow(); + } + + } + @Test + public void submitToDelayedSimpleInterruptNoParent() { + RunAction ra = new RunAction(); + + ScheduledExecutorService exec = Executors.newScheduledThreadPool(1); + try { + Subscription s = Schedulers.submitTo(exec, ra, 500, TimeUnit.MILLISECONDS, null, true); + + ra.awaitStart(); + + s.unsubscribe(); + + ra.awaitComplete(); + + assertTrue(ra.isWaitInterrupted()); + assertFalse(ra.get()); + assertTrue(s.isUnsubscribed()); + + } finally { + exec.shutdownNow(); + } + + } + @Test(timeout = 3000) + public void submitToDelayedSimpleInterruptBeforeRunNoParent() throws InterruptedException { + RunAction ra = new RunAction(); + + ScheduledExecutorService exec = Executors.newScheduledThreadPool(1); + try { + Subscription s = Schedulers.submitTo(exec, ra, 1000, TimeUnit.MILLISECONDS, null, true); + + Thread.sleep(500); + + s.unsubscribe(); + + Thread.sleep(1000); + + assertFalse(ra.isWaitInterrupted()); + assertFalse(ra.get()); + assertTrue(s.isUnsubscribed()); + } finally { + exec.shutdownNow(); + } + + } + + @Test + public void submitToDelayedSimpleNoInterruptNoParent() { + RunAction ra = new RunAction(); + + ScheduledExecutorService exec = Executors.newScheduledThreadPool(1); + + try { + Subscription s = Schedulers.submitTo(exec, ra, 500, TimeUnit.MILLISECONDS, null, false); + + ra.awaitStart(); + + s.unsubscribe(); + + ra.run(); + + ra.awaitComplete(); + + assertFalse(ra.isWaitInterrupted()); + assertTrue(ra.get()); + assertTrue(s.isUnsubscribed()); + + } finally { + exec.shutdownNow(); + } + + } + +}