Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue #1522 #1523

Merged
merged 3 commits into from
Jul 29, 2014
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,21 @@ void startEmitting() {

@Override
public void request(long n) {
long _c = 0;
long _c;
if (n == Long.MAX_VALUE) {
requested = Long.MAX_VALUE;
_c = REQUESTED_UPDATER.getAndSet(this, Long.MAX_VALUE);
} else {
_c = REQUESTED_UPDATER.getAndAdd(this, n);
for (;;) {
_c = requested;
if (_c == Long.MAX_VALUE) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the critical part as our fast-path in emit will never decrement.

I'm curious in this particular operator if the fast-path even matters and warrants the complexity.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove the fast-path in emit. But we still need to handle Long.MAX_VALUE in request explicitly because the overflow problem.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Downstream should never request more unless it has received something, so as long as we are decrementing when we emit then we shouldn't overflow.

It will be a fine safety check, but well behaved subscribers should never result in overflow.

The

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops .... the only reason we had overflows here is because of the fast-path that didn't decrement when we emitted.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the following code is the reason:

if (REQUESTED_UPDATER.addAndGet(this, -emitted) == 0) {
                              // we're done emitting the number requested so return
                              return;
                          }

Assume we have the following codes:

Observable.from(...).takeLast(3).observeOn(...).subscribe(new Subscriber<Integer>() {

            private boolean first = true;

            @Override
            public void onStart() {
                request(2);
            }

            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onNext(Integer integer) {
                if(first) {
                    request(Long.MAX_VALUE);
                    first = false;
                }
                else {
                    request(10);
                }
            }
        });

If the order is:

subscriber.onNext()               // thread-2
request(Long.MAX_VALUE)  // thread-2
if (REQUESTED_UPDATER.addAndGet(this, -emitted) == 0) // thread-1, emitted = 2
subscriber.onNext()               // thread-2
request(10)  // thread-2. It will overflow if we don't check before "c+n"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized my PR didn't handle this case. I should also put CAS loop in if (REQUESTED_UPDATER.addAndGet(this, -emitted) == 0)

// If `requested` is Long.MAX_VALUE, `c+n` will be overflow.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be overflow even if not Long.MAX_VALUE

// Therefore, always check before setting to `c+n`
return;
}
if (REQUESTED_UPDATER.compareAndSet(this, _c, _c + n)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can still just use getAndAdd without the CAS retry loop.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I try to ignore the further requests after request(Long.MAX_VALUE). I cannot use getAndAdd because if requested == Long.MAX_VALUE, I wanna ignore n. If adding n to requested, and if it happens to execute in while (true) loop in emit method, the loop won't stop because requested < 0.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the overflow, I mean sending a request(n) after request(Long.MAX_VALUE) may make requested overflow easily. c+n is rarely overflow if request != Long.MAX_VALUE (However, we still can handle c+n overflow problem by adding a check c > Long.MAX_VALUE - n).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

break;
}
}
}
if (!emittingStarted) {
// we haven't started yet, so record what was requested and return
Expand All @@ -122,16 +132,20 @@ public void request(long n) {
}

void emit(long previousRequested) {
if (requested < 0) {
if (requested == Long.MAX_VALUE) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ha! That's a fun bug, totally missed that when we went from -1 to MAX_VALUE.

I bet it isn't even worth having the complexity of a fast-path because the cost of queue/deque is far worse than the counter, and we only hit the counter at the very end anyways.

// fast-path without backpressure
try {
for (Object value : deque) {
notification.accept(subscriber, value);
if (previousRequested == 0) {
try {
for (Object value : deque) {
notification.accept(subscriber, value);
}
} catch (Throwable e) {
subscriber.onError(e);
} finally {
deque.clear();
}
} catch (Throwable e) {
subscriber.onError(e);
} finally {
deque.clear();
} else {
// backpressure path will handle Long.MAX_VALUE and emit the rest events.
}
} else {
// backpressure is requested
Expand Down Expand Up @@ -160,7 +174,6 @@ void emit(long previousRequested) {
// we're done emitting the number requested so return
return;
}

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@

import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.functions.Func1;
import rx.functions.Functions;
import rx.internal.util.RxRingBuffer;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
Expand Down Expand Up @@ -148,4 +150,118 @@ public Integer call(Integer i) {
};
}

@Test
public void testIssue1522() {
// https://github.com/Netflix/RxJava/issues/1522
assertEquals(0, Observable
.empty()
.count()
.filter(Functions.alwaysFalse())
.toList()
.toBlocking().single().size());
}

@Test
public void testIgnoreRequest1() {
// If `takeLast` does not ignore `request` properly, StackOverflowError will be thrown.
Observable.range(0, 100000).takeLast(100000).subscribe(new Subscriber<Integer>() {

@Override
public void onStart() {
request(Long.MAX_VALUE);
}

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {
}

@Override
public void onNext(Integer integer) {
request(Long.MAX_VALUE);
}
});
}

@Test
public void testIgnoreRequest2() {
// If `takeLast` does not ignore `request` properly, StackOverflowError will be thrown.
Observable.range(0, 100000).takeLast(100000).subscribe(new Subscriber<Integer>() {

@Override
public void onStart() {
request(1);
}

@Override
public void onCompleted() {
}

@Override
public void onError(Throwable e) {
}

@Override
public void onNext(Integer integer) {
request(1);
}
});
}

@Test(timeout = 30000)
public void testIgnoreRequest3() {
// If `takeLast` does not ignore `request` properly, it will enter an infinite loop.
Observable.range(0, 100000).takeLast(100000).subscribe(new Subscriber<Integer>() {

@Override
public void onStart() {
request(1);
}

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {
}

@Override
public void onNext(Integer integer) {
request(Long.MAX_VALUE);
}
});
}


@Test
public void testIgnoreRequest4() {
// If `takeLast` does not ignore `request` properly, StackOverflowError will be thrown.
Observable.range(0, 100000).takeLast(100000).subscribe(new Subscriber<Integer>() {

@Override
public void onStart() {
request(Long.MAX_VALUE);
}

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {
}

@Override
public void onNext(Integer integer) {
request(1);
}
});
}
}