diff --git a/src/main/java/rx/internal/operators/OperatorToObservableList.java b/src/main/java/rx/internal/operators/OperatorToObservableList.java index 8d7dff8f96..e77826acc6 100644 --- a/src/main/java/rx/internal/operators/OperatorToObservableList.java +++ b/src/main/java/rx/internal/operators/OperatorToObservableList.java @@ -15,12 +15,11 @@ */ package rx.internal.operators; -import rx.Observable.Operator; -import rx.Subscriber; +import java.util.*; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; +import rx.Observable.Operator; +import rx.*; +import rx.internal.producers.SingleDelayedProducer; /** * Returns an {@code Observable} that emits a single item, a list composed of all the items emitted by the @@ -90,7 +89,7 @@ public void onCompleted() { return; } list = null; - producer.set(result); + producer.setValue(result); } } diff --git a/src/main/java/rx/internal/operators/OperatorToObservableSortedList.java b/src/main/java/rx/internal/operators/OperatorToObservableSortedList.java index f2d5cb9948..a3e9c54839 100644 --- a/src/main/java/rx/internal/operators/OperatorToObservableSortedList.java +++ b/src/main/java/rx/internal/operators/OperatorToObservableSortedList.java @@ -20,6 +20,7 @@ import rx.Observable.Operator; import rx.*; import rx.functions.Func2; +import rx.internal.producers.SingleDelayedProducer; /** * Return an {@code Observable} that emits the items emitted by the source {@code Observable}, in a sorted order @@ -77,7 +78,7 @@ public void onCompleted() { onError(e); return; } - producer.set(a); + producer.setValue(a); } } diff --git a/src/main/java/rx/internal/operators/SingleDelayedProducer.java b/src/main/java/rx/internal/operators/SingleDelayedProducer.java deleted file mode 100644 index 9405250ac5..0000000000 --- a/src/main/java/rx/internal/operators/SingleDelayedProducer.java +++ /dev/null @@ -1,87 +0,0 @@ -package rx.internal.operators; - -import java.util.concurrent.atomic.AtomicInteger; - -import rx.*; - -/** - * A producer that holds a single value until it is requested and emits it followed by an onCompleted. - */ -public final class SingleDelayedProducer extends AtomicInteger implements Producer { - /** */ - private static final long serialVersionUID = 4721551710164477552L; - /** The actual child. */ - final Subscriber child; - /** The value to emit, acquired and released by compareAndSet. */ - T value; - /** State flag: request() called with positive value. */ - static final int REQUESTED = 1; - /** State flag: set() called. */ - static final int SET = 2; - /** - * Constructs a SingleDelayedProducer with the given child as output. - * @param child the subscriber to emit the value and completion events - */ - public SingleDelayedProducer(Subscriber child) { - this.child = child; - } - @Override - public void request(long n) { - if (n > 0) { - for (;;) { - int s = get(); - // if already requested - if ((s & REQUESTED) != 0) { - break; - } - int u = s | REQUESTED; - if (compareAndSet(s, u)) { - if ((s & SET) != 0) { - emit(); - } - break; - } - } - } - } - /** - * Sets the value to be emitted and emits it if there was a request. - * Should be called only once and from a single thread - * @param value the value to set and possibly emit - */ - public void set(T value) { - for (;;) { - int s = get(); - // if already set - if ((s & SET) != 0) { - break; - } - int u = s | SET; - this.value = value; - if (compareAndSet(s, u)) { - if ((s & REQUESTED) != 0) { - emit(); - } - break; - } - } - } - /** - * Emits the set value if the child is not unsubscribed and bounces back - * exceptions caught from child.onNext. - */ - void emit() { - try { - T v = value; - value = null; // do not hold onto the value - if (child.isUnsubscribed()) { - return; - } - child.onNext(v); - } catch (Throwable t) { - child.onError(t); - return; - } - child.onCompleted(); - } -} \ No newline at end of file diff --git a/src/main/java/rx/internal/producers/ProducerArbiter.java b/src/main/java/rx/internal/producers/ProducerArbiter.java new file mode 100644 index 0000000000..d90a575447 --- /dev/null +++ b/src/main/java/rx/internal/producers/ProducerArbiter.java @@ -0,0 +1,191 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.internal.producers; + +import rx.*; + +/** + * Producer that allows changing an underlying producer atomically and correctly resume with the accumulated + * requests. + */ +public final class ProducerArbiter implements Producer { + long requested; + Producer currentProducer; + + boolean emitting; + long missedRequested; + long missedProduced; + Producer missedProducer; + + static final Producer NULL_PRODUCER = new Producer() { + @Override + public void request(long n) { + + } + }; + + @Override + public void request(long n) { + if (n < 0) { + throw new IllegalArgumentException("n >= 0 required"); + } + if (n == 0) { + return; + } + synchronized (this) { + if (emitting) { + missedRequested += n; + return; + } + emitting = true; + } + boolean skipFinal = false; + try { + long r = requested; + long u = r + n; + if (u < 0) { + u = Long.MAX_VALUE; + } + requested = u; + + Producer p = currentProducer; + if (p != null) { + p.request(n); + } + + emitLoop(); + skipFinal = true; + } finally { + if (!skipFinal) { + synchronized (this) { + emitting = false; + } + } + } + } + + public void produced(long n) { + if (n <= 0) { + throw new IllegalArgumentException("n > 0 required"); + } + synchronized (this) { + if (emitting) { + missedProduced += n; + return; + } + emitting = true; + } + + boolean skipFinal = false; + try { + long r = requested; + if (r != Long.MAX_VALUE) { + long u = r - n; + if (u < 0) { + throw new IllegalStateException(); + } + requested = u; + } + + emitLoop(); + skipFinal = true; + } finally { + if (!skipFinal) { + synchronized (this) { + emitting = false; + } + } + } + } + + public void setProducer(Producer newProducer) { + synchronized (this) { + if (emitting) { + missedProducer = newProducer == null ? NULL_PRODUCER : newProducer; + return; + } + emitting = true; + } + boolean skipFinal = false; + try { + currentProducer = newProducer; + if (newProducer != null) { + newProducer.request(requested); + } + + emitLoop(); + skipFinal = true; + } finally { + if (!skipFinal) { + synchronized (this) { + emitting = false; + } + } + } + } + + public void emitLoop() { + for (;;) { + long localRequested; + long localProduced; + Producer localProducer; + synchronized (this) { + localRequested = missedRequested; + localProduced = missedProduced; + localProducer = missedProducer; + if (localRequested == 0L + && localProduced == 0L + && localProducer == null) { + emitting = false; + return; + } + missedRequested = 0L; + missedProduced = 0L; + missedProducer = null; + } + + long r = requested; + + if (r != Long.MAX_VALUE) { + long u = r + localRequested; + if (u < 0 || u == Long.MAX_VALUE) { + r = Long.MAX_VALUE; + requested = r; + } else { + long v = u - localProduced; + if (v < 0) { + throw new IllegalStateException("more produced than requested"); + } + r = v; + requested = v; + } + } + if (localProducer != null) { + if (localProducer == NULL_PRODUCER) { + currentProducer = null; + } else { + currentProducer = localProducer; + localProducer.request(r); + } + } else { + Producer p = currentProducer; + if (p != null && localRequested != 0L) { + p.request(localRequested); + } + } + } + } +} \ No newline at end of file diff --git a/src/main/java/rx/internal/producers/ProducerObserverArbiter.java b/src/main/java/rx/internal/producers/ProducerObserverArbiter.java new file mode 100644 index 0000000000..ff059590b5 --- /dev/null +++ b/src/main/java/rx/internal/producers/ProducerObserverArbiter.java @@ -0,0 +1,282 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.internal.producers; + +import java.util.*; + +import rx.*; +import rx.Observer; +import rx.exceptions.*; + +/** + * Producer that serializes any event emission with requesting and producer changes. + *

+ * The implementation shortcuts on error and overwrites producers that got delayed, similar + * to ProducerArbiter. + * + * @param the value type + */ +public final class ProducerObserverArbiter implements Producer, Observer { + final Subscriber child; + + boolean emitting; + + List queue; + + Producer currentProducer; + long requested; + + long missedRequested; + Producer missedProducer; + Object missedTerminal; + + volatile boolean hasError; + + static final Producer NULL_PRODUCER = new Producer() { + @Override + public void request(long n) { + + } + }; + + public ProducerObserverArbiter(Subscriber child) { + this.child = child; + } + + @Override + public void onNext(T t) { + synchronized (this) { + if (emitting) { + List q = queue; + if (q == null) { + q = new ArrayList(4); + queue = q; + } + q.add(t); + return; + } + } + boolean skipFinal = false; + try { + child.onNext(t); + + long r = requested; + if (r != Long.MAX_VALUE) { + requested = r - 1; + } + + emitLoop(); + skipFinal = true; + } finally { + if (!skipFinal) { + synchronized (this) { + emitting = false; + } + } + } + } + + @Override + public void onError(Throwable e) { + boolean emit; + synchronized (this) { + if (emitting) { + missedTerminal = e; + emit = false; + } else { + emitting = true; + emit = true; + } + } + if (emit) { + child.onError(e); + } else { + hasError = true; + } + } + + @Override + public void onCompleted() { + synchronized (this) { + if (emitting) { + missedTerminal = true; + return; + } + emitting = true; + } + child.onCompleted(); + } + + @Override + public void request(long n) { + if (n < 0) { + throw new IllegalArgumentException("n >= 0 required"); + } + if (n == 0) { + return; + } + synchronized (this) { + if (emitting) { + missedRequested += n; + return; + } + emitting = true; + } + boolean skipFinal = false; + try { + long r = requested; + long u = r + n; + if (u < 0) { + u = Long.MAX_VALUE; + } + requested = u; + + Producer p = currentProducer; + if (p != null) { + p.request(n); + } + + emitLoop(); + skipFinal = true; + } finally { + if (!skipFinal) { + synchronized (this) { + emitting = false; + } + } + } + } + + public void setProducer(Producer p) { + synchronized (this) { + if (emitting) { + missedProducer = p != null ? p : NULL_PRODUCER; + return; + } + emitting = true; + } + boolean skipFinal = false; + try { + currentProducer = p; + long r = requested; + if (p != null && r != 0) { + p.request(r); + } + emitLoop(); + skipFinal = true; + } finally { + if (!skipFinal) { + synchronized (this) { + emitting = false; + } + } + } + } + + void emitLoop() { + final Subscriber c = child; + + outer: + for (;;) { + long localRequested; + Producer localProducer; + Object localTerminal; + List q; + synchronized (this) { + localRequested = missedRequested; + localProducer = missedProducer; + localTerminal = missedTerminal; + q = queue; + if (localRequested == 0L && localProducer == null && q == null + && localTerminal == null) { + emitting = false; + return; + } + missedRequested = 0L; + missedProducer = null; + queue = null; + missedTerminal = null; + } + boolean empty = q == null || q.isEmpty(); + if (localTerminal != null) { + if (localTerminal != Boolean.TRUE) { + c.onError((Throwable)localTerminal); + return; + } else + if (empty) { + c.onCompleted(); + return; + } + } + long e = 0; + if (q != null) { + for (T v : q) { + if (c.isUnsubscribed()) { + return; + } else + if (hasError) { + continue outer; // if an error has been set, shortcut the loop and act on it + } + try { + c.onNext(v); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + Throwable ex1 = OnErrorThrowable.addValueAsLastCause(ex, v); + c.onError(ex1); + return; + } + } + e += q.size(); + } + long r = requested; + // if requested is max, we don't do any accounting + if (r != Long.MAX_VALUE) { + // if there were missing requested, add it up + if (localRequested != 0L) { + long u = r + localRequested; + if (u < 0) { + u = Long.MAX_VALUE; + } + r = u; + } + // if there were emissions and we don't run on max since the last check, subtract + if (e != 0L && r != Long.MAX_VALUE) { + long u = r - e; + if (u < 0) { + throw new IllegalStateException("More produced than requested"); + } + r = u; + } + requested = r; + } + if (localProducer != null) { + if (localProducer == NULL_PRODUCER) { + currentProducer = null; + } else { + currentProducer = localProducer; + if (r != 0L) { + localProducer.request(r); + } + } + } else { + Producer p = currentProducer; + if (p != null && localRequested != 0L) { + p.request(localRequested); + } + } + } + } +} \ No newline at end of file diff --git a/src/main/java/rx/internal/producers/QueuedProducer.java b/src/main/java/rx/internal/producers/QueuedProducer.java new file mode 100644 index 0000000000..8dbf4f361e --- /dev/null +++ b/src/main/java/rx/internal/producers/QueuedProducer.java @@ -0,0 +1,187 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.internal.producers; + +import java.util.Queue; +import java.util.concurrent.atomic.*; + +import rx.*; +import rx.exceptions.*; +import rx.internal.operators.BackpressureUtils; +import rx.internal.util.atomic.SpscLinkedAtomicQueue; +import rx.internal.util.unsafe.*; + +/** + * Producer that holds an unbounded (or custom) queue, handles terminal events, + * enqueues values and relays them to a child subscriber on request. + * + * @param the value type + */ +public final class QueuedProducer extends AtomicLong implements Producer, Observer { + + /** */ + private static final long serialVersionUID = 7277121710709137047L; + + final Subscriber child; + final Queue queue; + final AtomicInteger wip; + + Throwable error; + volatile boolean done; + + static final Object NULL_SENTINEL = new Object(); + + /** + * Constructs an instance with the target child subscriber and an Spsc Linked (Atomic) Queue + * as the queue implementation. + * @param child the target child subscriber + */ + public QueuedProducer(Subscriber child) { + this(child, UnsafeAccess.isUnsafeAvailable() + ? new SpscLinkedQueue() : new SpscLinkedAtomicQueue()); + } + /** + * Constructs an instance with the target child subscriber and a custom queue implementation + * @param child the target child subscriber + * @param queue the queue to use + */ + public QueuedProducer(Subscriber child, Queue queue) { + this.child = child; + this.queue = queue; + this.wip = new AtomicInteger(); + } + + @Override + public void request(long n) { + if (n < 0) { + throw new IllegalArgumentException("n >= 0 required"); + } + if (n > 0) { + BackpressureUtils.getAndAddRequest(this, n); + drain(); + } + } + + /** + * Offers a value to this producer and tries to emit any queud values + * if the child requests allow it. + * @param value the value to enqueue and attempt to drain + * @return true if the queue accepted the offer, false otherwise + */ + public boolean offer(T value) { + if (value == null) { + if (!queue.offer(NULL_SENTINEL)) { + return false; + } + } else { + if (!queue.offer(value)) { + return false; + } + } + drain(); + return true; + } + + @Override + public void onNext(T value) { + if (!offer(value)) { + onError(new MissingBackpressureException()); + } + } + + @Override + public void onError(Throwable e) { + error = e; + done = true; + drain(); + } + + @Override + public void onCompleted() { + done = true; + drain(); + } + + private boolean checkTerminated(boolean isDone, + boolean isEmpty) { + if (child.isUnsubscribed()) { + return true; + } + if (isDone) { + Throwable e = error; + if (e != null) { + queue.clear(); + child.onError(e); + return true; + } else + if (isEmpty) { + child.onCompleted(); + return true; + } + } + return false; + } + + private void drain() { + if (wip.getAndIncrement() == 0) { + final Subscriber c = child; + final Queue q = queue; + + do { + if (checkTerminated(done, q.isEmpty())) { // (1) + return; + } + + wip.lazySet(1); + + long r = get(); + long e = 0; + + while (r != 0) { + boolean d = done; + Object v = q.poll(); + if (checkTerminated(d, v == null)) { + return; + } else + if (v == null) { + break; + } + + try { + if (v == NULL_SENTINEL) { + c.onNext(null); + } else { + @SuppressWarnings("unchecked") + T t = (T)v; + c.onNext(t); + } + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + Throwable ex1 = OnErrorThrowable.addValueAsLastCause(ex, v != NULL_SENTINEL ? v : null); + c.onError(ex1); + return; + } + r--; + e++; + } + + if (e != 0 && get() != Long.MAX_VALUE) { + addAndGet(-e); + } + } while (wip.decrementAndGet() != 0); + } + } +} \ No newline at end of file diff --git a/src/main/java/rx/internal/producers/QueuedValueProducer.java b/src/main/java/rx/internal/producers/QueuedValueProducer.java new file mode 100644 index 0000000000..df61a05041 --- /dev/null +++ b/src/main/java/rx/internal/producers/QueuedValueProducer.java @@ -0,0 +1,138 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.internal.producers; + +import java.util.Queue; +import java.util.concurrent.atomic.*; + +import rx.*; +import rx.exceptions.*; +import rx.internal.operators.BackpressureUtils; +import rx.internal.util.atomic.SpscLinkedAtomicQueue; +import rx.internal.util.unsafe.*; + +/** + * Producer that holds an unbounded (or custom) queue to enqueue values and relays them + * to a child subscriber on request. + * + * @param the value type + */ +public final class QueuedValueProducer extends AtomicLong implements Producer { + + /** */ + private static final long serialVersionUID = 7277121710709137047L; + + final Subscriber child; + final Queue queue; + final AtomicInteger wip; + + static final Object NULL_SENTINEL = new Object(); + + /** + * Constructs an instance with the target child subscriber and an Spsc Linked (Atomic) Queue + * as the queue implementation. + * @param child the target child subscriber + */ + public QueuedValueProducer(Subscriber child) { + this(child, UnsafeAccess.isUnsafeAvailable() + ? new SpscLinkedQueue() : new SpscLinkedAtomicQueue()); + } + /** + * Constructs an instance with the target child subscriber and a custom queue implementation + * @param child the target child subscriber + * @param queue the queue to use + */ + public QueuedValueProducer(Subscriber child, Queue queue) { + this.child = child; + this.queue = queue; + this.wip = new AtomicInteger(); + } + + @Override + public void request(long n) { + if (n < 0) { + throw new IllegalArgumentException("n >= 0 required"); + } + if (n > 0) { + BackpressureUtils.getAndAddRequest(this, n); + drain(); + } + } + + /** + * Offers a value to this producer and tries to emit any queud values + * if the child requests allow it. + * @param value the value to enqueue and attempt to drain + * @return true if the queue accepted the offer, false otherwise + */ + public boolean offer(T value) { + if (value == null) { + if (!queue.offer(NULL_SENTINEL)) { + return false; + } + } else { + if (!queue.offer(value)) { + return false; + } + } + drain(); + return true; + } + + private void drain() { + if (wip.getAndIncrement() == 0) { + final Subscriber c = child; + final Queue q = queue; + do { + if (c.isUnsubscribed()) { + return; + } + + wip.lazySet(1); + + long r = get(); + long e = 0; + Object v; + + while (r != 0 && (v = q.poll()) != null) { + try { + if (v == NULL_SENTINEL) { + c.onNext(null); + } else { + @SuppressWarnings("unchecked") + T t = (T)v; + c.onNext(t); + } + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + Throwable ex1 = OnErrorThrowable.addValueAsLastCause(ex, v != NULL_SENTINEL ? v : null); + c.onError(ex1); + return; + } + if (c.isUnsubscribed()) { + return; + } + r--; + e++; + } + + if (e != 0 && get() != Long.MAX_VALUE) { + addAndGet(-e); + } + } while (wip.decrementAndGet() != 0); + } + } +} \ No newline at end of file diff --git a/src/main/java/rx/internal/producers/SingleDelayedProducer.java b/src/main/java/rx/internal/producers/SingleDelayedProducer.java new file mode 100644 index 0000000000..5da11dd80f --- /dev/null +++ b/src/main/java/rx/internal/producers/SingleDelayedProducer.java @@ -0,0 +1,115 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.internal.producers; + +import java.util.concurrent.atomic.AtomicInteger; + +import rx.*; +import rx.exceptions.*; + +/** + * Producer that emits a single value and completes the child subscriber once that + * single value is set on it and the child requested items (maybe both asynchronously). + * + * @param the value type + */ +public final class SingleDelayedProducer extends AtomicInteger implements Producer { + /** */ + private static final long serialVersionUID = -2873467947112093874L; + /** The child to emit the value and completion once possible. */ + final Subscriber child; + /** The value to emit.*/ + T value; + + static final int NO_REQUEST_NO_VALUE = 0; + static final int NO_REQUEST_HAS_VALUE = 1; + static final int HAS_REQUEST_NO_VALUE = 2; + static final int HAS_REQUEST_HAS_VALUE = 3; + + /** + * Constructor, wraps the target child subscriber. + * @param child the child subscriber, not null + */ + public SingleDelayedProducer(Subscriber child) { + this.child = child; + } + + @Override + public void request(long n) { + if (n < 0) { + throw new IllegalArgumentException("n >= 0 required"); + } + if (n == 0) { + return; + } + for (;;) { + int s = get(); + if (s == NO_REQUEST_NO_VALUE) { + if (!compareAndSet(NO_REQUEST_NO_VALUE, HAS_REQUEST_NO_VALUE)) { + continue; + } + } else + if (s == NO_REQUEST_HAS_VALUE) { + if (compareAndSet(NO_REQUEST_HAS_VALUE, HAS_REQUEST_HAS_VALUE)) { + emit(child, value); + } + } + return; + } + } + + public void setValue(T value) { + for (;;) { + int s = get(); + if (s == NO_REQUEST_NO_VALUE) { + this.value = value; + if (!compareAndSet(NO_REQUEST_NO_VALUE, NO_REQUEST_HAS_VALUE)) { + continue; + } + } else + if (s == HAS_REQUEST_NO_VALUE) { + if (compareAndSet(HAS_REQUEST_NO_VALUE, HAS_REQUEST_HAS_VALUE)) { + emit(child, value); + } + } + return; + } + } + /** + * Emits the given value to the child subscriber and completes it + * and checks for unsubscriptions eagerly. + * @param c + * @param v + */ + private static void emit(Subscriber c, T v) { + if (c.isUnsubscribed()) { + return; + } + try { + c.onNext(v); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + Throwable e1 = OnErrorThrowable.addValueAsLastCause(e, v); + c.onError(e1); + return; + } + if (c.isUnsubscribed()) { + return; + } + c.onCompleted(); + + } +} \ No newline at end of file diff --git a/src/main/java/rx/internal/producers/SingleProducer.java b/src/main/java/rx/internal/producers/SingleProducer.java new file mode 100644 index 0000000000..8e8e17dcb4 --- /dev/null +++ b/src/main/java/rx/internal/producers/SingleProducer.java @@ -0,0 +1,79 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.internal.producers; + +import java.util.concurrent.atomic.AtomicBoolean; + +import rx.*; +import rx.exceptions.*; + +/** + * A producer which emits a single value and completes the child on the first positive request. + * + * @param the value type + */ +public final class SingleProducer extends AtomicBoolean implements Producer { + /** */ + private static final long serialVersionUID = -3353584923995471404L; + /** The child subscriber. */ + final Subscriber child; + /** The value to be emitted. */ + final T value; + /** + * Constructs the producer with the given target child and value to be emitted. + * @param child the child subscriber, non-null + * @param value the value to be emitted, may be null + */ + public SingleProducer(Subscriber child, T value) { + this.child = child; + this.value = value; + } + @Override + public void request(long n) { + // negative requests are bugs + if (n < 0) { + throw new IllegalArgumentException("n >= 0 required"); + } + // we ignore zero requests + if (n == 0) { + return; + } + // atomically change the state into emitting mode + if (compareAndSet(false, true)) { + // avoid re-reading the instance fields + final Subscriber c = child; + T v = value; + // eagerly check for unsubscription + if (c.isUnsubscribed()) { + return; + } + // emit the value + try { + c.onNext(v); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + c.onError(OnErrorThrowable.addValueAsLastCause(e, v)); + return; + } + // eagerly check for unsubscription + if (c.isUnsubscribed()) { + return; + } + // complete the child + c.onCompleted(); + } + } +} diff --git a/src/main/java/rx/internal/util/atomic/BaseLinkedAtomicQueue.java b/src/main/java/rx/internal/util/atomic/BaseLinkedAtomicQueue.java new file mode 100644 index 0000000000..f46890b7f1 --- /dev/null +++ b/src/main/java/rx/internal/util/atomic/BaseLinkedAtomicQueue.java @@ -0,0 +1,93 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE + * Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic/BaseLinkedAtomicQueue.java + */ +package rx.internal.util.atomic; + +import java.util.AbstractQueue; +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicReference; + +abstract class BaseLinkedAtomicQueue extends AbstractQueue { + private final AtomicReference> producerNode; + private final AtomicReference> consumerNode; + public BaseLinkedAtomicQueue() { + producerNode = new AtomicReference>(); + consumerNode = new AtomicReference>(); + } + protected final LinkedQueueNode lvProducerNode() { + return producerNode.get(); + } + protected final LinkedQueueNode lpProducerNode() { + return producerNode.get(); + } + protected final void spProducerNode(LinkedQueueNode node) { + producerNode.lazySet(node); + } + protected final LinkedQueueNode xchgProducerNode(LinkedQueueNode node) { + return producerNode.getAndSet(node); + } + protected final LinkedQueueNode lvConsumerNode() { + return consumerNode.get(); + } + + protected final LinkedQueueNode lpConsumerNode() { + return consumerNode.get(); + } + protected final void spConsumerNode(LinkedQueueNode node) { + consumerNode.lazySet(node); + } + @Override + public final Iterator iterator() { + throw new UnsupportedOperationException(); + } + + /** + * {@inheritDoc}
+ *

+ * IMPLEMENTATION NOTES:
+ * This is an O(n) operation as we run through all the nodes and count them.
+ * + * @see java.util.Queue#size() + */ + @Override + public final int size() { + LinkedQueueNode chaserNode = lvConsumerNode(); + final LinkedQueueNode producerNode = lvProducerNode(); + int size = 0; + // must chase the nodes all the way to the producer node, but there's no need to chase a moving target. + while (chaserNode != producerNode && size < Integer.MAX_VALUE) { + LinkedQueueNode next; + while((next = chaserNode.lvNext()) == null); + chaserNode = next; + size++; + } + return size; + } + /** + * {@inheritDoc}
+ *

+ * IMPLEMENTATION NOTES:
+ * Queue is empty when producerNode is the same as consumerNode. An alternative implementation would be to observe + * the producerNode.value is null, which also means an empty queue because only the consumerNode.value is allowed to + * be null. + * + * @see MessagePassingQueue#isEmpty() + */ + @Override + public final boolean isEmpty() { + return lvConsumerNode() == lvProducerNode(); + } +} \ No newline at end of file diff --git a/src/main/java/rx/internal/util/atomic/LinkedQueueNode.java b/src/main/java/rx/internal/util/atomic/LinkedQueueNode.java new file mode 100644 index 0000000000..d687460c64 --- /dev/null +++ b/src/main/java/rx/internal/util/atomic/LinkedQueueNode.java @@ -0,0 +1,57 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE + * Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/LinkedQueueNode.java + */ +package rx.internal.util.atomic; + +import java.util.concurrent.atomic.AtomicReference; + +public final class LinkedQueueNode extends AtomicReference> { + /** */ + private static final long serialVersionUID = 2404266111789071508L; + private E value; + + public LinkedQueueNode() { + } + public LinkedQueueNode(E val) { + spValue(val); + } + /** + * Gets the current value and nulls out the reference to it from this node. + * + * @return value + */ + public E getAndNullValue() { + E temp = lpValue(); + spValue(null); + return temp; + } + + public E lpValue() { + return value; + } + + public void spValue(E newValue) { + value = newValue; + } + + public void soNext(LinkedQueueNode n) { + lazySet(n); + } + + public LinkedQueueNode lvNext() { + return get(); + } +} \ No newline at end of file diff --git a/src/main/java/rx/internal/util/atomic/MpscLinkedAtomicQueue.java b/src/main/java/rx/internal/util/atomic/MpscLinkedAtomicQueue.java new file mode 100644 index 0000000000..ebc1264599 --- /dev/null +++ b/src/main/java/rx/internal/util/atomic/MpscLinkedAtomicQueue.java @@ -0,0 +1,126 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE + * Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic/MpscLinkedAtomicQueue.java + */ +package rx.internal.util.atomic; + +/** + * This is a direct Java port of the MPSC algorithm as presented on 1024 + * Cores by D. Vyukov. The original has been adapted to Java and it's quirks with regards to memory model and + * layout: + *

    + *
  1. Use XCHG functionality provided by AtomicReference (which is better in JDK 8+). + *
+ * The queue is initialized with a stub node which is set to both the producer and consumer node references. From this + * point follow the notes on offer/poll. + * + * @author nitsanw + * + * @param + */ +public final class MpscLinkedAtomicQueue extends BaseLinkedAtomicQueue { + + public MpscLinkedAtomicQueue() { + super(); + LinkedQueueNode node = new LinkedQueueNode(); + spConsumerNode(node); + xchgProducerNode(node);// this ensures correct construction: StoreLoad + } + /** + * {@inheritDoc}
+ *

+ * IMPLEMENTATION NOTES:
+ * Offer is allowed from multiple threads.
+ * Offer allocates a new node and: + *

    + *
  1. Swaps it atomically with current producer node (only one producer 'wins') + *
  2. Sets the new node as the node following from the swapped producer node + *
+ * This works because each producer is guaranteed to 'plant' a new node and link the old node. No 2 producers can + * get the same producer node as part of XCHG guarantee. + * + * @see MessagePassingQueue#offer(Object) + * @see java.util.Queue#offer(java.lang.Object) + */ + @Override + public final boolean offer(final E nextValue) { + if (nextValue == null) { + throw new IllegalArgumentException("null elements not allowed"); + } + final LinkedQueueNode nextNode = new LinkedQueueNode(nextValue); + final LinkedQueueNode prevProducerNode = xchgProducerNode(nextNode); + // Should a producer thread get interrupted here the chain WILL be broken until that thread is resumed + // and completes the store in prev.next. + prevProducerNode.soNext(nextNode); // StoreStore + return true; + } + + /** + * {@inheritDoc}
+ *

+ * IMPLEMENTATION NOTES:
+ * Poll is allowed from a SINGLE thread.
+ * Poll reads the next node from the consumerNode and: + *

    + *
  1. If it is null, the queue is assumed empty (though it might not be). + *
  2. If it is not null set it as the consumer node and return it's now evacuated value. + *
+ * This means the consumerNode.value is always null, which is also the starting point for the queue. Because null + * values are not allowed to be offered this is the only node with it's value set to null at any one time. + * + * @see MessagePassingQueue#poll() + * @see java.util.Queue#poll() + */ + @Override + public final E poll() { + LinkedQueueNode currConsumerNode = lpConsumerNode(); // don't load twice, it's alright + LinkedQueueNode nextNode = currConsumerNode.lvNext(); + if (nextNode != null) { + // we have to null out the value because we are going to hang on to the node + final E nextValue = nextNode.getAndNullValue(); + spConsumerNode(nextNode); + return nextValue; + } + else if (currConsumerNode != lvProducerNode()) { + // spin, we are no longer wait free + while((nextNode = currConsumerNode.lvNext()) == null); + // got the next node... + + // we have to null out the value because we are going to hang on to the node + final E nextValue = nextNode.getAndNullValue(); + spConsumerNode(nextNode); + return nextValue; + } + return null; + } + + @Override + public final E peek() { + LinkedQueueNode currConsumerNode = lpConsumerNode(); // don't load twice, it's alright + LinkedQueueNode nextNode = currConsumerNode.lvNext(); + if (nextNode != null) { + return nextNode.lpValue(); + } + else if (currConsumerNode != lvProducerNode()) { + // spin, we are no longer wait free + while((nextNode = currConsumerNode.lvNext()) == null); + // got the next node... + return nextNode.lpValue(); + } + return null; + } + +} \ No newline at end of file diff --git a/src/main/java/rx/internal/util/atomic/SpscLinkedAtomicQueue.java b/src/main/java/rx/internal/util/atomic/SpscLinkedAtomicQueue.java new file mode 100644 index 0000000000..5832f7371d --- /dev/null +++ b/src/main/java/rx/internal/util/atomic/SpscLinkedAtomicQueue.java @@ -0,0 +1,106 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE + * Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic/SpscLinkedAtomicQueue.java + */ +package rx.internal.util.atomic; + +/** + * This is a weakened version of the MPSC algorithm as presented on 1024 + * Cores by D. Vyukov. The original has been adapted to Java and it's quirks with regards to memory model and + * layout: + *
    + *
  1. As this is an SPSC we have no need for XCHG, an ordered store is enough. + *
+ * The queue is initialized with a stub node which is set to both the producer and consumer node references. From this + * point follow the notes on offer/poll. + * + * @author nitsanw + * + * @param + */ +public final class SpscLinkedAtomicQueue extends BaseLinkedAtomicQueue { + + public SpscLinkedAtomicQueue() { + super(); + LinkedQueueNode node = new LinkedQueueNode(); + spProducerNode(node); + spConsumerNode(node); + node.soNext(null); // this ensures correct construction: StoreStore + } + + /** + * {@inheritDoc}
+ * + * IMPLEMENTATION NOTES:
+ * Offer is allowed from a SINGLE thread.
+ * Offer allocates a new node (holding the offered value) and: + *
    + *
  1. Sets that node as the producerNode.next + *
  2. Sets the new node as the producerNode + *
+ * From this follows that producerNode.next is always null and for all other nodes node.next is not null. + * + * @see MessagePassingQueue#offer(Object) + * @see java.util.Queue#offer(java.lang.Object) + */ + @Override + public boolean offer(final E nextValue) { + if (nextValue == null) { + throw new IllegalArgumentException("null elements not allowed"); + } + final LinkedQueueNode nextNode = new LinkedQueueNode(nextValue); + lpProducerNode().soNext(nextNode); + spProducerNode(nextNode); + return true; + } + + /** + * {@inheritDoc}
+ * + * IMPLEMENTATION NOTES:
+ * Poll is allowed from a SINGLE thread.
+ * Poll reads the next node from the consumerNode and: + *
    + *
  1. If it is null, the queue is empty. + *
  2. If it is not null set it as the consumer node and return it's now evacuated value. + *
+ * This means the consumerNode.value is always null, which is also the starting point for the queue. Because null + * values are not allowed to be offered this is the only node with it's value set to null at any one time. + * + */ + @Override + public E poll() { + final LinkedQueueNode nextNode = lpConsumerNode().lvNext(); + if (nextNode != null) { + // we have to null out the value because we are going to hang on to the node + final E nextValue = nextNode.getAndNullValue(); + spConsumerNode(nextNode); + return nextValue; + } + return null; + } + + @Override + public E peek() { + final LinkedQueueNode nextNode = lpConsumerNode().lvNext(); + if (nextNode != null) { + return nextNode.lpValue(); + } else { + return null; + } + } + +} \ No newline at end of file diff --git a/src/main/java/rx/internal/util/unsafe/BaseLinkedQueue.java b/src/main/java/rx/internal/util/unsafe/BaseLinkedQueue.java new file mode 100644 index 0000000000..05bcb798fb --- /dev/null +++ b/src/main/java/rx/internal/util/unsafe/BaseLinkedQueue.java @@ -0,0 +1,126 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE + * Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic/BaseLinkedQueue.java + */ +package rx.internal.util.unsafe; + +import static rx.internal.util.unsafe.UnsafeAccess.UNSAFE; + +import java.util.*; + +import rx.internal.util.atomic.LinkedQueueNode; + +abstract class BaseLinkedQueuePad0 extends AbstractQueue { + long p00, p01, p02, p03, p04, p05, p06, p07; + long p30, p31, p32, p33, p34, p35, p36, p37; +} + +abstract class BaseLinkedQueueProducerNodeRef extends BaseLinkedQueuePad0 { + protected final static long P_NODE_OFFSET = UnsafeAccess.addressOf(BaseLinkedQueueProducerNodeRef.class, "producerNode"); + + protected LinkedQueueNode producerNode; + protected final void spProducerNode(LinkedQueueNode node) { + producerNode = node; + } + + @SuppressWarnings("unchecked") + protected final LinkedQueueNode lvProducerNode() { + return (LinkedQueueNode) UNSAFE.getObjectVolatile(this, P_NODE_OFFSET); + } + + protected final LinkedQueueNode lpProducerNode() { + return producerNode; + } +} + +abstract class BaseLinkedQueuePad1 extends BaseLinkedQueueProducerNodeRef { + long p00, p01, p02, p03, p04, p05, p06, p07; + long p30, p31, p32, p33, p34, p35, p36, p37; +} + +abstract class BaseLinkedQueueConsumerNodeRef extends BaseLinkedQueuePad1 { + protected final static long C_NODE_OFFSET = UnsafeAccess.addressOf(BaseLinkedQueueConsumerNodeRef.class, "consumerNode"); + protected LinkedQueueNode consumerNode; + protected final void spConsumerNode(LinkedQueueNode node) { + consumerNode = node; + } + + @SuppressWarnings("unchecked") + protected final LinkedQueueNode lvConsumerNode() { + return (LinkedQueueNode) UNSAFE.getObjectVolatile(this, C_NODE_OFFSET); + } + + protected final LinkedQueueNode lpConsumerNode() { + return consumerNode; + } +} + +/** + * A base data structure for concurrent linked queues. + * + * @author nitsanw + * + * @param + */ +abstract class BaseLinkedQueue extends BaseLinkedQueueConsumerNodeRef { + long p00, p01, p02, p03, p04, p05, p06, p07; + long p30, p31, p32, p33, p34, p35, p36, p37; + + + @Override + public final Iterator iterator() { + throw new UnsupportedOperationException(); + } + + /** + * {@inheritDoc}
+ *

+ * IMPLEMENTATION NOTES:
+ * This is an O(n) operation as we run through all the nodes and count them.
+ * + * @see java.util.Queue#size() + */ + @Override + public final int size() { + // Read consumer first, this is important because if the producer is node is 'older' than the consumer the + // consumer may overtake it (consume past it). This will lead to an infinite loop below. + LinkedQueueNode chaserNode = lvConsumerNode(); + final LinkedQueueNode producerNode = lvProducerNode(); + int size = 0; + // must chase the nodes all the way to the producer node, but there's no need to chase a moving target. + while (chaserNode != producerNode && size < Integer.MAX_VALUE) { + LinkedQueueNode next; + while((next = chaserNode.lvNext()) == null); + chaserNode = next; + size++; + } + return size; + } + + /** + * {@inheritDoc}
+ *

+ * IMPLEMENTATION NOTES:
+ * Queue is empty when producerNode is the same as consumerNode. An alternative implementation would be to observe + * the producerNode.value is null, which also means an empty queue because only the consumerNode.value is allowed to + * be null. + * + * @see MessagePassingQueue#isEmpty() + */ + @Override + public final boolean isEmpty() { + return lvConsumerNode() == lvProducerNode(); + } +} \ No newline at end of file diff --git a/src/main/java/rx/internal/util/unsafe/MpscLinkedQueue.java b/src/main/java/rx/internal/util/unsafe/MpscLinkedQueue.java new file mode 100644 index 0000000000..f9e63f1c6b --- /dev/null +++ b/src/main/java/rx/internal/util/unsafe/MpscLinkedQueue.java @@ -0,0 +1,136 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE + * Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/MpscLinkedQueue.java + */ +package rx.internal.util.unsafe; + +import static rx.internal.util.unsafe.UnsafeAccess.UNSAFE; +import rx.internal.util.atomic.LinkedQueueNode; +/** + * This is a direct Java port of the MPSC algorithm as presented on 1024 + * Cores by D. Vyukov. The original has been adapted to Java and it's quirks with regards to memory model and + * layout: + *

    + *
  1. Use inheritance to ensure no false sharing occurs between producer/consumer node reference fields. + *
  2. Use XCHG functionality to the best of the JDK ability (see differences in JDK7/8 impls). + *
+ * The queue is initialized with a stub node which is set to both the producer and consumer node references. From this + * point follow the notes on offer/poll. + * + * @author nitsanw + * + * @param + */ +public final class MpscLinkedQueue extends BaseLinkedQueue { + + public MpscLinkedQueue() { + consumerNode = new LinkedQueueNode(); + xchgProducerNode(consumerNode);// this ensures correct construction: StoreLoad + } + + @SuppressWarnings("unchecked") + protected final LinkedQueueNode xchgProducerNode(LinkedQueueNode newVal) { + Object oldVal; + do { + oldVal = producerNode; + } while(!UNSAFE.compareAndSwapObject(this, P_NODE_OFFSET, oldVal, newVal)); + return (LinkedQueueNode) oldVal; + } + + /** + * {@inheritDoc}
+ *

+ * IMPLEMENTATION NOTES:
+ * Offer is allowed from multiple threads.
+ * Offer allocates a new node and: + *

    + *
  1. Swaps it atomically with current producer node (only one producer 'wins') + *
  2. Sets the new node as the node following from the swapped producer node + *
+ * This works because each producer is guaranteed to 'plant' a new node and link the old node. No 2 producers can + * get the same producer node as part of XCHG guarantee. + * + * @see MessagePassingQueue#offer(Object) + * @see java.util.Queue#offer(java.lang.Object) + */ + @Override + public final boolean offer(final E nextValue) { + if (nextValue == null) { + throw new IllegalArgumentException("null elements not allowed"); + } + final LinkedQueueNode nextNode = new LinkedQueueNode(nextValue); + final LinkedQueueNode prevProducerNode = xchgProducerNode(nextNode); + // Should a producer thread get interrupted here the chain WILL be broken until that thread is resumed + // and completes the store in prev.next. + prevProducerNode.soNext(nextNode); // StoreStore + return true; + } + + /** + * {@inheritDoc}
+ *

+ * IMPLEMENTATION NOTES:
+ * Poll is allowed from a SINGLE thread.
+ * Poll reads the next node from the consumerNode and: + *

    + *
  1. If it is null, the queue is assumed empty (though it might not be). + *
  2. If it is not null set it as the consumer node and return it's now evacuated value. + *
+ * This means the consumerNode.value is always null, which is also the starting point for the queue. Because null + * values are not allowed to be offered this is the only node with it's value set to null at any one time. + * + * @see MessagePassingQueue#poll() + * @see java.util.Queue#poll() + */ + @Override + public final E poll() { + LinkedQueueNode currConsumerNode = lpConsumerNode(); // don't load twice, it's alright + LinkedQueueNode nextNode = currConsumerNode.lvNext(); + if (nextNode != null) { + // we have to null out the value because we are going to hang on to the node + final E nextValue = nextNode.getAndNullValue(); + spConsumerNode(nextNode); + return nextValue; + } + else if (currConsumerNode != lvProducerNode()) { + // spin, we are no longer wait free + while((nextNode = currConsumerNode.lvNext()) == null); + // got the next node... + + // we have to null out the value because we are going to hang on to the node + final E nextValue = nextNode.getAndNullValue(); + consumerNode = nextNode; + return nextValue; + } + return null; + } + + @Override + public final E peek() { + LinkedQueueNode currConsumerNode = consumerNode; // don't load twice, it's alright + LinkedQueueNode nextNode = currConsumerNode.lvNext(); + if (nextNode != null) { + return nextNode.lpValue(); + } + else if (currConsumerNode != lvProducerNode()) { + // spin, we are no longer wait free + while((nextNode = currConsumerNode.lvNext()) == null); + // got the next node... + return nextNode.lpValue(); + } + return null; + } +} \ No newline at end of file diff --git a/src/main/java/rx/internal/util/unsafe/SpscLinkedQueue.java b/src/main/java/rx/internal/util/unsafe/SpscLinkedQueue.java new file mode 100644 index 0000000000..7c3c675b48 --- /dev/null +++ b/src/main/java/rx/internal/util/unsafe/SpscLinkedQueue.java @@ -0,0 +1,108 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE + * Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/SpscLinkedQueue.java + */ +package rx.internal.util.unsafe; + +import rx.internal.util.atomic.LinkedQueueNode; + + + +/** + * This is a weakened version of the MPSC algorithm as presented on 1024 + * Cores by D. Vyukov. The original has been adapted to Java and it's quirks with regards to memory model and + * layout: + *
    + *
  1. Use inheritance to ensure no false sharing occurs between producer/consumer node reference fields. + *
  2. As this is an SPSC we have no need for XCHG, an ordered store is enough. + *
+ * The queue is initialized with a stub node which is set to both the producer and consumer node references. From this + * point follow the notes on offer/poll. + * + * @author nitsanw + * + * @param + */ +public final class SpscLinkedQueue extends BaseLinkedQueue { + + public SpscLinkedQueue() { + spProducerNode(new LinkedQueueNode()); + spConsumerNode(producerNode); + consumerNode.soNext(null); // this ensures correct construction: StoreStore + } + + /** + * {@inheritDoc}
+ * + * IMPLEMENTATION NOTES:
+ * Offer is allowed from a SINGLE thread.
+ * Offer allocates a new node (holding the offered value) and: + *
    + *
  1. Sets that node as the producerNode.next + *
  2. Sets the new node as the producerNode + *
+ * From this follows that producerNode.next is always null and for all other nodes node.next is not null. + * + * @see MessagePassingQueue#offer(Object) + * @see java.util.Queue#offer(java.lang.Object) + */ + @Override + public boolean offer(final E nextValue) { + if (nextValue == null) { + throw new IllegalArgumentException("null elements not allowed"); + } + final LinkedQueueNode nextNode = new LinkedQueueNode(nextValue); + producerNode.soNext(nextNode); + producerNode = nextNode; + return true; + } + + /** + * {@inheritDoc}
+ * + * IMPLEMENTATION NOTES:
+ * Poll is allowed from a SINGLE thread.
+ * Poll reads the next node from the consumerNode and: + *
    + *
  1. If it is null, the queue is empty. + *
  2. If it is not null set it as the consumer node and return it's now evacuated value. + *
+ * This means the consumerNode.value is always null, which is also the starting point for the queue. Because null + * values are not allowed to be offered this is the only node with it's value set to null at any one time. + * + */ + @Override + public E poll() { + final LinkedQueueNode nextNode = consumerNode.lvNext(); + if (nextNode != null) { + // we have to null out the value because we are going to hang on to the node + final E nextValue = nextNode.getAndNullValue(); + consumerNode = nextNode; + return nextValue; + } + return null; + } + + @Override + public E peek() { + final LinkedQueueNode nextNode = consumerNode.lvNext(); + if (nextNode != null) { + return nextNode.lpValue(); + } else { + return null; + } + } +} \ No newline at end of file diff --git a/src/main/java/rx/internal/util/unsafe/UnsafeAccess.java b/src/main/java/rx/internal/util/unsafe/UnsafeAccess.java index a5cbcb5d40..88d0ebf4dd 100644 --- a/src/main/java/rx/internal/util/unsafe/UnsafeAccess.java +++ b/src/main/java/rx/internal/util/unsafe/UnsafeAccess.java @@ -84,4 +84,25 @@ public static int getAndSetInt(Object obj, long offset, int newValue) { public static boolean compareAndSwapInt(Object obj, long offset, int expected, int newValue) { return UNSAFE.compareAndSwapInt(obj, offset, expected, newValue); } + + /** + * Returns the address of the specific field on the class and + * wraps a NoSuchFieldException into an internal error. + *

+ * One can avoid using static initializers this way and just assign + * the address directly to the target static field. + * @param clazz the target class + * @param fieldName the target field name + * @return the address (offset) of the field + */ + public static long addressOf(Class clazz, String fieldName) { + try { + Field f = clazz.getDeclaredField(fieldName); + return UNSAFE.objectFieldOffset(f); + } catch (NoSuchFieldException ex) { + InternalError ie = new InternalError(); + ie.initCause(ex); + throw ie; + } + } } \ No newline at end of file diff --git a/src/test/java/rx/internal/producers/ProducersTest.java b/src/test/java/rx/internal/producers/ProducersTest.java new file mode 100644 index 0000000000..ee746335fd --- /dev/null +++ b/src/test/java/rx/internal/producers/ProducersTest.java @@ -0,0 +1,381 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.internal.producers; + +import static org.mockito.Mockito.*; + +import java.util.*; +import java.util.concurrent.TimeUnit; + +import org.junit.*; + +import rx.*; +import rx.Observable.OnSubscribe; +import rx.Observable; +import rx.Observer; +import rx.functions.*; +import rx.observers.TestSubscriber; +import rx.schedulers.*; +import rx.subscriptions.SerialSubscription; + +public class ProducersTest { + @Test + public void testSingleNoBackpressure() { + TestSubscriber ts = new TestSubscriber(); + SingleProducer sp = new SingleProducer(ts, 1); + ts.setProducer(sp); + + ts.assertNoErrors(); + ts.assertTerminalEvent(); + ts.assertReceivedOnNext(Arrays.asList(1)); + } + @Test + public void testSingleWithBackpressure() { + TestSubscriber ts = new TestSubscriber(); + ts.requestMore(0); + SingleProducer sp = new SingleProducer(ts, 1); + ts.setProducer(sp); + + ts.assertNoErrors(); + ts.assertReceivedOnNext(Collections.emptyList()); + Assert.assertTrue(ts.getOnCompletedEvents().isEmpty()); + + ts.requestMore(2); + + ts.assertNoErrors(); + ts.assertTerminalEvent(); + ts.assertReceivedOnNext(Arrays.asList(1)); + } + + @Test + public void testSingleDelayedNoBackpressure() { + TestSubscriber ts = new TestSubscriber(); + SingleDelayedProducer sdp = new SingleDelayedProducer(ts); + ts.setProducer(sdp); + + ts.assertNoErrors(); + ts.assertReceivedOnNext(Collections.emptyList()); + Assert.assertTrue(ts.getOnCompletedEvents().isEmpty()); + + sdp.setValue(1); + + ts.assertNoErrors(); + ts.assertTerminalEvent(); + ts.assertReceivedOnNext(Arrays.asList(1)); + + sdp.setValue(2); + + ts.assertReceivedOnNext(Arrays.asList(1)); + } + @Test + public void testSingleDelayedWithBackpressure() { + TestSubscriber ts = new TestSubscriber(); + ts.requestMore(0); + SingleDelayedProducer sdp = new SingleDelayedProducer(ts); + ts.setProducer(sdp); + + ts.assertNoErrors(); + ts.assertReceivedOnNext(Collections.emptyList()); + Assert.assertTrue(ts.getOnCompletedEvents().isEmpty()); + + sdp.setValue(1); + + ts.assertNoErrors(); + ts.assertReceivedOnNext(Collections.emptyList()); + Assert.assertTrue(ts.getOnCompletedEvents().isEmpty()); + + ts.requestMore(2); + + ts.assertNoErrors(); + ts.assertTerminalEvent(); + ts.assertReceivedOnNext(Arrays.asList(1)); + + sdp.setValue(2); + + ts.assertReceivedOnNext(Arrays.asList(1)); + } + + @Test + public void testQueuedValueNoBackpressure() { + TestSubscriber ts = new TestSubscriber(); + QueuedValueProducer qvp = new QueuedValueProducer(ts); + ts.setProducer(qvp); + + qvp.offer(1); + + ts.assertNoErrors(); + ts.assertReceivedOnNext(Arrays.asList(1)); + + qvp.offer(2); + qvp.offer(3); + qvp.offer(4); + + ts.assertNoErrors(); + ts.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4)); + } + @Test + public void testQueuedValueWithBackpressure() { + TestSubscriber ts = new TestSubscriber(); + ts.requestMore(0); + QueuedValueProducer qvp = new QueuedValueProducer(ts); + ts.setProducer(qvp); + + qvp.offer(1); + + ts.assertNoErrors(); + ts.assertReceivedOnNext(Collections.emptyList()); + + qvp.offer(2); + ts.requestMore(2); + + ts.assertNoErrors(); + ts.assertReceivedOnNext(Arrays.asList(1, 2)); + + ts.requestMore(2); + + ts.assertNoErrors(); + ts.assertReceivedOnNext(Arrays.asList(1, 2)); + + qvp.offer(3); + qvp.offer(4); + ts.assertNoErrors(); + ts.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4)); + } + + @Test + public void testQueuedNoBackpressure() { + TestSubscriber ts = new TestSubscriber(); + QueuedProducer qp = new QueuedProducer(ts); + ts.setProducer(qp); + + qp.offer(1); + + ts.assertNoErrors(); + ts.assertReceivedOnNext(Arrays.asList(1)); + + qp.offer(2); + qp.offer(3); + qp.offer(4); + + ts.assertNoErrors(); + ts.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4)); + + qp.onCompleted(); + + ts.assertTerminalEvent(); + ts.assertNoErrors(); + ts.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4)); + } + @Test + public void testQueuedWithBackpressure() { + TestSubscriber ts = new TestSubscriber(); + ts.requestMore(0); + QueuedProducer qp = new QueuedProducer(ts); + ts.setProducer(qp); + + qp.offer(1); + + ts.assertNoErrors(); + ts.assertReceivedOnNext(Collections.emptyList()); + + qp.offer(2); + ts.requestMore(2); + + ts.assertNoErrors(); + ts.assertReceivedOnNext(Arrays.asList(1, 2)); + + ts.requestMore(2); + + ts.assertNoErrors(); + ts.assertReceivedOnNext(Arrays.asList(1, 2)); + + qp.offer(3); + qp.offer(4); + ts.assertNoErrors(); + ts.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4)); + + qp.onCompleted(); + ts.assertTerminalEvent(); + ts.assertNoErrors(); + ts.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4)); + } + + @Test + public void testArbiter() { + Producer p1 = mock(Producer.class); + Producer p2 = mock(Producer.class); + + ProducerArbiter pa = new ProducerArbiter(); + + pa.request(100); + + pa.setProducer(p1); + + verify(p1).request(100); + + pa.produced(50); + + pa.setProducer(p2); + + verify(p2).request(50); + } + + static final class TestProducer implements Producer { + final Observer child; + public TestProducer(Observer child) { + this.child = child; + } + @Override + public void request(long n) { + child.onNext((int)n); + } + } + + @Test + public void testObserverArbiterWithBackpressure() { + final TestSubscriber ts = new TestSubscriber(); + ts.requestMore(0); + final ProducerObserverArbiter poa = new ProducerObserverArbiter(ts); + ts.setProducer(poa); + + + poa.setProducer(new TestProducer(poa)); + + ts.requestMore(1); + + ts.assertNoErrors(); + ts.assertReceivedOnNext(Arrays.asList(1)); + + poa.setProducer(null); + ts.requestMore(5); + + ts.assertNoErrors(); + ts.assertReceivedOnNext(Arrays.asList(1)); + + poa.setProducer(new TestProducer(poa)); + + ts.assertNoErrors(); + ts.assertReceivedOnNext(Arrays.asList(1, 5)); + + poa.onCompleted(); + ts.assertNoErrors(); + ts.assertTerminalEvent(); + ts.assertReceivedOnNext(Arrays.asList(1, 5)); + } + static final class SwitchTimer + implements OnSubscribe { + final List> sources; + final long time; + final TimeUnit unit; + final Scheduler scheduler; + public SwitchTimer( + Iterable> sources, + long time, TimeUnit unit, Scheduler scheduler) { + this.scheduler = scheduler; + this.sources = new ArrayList>(); + this.time = time; + this.unit = unit; + for (Observable o : sources) { + this.sources.add(o); + } + } + @Override + public void call(Subscriber child) { + final ProducerObserverArbiter poa = + new ProducerObserverArbiter(child); + + Scheduler.Worker w = scheduler.createWorker(); + child.add(w); + + child.setProducer(poa); + + final SerialSubscription ssub = new SerialSubscription(); + child.add(ssub); + + final int[] index = new int[1]; + + w.schedulePeriodically(new Action0() { + @Override + public void call() { + final int idx = index[0]++; + if (idx >= sources.size()) { + poa.onCompleted(); + return; + } + Subscriber s = new Subscriber() { + @Override + public void onNext(T t) { + poa.onNext(t); + } + @Override + public void onError(Throwable e) { + poa.onError(e); + } + @Override + public void onCompleted() { + if (idx + 1 == sources.size()) { + poa.onCompleted(); + } + } + @Override + public void setProducer(Producer producer) { + poa.setProducer(producer); + } + }; + + ssub.set(s); + sources.get(idx).unsafeSubscribe(s); + } + }, time, time, unit); + } + } + final Func1 plus(final long n) { + return new Func1() { + @Override + public Long call(Long t) { + return t + n; + } + }; + } + @Test + public void testObserverArbiterAsync() { + TestScheduler test = Schedulers.test(); + @SuppressWarnings("unchecked") + List> timers = Arrays.asList( + Observable.timer(100, 100, TimeUnit.MILLISECONDS, test), + Observable.timer(100, 100, TimeUnit.MILLISECONDS, test) + .map(plus(20)), + Observable.timer(100, 100, TimeUnit.MILLISECONDS, test) + .map(plus(40)) + ); + + Observable source = Observable.create( + new SwitchTimer(timers, 550, + TimeUnit.MILLISECONDS, test)); + + TestSubscriber ts = new TestSubscriber(); + ts.requestMore(100); + source.subscribe(ts); + + test.advanceTimeBy(1, TimeUnit.MINUTES); + + ts.assertTerminalEvent(); + ts.assertNoErrors(); + ts.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L, 3L, 4L, + 20L, 21L, 22L, 23L, 24L, + 40L, 41L, 42L, 43L, 44L)); + } +}