From 491e615d4d1a11e583b8c7bb22ea86b49794dc8b Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 30 Apr 2015 09:45:27 +0200 Subject: [PATCH] Fixed schedule race and task retention with ExecutorScheduler. --- .../java/rx/schedulers/ExecutorScheduler.java | 126 ++++++------- .../rx/schedulers/ExecutorSchedulerTest.java | 172 +++++++++++++++++- 2 files changed, 223 insertions(+), 75 deletions(-) diff --git a/src/main/java/rx/schedulers/ExecutorScheduler.java b/src/main/java/rx/schedulers/ExecutorScheduler.java index 3f8c8899a6..ce9643cf2b 100644 --- a/src/main/java/rx/schedulers/ExecutorScheduler.java +++ b/src/main/java/rx/schedulers/ExecutorScheduler.java @@ -15,21 +15,14 @@ */ package rx.schedulers; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import rx.Scheduler; -import rx.Subscription; + +import rx.*; import rx.functions.Action0; +import rx.internal.schedulers.ScheduledAction; import rx.plugins.RxJavaPlugins; -import rx.subscriptions.CompositeSubscription; -import rx.subscriptions.MultipleAssignmentSubscription; -import rx.subscriptions.Subscriptions; +import rx.subscriptions.*; /** * Scheduler that wraps an Executor instance and establishes the Scheduler contract upon it. @@ -58,12 +51,12 @@ static final class ExecutorSchedulerWorker extends Scheduler.Worker implements R // TODO: use a better performing structure for task tracking final CompositeSubscription tasks; // TODO: use MpscLinkedQueue once available - final ConcurrentLinkedQueue queue; + final ConcurrentLinkedQueue queue; final AtomicInteger wip; public ExecutorSchedulerWorker(Executor executor) { this.executor = executor; - this.queue = new ConcurrentLinkedQueue(); + this.queue = new ConcurrentLinkedQueue(); this.wip = new AtomicInteger(); this.tasks = new CompositeSubscription(); } @@ -73,11 +66,15 @@ public Subscription schedule(Action0 action) { if (isUnsubscribed()) { return Subscriptions.unsubscribed(); } - ExecutorAction ea = new ExecutorAction(action, tasks); + ScheduledAction ea = new ScheduledAction(action, tasks); tasks.add(ea); queue.offer(ea); if (wip.getAndIncrement() == 0) { try { + // note that since we schedule the emission of potentially multiple tasks + // there is no clear way to cancel this schedule from individual tasks + // so even if executor is an ExecutorService, we can't associate the future + // returned by submit() with any particular ScheduledAction executor.execute(this); } catch (RejectedExecutionException t) { // cleanup if rejected @@ -96,7 +93,10 @@ public Subscription schedule(Action0 action) { @Override public void run() { do { - queue.poll().run(); + ScheduledAction sa = queue.poll(); + if (!sa.isUnsubscribed()) { + sa.run(); + } } while (wip.decrementAndGet() > 0); } @@ -115,28 +115,54 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit service = GenericScheduledExecutorService.getInstance(); } + final MultipleAssignmentSubscription first = new MultipleAssignmentSubscription(); final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription(); - // tasks.add(mas); // Needs a removal without unsubscription + mas.set(first); + tasks.add(mas); + final Subscription removeMas = Subscriptions.create(new Action0() { + @Override + public void call() { + tasks.remove(mas); + } + }); - try { - Future f = service.schedule(new Runnable() { - @Override - public void run() { - if (mas.isUnsubscribed()) { - return; - } - mas.set(schedule(action)); - // tasks.delete(mas); // Needs a removal without unsubscription + ScheduledAction ea = new ScheduledAction(new Action0() { + @Override + public void call() { + if (mas.isUnsubscribed()) { + return; } - }, delayTime, unit); - mas.set(Subscriptions.from(f)); + // schedule the real action untimed + Subscription s2 = schedule(action); + mas.set(s2); + // unless the worker is unsubscribed, we should get a new ScheduledAction + if (s2.getClass() == ScheduledAction.class) { + // when this ScheduledAction completes, we need to remove the + // MAS referencing the whole setup to avoid leaks + ((ScheduledAction)s2).add(removeMas); + } + } + }); + // This will make sure if ea.call() gets executed before this line + // we don't override the current task in mas. + first.set(ea); + // we don't need to add ea to tasks because it will be tracked through mas/first + + + try { + Future f = service.schedule(ea, delayTime, unit); + ea.add(f); } catch (RejectedExecutionException t) { // report the rejection to plugins RxJavaPlugins.getInstance().getErrorHandler().handleError(t); throw t; } - return mas; + /* + * This allows cancelling either the delayed schedule or the actual schedule referenced + * by mas and makes sure mas is removed from the tasks composite to avoid leaks. + */ + return removeMas; } @Override @@ -150,46 +176,4 @@ public void unsubscribe() { } } - - /** Runs the actual action and maintains an unsubscription state. */ - static final class ExecutorAction implements Runnable, Subscription { - final Action0 actual; - final CompositeSubscription parent; - volatile int unsubscribed; - static final AtomicIntegerFieldUpdater UNSUBSCRIBED_UPDATER - = AtomicIntegerFieldUpdater.newUpdater(ExecutorAction.class, "unsubscribed"); - - public ExecutorAction(Action0 actual, CompositeSubscription parent) { - this.actual = actual; - this.parent = parent; - } - - @Override - public void run() { - if (isUnsubscribed()) { - return; - } - try { - actual.call(); - } catch (Throwable t) { - RxJavaPlugins.getInstance().getErrorHandler().handleError(t); - Thread thread = Thread.currentThread(); - thread.getUncaughtExceptionHandler().uncaughtException(thread, t); - } finally { - unsubscribe(); - } - } - @Override - public boolean isUnsubscribed() { - return unsubscribed != 0; - } - - @Override - public void unsubscribe() { - if (UNSUBSCRIBED_UPDATER.compareAndSet(this, 0, 1)) { - parent.remove(this); - } - } - - } } diff --git a/src/test/java/rx/schedulers/ExecutorSchedulerTest.java b/src/test/java/rx/schedulers/ExecutorSchedulerTest.java index 9fcd303807..b11f7879e1 100644 --- a/src/test/java/rx/schedulers/ExecutorSchedulerTest.java +++ b/src/test/java/rx/schedulers/ExecutorSchedulerTest.java @@ -15,12 +15,20 @@ */ package rx.schedulers; +import static org.junit.Assert.*; + +import java.lang.management.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + import org.junit.Test; -import rx.Scheduler; -import rx.internal.util.RxThreadFactory; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; +import rx.*; +import rx.Scheduler.Worker; +import rx.functions.*; +import rx.internal.schedulers.NewThreadWorker; +import rx.internal.util.RxThreadFactory; +import rx.schedulers.ExecutorScheduler.ExecutorSchedulerWorker; public class ExecutorSchedulerTest extends AbstractSchedulerConcurrencyTests { @@ -40,4 +48,160 @@ public final void testUnhandledErrorIsDeliveredToThreadHandler() throws Interrup public final void testHandledErrorIsNotDeliveredToThreadHandler() throws InterruptedException { SchedulerTests.testHandledErrorIsNotDeliveredToThreadHandler(getScheduler()); } + @Test(timeout = 30000) + public void testCancelledTaskRetention() throws InterruptedException { + System.out.println("Wait before GC"); + Thread.sleep(1000); + + System.out.println("GC"); + System.gc(); + + Thread.sleep(1000); + + + MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage(); + long initial = memHeap.getUsed(); + + System.out.printf("Starting: %.3f MB%n", initial / 1024.0 / 1024.0); + + Scheduler.Worker w = Schedulers.io().createWorker(); + for (int i = 0; i < 500000; i++) { + if (i % 50000 == 0) { + System.out.println(" -> still scheduling: " + i); + } + w.schedule(Actions.empty(), 1, TimeUnit.DAYS); + } + + memHeap = memoryMXBean.getHeapMemoryUsage(); + long after = memHeap.getUsed(); + System.out.printf("Peak: %.3f MB%n", after / 1024.0 / 1024.0); + + w.unsubscribe(); + + System.out.println("Wait before second GC"); + Thread.sleep(NewThreadWorker.PURGE_FREQUENCY + 2000); + + System.out.println("Second GC"); + System.gc(); + + Thread.sleep(1000); + + memHeap = memoryMXBean.getHeapMemoryUsage(); + long finish = memHeap.getUsed(); + System.out.printf("After: %.3f MB%n", finish / 1024.0 / 1024.0); + + if (finish > initial * 5) { + fail(String.format("Tasks retained: %.3f -> %.3f -> %.3f", initial / 1024 / 1024.0, after / 1024 / 1024.0, finish / 1024 / 1024d)); + } + } + + /** A simple executor which queues tasks and executes them one-by-one if executeOne() is called. */ + static final class TestExecutor implements Executor { + final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); + @Override + public void execute(Runnable command) { + queue.offer(command); + } + public void executeOne() { + Runnable r = queue.poll(); + if (r != null) { + r.run(); + } + } + public void executeAll() { + Runnable r; + while ((r = queue.poll()) != null) { + r.run(); + } + } + } + + @Test + public void testCancelledTasksDontRun() { + final AtomicInteger calls = new AtomicInteger(); + Action0 task = new Action0() { + @Override + public void call() { + calls.getAndIncrement(); + } + }; + TestExecutor exec = new TestExecutor(); + Scheduler custom = Schedulers.from(exec); + Worker w = custom.createWorker(); + try { + Subscription s1 = w.schedule(task); + Subscription s2 = w.schedule(task); + Subscription s3 = w.schedule(task); + + s1.unsubscribe(); + s2.unsubscribe(); + s3.unsubscribe(); + + exec.executeAll(); + + assertEquals(0, calls.get()); + } finally { + w.unsubscribe(); + } + } + @Test + public void testCancelledWorkerDoesntRunTasks() { + final AtomicInteger calls = new AtomicInteger(); + Action0 task = new Action0() { + @Override + public void call() { + calls.getAndIncrement(); + } + }; + TestExecutor exec = new TestExecutor(); + Scheduler custom = Schedulers.from(exec); + Worker w = custom.createWorker(); + try { + w.schedule(task); + w.schedule(task); + w.schedule(task); + } finally { + w.unsubscribe(); + } + exec.executeAll(); + assertEquals(0, calls.get()); + } + @Test + public void testNoTimedTaskAfterScheduleRetention() throws InterruptedException { + Executor e = new Executor() { + @Override + public void execute(Runnable command) { + command.run(); + } + }; + ExecutorSchedulerWorker w = (ExecutorSchedulerWorker)Schedulers.from(e).createWorker(); + + w.schedule(Actions.empty(), 1, TimeUnit.MILLISECONDS); + + assertTrue(w.tasks.hasSubscriptions()); + + Thread.sleep(100); + + assertFalse(w.tasks.hasSubscriptions()); + } + + @Test + public void testNoTimedTaskPartRetention() { + Executor e = new Executor() { + @Override + public void execute(Runnable command) { + + } + }; + ExecutorSchedulerWorker w = (ExecutorSchedulerWorker)Schedulers.from(e).createWorker(); + + Subscription s = w.schedule(Actions.empty(), 1, TimeUnit.DAYS); + + assertTrue(w.tasks.hasSubscriptions()); + + s.unsubscribe(); + + assertFalse(w.tasks.hasSubscriptions()); + } }