Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Scheduler Memory Leaks #712

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public Subscription call(final Scheduler scheduler, final Func2 parentAction) {
@Override
public void call() {
if (!parentSubscription.isUnsubscribed()) {
childSubscription.setSubscription(scheduler.schedule(parentAction, parentAction));
childSubscription.set(scheduler.schedule(parentAction, parentAction));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import rx.Notification;
import rx.Observable;
Expand Down Expand Up @@ -71,7 +72,7 @@ private class Observation {
final CompositeSubscription compositeSubscription = new CompositeSubscription();
final MultipleAssignmentSubscription recursiveSubscription = new MultipleAssignmentSubscription();
final ConcurrentLinkedQueue<Notification<? extends T>> queue = new ConcurrentLinkedQueue<Notification<? extends T>>();
final AtomicInteger counter = new AtomicInteger(0);
final AtomicLong counter = new AtomicLong(0);
private volatile Scheduler recursiveScheduler;

public Observation(Observer<? super T> observer) {
Expand Down Expand Up @@ -108,7 +109,7 @@ public Subscription call(Scheduler innerScheduler, T state) {
}

void processQueue() {
recursiveSubscription.setSubscription(recursiveScheduler.schedule(new Action1<Action0>() {
recursiveSubscription.set(recursiveScheduler.schedule(new Action1<Action0>() {
@Override
public void call(Action0 self) {
Notification<? extends T> not = queue.poll();
Expand Down
116 changes: 90 additions & 26 deletions rxjava-core/src/main/java/rx/schedulers/CurrentThreadScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,71 +17,135 @@

import java.util.PriorityQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import rx.Scheduler;
import rx.Subscription;
import rx.subscriptions.MultipleAssignmentSubscription;
import rx.util.functions.Func1;
import rx.util.functions.Func2;

/**
* Schedules work on the current thread but does not execute immediately. Work is put in a queue and executed after the current unit of work is completed.
*/
public class CurrentThreadScheduler extends Scheduler {
private static final CurrentThreadScheduler INSTANCE = new CurrentThreadScheduler();
private static final AtomicLong counter = new AtomicLong(0);

public static CurrentThreadScheduler getInstance() {
return INSTANCE;
}

private static final ThreadLocal<PriorityQueue<TimedAction>> QUEUE = new ThreadLocal<PriorityQueue<TimedAction>>();
private static final ThreadLocal<PriorityQueue<TimedAction>> QUEUE = new ThreadLocal<PriorityQueue<TimedAction>>() {
protected java.util.PriorityQueue<TimedAction> initialValue() {
return new PriorityQueue<TimedAction>();
};
};

private static final ThreadLocal<Boolean> PROCESSING = new ThreadLocal<Boolean>() {
protected Boolean initialValue() {
return Boolean.FALSE;
};
};

/* package accessible for unit tests */CurrentThreadScheduler() {
}

private final AtomicInteger counter = new AtomicInteger(0);

@Override
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);
enqueue(discardableAction, now());
return discardableAction;
// immediately move to the InnerCurrentThreadScheduler
InnerCurrentThreadScheduler innerScheduler = new InnerCurrentThreadScheduler();
innerScheduler.schedule(state, action);
enqueueFromOuter(innerScheduler, now());
return innerScheduler;
}

@Override
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action, long dueTime, TimeUnit unit) {
long execTime = now() + unit.toMillis(dueTime);

DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, new SleepingAction<T>(action, this, execTime));
enqueue(discardableAction, execTime);
return discardableAction;
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action, long delayTime, TimeUnit unit) {
long execTime = now() + unit.toMillis(delayTime);

// create an inner scheduler and queue it for execution
InnerCurrentThreadScheduler innerScheduler = new InnerCurrentThreadScheduler();
innerScheduler.schedule(state, action, delayTime, unit);
enqueueFromOuter(innerScheduler, execTime);
return innerScheduler;
}

private void enqueue(DiscardableAction<?> action, long execTime) {
/*
* This will accept InnerCurrentThreadScheduler instances and execute them in order they are received
* and on each of them will loop internally until each is complete.
*/
private void enqueueFromOuter(final InnerCurrentThreadScheduler innerScheduler, long execTime) {
// Note that everything here is single-threaded so we won't have race conditions
PriorityQueue<TimedAction> queue = QUEUE.get();
boolean exec = queue == null;
queue.add(new TimedAction(new Func1<Scheduler, Subscription>() {

if (exec) {
queue = new PriorityQueue<TimedAction>();
QUEUE.set(queue);
@Override
public Subscription call(Scheduler _) {
// when the InnerCurrentThreadScheduler gets scheduled we want to process its tasks
return innerScheduler.startProcessing();
}
}, execTime, counter.incrementAndGet()));

// first time through starts the loop
if (!PROCESSING.get()) {
PROCESSING.set(Boolean.TRUE);
while (!queue.isEmpty()) {
queue.poll().action.call(innerScheduler);
}
PROCESSING.set(Boolean.FALSE);
}
}

queue.add(new TimedAction(action, execTime, counter.incrementAndGet()));
private static class InnerCurrentThreadScheduler extends Scheduler implements Subscription {
private final MultipleAssignmentSubscription childSubscription = new MultipleAssignmentSubscription();
private final PriorityQueue<TimedAction> innerQueue = new PriorityQueue<TimedAction>();

if (exec) {
while (!queue.isEmpty()) {
queue.poll().action.call(this);
@Override
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);
childSubscription.set(discardableAction);
enqueue(discardableAction, now());
return childSubscription;
}

@Override
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action, long delayTime, TimeUnit unit) {
long execTime = now() + unit.toMillis(delayTime);

DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, new SleepingAction<T>(action, this, execTime));
childSubscription.set(discardableAction);
enqueue(discardableAction, execTime);
return childSubscription;
}

private void enqueue(Func1<Scheduler, Subscription> action, long execTime) {
innerQueue.add(new TimedAction(action, execTime, counter.incrementAndGet()));
}

private Subscription startProcessing() {
while (!innerQueue.isEmpty()) {
innerQueue.poll().action.call(this);
}
return this;
}

QUEUE.set(null);
@Override
public void unsubscribe() {
childSubscription.unsubscribe();
}

}

/**
* Use time to sort items so delayed actions are sorted to their appropriate position in the queue.
*/
private static class TimedAction implements Comparable<TimedAction> {
final DiscardableAction<?> action;
final Func1<Scheduler, Subscription> action;
final Long execTime;
final Integer count; // In case if time between enqueueing took less than 1ms
final Long count; // In case if time between enqueueing took less than 1ms

private TimedAction(DiscardableAction<?> action, Long execTime, Integer count) {
private TimedAction(Func1<Scheduler, Subscription> action, Long execTime, Long count) {
this.action = action;
this.execTime = execTime;
this.count = count;
Expand Down
Loading