Skip to content

Commit

Permalink
Merge pull request #3137 from akarnokd/FromIterablePerf
Browse files Browse the repository at this point in the history
FromIterable overhead reduction.
  • Loading branch information
akarnokd committed Aug 12, 2015
2 parents 9a84006 + f6ea890 commit 6362dfe
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 48 deletions.
105 changes: 57 additions & 48 deletions src/main/java/rx/internal/operators/OnSubscribeFromIterable.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@
package rx.internal.operators;

import java.util.Iterator;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;

import rx.*;
import rx.Observable.OnSubscribe;
import rx.Producer;
import rx.Subscriber;

/**
* Converts an {@code Iterable} sequence into an {@code Observable}.
Expand Down Expand Up @@ -50,33 +49,54 @@ public void call(final Subscriber<? super T> o) {
o.setProducer(new IterableProducer<T>(o, it));
}

private static final class IterableProducer<T> implements Producer {
private static final class IterableProducer<T> extends AtomicLong implements Producer {
/** */
private static final long serialVersionUID = -8730475647105475802L;
private final Subscriber<? super T> o;
private final Iterator<? extends T> it;

private volatile long requested = 0;
@SuppressWarnings("rawtypes")
private static final AtomicLongFieldUpdater<IterableProducer> REQUESTED_UPDATER = AtomicLongFieldUpdater.newUpdater(IterableProducer.class, "requested");

private IterableProducer(Subscriber<? super T> o, Iterator<? extends T> it) {
this.o = o;
this.it = it;
}

@Override
public void request(long n) {
if (requested == Long.MAX_VALUE) {
if (get() == Long.MAX_VALUE) {
// already started with fast-path
return;
}
if (n == Long.MAX_VALUE && REQUESTED_UPDATER.compareAndSet(this, 0, Long.MAX_VALUE)) {
// fast-path without backpressure
if (n == Long.MAX_VALUE && compareAndSet(0, Long.MAX_VALUE)) {
fastpath();
} else
if (n > 0 && BackpressureUtils.getAndAddRequest(this, n) == 0L) {
slowpath(n);
}

}

void slowpath(long n) {
// backpressure is requested
final Subscriber<? super T> o = this.o;
final Iterator<? extends T> it = this.it;

long r = n;
while (true) {
/*
* This complicated logic is done to avoid touching the
* volatile `requested` value during the loop itself. If
* it is touched during the loop the performance is
* impacted significantly.
*/
long numToEmit = r;
while (true) {
if (o.isUnsubscribed()) {
return;
} else if (it.hasNext()) {
o.onNext(it.next());
if (--numToEmit >= 0) {
o.onNext(it.next());
} else
break;
} else if (!o.isUnsubscribed()) {
o.onCompleted();
return;
Expand All @@ -85,45 +105,34 @@ public void request(long n) {
return;
}
}
} else if (n > 0) {
// backpressure is requested
long _c = BackpressureUtils.getAndAddRequest(REQUESTED_UPDATER, this, n);
if (_c == 0) {
while (true) {
/*
* This complicated logic is done to avoid touching the
* volatile `requested` value during the loop itself. If
* it is touched during the loop the performance is
* impacted significantly.
*/
long r = requested;
long numToEmit = r;
while (true) {
if (o.isUnsubscribed()) {
return;
} else if (it.hasNext()) {
if (--numToEmit >= 0) {
o.onNext(it.next());
} else
break;
} else if (!o.isUnsubscribed()) {
o.onCompleted();
return;
} else {
// is unsubscribed
return;
}
}
if (REQUESTED_UPDATER.addAndGet(this, -r) == 0) {
// we're done emitting the number requested so
// return
return;
}

}
r = addAndGet(-r);
if (r == 0L) {
// we're done emitting the number requested so
// return
return;
}

}
}

void fastpath() {
// fast-path without backpressure
final Subscriber<? super T> o = this.o;
final Iterator<? extends T> it = this.it;

while (true) {
if (o.isUnsubscribed()) {
return;
} else if (it.hasNext()) {
o.onNext(it.next());
} else if (!o.isUnsubscribed()) {
o.onCompleted();
return;
} else {
// is unsubscribed
return;
}
}
}
}

Expand Down
85 changes: 85 additions & 0 deletions src/perf/java/rx/operators/FromIterablePerf.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package rx.operators;

import java.util.Arrays;
import java.util.concurrent.TimeUnit;

import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;

import rx.*;
import rx.internal.operators.OnSubscribeFromIterable;
import rx.jmh.LatchedObserver;

/**
* Benchmark from(Iterable).
* <p>
* gradlew benchmarks "-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*FromIterablePerf.*"
* <p>
* gradlew benchmarks "-Pjmh=-f 1 -tu ns -bm avgt -wi 5 -i 5 -r 1 .*FromIterablePerf.*"
*/
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
@State(Scope.Thread)
public class FromIterablePerf {
Observable<Integer> from;
OnSubscribeFromIterable<Integer> direct;
@Param({"1", "1000", "1000000"})
public int size;

@Setup
public void setup() {
Integer[] array = new Integer[size];
for (int i = 0; i < size; i++) {
array[i] = i;
}
from = Observable.from(Arrays.asList(array));
direct = new OnSubscribeFromIterable<Integer>(Arrays.asList(array));
}

@Benchmark
public void from(Blackhole bh) {
from.subscribe(new LatchedObserver<Integer>(bh));
}
@Benchmark
public void fromUnsafe(final Blackhole bh) {
from.unsafeSubscribe(createSubscriber(bh));
}

@Benchmark
public void direct(final Blackhole bh) {
direct.call(createSubscriber(bh));
}

Subscriber<Integer> createSubscriber(final Blackhole bh) {
return new Subscriber<Integer>() {
@Override
public void onNext(Integer t) {
bh.consume(t);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onCompleted() {

}
};
}
}

0 comments on commit 6362dfe

Please sign in to comment.