Skip to content

Commit

Permalink
1.x: replay now has O(1) subscription and O(1) request coordination cost
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Oct 28, 2015
1 parent 92fe02d commit 53a74e4
Show file tree
Hide file tree
Showing 3 changed files with 435 additions and 111 deletions.
277 changes: 166 additions & 111 deletions src/main/java/rx/internal/operators/OperatorReplay.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@

import rx.*;
import rx.Observable;
import rx.exceptions.Exceptions;
import rx.exceptions.OnErrorThrowable;
import rx.exceptions.*;
import rx.functions.*;
import rx.internal.util.OpenHashSet;
import rx.observables.ConnectableObservable;
import rx.schedulers.Timestamped;
import rx.subscriptions.Subscriptions;
Expand Down Expand Up @@ -299,8 +299,16 @@ static final class ReplaySubscriber<T> extends Subscriber<T> implements Subscrip
/** Indicates a terminated ReplaySubscriber. */
static final InnerProducer[] TERMINATED = new InnerProducer[0];

/** Tracks the subscribed producers. */
final AtomicReference<InnerProducer[]> producers;
/** Indicates no further InnerProducers are accepted. */
volatile boolean terminated;
/** Tracks the subscribed producers. Guarded by itself. */
final OpenHashSet<InnerProducer<T>> producers;
/** Contains a copy of the producers. Modified only from the source side. */
InnerProducer<T>[] producersCache;
/** Contains number of modifications to the producers set.*/
volatile long producersVersion;
/** Contains the number of modifications that the producersCache holds. */
long producersCacheVersion;
/**
* Atomically changed from false to true by connect to make sure the
* connection is only performed by one thread.
Expand All @@ -320,12 +328,19 @@ static final class ReplaySubscriber<T> extends Subscriber<T> implements Subscrip
/** The upstream producer. */
volatile Producer producer;

/** The queue that holds producers with request changes that need to be coordinated. */
List<InnerProducer<T>> coordinationQueue;
/** Indicate that all request amounts should be considered. */
boolean coordinateAll;

