Skip to content

Commit

Permalink
Merge pull request #1172 from akarnokd/ObserveOnBatchDequeue
Browse files Browse the repository at this point in the history
ObserveOn: Change to batch dequeue.
  • Loading branch information
benjchristensen committed May 16, 2014
2 parents 570a8f9 + e6ae50b commit 9b205ee
Showing 1 changed file with 47 additions and 8 deletions.
55 changes: 47 additions & 8 deletions rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
*/
package rx.operators;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

import rx.Observable.Operator;
Expand Down Expand Up @@ -60,7 +61,7 @@ private static class ObserveOnSubscriber<T> extends Subscriber<T> {
final Subscriber<? super T> observer;
private final Scheduler.Worker recursiveScheduler;

private final ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<Object>();
private FastList queue = new FastList();
final AtomicLong counter = new AtomicLong(0);

public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> subscriber) {
Expand All @@ -72,19 +73,25 @@ public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> subscriber

@Override
public void onNext(final T t) {
queue.offer(on.next(t));
synchronized (this) {
queue.add(on.next(t));
}
schedule();
}

@Override
public void onCompleted() {
queue.offer(on.completed());
synchronized (this) {
queue.add(on.completed());
}
schedule();
}

@Override
public void onError(final Throwable e) {
queue.offer(on.error(e));
synchronized (this) {
queue.add(on.error(e));
}
schedule();
}

Expand All @@ -103,11 +110,43 @@ public void call() {

private void pollQueue() {
do {
Object v = queue.poll();
on.accept(observer, v);
} while (counter.decrementAndGet() > 0);
FastList vs;
synchronized (this) {
vs = queue;
queue = new FastList();
}
for (Object v : vs.array) {
if (v == null) {
break;
}
on.accept(observer, v);
}
if (counter.addAndGet(-vs.size) == 0) {
break;
}
} while (true);
}

}

static final class FastList {
Object[] array;
int size;

public void add(Object o) {
int s = size;
Object[] a = array;
if (a == null) {
a = new Object[16];
array = a;
} else if (s == a.length) {
Object[] array2 = new Object[s + (s >> 2)];
System.arraycopy(a, 0, array2, 0, s);
a = array2;
array = a;
}
a[s] = o;
size = s + 1;
}
}
}

0 comments on commit 9b205ee

Please sign in to comment.