Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: distinguish between sync and async dispose in ScheduledRunnable #5715

Merged
merged 3 commits into from
Nov 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ public final class ScheduledRunnable extends AtomicReferenceArray<Object>
private static final long serialVersionUID = -6120223772001106981L;
final Runnable actual;

static final Object DISPOSED = new Object();
/** Indicates that the parent tracking this task has been notified about its completion. */
static final Object PARENT_DISPOSED = new Object();
/** Indicates the dispose() was called from within the run/call method. */
static final Object SYNC_DISPOSED = new Object();
/** Indicates the dispose() was called from another thread. */
static final Object ASYNC_DISPOSED = new Object();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

supernit: ASYNC doesn't mean another thread :)

But for name conciseness I guess it's fine


static final Object DONE = new Object();

Expand Down Expand Up @@ -66,13 +71,13 @@ public void run() {
} finally {
lazySet(THREAD_INDEX, null);
Object o = get(PARENT_INDEX);
if (o != DISPOSED && o != null && compareAndSet(PARENT_INDEX, o, DONE)) {
if (o != PARENT_DISPOSED && compareAndSet(PARENT_INDEX, o, DONE) && o != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change of compareAndSet position in if statement seems to be a behavior change…

Though it doesn't affect ScheduledRunnableTest on my machine

Do we need that change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. With the previous order, if o was null because of lack of a parent, the CAS didn't happen and the task would partially appear to be still active. With the sync-async marker changes, isDisposed was switched to check the parent reference for indication of being disposed instead of the future reference, which required now 3 comparisons instead of 2.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, can you please add a test to cover that then?

When I changed order back to original one on your branch, whole ScheduledRunnableTest passed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unit tests added.

((DisposableContainer)o).delete(this);
}

for (;;) {
o = get(FUTURE_INDEX);
if (o == DISPOSED || compareAndSet(FUTURE_INDEX, o, DONE)) {
if (o == SYNC_DISPOSED || o == ASYNC_DISPOSED || compareAndSet(FUTURE_INDEX, o, DONE)) {
break;
}
}
Expand All @@ -85,8 +90,12 @@ public void setFuture(Future<?> f) {
if (o == DONE) {
return;
}
if (o == DISPOSED) {
f.cancel(get(THREAD_INDEX) != Thread.currentThread());
if (o == SYNC_DISPOSED) {
f.cancel(false);
return;
}
if (o == ASYNC_DISPOSED) {
f.cancel(true);
return;
}
if (compareAndSet(FUTURE_INDEX, o, f)) {
Expand All @@ -99,23 +108,24 @@ public void setFuture(Future<?> f) {
public void dispose() {
for (;;) {
Object o = get(FUTURE_INDEX);
if (o == DONE || o == DISPOSED) {
if (o == DONE || o == SYNC_DISPOSED || o == ASYNC_DISPOSED) {
break;
}
if (compareAndSet(FUTURE_INDEX, o, DISPOSED)) {
boolean async = get(THREAD_INDEX) != Thread.currentThread();
if (compareAndSet(FUTURE_INDEX, o, async ? ASYNC_DISPOSED : SYNC_DISPOSED)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, what worries me here is that we base boolean async on get(THREAD_INDEX) != Thread.currentThread(), but in finally of run() we set Thread to null which always gonna give async == false

Yes, it's lazySet(), but aren't we creating unwanted race condition here between run() and dispose()?

The worst scenario is that if dispose() indeed was called from another thread but lazySet() memory write becomes visible to it, we'll have async == false which is actually wrong in this scenario…

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cancelling from the current thread could only happen while the run callback is executing. Any other cancellation that comes after this can only be asynchronous. It doesn't matter what value is in THREAD_INDEX at this time because that will not be the same thread as the caller's.

Since we are running with single-threaded thread pools, any subsequent task on the same pool having the same Thread will find the references marked as DONE and not cancel the previous task. This is guaranteed by the sequential task processing of the thread pool.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't matter what value is in THREAD_INDEX at this time because that will not be the same thread as the caller's.

Aha, I see now, 👍

if (o != null) {
((Future<?>)o).cancel(get(THREAD_INDEX) != Thread.currentThread());
((Future<?>)o).cancel(async);
}
break;
}
}

for (;;) {
Object o = get(PARENT_INDEX);
if (o == DONE || o == DISPOSED || o == null) {
if (o == DONE || o == PARENT_DISPOSED || o == null) {
return;
}
if (compareAndSet(PARENT_INDEX, o, DISPOSED)) {
if (compareAndSet(PARENT_INDEX, o, PARENT_DISPOSED)) {
((DisposableContainer)o).delete(this);
return;
}
Expand All @@ -124,7 +134,7 @@ public void dispose() {

@Override
public boolean isDisposed() {
Object o = get(FUTURE_INDEX);
return o == DISPOSED || o == DONE;
Object o = get(PARENT_INDEX);
return o == PARENT_DISPOSED || o == DONE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.List;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.*;

import org.junit.Test;

Expand Down Expand Up @@ -283,4 +284,114 @@ public void run() {
TestHelper.race(r1, r2);
}
}

@Test
public void syncWorkerCancelRace() {
for (int i = 0; i < 10000; i++) {
final CompositeDisposable set = new CompositeDisposable();
final AtomicBoolean interrupted = new AtomicBoolean();
final AtomicInteger sync = new AtomicInteger(2);
final AtomicInteger syncb = new AtomicInteger(2);

Runnable r0 = new Runnable() {
@Override
public void run() {
set.dispose();
if (sync.decrementAndGet() != 0) {
while (sync.get() != 0) { }
}
if (syncb.decrementAndGet() != 0) {
while (syncb.get() != 0) { }
}
for (int j = 0; j < 1000; j++) {
if (Thread.currentThread().isInterrupted()) {
interrupted.set(true);
break;
}
}
}
};

final ScheduledRunnable run = new ScheduledRunnable(r0, set);
set.add(run);

final FutureTask<Void> ft = new FutureTask<Void>(run, null);

Runnable r2 = new Runnable() {
@Override
public void run() {
if (sync.decrementAndGet() != 0) {
while (sync.get() != 0) { }
}
run.setFuture(ft);
if (syncb.decrementAndGet() != 0) {
while (syncb.get() != 0) { }
}
}
};

TestHelper.race(ft, r2);

assertFalse("The task was interrupted", interrupted.get());
}
}

@Test
public void disposeAfterRun() {
final ScheduledRunnable run = new ScheduledRunnable(Functions.EMPTY_RUNNABLE, null);

run.run();
assertEquals(ScheduledRunnable.DONE, run.get(ScheduledRunnable.FUTURE_INDEX));

run.dispose();
assertEquals(ScheduledRunnable.DONE, run.get(ScheduledRunnable.FUTURE_INDEX));
}

@Test
public void syncDisposeIdempotent() {
final ScheduledRunnable run = new ScheduledRunnable(Functions.EMPTY_RUNNABLE, null);
run.set(ScheduledRunnable.THREAD_INDEX, Thread.currentThread());

run.dispose();
assertEquals(ScheduledRunnable.SYNC_DISPOSED, run.get(ScheduledRunnable.FUTURE_INDEX));
run.dispose();
assertEquals(ScheduledRunnable.SYNC_DISPOSED, run.get(ScheduledRunnable.FUTURE_INDEX));
run.run();
assertEquals(ScheduledRunnable.SYNC_DISPOSED, run.get(ScheduledRunnable.FUTURE_INDEX));
}

@Test
public void asyncDisposeIdempotent() {
final ScheduledRunnable run = new ScheduledRunnable(Functions.EMPTY_RUNNABLE, null);

run.dispose();
assertEquals(ScheduledRunnable.ASYNC_DISPOSED, run.get(ScheduledRunnable.FUTURE_INDEX));
run.dispose();
assertEquals(ScheduledRunnable.ASYNC_DISPOSED, run.get(ScheduledRunnable.FUTURE_INDEX));
run.run();
assertEquals(ScheduledRunnable.ASYNC_DISPOSED, run.get(ScheduledRunnable.FUTURE_INDEX));
}


@Test
public void noParentIsDisposed() {
ScheduledRunnable run = new ScheduledRunnable(Functions.EMPTY_RUNNABLE, null);
assertFalse(run.isDisposed());
run.run();
assertTrue(run.isDisposed());
}

@Test
public void withParentIsDisposed() {
CompositeDisposable set = new CompositeDisposable();
ScheduledRunnable run = new ScheduledRunnable(Functions.EMPTY_RUNNABLE, set);
set.add(run);

assertFalse(run.isDisposed());

run.run();
assertTrue(run.isDisposed());

assertFalse(set.remove(run));
}
}