From a92417d44da345becdbccc74fc998e7c1361404b Mon Sep 17 00:00:00 2001 From: akarnokd Date: Sat, 10 Aug 2024 09:15:52 +0200 Subject: [PATCH] ScheduledRunnable to honor interrupt settings from Schedulers.from usage --- .../schedulers/ExecutorScheduler.java | 2 +- .../schedulers/ScheduledRunnable.java | 19 ++- .../ExecutorSchedulerInterruptibleTest.java | 145 ++++++++++++++++++ 3 files changed, 163 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/reactivex/rxjava3/internal/schedulers/ExecutorScheduler.java b/src/main/java/io/reactivex/rxjava3/internal/schedulers/ExecutorScheduler.java index 7c7b017988..fa0bcab7f8 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/schedulers/ExecutorScheduler.java +++ b/src/main/java/io/reactivex/rxjava3/internal/schedulers/ExecutorScheduler.java @@ -204,7 +204,7 @@ public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); - ScheduledRunnable sr = new ScheduledRunnable(new SequentialDispose(mar, decoratedRun), tasks); + ScheduledRunnable sr = new ScheduledRunnable(new SequentialDispose(mar, decoratedRun), tasks, interruptibleWorker); tasks.add(sr); if (executor instanceof ScheduledExecutorService) { diff --git a/src/main/java/io/reactivex/rxjava3/internal/schedulers/ScheduledRunnable.java b/src/main/java/io/reactivex/rxjava3/internal/schedulers/ScheduledRunnable.java index 14c197bccc..2a1baa641a 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/schedulers/ScheduledRunnable.java +++ b/src/main/java/io/reactivex/rxjava3/internal/schedulers/ScheduledRunnable.java @@ -24,6 +24,7 @@ public final class ScheduledRunnable extends AtomicReferenceArray private static final long serialVersionUID = -6120223772001106981L; final Runnable actual; + final boolean interruptOnCancel; /** Indicates that the parent tracking this task has been notified about its completion. */ static final Object PARENT_DISPOSED = new Object(); @@ -41,12 +42,26 @@ public final class ScheduledRunnable extends AtomicReferenceArray /** * Creates a ScheduledRunnable by wrapping the given action and setting * up the optional parent. + * The underlying future will be interrupted if the task is disposed asynchronously. * @param actual the runnable to wrap, not-null (not verified) * @param parent the parent tracking container or null if none */ public ScheduledRunnable(Runnable actual, DisposableContainer parent) { + this(actual, parent, true); + } + + /** + * Creates a ScheduledRunnable by wrapping the given action and setting + * up the optional parent. + * @param actual the runnable to wrap, not-null (not verified) + * @param parent the parent tracking container or null if none + * @param interruptOnCancel if true, the underlying future will be interrupted when disposing + * this task from a different thread than it is running on. + */ + public ScheduledRunnable(Runnable actual, DisposableContainer parent, boolean interruptOnCancel) { super(3); this.actual = actual; + this.interruptOnCancel = interruptOnCancel; this.lazySet(0, parent); } @@ -95,7 +110,7 @@ public void setFuture(Future f) { return; } if (o == ASYNC_DISPOSED) { - f.cancel(true); + f.cancel(interruptOnCancel); return; } if (compareAndSet(FUTURE_INDEX, o, f)) { @@ -114,7 +129,7 @@ public void dispose() { boolean async = get(THREAD_INDEX) != Thread.currentThread(); if (compareAndSet(FUTURE_INDEX, o, async ? ASYNC_DISPOSED : SYNC_DISPOSED)) { if (o != null) { - ((Future)o).cancel(async); + ((Future)o).cancel(async && interruptOnCancel); } break; } diff --git a/src/test/java/io/reactivex/rxjava3/schedulers/ExecutorSchedulerInterruptibleTest.java b/src/test/java/io/reactivex/rxjava3/schedulers/ExecutorSchedulerInterruptibleTest.java index f4b5f82ec4..074ac8039a 100644 --- a/src/test/java/io/reactivex/rxjava3/schedulers/ExecutorSchedulerInterruptibleTest.java +++ b/src/test/java/io/reactivex/rxjava3/schedulers/ExecutorSchedulerInterruptibleTest.java @@ -965,4 +965,149 @@ public void run() { exec.shutdown(); } } + + public static class TrackInterruptScheduledExecutor extends ScheduledThreadPoolExecutor { + + public final AtomicBoolean interruptReceived = new AtomicBoolean(); + + public TrackInterruptScheduledExecutor() { + super(10); + } + + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + return new TrackingScheduledFuture(super.schedule(callable, delay, unit)); + } + + class TrackingScheduledFuture implements ScheduledFuture { + + ScheduledFuture original; + + TrackingScheduledFuture(ScheduledFuture original) { + this.original = original; + } + + @Override + public long getDelay(TimeUnit unit) { + return original.getDelay(unit); + } + + @Override + public int compareTo(Delayed o) { + return original.compareTo(o); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (mayInterruptIfRunning) { + interruptReceived.set(true); + } + return original.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return original.isCancelled(); + } + + @Override + public boolean isDone() { + return original.isDone(); + } + + @Override + public V get() throws InterruptedException, ExecutionException { + return original.get(); + } + + @Override + public V get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return get(timeout, unit); + } + } + } + + @Test + public void noInterruptBeforeRunningDelayedWorker() throws Throwable { + TrackInterruptScheduledExecutor exec = new TrackInterruptScheduledExecutor(); + + try { + Scheduler sch = Schedulers.from(exec, false); + + Worker worker = sch.createWorker(); + + Disposable d = worker.schedule(() -> { }, 1, TimeUnit.SECONDS); + + d.dispose(); + + int i = 150; + + while (i-- > 0) { + assertFalse("Task interrupt detected", exec.interruptReceived.get()); + Thread.sleep(10); + } + + } finally { + exec.shutdownNow(); + } + } + + @Test + public void hasInterruptBeforeRunningDelayedWorker() throws Throwable { + TrackInterruptScheduledExecutor exec = new TrackInterruptScheduledExecutor(); + + try { + Scheduler sch = Schedulers.from(exec, true); + + Worker worker = sch.createWorker(); + + Disposable d = worker.schedule(() -> { }, 1, TimeUnit.SECONDS); + + d.dispose(); + + Thread.sleep(100); + assertTrue("Task interrupt detected", exec.interruptReceived.get()); + + } finally { + exec.shutdownNow(); + } + } + + @Test + public void noInterruptAfterRunningDelayedWorker() throws Throwable { + TrackInterruptScheduledExecutor exec = new TrackInterruptScheduledExecutor(); + + try { + Scheduler sch = Schedulers.from(exec, false); + + Worker worker = sch.createWorker(); + AtomicBoolean taskRun = new AtomicBoolean(); + + Disposable d = worker.schedule(() -> { + taskRun.set(true); + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + exec.interruptReceived.set(true); + } + }, 100, TimeUnit.MILLISECONDS); + + Thread.sleep(150); + ; + d.dispose(); + + int i = 50; + + while (i-- > 0) { + assertFalse("Task interrupt detected", exec.interruptReceived.get()); + Thread.sleep(10); + } + + assertTrue("Task run at all", taskRun.get()); + + } finally { + exec.shutdownNow(); + } + } }