Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Subscriber readability #2990

Merged
merged 1 commit into from
Jun 9, 2015
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 53 additions & 43 deletions src/main/java/rx/Subscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,23 @@
* the type of items the Subscriber expects to observe
*/
public abstract class Subscriber<T> implements Observer<T>, Subscription {

// represents requested not set yet
private static final Long NOT_SET = Long.MIN_VALUE;

private final SubscriptionList cs;
private final Subscriber<?> op;
private final SubscriptionList subscriptions;
private final Subscriber<?> subscriber;
/* protected by `this` */
private Producer p;
private Producer producer;
/* protected by `this` */
private long requested = Long.MIN_VALUE; // default to not set
private long requested = NOT_SET; // default to not set

protected Subscriber() {
this(null, false);
}

protected Subscriber(Subscriber<?> op) {
this(op, true);
protected Subscriber(Subscriber<?> subscriber) {
this(subscriber, true);
}

/**
Expand All @@ -53,15 +56,15 @@ protected Subscriber(Subscriber<?> op) {
* <p>
* To retain the chaining of subscribers, add the created instance to {@code op} via {@link #add}.
*
* @param op
* @param subscriber
* the other Subscriber
* @param shareSubscriptions
* {@code true} to share the subscription list in {@code op} with this instance
* @since 1.0.6
*/
protected Subscriber(Subscriber<?> op, boolean shareSubscriptions) {
this.op = op;
this.cs = shareSubscriptions && op != null ? op.cs : new SubscriptionList();
protected Subscriber(Subscriber<?> subscriber, boolean shareSubscriptions) {
this.subscriber = subscriber;
this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList();
}

/**
Expand All @@ -73,12 +76,12 @@ protected Subscriber(Subscriber<?> op, boolean shareSubscriptions) {
* the {@code Subscription} to add
*/
public final void add(Subscription s) {
cs.add(s);
subscriptions.add(s);
}

@Override
public final void unsubscribe() {
cs.unsubscribe();
subscriptions.unsubscribe();
}

/**
Expand All @@ -88,7 +91,7 @@ public final void unsubscribe() {
*/
@Override
public final boolean isUnsubscribed() {
return cs.isUnsubscribed();
return subscriptions.isUnsubscribed();
}

/**
Expand Down Expand Up @@ -124,57 +127,64 @@ protected final void request(long n) {
if (n < 0) {
throw new IllegalArgumentException("number requested cannot be negative: " + n);
}
Producer shouldRequest = null;

// if producer is set then we will request from it
// otherwise we increase the requested count by n
Producer producerToRequestFrom = null;
synchronized (this) {
if (p != null) {
shouldRequest = p;
} else if (requested == Long.MIN_VALUE) {
requested = n;
} else {
final long total = requested + n;
// check if overflow occurred
if (total < 0) {
requested = Long.MAX_VALUE;
} else {
requested = total;
}
if (producer != null) {
producerToRequestFrom = producer;
} else {
addToRequested(n);
return;
}
}
// after releasing lock
if (shouldRequest != null) {
shouldRequest.request(n);
}
// after releasing lock (we should not make requests holding a lock)
producerToRequestFrom.request(n);
}

private void addToRequested(long n) {
if (requested == NOT_SET) {
requested = n;
} else {
final long total = requested + n;
// check if overflow occurred
if (total < 0) {
requested = Long.MAX_VALUE;
} else {
requested = total;
}
}
}

/**
* @warn javadoc description missing
* @warn param producer not described
* @param producer
* @param p
*/
public void setProducer(Producer producer) {
public void setProducer(Producer p) {
long toRequest;
boolean setProducer = false;
boolean passToSubscriber = false;
synchronized (this) {
toRequest = requested;
p = producer;
if (op != null) {
producer = p;
if (subscriber != null) {
// middle operator ... we pass thru unless a request has been made
if (toRequest == Long.MIN_VALUE) {
if (toRequest == NOT_SET) {
// we pass-thru to the next producer as nothing has been requested
setProducer = true;
passToSubscriber = true;
}

}
}
// do after releasing lock
if (setProducer) {
op.setProducer(p);
if (passToSubscriber) {
subscriber.setProducer(producer);
} else {
// we execute the request with whatever has been requested (or Long.MAX_VALUE)
if (toRequest == Long.MIN_VALUE) {
p.request(Long.MAX_VALUE);
if (toRequest == NOT_SET) {
producer.request(Long.MAX_VALUE);
} else {
p.request(toRequest);
producer.request(toRequest);
}
}
}
Expand Down