diff --git a/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java index 9412ef8324..972abf58f6 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java @@ -15,7 +15,8 @@ */ package rx.operators; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicLong; import rx.Observable.Operator; @@ -60,7 +61,7 @@ private static class ObserveOnSubscriber extends Subscriber { final Subscriber observer; private final Scheduler.Worker recursiveScheduler; - private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); + private FastList queue = new FastList(); final AtomicLong counter = new AtomicLong(0); public ObserveOnSubscriber(Scheduler scheduler, Subscriber subscriber) { @@ -72,19 +73,25 @@ public ObserveOnSubscriber(Scheduler scheduler, Subscriber subscriber @Override public void onNext(final T t) { - queue.offer(on.next(t)); + synchronized (this) { + queue.add(on.next(t)); + } schedule(); } @Override public void onCompleted() { - queue.offer(on.completed()); + synchronized (this) { + queue.add(on.completed()); + } schedule(); } @Override public void onError(final Throwable e) { - queue.offer(on.error(e)); + synchronized (this) { + queue.add(on.error(e)); + } schedule(); } @@ -103,11 +110,43 @@ public void call() { private void pollQueue() { do { - Object v = queue.poll(); - on.accept(observer, v); - } while (counter.decrementAndGet() > 0); + FastList vs; + synchronized (this) { + vs = queue; + queue = new FastList(); + } + for (Object v : vs.array) { + if (v == null) { + break; + } + on.accept(observer, v); + } + if (counter.addAndGet(-vs.size) == 0) { + break; + } + } while (true); } } + static final class FastList { + Object[] array; + int size; + + public void add(Object o) { + int s = size; + Object[] a = array; + if (a == null) { + a = new Object[16]; + array = a; + } else if (s == a.length) { + Object[] array2 = new Object[s + (s >> 2)]; + System.arraycopy(a, 0, array2, 0, s); + a = array2; + array = a; + } + a[s] = o; + size = s + 1; + } + } } \ No newline at end of file