@SuppressWarnings("unchecked")
public ReplaySubscriber(AtomicReference<ReplaySubscriber<T>> current,
ReplayBuffer<T> buffer) {
this.buffer = buffer;

this.nl = NotificationLite.instance();
this.producers = new AtomicReference<InnerProducer[]>(EMPTY);
this.producers = new OpenHashSet<InnerProducer<T>>();
this.producersCache = EMPTY;
this.shouldConnect = new AtomicBoolean();
// make sure the source doesn't produce values until the child subscribers
// expressed their request amounts
Expand All @@ -336,7 +351,15 @@ void init() {
add(Subscriptions.create(new Action0() {
@Override
public void call() {
ReplaySubscriber.this.producers.getAndSet(TERMINATED);
if (!terminated) {
synchronized (producers) {
if (!terminated) {
producers.terminate();
producersVersion++;
terminated = true;
}
}
}
// unlike OperatorPublish, we can't null out the terminated so
// late subscribers can still get replay
// current.compareAndSet(ReplaySubscriber.this, null);
Expand All @@ -355,76 +378,34 @@ boolean add(InnerProducer<T> producer) {
if (producer == null) {
throw new NullPointerException();
}
// the state can change so we do a CAS loop to achieve atomicity
for (;;) {
// get the current producer array
InnerProducer[] c = producers.get();
// if this subscriber-to-source reached a terminal state by receiving
// an onError or onCompleted, just refuse to add the new producer
if (c == TERMINATED) {
if (terminated) {
return false;
}
synchronized (producers) {
if (terminated) {
return false;
}
// we perform a copy-on-write logic
int len = c.length;
InnerProducer[] u = new InnerProducer[len + 1];
System.arraycopy(c, 0, u, 0, len);
u[len] = producer;
// try setting the producers array
if (producers.compareAndSet(c, u)) {
return true;
}
// if failed, some other operation succeded (another add, remove or termination)
// so retry

producers.add(producer);
producersVersion++;
}
return true;
}

/**
* Atomically removes the given producer from the producers array.
* @param producer the producer to remove
*/
void remove(InnerProducer<T> producer) {
// the state can change so we do a CAS loop to achieve atomicity
for (;;) {
// let's read the current producers array
InnerProducer[] c = producers.get();
// if it is either empty or terminated, there is nothing to remove so we quit
if (c == EMPTY || c == TERMINATED) {
return;
}
// let's find the supplied producer in the array
// although this is O(n), we don't expect too many child subscribers in general
int j = -1;
int len = c.length;
for (int i = 0; i < len; i++) {
if (c[i].equals(producer)) {
j = i;
break;
}
}
// we didn't find it so just quit
if (j < 0) {
return;
}
// we do copy-on-write logic here
InnerProducer[] u;
// we don't create a new empty array if producer was the single inhabitant
// but rather reuse an empty array
if (len == 1) {
u = EMPTY;
} else {
// otherwise, create a new array one less in size
u = new InnerProducer[len - 1];
// copy elements being before the given producer
System.arraycopy(c, 0, u, 0, j);
// copy elements being after the given producer
System.arraycopy(c, j + 1, u, j, len - j - 1);
}
// try setting this new array as
if (producers.compareAndSet(c, u)) {
if (terminated) {
return;
}
synchronized (producers) {
if (terminated) {
return;
}
// if we failed, it means something else happened
// (a concurrent add/remove or termination), we need to retry
producers.remove(producer);
producersVersion++;
}
}

Expand All @@ -435,7 +416,7 @@ public void setProducer(Producer p) {
throw new IllegalStateException("Only a single producer can be set on a Subscriber.");
}
producer = p;
manageRequests();
manageRequests(null);
replay();
}

Expand Down Expand Up @@ -478,81 +459,157 @@ public void onCompleted() {
/**
* Coordinates the request amounts of various child Subscribers.
*/
void manageRequests() {
void manageRequests(InnerProducer<T> inner) {
// if the upstream has completed, no more requesting is possible
if (isUnsubscribed()) {
return;
}
synchronized (this) {
if (emitting) {
if (inner != null) {
List<InnerProducer<T>> q = coordinationQueue;
if (q == null) {
q = new ArrayList<InnerProducer<T>>();
coordinationQueue = q;
}
q.add(inner);
} else {
coordinateAll = true;
}
missed = true;
return;
}
emitting = true;
}

long ri = maxChildRequested;
long maxTotalRequested;

if (inner != null) {
maxTotalRequested = Math.max(ri, inner.totalRequested.get());
} else {
maxTotalRequested = ri;

InnerProducer<T>[] a = copyProducers();
for (InnerProducer<T> rp : a) {
if (rp != null) {
maxTotalRequested = Math.max(maxTotalRequested, rp.totalRequested.get());
}
}

}
makeRequest(maxTotalRequested, ri);

for (;;) {
// if the upstream has completed, no more requesting is possible
if (isUnsubscribed()) {
return;
}

@SuppressWarnings("unchecked")
InnerProducer<T>[] a = producers.get();

long ri = maxChildRequested;
long maxTotalRequests = ri;

for (InnerProducer<T> rp : a) {
maxTotalRequests = Math.max(maxTotalRequests, rp.totalRequested.get());
List<InnerProducer<T>> q;
boolean all;
synchronized (this) {
if (!missed) {
emitting = false;
return;
}
missed = false;
q = coordinationQueue;
coordinationQueue = null;
all = coordinateAll;
coordinateAll = false;
}

long ur = maxUpstreamRequested;
Producer p = producer;
ri = maxChildRequested;
maxTotalRequested = ri;

long diff = maxTotalRequests - ri;
if (diff != 0) {
maxChildRequested = maxTotalRequests;
if (p != null) {
if (ur != 0L) {
maxUpstreamRequested = 0L;
p.request(ur + diff);
} else {
p.request(diff);
}
} else {
// collect upstream request amounts until there is a producer for them
long u = ur + diff;
if (u < 0) {
u = Long.MAX_VALUE;
if (q != null) {
for (InnerProducer<T> rp : q) {
maxTotalRequested = Math.max(maxTotalRequested, rp.totalRequested.get());
}
}

if (all) {
InnerProducer<T>[] a = copyProducers();
for (InnerProducer<T> rp : a) {
if (rp != null) {
maxTotalRequested = Math.max(maxTotalRequested, rp.totalRequested.get());
}
maxUpstreamRequested = u;
}
} else
// if there were outstanding upstream requests and we have a producer
if (ur != 0L && p != null) {
maxUpstreamRequested = 0L;
// fire the accumulated requests
p.request(ur);
}

synchronized (this) {
if (!missed) {
emitting = false;
return;
makeRequest(maxTotalRequested, ri);
}
}

InnerProducer<T>[] copyProducers() {
synchronized (producers) {
Object[] a = producers.values();
int n = a.length;
@SuppressWarnings("unchecked")
InnerProducer<T>[] result = new InnerProducer[n];
System.arraycopy(a, 0, result, 0, n);
return result;
}
}

void makeRequest(long maxTotalRequests, long previousTotalRequests) {
long ur = maxUpstreamRequested;
Producer p = producer;

long diff = maxTotalRequests - previousTotalRequests;
if (diff != 0) {
maxChildRequested = maxTotalRequests;
if (p != null) {
if (ur != 0L) {
maxUpstreamRequested = 0L;
p.request(ur + diff);
} else {
p.request(diff);
}
missed = false;
} else {
// collect upstream request amounts until there is a producer for them
long u = ur + diff;
if (u < 0) {
u = Long.MAX_VALUE;
}
maxUpstreamRequested = u;
}
} else
// if there were outstanding upstream requests and we have a producer
if (ur != 0L && p != null) {
maxUpstreamRequested = 0L;
// fire the accumulated requests
p.request(ur);
}
}

/**
* Tries to replay the buffer contents to all known subscribers.
*/
@SuppressWarnings("unchecked")
void replay() {
@SuppressWarnings("unchecked")
InnerProducer<T>[] a = producers.get();
for (InnerProducer<T> rp : a) {
buffer.replay(rp);
InnerProducer<T>[] pc = producersCache;
if (producersCacheVersion != producersVersion) {
synchronized (producers) {
pc = producersCache;
// if the producers hasn't changed do nothing
// otherwise make a copy of the current set of producers
Object[] a = producers.values();
int n = a.length;
if (pc.length != n) {
pc = new InnerProducer[n];
producersCache = pc;
}
System.arraycopy(a, 0, pc, 0, n);
producersCacheVersion = producersVersion;
}
}
ReplayBuffer<T> b = buffer;
for (InnerProducer<T> rp : pc) {
if (rp != null) {
b.replay(rp);
}
}
}
}
Expand Down Expand Up @@ -631,7 +688,7 @@ public void request(long n) {
addTotalRequested(n);
// if successful, notify the parent dispacher this child can receive more
// elements
parent.manageRequests();
parent.manageRequests(this);

parent.buffer.replay(this);
return;
Expand Down Expand Up @@ -712,7 +769,7 @@ public void unsubscribe() {
// let's assume this child had 0 requested before the unsubscription while
// the others had non-zero. By removing this 'blocking' child, the others
// are now free to receive events
parent.manageRequests();
parent.manageRequests(this);
}
}
}
Expand Down Expand Up @@ -852,8 +909,6 @@ public void replay(InnerProducer<T> output) {

/**
* Represents a node in a bounded replay buffer's linked list.
*
* @param <T> the contained value type
*/
static final class Node extends AtomicReference<Node> {
/** */
Expand Down
Loading

0 comments on commit 53a74e4

Please sign in to comment.