Skip to content

Commit

Permalink
Merge pull request #2990 from davidmoten/subscriber-readability
Browse files Browse the repository at this point in the history
Improve Subscriber readability
  • Loading branch information
benjchristensen committed Jun 9, 2015
2 parents f956293 + 0cf7082 commit 2c2bc0d
Showing 1 changed file with 53 additions and 43 deletions.
96 changes: 53 additions & 43 deletions src/main/java/rx/Subscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,23 @@
* the type of items the Subscriber expects to observe
*/
public abstract class Subscriber<T> implements Observer<T>, Subscription {

// represents requested not set yet
private static final Long NOT_SET = Long.MIN_VALUE;

private final SubscriptionList cs;
private final Subscriber<?> op;
private final SubscriptionList subscriptions;
private final Subscriber<?> subscriber;
/* protected by `this` */
private Producer p;
private Producer producer;
/* protected by `this` */
private long requested = Long.MIN_VALUE; // default to not set
private long requested = NOT_SET; // default to not set

protected Subscriber() {
this(null, false);
}

protected Subscriber(Subscriber<?> op) {
this(op, true);
protected Subscriber(Subscriber<?> subscriber) {
this(subscriber, true);
}

/**
Expand All @@ -53,15 +56,15 @@ protected Subscriber(Subscriber<?> op) {
* <p>
* To retain the chaining of subscribers, add the created instance to {@code op} via {@link #add}.
*
* @param op
* @param subscriber
* the other Subscriber
* @param shareSubscriptions
* {@code true} to share the subscription list in {@code op} with this instance
* @since 1.0.6
*/
protected Subscriber(Subscriber<?> op, boolean shareSubscriptions) {
this.op = op;
this.cs = shareSubscriptions && op != null ? op.cs : new SubscriptionList();
protected Subscriber(Subscriber<?> subscriber, boolean shareSubscriptions) {
this.subscriber = subscriber;
this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList();
}

/**
Expand All @@ -73,12 +76,12 @@ protected Subscriber(Subscriber<?> op, boolean shareSubscriptions) {
* the {@code Subscription} to add
*/
public final void add(Subscription s) {
cs.add(s);
subscriptions.add(s);
}

@Override
public final void unsubscribe() {
cs.unsubscribe();
subscriptions.unsubscribe();
}

/**
Expand All @@ -88,7 +91,7 @@ public final void unsubscribe() {
*/
@Override
public final boolean isUnsubscribed() {
return cs.isUnsubscribed();
return subscriptions.isUnsubscribed();
}

/**
Expand Down Expand Up @@ -124,57 +127,64 @@ protected final void request(long n) {
if (n < 0) {
throw new IllegalArgumentException("number requested cannot be negative: " + n);
}
Producer shouldRequest = null;

// if producer is set then we will request from it
// otherwise we increase the requested count by n
Producer producerToRequestFrom = null;
synchronized (this) {
if (p != null) {
shouldRequest = p;
} else if (requested == Long.MIN_VALUE) {
requested = n;
} else {
final long total = requested + n;
// check if overflow occurred
if (total < 0) {
requested = Long.MAX_VALUE;
} else {
requested = total;
}
if (producer != null) {
producerToRequestFrom = producer;
} else {
addToRequested(n);
return;
}
}
// after releasing lock
if (shouldRequest != null) {
shouldRequest.request(n);
}
// after releasing lock (we should not make requests holding a lock)
producerToRequestFrom.request(n);
}

private void addToRequested(long n) {
if (requested == NOT_SET) {
requested = n;
} else {
final long total = requested + n;
// check if overflow occurred
if (total < 0) {
requested = Long.MAX_VALUE;
} else {
requested = total;
}
}
}

/**
* @warn javadoc description missing
* @warn param producer not described
* @param producer
* @param p
*/
public void setProducer(Producer producer) {
public void setProducer(Producer p) {
long toRequest;
boolean setProducer = false;
boolean passToSubscriber = false;
synchronized (this) {
toRequest = requested;
p = producer;
if (op != null) {
producer = p;
if (subscriber != null) {
// middle operator ... we pass thru unless a request has been made
if (toRequest == Long.MIN_VALUE) {
if (toRequest == NOT_SET) {
// we pass-thru to the next producer as nothing has been requested
setProducer = true;
passToSubscriber = true;
}

}
}
// do after releasing lock
if (setProducer) {
op.setProducer(p);
if (passToSubscriber) {
subscriber.setProducer(producer);
} else {
// we execute the request with whatever has been requested (or Long.MAX_VALUE)
if (toRequest == Long.MIN_VALUE) {
p.request(Long.MAX_VALUE);
if (toRequest == NOT_SET) {
producer.request(Long.MAX_VALUE);
} else {
p.request(toRequest);
producer.request(toRequest);
}
}
}
Expand Down

0 comments on commit 2c2bc0d

Please sign in to comment.