Skip to content

Commit

Permalink
Fixed schedule race and task retention with ExecutorScheduler.
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Apr 30, 2015
1 parent 51e03cc commit 491e615
Show file tree
Hide file tree
Showing 2 changed files with 223 additions and 75 deletions.
126 changes: 55 additions & 71 deletions src/main/java/rx/schedulers/ExecutorScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,14 @@
*/
package rx.schedulers;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import rx.Scheduler;
import rx.Subscription;

import rx.*;
import rx.functions.Action0;
import rx.internal.schedulers.ScheduledAction;
import rx.plugins.RxJavaPlugins;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.MultipleAssignmentSubscription;
import rx.subscriptions.Subscriptions;
import rx.subscriptions.*;

/**
* Scheduler that wraps an Executor instance and establishes the Scheduler contract upon it.
Expand Down Expand Up @@ -58,12 +51,12 @@ static final class ExecutorSchedulerWorker extends Scheduler.Worker implements R
// TODO: use a better performing structure for task tracking
final CompositeSubscription tasks;
// TODO: use MpscLinkedQueue once available
final ConcurrentLinkedQueue<ExecutorAction> queue;
final ConcurrentLinkedQueue<ScheduledAction> queue;
final AtomicInteger wip;

public ExecutorSchedulerWorker(Executor executor) {
this.executor = executor;
this.queue = new ConcurrentLinkedQueue<ExecutorAction>();
this.queue = new ConcurrentLinkedQueue<ScheduledAction>();
this.wip = new AtomicInteger();
this.tasks = new CompositeSubscription();
}
Expand All @@ -73,11 +66,15 @@ public Subscription schedule(Action0 action) {
if (isUnsubscribed()) {
return Subscriptions.unsubscribed();
}
ExecutorAction ea = new ExecutorAction(action, tasks);
ScheduledAction ea = new ScheduledAction(action, tasks);
tasks.add(ea);
queue.offer(ea);
if (wip.getAndIncrement() == 0) {
try {
// note that since we schedule the emission of potentially multiple tasks
// there is no clear way to cancel this schedule from individual tasks
// so even if executor is an ExecutorService, we can't associate the future
// returned by submit() with any particular ScheduledAction
executor.execute(this);
} catch (RejectedExecutionException t) {
// cleanup if rejected
Expand All @@ -96,7 +93,10 @@ public Subscription schedule(Action0 action) {
@Override
public void run() {
do {
queue.poll().run();
ScheduledAction sa = queue.poll();
if (!sa.isUnsubscribed()) {
sa.run();
}
} while (wip.decrementAndGet() > 0);
}

Expand All @@ -115,28 +115,54 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit
service = GenericScheduledExecutorService.getInstance();
}

final MultipleAssignmentSubscription first = new MultipleAssignmentSubscription();
final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
// tasks.add(mas); // Needs a removal without unsubscription
mas.set(first);
tasks.add(mas);
final Subscription removeMas = Subscriptions.create(new Action0() {
@Override
public void call() {
tasks.remove(mas);
}
});

try {
Future<?> f = service.schedule(new Runnable() {
@Override
public void run() {
if (mas.isUnsubscribed()) {
return;
}
mas.set(schedule(action));
// tasks.delete(mas); // Needs a removal without unsubscription
ScheduledAction ea = new ScheduledAction(new Action0() {
@Override
public void call() {
if (mas.isUnsubscribed()) {
return;
}
}, delayTime, unit);
mas.set(Subscriptions.from(f));
// schedule the real action untimed
Subscription s2 = schedule(action);
mas.set(s2);
// unless the worker is unsubscribed, we should get a new ScheduledAction
if (s2.getClass() == ScheduledAction.class) {
// when this ScheduledAction completes, we need to remove the
// MAS referencing the whole setup to avoid leaks
((ScheduledAction)s2).add(removeMas);
}
}
});
// This will make sure if ea.call() gets executed before this line
// we don't override the current task in mas.
first.set(ea);
// we don't need to add ea to tasks because it will be tracked through mas/first


try {
Future<?> f = service.schedule(ea, delayTime, unit);
ea.add(f);
} catch (RejectedExecutionException t) {
// report the rejection to plugins
RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
throw t;
}

return mas;
/*
* This allows cancelling either the delayed schedule or the actual schedule referenced
* by mas and makes sure mas is removed from the tasks composite to avoid leaks.
*/
return removeMas;
}

@Override
Expand All @@ -150,46 +176,4 @@ public void unsubscribe() {
}

}

/** Runs the actual action and maintains an unsubscription state. */
static final class ExecutorAction implements Runnable, Subscription {
final Action0 actual;
final CompositeSubscription parent;
volatile int unsubscribed;
static final AtomicIntegerFieldUpdater<ExecutorAction> UNSUBSCRIBED_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(ExecutorAction.class, "unsubscribed");

public ExecutorAction(Action0 actual, CompositeSubscription parent) {
this.actual = actual;
this.parent = parent;
}

@Override
public void run() {
if (isUnsubscribed()) {
return;
}
try {
actual.call();
} catch (Throwable t) {
RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, t);
} finally {
unsubscribe();
}
}
@Override
public boolean isUnsubscribed() {
return unsubscribed != 0;
}

