Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: Coverage improvements, logical fixes and cleanups 03/08 #5905

Merged
merged 1 commit into from
Mar 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,24 @@ public FlowableCache(Flowable<T> source, int capacityHint) {
protected void subscribeActual(Subscriber<? super T> t) {
// we can connect first because we replay everything anyway
ReplaySubscription<T> rp = new ReplaySubscription<T>(t, state);
state.addChild(rp);

t.onSubscribe(rp);

boolean doReplay = true;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old logic could have made the Subscriber visible too early and thus it could receive an onComplete before an onSubscribe.

if (state.addChild(rp)) {
if (rp.requested.get() == ReplaySubscription.CANCELLED) {
state.removeChild(rp);
doReplay = false;
}
}

// we ensure a single connection here to save an instance field of AtomicBoolean in state.
if (!once.get() && once.compareAndSet(false, true)) {
state.connect();
}

// no need to call rp.replay() here because the very first request will trigger it anyway
if (doReplay) {
rp.replay();
}
}

/**
Expand Down Expand Up @@ -122,22 +130,23 @@ static final class CacheState<T> extends LinkedArrayList implements FlowableSubs
/**
* Adds a ReplaySubscription to the subscribers array atomically.
* @param p the target ReplaySubscription wrapping a downstream Subscriber with state
* @return true if the ReplaySubscription was added or false if the cache is already terminated
*/
public void addChild(ReplaySubscription<T> p) {
public boolean addChild(ReplaySubscription<T> p) {
// guarding by connection to save on allocating another object
// thus there are two distinct locks guarding the value-addition and child come-and-go
for (;;) {
ReplaySubscription<T>[] a = subscribers.get();
if (a == TERMINATED) {
return;
return false;
}
int n = a.length;
@SuppressWarnings("unchecked")
ReplaySubscription<T>[] b = new ReplaySubscription[n + 1];
System.arraycopy(a, 0, b, 0, n);
b[n] = p;
if (subscribers.compareAndSet(a, b)) {
return;
return true;
}
}
}
Expand Down Expand Up @@ -240,12 +249,16 @@ static final class ReplaySubscription<T>
extends AtomicInteger implements Subscription {

private static final long serialVersionUID = -2557562030197141021L;
private static final long CANCELLED = -1;
private static final long CANCELLED = Long.MIN_VALUE;
/** The actual child subscriber. */
final Subscriber<? super T> child;
/** The cache state object. */
final CacheState<T> state;

/**
* Number of items requested and also the cancelled indicator if
* it contains {@link #CANCELLED}.
*/
final AtomicLong requested;

/**
Expand All @@ -263,6 +276,9 @@ static final class ReplaySubscription<T>
*/
int index;

/** Number of items emitted so far. */
long emitted;

ReplaySubscription(Subscriber<? super T> child, CacheState<T> state) {
this.child = child;
this.state = state;
Expand All @@ -271,17 +287,8 @@ static final class ReplaySubscription<T>
@Override
public void request(long n) {
if (SubscriptionHelper.validate(n)) {
for (;;) {
long r = requested.get();
if (r == CANCELLED) {
return;
}
long u = BackpressureHelper.addCap(r, n);
if (requested.compareAndSet(r, u)) {
replay();
return;
}
}
BackpressureHelper.addCancel(requested, n);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the helper that was not available the time this class was developed.

replay();
}
}

Expand All @@ -303,12 +310,13 @@ public void replay() {
int missed = 1;
final Subscriber<? super T> child = this.child;
AtomicLong rq = requested;
long e = emitted;

for (;;) {

long r = rq.get();

if (r < 0L) {
if (r == CANCELLED) {
return;
}

Expand All @@ -326,9 +334,8 @@ public void replay() {
final int n = b.length - 1;
int j = index;
int k = currentIndexInBuffer;
int valuesProduced = 0;

while (j < s && r > 0) {
while (j < s && e != r) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the now more modern emitted-requested tracking pairs.

if (rq.get() == CANCELLED) {
return;
}
Expand All @@ -344,15 +351,14 @@ public void replay() {

k++;
j++;
r--;
valuesProduced++;
e++;
}

if (rq.get() == CANCELLED) {
return;
}

if (r == 0) {
if (r == e) {
Object o = b[k];
if (NotificationLite.isComplete(o)) {
child.onComplete();
Expand All @@ -364,15 +370,12 @@ public void replay() {
}
}

if (valuesProduced != 0) {
BackpressureHelper.producedCancel(rq, valuesProduced);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a separate emitted, there is no need to decrement the requested amount, saving on an atomic instruction.

}

index = j;
currentIndexInBuffer = k;
currentBuffer = b;
}

emitted = e;
missed = addAndGet(-missed);
if (missed == 0) {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,11 +411,7 @@ public void clear() {

@Override
public boolean isEmpty() {
Iterator<? extends R> it = current;
if (it == null) {
return queue.isEmpty();
}
return !it.hasNext();
return current == null && queue.isEmpty();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An iterator is always checked for hasNext before the control is given back in poll and current is always null if the previous iterator had no further elements. Therefore, the in the original code !it.hasNext() was always false.

}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1215,7 +1215,8 @@ public void subscribe(Subscriber<? super T> child) {
buf = bufferFactory.call();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
throw ExceptionHelper.wrapOrThrow(ex);
EmptySubscription.error(ex, child);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a subscribe implementation which should not crash but signal the problem to the Subscriber.

return;
}
// create a new subscriber to source
ReplaySubscriber<T> u = new ReplaySubscriber<T>(buf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,11 @@ static final class PublishObserver<T>
@SuppressWarnings("unchecked")
@Override
public void dispose() {
if (observers.get() != TERMINATED) {
InnerDisposable[] ps = observers.getAndSet(TERMINATED);
if (ps != TERMINATED) {
current.compareAndSet(PublishObserver.this, null);
InnerDisposable[] ps = observers.getAndSet(TERMINATED);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bias the dispose logic towards a more likely single disposal.

if (ps != TERMINATED) {
current.compareAndSet(PublishObserver.this, null);

DisposableHelper.dispose(s);
}
DisposableHelper.dispose(s);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,4 +419,124 @@ public void error() {
.test(0L)
.assertFailure(TestException.class);
}

@Test
public void cancelledUpFrontConnectAnyway() {
final AtomicInteger call = new AtomicInteger();
Flowable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
return call.incrementAndGet();
}
})
.cache()
.test(1L, true)
.assertNoValues();

assertEquals(1, call.get());
}

@Test
public void cancelledUpFront() {
final AtomicInteger call = new AtomicInteger();
Flowable<Object> f = Flowable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
return call.incrementAndGet();
}
}).concatWith(Flowable.never())
.cache();

f.test().assertValuesOnly(1);

f.test(1L, true)
.assertEmpty();

assertEquals(1, call.get());
}

@Test
public void subscribeSubscribeRace() {
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
final Flowable<Integer> cache = Flowable.range(1, 500).cache();

final TestSubscriber<Integer> to1 = new TestSubscriber<Integer>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ts1

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are at least 178 such unrenamed local variables. I'll post a separate PR fixing all of them and perhaps add a validator unit test that looks for this very common mistake.

final TestSubscriber<Integer> to2 = new TestSubscriber<Integer>();

Runnable r1 = new Runnable() {
@Override
public void run() {
cache.subscribe(to1);
}
};

Runnable r2 = new Runnable() {
@Override
public void run() {
cache.subscribe(to2);
}
};

TestHelper.race(r1, r2);

to1
.awaitDone(5, TimeUnit.SECONDS)
.assertSubscribed()
.assertValueCount(500)
.assertComplete()
.assertNoErrors();

to2
.awaitDone(5, TimeUnit.SECONDS)
.assertSubscribed()
.assertValueCount(500)
.assertComplete()
.assertNoErrors();
}
}

@Test
public void subscribeCompleteRace() {
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
final PublishProcessor<Integer> ps = PublishProcessor.<Integer>create();

final Flowable<Integer> cache = ps.cache();

cache.test();

final TestSubscriber<Integer> to = new TestSubscriber<Integer>();

Runnable r1 = new Runnable() {
@Override
public void run() {
cache.subscribe(to);
}
};

Runnable r2 = new Runnable() {
@Override
public void run() {
ps.onComplete();
}
};

TestHelper.race(r1, r2);

to
.awaitDone(5, TimeUnit.SECONDS)
.assertResult();
}
}

@Test
public void backpressure() {
Flowable.range(1, 5)
.cache()
.test(0)
.assertEmpty()
.requestMore(2)
.assertValuesOnly(1, 2)
.requestMore(3)
.assertResult(1, 2, 3, 4, 5);
}
}
Loading