Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Helper method to submit an (interruptible) action to an ExecutorService #2761

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 43 additions & 10 deletions src/main/java/rx/internal/schedulers/ScheduledAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,48 @@
package rx.internal.schedulers;

import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.*;

import rx.Subscription;
import rx.exceptions.OnErrorNotImplementedException;
import rx.functions.Action0;
import rx.internal.util.SubscriptionList;
import rx.plugins.RxJavaPlugins;
import rx.subscriptions.CompositeSubscription;

/**
* A {@code Runnable} that executes an {@code Action0} and can be cancelled. The analog is the
* {@code Subscriber} in respect of an {@code Observer}.
* <p><b>System-wide properties:</b>
* <ul>
* <li>{@code rx.scheduler.interrupt-on-unsubscribe}
* <dd>Use {@code Future.cancel(true)} to interrupt a running action? {@code "true"} (default) or {@code "false"}.</br>
* </li>
* </ul>
*/
public final class ScheduledAction extends AtomicReference<Thread> implements Runnable, Subscription {
/** */
private static final long serialVersionUID = -3962399486978279857L;
final CompositeSubscription cancel;
static final boolean DEFAULT_INTERRUPT_ON_UNSUBSCRIBE;
static final String KEY_INTERRUPT_ON_UNSUBSCRIBE = "rx.scheduler.interrupt-on-unsubscribe";
static {
String interruptOnUnsubscribeValue = System.getProperty(KEY_INTERRUPT_ON_UNSUBSCRIBE);
DEFAULT_INTERRUPT_ON_UNSUBSCRIBE = interruptOnUnsubscribeValue == null || "true".equals(interruptOnUnsubscribeValue);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it is ever good for this to be a global default. Haven't we learned that it causes nasty issues in places like event loops? It seems only appropriate for separate threads, like the IO or NewThread schedulers.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I add such parameters to the scheduler implementations directly or have them fixed?

}
final SubscriptionList cancel;
final Action0 action;
volatile int interruptOnUnsubscribe;
static final AtomicIntegerFieldUpdater<ScheduledAction> INTERRUPT_ON_UNSUBSCRIBE
= AtomicIntegerFieldUpdater.newUpdater(ScheduledAction.class, "interruptOnUnsubscribe");

public ScheduledAction(Action0 action) {
this(action, DEFAULT_INTERRUPT_ON_UNSUBSCRIBE);
}

public ScheduledAction(Action0 action, boolean interruptOnUnsubscribe) {
this.action = action;
this.cancel = new CompositeSubscription();
this.cancel = new SubscriptionList();
this.interruptOnUnsubscribe = interruptOnUnsubscribe ? 1 : 0;
}

@Override
Expand All @@ -61,16 +81,29 @@ public void run() {
}
}

/**
* Sets the flag to indicate the underlying Future task should be interrupted on unsubscription or not.
* @param interrupt the new interruptible status
*/
public void setInterruptOnUnsubscribe(boolean interrupt) {
INTERRUPT_ON_UNSUBSCRIBE.lazySet(this, interrupt ? 1 : 0);
}
/**
* Returns {@code true} if the underlying Future task will be interrupted on unsubscription.
* @return the current interruptible status
*/
public boolean isInterruptOnUnsubscribe() {
return interruptOnUnsubscribe != 0;
}

@Override
public boolean isUnsubscribed() {
return cancel.isUnsubscribed();
}

@Override
public void unsubscribe() {
if (!cancel.isUnsubscribed()) {
cancel.unsubscribe();
}
cancel.unsubscribe();
}

/**
Expand All @@ -89,7 +122,7 @@ public void add(Subscription s) {
* @param f the future to add
*/
public void add(final Future<?> f) {
cancel.add(new FutureCompleter(f));
add(new FutureCompleter(f));
}

/**
Expand All @@ -100,7 +133,7 @@ public void add(final Future<?> f) {
* the parent {@code CompositeSubscription} to add
*/
public void addParent(CompositeSubscription parent) {
cancel.add(new Remover(this, parent));
add(new Remover(this, parent));
}

/**
Expand All @@ -119,7 +152,7 @@ private FutureCompleter(Future<?> f) {
@Override
public void unsubscribe() {
if (ScheduledAction.this.get() != Thread.currentThread()) {
f.cancel(true);
f.cancel(interruptOnUnsubscribe != 0);
} else {
f.cancel(false);
}
Expand Down
57 changes: 54 additions & 3 deletions src/main/java/rx/schedulers/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
*/
package rx.schedulers;

import rx.Scheduler;
import rx.plugins.RxJavaPlugins;
import java.util.concurrent.*;

import java.util.concurrent.Executor;
import rx.*;
import rx.functions.Action0;
import rx.internal.schedulers.ScheduledAction;
import rx.plugins.RxJavaPlugins;
import rx.subscriptions.CompositeSubscription;

/**
* Static factory methods for creating Schedulers.
Expand Down Expand Up @@ -136,4 +139,52 @@ public static TestScheduler test() {
public static Scheduler from(Executor executor) {
return new ExecutorScheduler(executor);
}
/**
* Submit an Action0 to the specified executor service with the option to interrupt the task
* on unsubscription and add it to a parent composite subscription.
* @param executor the target executor service
* @param action the action to execute
* @param parent if not {@code null} the subscription representing the action is added to this composite with logic to remove it
* once the action completes or is unsubscribed.
* @param interruptOnUnsubscribe if {@code false}, unsubscribing the task will not interrupt the task if it is running
* @return the Subscription representing the scheduled action which is also added to the {@code parent} composite
*/
public static Subscription submitTo(ExecutorService executor, Action0 action, CompositeSubscription parent, boolean interruptOnUnsubscribe) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good I think. Should we just us 'CompositeSubscription' or does this warrant an interface for subscriptions that support removal?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Certainly, an interface would be more general but I'm afraid it turns the usage places of CompositeSubscription into bimorphic or even megamorphic call sites. Let's leave it as CompositeSubscription for now.

ScheduledAction sa = new ScheduledAction(action, interruptOnUnsubscribe);

if (parent != null) {
parent.add(sa);
sa.addParent(parent);
}

Future<?> f = executor.submit(sa);
sa.add(f);

return sa;
}
/**
* Submit an Action0 to the specified executor service with the given delay and the option to interrupt the task
* on unsubscription and add it to a parent composite subscription.
* @param executor the target executor service
* @param action the action to execute
* @param delay the delay value
* @param unit the time unit of the delay value
* @param parent if not {@code null} the subscription representing the action is added to this composite with logic to remove it
* once the action completes or is unsubscribed.
* @param interruptOnUnsubscribe if {@code false}, unsubscribing the task will not interrupt the task if it is running
* @return the Subscription representing the scheduled action which is also added to the {@code parent} composite
*/
public static Subscription submitTo(ScheduledExecutorService executor, Action0 action, long delay, TimeUnit unit, CompositeSubscription parent, boolean interruptOnUnsubscribe) {
ScheduledAction sa = new ScheduledAction(action, interruptOnUnsubscribe);

if (parent != null) {
parent.add(sa);
sa.addParent(parent);
}

Future<?> f = executor.schedule(sa, delay, unit);
sa.add(f);

return sa;
}
}
Loading