@Override
public void unsubscribe() {
if (UNSUBSCRIBED_UPDATER.compareAndSet(this, 0, 1)) {
parent.remove(this);
}
}

}
}
172 changes: 168 additions & 4 deletions src/test/java/rx/schedulers/ExecutorSchedulerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,20 @@
*/
package rx.schedulers;

import static org.junit.Assert.*;

import java.lang.management.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;
import rx.Scheduler;
import rx.internal.util.RxThreadFactory;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import rx.*;
import rx.Scheduler.Worker;
import rx.functions.*;
import rx.internal.schedulers.NewThreadWorker;
import rx.internal.util.RxThreadFactory;
import rx.schedulers.ExecutorScheduler.ExecutorSchedulerWorker;

public class ExecutorSchedulerTest extends AbstractSchedulerConcurrencyTests {

Expand All @@ -40,4 +48,160 @@ public final void testUnhandledErrorIsDeliveredToThreadHandler() throws Interrup
public final void testHandledErrorIsNotDeliveredToThreadHandler() throws InterruptedException {
SchedulerTests.testHandledErrorIsNotDeliveredToThreadHandler(getScheduler());
}
@Test(timeout = 30000)
public void testCancelledTaskRetention() throws InterruptedException {
System.out.println("Wait before GC");
Thread.sleep(1000);

System.out.println("GC");
System.gc();

Thread.sleep(1000);


MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage();
long initial = memHeap.getUsed();

System.out.printf("Starting: %.3f MB%n", initial / 1024.0 / 1024.0);

Scheduler.Worker w = Schedulers.io().createWorker();
for (int i = 0; i < 500000; i++) {
if (i % 50000 == 0) {
System.out.println(" -> still scheduling: " + i);
}
w.schedule(Actions.empty(), 1, TimeUnit.DAYS);
}

memHeap = memoryMXBean.getHeapMemoryUsage();
long after = memHeap.getUsed();
System.out.printf("Peak: %.3f MB%n", after / 1024.0 / 1024.0);

w.unsubscribe();

System.out.println("Wait before second GC");
Thread.sleep(NewThreadWorker.PURGE_FREQUENCY + 2000);

System.out.println("Second GC");
System.gc();

Thread.sleep(1000);

memHeap = memoryMXBean.getHeapMemoryUsage();
long finish = memHeap.getUsed();
System.out.printf("After: %.3f MB%n", finish / 1024.0 / 1024.0);

if (finish > initial * 5) {
fail(String.format("Tasks retained: %.3f -> %.3f -> %.3f", initial / 1024 / 1024.0, after / 1024 / 1024.0, finish / 1024 / 1024d));
}
}

/** A simple executor which queues tasks and executes them one-by-one if executeOne() is called. */
static final class TestExecutor implements Executor {
final ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<Runnable>();
@Override
public void execute(Runnable command) {
queue.offer(command);
}
public void executeOne() {
Runnable r = queue.poll();
if (r != null) {
r.run();
}
}
public void executeAll() {
Runnable r;
while ((r = queue.poll()) != null) {
r.run();
}
}
}

@Test
public void testCancelledTasksDontRun() {
final AtomicInteger calls = new AtomicInteger();
Action0 task = new Action0() {
@Override
public void call() {
calls.getAndIncrement();
}
};
TestExecutor exec = new TestExecutor();
Scheduler custom = Schedulers.from(exec);
Worker w = custom.createWorker();
try {
Subscription s1 = w.schedule(task);
Subscription s2 = w.schedule(task);
Subscription s3 = w.schedule(task);

s1.unsubscribe();
s2.unsubscribe();
s3.unsubscribe();

exec.executeAll();

assertEquals(0, calls.get());
} finally {
w.unsubscribe();
}
}
@Test
public void testCancelledWorkerDoesntRunTasks() {
final AtomicInteger calls = new AtomicInteger();
Action0 task = new Action0() {
@Override
public void call() {
calls.getAndIncrement();
}
};
TestExecutor exec = new TestExecutor();
Scheduler custom = Schedulers.from(exec);
Worker w = custom.createWorker();
try {
w.schedule(task);
w.schedule(task);
w.schedule(task);
} finally {
w.unsubscribe();
}
exec.executeAll();
assertEquals(0, calls.get());
}
@Test
public void testNoTimedTaskAfterScheduleRetention() throws InterruptedException {
Executor e = new Executor() {
@Override
public void execute(Runnable command) {
command.run();
}
};
ExecutorSchedulerWorker w = (ExecutorSchedulerWorker)Schedulers.from(e).createWorker();

w.schedule(Actions.empty(), 1, TimeUnit.MILLISECONDS);

assertTrue(w.tasks.hasSubscriptions());

Thread.sleep(100);

assertFalse(w.tasks.hasSubscriptions());
}

@Test
public void testNoTimedTaskPartRetention() {
Executor e = new Executor() {
@Override
public void execute(Runnable command) {

}
};
ExecutorSchedulerWorker w = (ExecutorSchedulerWorker)Schedulers.from(e).createWorker();

Subscription s = w.schedule(Actions.empty(), 1, TimeUnit.DAYS);

assertTrue(w.tasks.hasSubscriptions());

s.unsubscribe();

assertFalse(w.tasks.hasSubscriptions());
}
}

0 comments on commit 491e615

Please sign in to comment.