From 2734a33d1fd03a46b0ac9d11793c9433ac148ee8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Thu, 9 Nov 2017 20:12:52 +0100 Subject: [PATCH 1/3] 2.x: distinguish between sync and async dispose in ScheduledRunnable --- .../schedulers/ScheduledRunnable.java | 34 +++++++----- .../schedulers/ScheduledRunnableTest.java | 52 +++++++++++++++++++ 2 files changed, 74 insertions(+), 12 deletions(-) diff --git a/src/main/java/io/reactivex/internal/schedulers/ScheduledRunnable.java b/src/main/java/io/reactivex/internal/schedulers/ScheduledRunnable.java index c942deacfa..e4414db800 100644 --- a/src/main/java/io/reactivex/internal/schedulers/ScheduledRunnable.java +++ b/src/main/java/io/reactivex/internal/schedulers/ScheduledRunnable.java @@ -26,7 +26,12 @@ public final class ScheduledRunnable extends AtomicReferenceArray private static final long serialVersionUID = -6120223772001106981L; final Runnable actual; - static final Object DISPOSED = new Object(); + /** Indicates that the parent tracking this task has been notified about its completion. */ + static final Object PARENT_DISPOSED = new Object(); + /** Indicates the dispose() was called from within the run/call method. */ + static final Object SYNC_DISPOSED = new Object(); + /** Indicates the dispose() was called from another thread. */ + static final Object ASYNC_DISPOSED = new Object(); static final Object DONE = new Object(); @@ -66,13 +71,13 @@ public void run() { } finally { lazySet(THREAD_INDEX, null); Object o = get(PARENT_INDEX); - if (o != DISPOSED && o != null && compareAndSet(PARENT_INDEX, o, DONE)) { + if (o != PARENT_DISPOSED && o != null && compareAndSet(PARENT_INDEX, o, DONE)) { ((DisposableContainer)o).delete(this); } for (;;) { o = get(FUTURE_INDEX); - if (o == DISPOSED || compareAndSet(FUTURE_INDEX, o, DONE)) { + if (o == SYNC_DISPOSED || o == ASYNC_DISPOSED || compareAndSet(FUTURE_INDEX, o, DONE)) { break; } } @@ -85,8 +90,12 @@ public void setFuture(Future f) { if (o == DONE) { return; } - if (o == DISPOSED) { - f.cancel(get(THREAD_INDEX) != Thread.currentThread()); + if (o == SYNC_DISPOSED) { + f.cancel(false); + return; + } + if (o == ASYNC_DISPOSED) { + f.cancel(true); return; } if (compareAndSet(FUTURE_INDEX, o, f)) { @@ -99,12 +108,13 @@ public void setFuture(Future f) { public void dispose() { for (;;) { Object o = get(FUTURE_INDEX); - if (o == DONE || o == DISPOSED) { + if (o == DONE || o == SYNC_DISPOSED || o == ASYNC_DISPOSED) { break; } - if (compareAndSet(FUTURE_INDEX, o, DISPOSED)) { + boolean async = get(THREAD_INDEX) != Thread.currentThread(); + if (compareAndSet(FUTURE_INDEX, o, async ? ASYNC_DISPOSED : SYNC_DISPOSED)) { if (o != null) { - ((Future)o).cancel(get(THREAD_INDEX) != Thread.currentThread()); + ((Future)o).cancel(async); } break; } @@ -112,10 +122,10 @@ public void dispose() { for (;;) { Object o = get(PARENT_INDEX); - if (o == DONE || o == DISPOSED || o == null) { + if (o == DONE || o == PARENT_DISPOSED || o == null) { return; } - if (compareAndSet(PARENT_INDEX, o, DISPOSED)) { + if (compareAndSet(PARENT_INDEX, o, PARENT_DISPOSED)) { ((DisposableContainer)o).delete(this); return; } @@ -124,7 +134,7 @@ public void dispose() { @Override public boolean isDisposed() { - Object o = get(FUTURE_INDEX); - return o == DISPOSED || o == DONE; + Object o = get(PARENT_INDEX); + return o == PARENT_DISPOSED || o == DONE; } } diff --git a/src/test/java/io/reactivex/internal/schedulers/ScheduledRunnableTest.java b/src/test/java/io/reactivex/internal/schedulers/ScheduledRunnableTest.java index 169767d4c6..da86973016 100644 --- a/src/test/java/io/reactivex/internal/schedulers/ScheduledRunnableTest.java +++ b/src/test/java/io/reactivex/internal/schedulers/ScheduledRunnableTest.java @@ -18,6 +18,7 @@ import java.lang.Thread.UncaughtExceptionHandler; import java.util.List; import java.util.concurrent.FutureTask; +import java.util.concurrent.atomic.*; import org.junit.Test; @@ -283,4 +284,55 @@ public void run() { TestHelper.race(r1, r2); } } + + @Test + public void syncWorkerCancelRace() { + for (int i = 0; i < 10000; i++) { + final CompositeDisposable set = new CompositeDisposable(); + final AtomicBoolean interrupted = new AtomicBoolean(); + final AtomicInteger sync = new AtomicInteger(2); + final AtomicInteger syncb = new AtomicInteger(2); + + Runnable r0 = new Runnable() { + @Override + public void run() { + set.dispose(); + if (sync.decrementAndGet() != 0) { + while (sync.get() != 0) { } + } + if (syncb.decrementAndGet() != 0) { + while (syncb.get() != 0) { } + } + for (int j = 0; j < 1000; j++) { + if (Thread.currentThread().isInterrupted()) { + interrupted.set(true); + break; + } + } + } + }; + + final ScheduledRunnable run = new ScheduledRunnable(r0, set); + set.add(run); + + final FutureTask ft = new FutureTask(run, null); + + Runnable r2 = new Runnable() { + @Override + public void run() { + if (sync.decrementAndGet() != 0) { + while (sync.get() != 0) { } + } + run.setFuture(ft); + if (syncb.decrementAndGet() != 0) { + while (syncb.get() != 0) { } + } + } + }; + + TestHelper.race(ft, r2); + + assertFalse("The task was interrupted", interrupted.get()); + } + } } From 3e911b4513ddaec3159e40a9670edf2e6887fd44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Thu, 9 Nov 2017 21:21:59 +0100 Subject: [PATCH 2/3] Coverage improvement and fix a missing state change --- .../schedulers/ScheduledRunnable.java | 2 +- .../schedulers/ScheduledRunnableTest.java | 36 +++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/reactivex/internal/schedulers/ScheduledRunnable.java b/src/main/java/io/reactivex/internal/schedulers/ScheduledRunnable.java index e4414db800..61218fbe40 100644 --- a/src/main/java/io/reactivex/internal/schedulers/ScheduledRunnable.java +++ b/src/main/java/io/reactivex/internal/schedulers/ScheduledRunnable.java @@ -71,7 +71,7 @@ public void run() { } finally { lazySet(THREAD_INDEX, null); Object o = get(PARENT_INDEX); - if (o != PARENT_DISPOSED && o != null && compareAndSet(PARENT_INDEX, o, DONE)) { + if (o != PARENT_DISPOSED && compareAndSet(PARENT_INDEX, o, DONE) && o != null) { ((DisposableContainer)o).delete(this); } diff --git a/src/test/java/io/reactivex/internal/schedulers/ScheduledRunnableTest.java b/src/test/java/io/reactivex/internal/schedulers/ScheduledRunnableTest.java index da86973016..ae853348aa 100644 --- a/src/test/java/io/reactivex/internal/schedulers/ScheduledRunnableTest.java +++ b/src/test/java/io/reactivex/internal/schedulers/ScheduledRunnableTest.java @@ -335,4 +335,40 @@ public void run() { assertFalse("The task was interrupted", interrupted.get()); } } + + @Test + public void disposeAfterRun() { + final ScheduledRunnable run = new ScheduledRunnable(Functions.EMPTY_RUNNABLE, null); + + run.run(); + assertEquals(ScheduledRunnable.DONE, run.get(ScheduledRunnable.FUTURE_INDEX)); + + run.dispose(); + assertEquals(ScheduledRunnable.DONE, run.get(ScheduledRunnable.FUTURE_INDEX)); + } + + @Test + public void syncDisposeIdempotent() { + final ScheduledRunnable run = new ScheduledRunnable(Functions.EMPTY_RUNNABLE, null); + run.set(ScheduledRunnable.THREAD_INDEX, Thread.currentThread()); + + run.dispose(); + assertEquals(ScheduledRunnable.SYNC_DISPOSED, run.get(ScheduledRunnable.FUTURE_INDEX)); + run.dispose(); + assertEquals(ScheduledRunnable.SYNC_DISPOSED, run.get(ScheduledRunnable.FUTURE_INDEX)); + run.run(); + assertEquals(ScheduledRunnable.SYNC_DISPOSED, run.get(ScheduledRunnable.FUTURE_INDEX)); + } + + @Test + public void asyncDisposeIdempotent() { + final ScheduledRunnable run = new ScheduledRunnable(Functions.EMPTY_RUNNABLE, null); + + run.dispose(); + assertEquals(ScheduledRunnable.ASYNC_DISPOSED, run.get(ScheduledRunnable.FUTURE_INDEX)); + run.dispose(); + assertEquals(ScheduledRunnable.ASYNC_DISPOSED, run.get(ScheduledRunnable.FUTURE_INDEX)); + run.run(); + assertEquals(ScheduledRunnable.ASYNC_DISPOSED, run.get(ScheduledRunnable.FUTURE_INDEX)); + } } From 049f6f985905e51a064fb302d1ad71f6ca36e8ad Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 15 Nov 2017 10:43:21 +0100 Subject: [PATCH 3/3] Add test case for the parent-done reordered check --- .../schedulers/ScheduledRunnableTest.java | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/src/test/java/io/reactivex/internal/schedulers/ScheduledRunnableTest.java b/src/test/java/io/reactivex/internal/schedulers/ScheduledRunnableTest.java index ae853348aa..49d20edf80 100644 --- a/src/test/java/io/reactivex/internal/schedulers/ScheduledRunnableTest.java +++ b/src/test/java/io/reactivex/internal/schedulers/ScheduledRunnableTest.java @@ -371,4 +371,27 @@ public void asyncDisposeIdempotent() { run.run(); assertEquals(ScheduledRunnable.ASYNC_DISPOSED, run.get(ScheduledRunnable.FUTURE_INDEX)); } + + + @Test + public void noParentIsDisposed() { + ScheduledRunnable run = new ScheduledRunnable(Functions.EMPTY_RUNNABLE, null); + assertFalse(run.isDisposed()); + run.run(); + assertTrue(run.isDisposed()); + } + + @Test + public void withParentIsDisposed() { + CompositeDisposable set = new CompositeDisposable(); + ScheduledRunnable run = new ScheduledRunnable(Functions.EMPTY_RUNNABLE, set); + set.add(run); + + assertFalse(run.isDisposed()); + + run.run(); + assertTrue(run.isDisposed()); + + assertFalse(set.remove(run)); + } }