Skip to content

Commit

Permalink
ScheduledRunnable to honor interrupt settings from Schedulers.from usage
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Aug 10, 2024
1 parent dc9764e commit a92417d
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit

final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

ScheduledRunnable sr = new ScheduledRunnable(new SequentialDispose(mar, decoratedRun), tasks);
ScheduledRunnable sr = new ScheduledRunnable(new SequentialDispose(mar, decoratedRun), tasks, interruptibleWorker);
tasks.add(sr);

if (executor instanceof ScheduledExecutorService) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public final class ScheduledRunnable extends AtomicReferenceArray<Object>

private static final long serialVersionUID = -6120223772001106981L;
final Runnable actual;
final boolean interruptOnCancel;

/** Indicates that the parent tracking this task has been notified about its completion. */
static final Object PARENT_DISPOSED = new Object();
Expand All @@ -41,12 +42,26 @@ public final class ScheduledRunnable extends AtomicReferenceArray<Object>
/**
* Creates a ScheduledRunnable by wrapping the given action and setting
* up the optional parent.
* The underlying future will be interrupted if the task is disposed asynchronously.
* @param actual the runnable to wrap, not-null (not verified)
* @param parent the parent tracking container or null if none
*/
public ScheduledRunnable(Runnable actual, DisposableContainer parent) {
this(actual, parent, true);
}

/**
* Creates a ScheduledRunnable by wrapping the given action and setting
* up the optional parent.
* @param actual the runnable to wrap, not-null (not verified)
* @param parent the parent tracking container or null if none
* @param interruptOnCancel if true, the underlying future will be interrupted when disposing
* this task from a different thread than it is running on.
*/
public ScheduledRunnable(Runnable actual, DisposableContainer parent, boolean interruptOnCancel) {
super(3);
this.actual = actual;
this.interruptOnCancel = interruptOnCancel;
this.lazySet(0, parent);
}

Expand Down Expand Up @@ -95,7 +110,7 @@ public void setFuture(Future<?> f) {
return;
}
if (o == ASYNC_DISPOSED) {
f.cancel(true);
f.cancel(interruptOnCancel);
return;
}
if (compareAndSet(FUTURE_INDEX, o, f)) {
Expand All @@ -114,7 +129,7 @@ public void dispose() {
boolean async = get(THREAD_INDEX) != Thread.currentThread();
if (compareAndSet(FUTURE_INDEX, o, async ? ASYNC_DISPOSED : SYNC_DISPOSED)) {
if (o != null) {
((Future<?>)o).cancel(async);
((Future<?>)o).cancel(async && interruptOnCancel);
}
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -965,4 +965,149 @@ public void run() {
exec.shutdown();
}
}

public static class TrackInterruptScheduledExecutor extends ScheduledThreadPoolExecutor {

public final AtomicBoolean interruptReceived = new AtomicBoolean();

public TrackInterruptScheduledExecutor() {
super(10);
}

@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return new TrackingScheduledFuture<V>(super.schedule(callable, delay, unit));
}

class TrackingScheduledFuture<V> implements ScheduledFuture<V> {

ScheduledFuture<V> original;

TrackingScheduledFuture(ScheduledFuture<V> original) {
this.original = original;
}

@Override
public long getDelay(TimeUnit unit) {
return original.getDelay(unit);
}

@Override
public int compareTo(Delayed o) {
return original.compareTo(o);
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (mayInterruptIfRunning) {
interruptReceived.set(true);
}
return original.cancel(mayInterruptIfRunning);
}

@Override
public boolean isCancelled() {
return original.isCancelled();
}

@Override
public boolean isDone() {
return original.isDone();
}

@Override
public V get() throws InterruptedException, ExecutionException {
return original.get();
}

@Override
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return get(timeout, unit);
}
}
}

@Test
public void noInterruptBeforeRunningDelayedWorker() throws Throwable {
TrackInterruptScheduledExecutor exec = new TrackInterruptScheduledExecutor();

try {
Scheduler sch = Schedulers.from(exec, false);

Worker worker = sch.createWorker();

Disposable d = worker.schedule(() -> { }, 1, TimeUnit.SECONDS);

d.dispose();

int i = 150;

while (i-- > 0) {
assertFalse("Task interrupt detected", exec.interruptReceived.get());
Thread.sleep(10);
}

} finally {
exec.shutdownNow();
}
}

@Test
public void hasInterruptBeforeRunningDelayedWorker() throws Throwable {
TrackInterruptScheduledExecutor exec = new TrackInterruptScheduledExecutor();

try {
Scheduler sch = Schedulers.from(exec, true);

Worker worker = sch.createWorker();

Disposable d = worker.schedule(() -> { }, 1, TimeUnit.SECONDS);

d.dispose();

Thread.sleep(100);
assertTrue("Task interrupt detected", exec.interruptReceived.get());

} finally {
exec.shutdownNow();
}
}

@Test
public void noInterruptAfterRunningDelayedWorker() throws Throwable {
TrackInterruptScheduledExecutor exec = new TrackInterruptScheduledExecutor();

try {
Scheduler sch = Schedulers.from(exec, false);

Worker worker = sch.createWorker();
AtomicBoolean taskRun = new AtomicBoolean();

Disposable d = worker.schedule(() -> {
taskRun.set(true);
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
exec.interruptReceived.set(true);
}
}, 100, TimeUnit.MILLISECONDS);

Thread.sleep(150);
;
d.dispose();

int i = 50;

while (i-- > 0) {
assertFalse("Task interrupt detected", exec.interruptReceived.get());
Thread.sleep(10);
}

assertTrue("Task run at all", taskRun.get());

} finally {
exec.shutdownNow();
}
}
}

0 comments on commit a92417d

Please sign in to comment.