Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

observeOn: allow configurable buffer size #3777

Merged
merged 1 commit into from
Mar 23, 2016
Merged

observeOn: allow configurable buffer size #3777

merged 1 commit into from
Mar 23, 2016

Conversation

srvaroa
Copy link
Contributor

@srvaroa srvaroa commented Mar 17, 2016

The observeOn operator is backed by a small queue of 128 slots that may
overflow quickly on slow producers. This could only be avoided by
adding a backpressure operator before the observeOn (not only
inconvenient, but also taking a perf. hit as it forces hops between two
queues).

This patch allows modifying the default queue size on the observeOn
operator.

Fixes: #3751
Signed-off-by: Galo Navarro [email protected]

* @see #observeOn(Scheduler)
* @see #observeOn(Scheduler, boolean)
*/
public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add observeOn(Scheduler, boolean delayError, int bufferSize) overload as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still miss the overload observeOn(Scheduler, boolean delayError, int bufferSize)

@akarnokd
Copy link
Member

👍

@@ -523,14 +523,21 @@ public boolean hasNext() {
assertEquals(7, generated.get());
}

/**
* {@link BackpressureTests.testObserveOnWithSlowConsumer} also covers the case with default buffer size.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this link won't work, I guess

@artem-zinnatullin
Copy link
Contributor

👍 though i'd leave old test for default buffer size and just add a new one

@srvaroa
Copy link
Contributor Author

srvaroa commented Mar 20, 2016

Rebased, fixed the javadoc link, and a spelling mistake.

@artem-zinnatullin I reused this one to avoid redundancy as the code path is virtually the same, and there are tests (the ones referenced in the @link) which use the default size. Let me know if you still prefer the 2 cases and I'll fix that.

@artem-zinnatullin
Copy link
Contributor

Yeah, I understand, just afraid that one may change linked tests and we won't cover overload.

// one day we'll setup code coverage, I hope

@Test
public void testQueueFullEmitsError() {
final CountDownLatch latch = new CountDownLatch(1);
// randomize buffer size, note that underlying implementations may be tuning the real size to a power of 2
// which can lead to unexpected results when adding excess capacity (e.g.: see ConcurrentCircularArrayQueue)
final int capacity = (int)Math.pow(2, new Random().nextInt(10));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @artem-zinnatullin, leave this test as it was and create a new one with a loop over some queue sizes:

for (int i = 1; i <= 1024; i = i * 2) {
   final int capacity = i;
   // ...
}

The observeOn operator is backed by a small queue of 128 slots that may
overflow quickly on slow producers.  This could only be avoided by
adding a backpressure operator before the observeOn (not only
inconvenient, but also taking a perf. hit as it forces hops between two
queues).

This patch allows modifying the default queue size on the observeOn
operator.

Fixes: #3751
Signed-off-by: Galo Navarro <[email protected]>
@srvaroa
Copy link
Contributor Author

srvaroa commented Mar 20, 2016

Agh thanks @akarnokd, I didn't notice I changed computers so I was back on the 1st patch and lost the overload. Resubmitted based on the one with all overloads, and added the extra test instead of rewriting it. Hopefully fine now.

@akarnokd
Copy link
Member

👍

@@ -581,6 +581,69 @@ public void onNext(Integer t) {
}

@Test
public void testQueueFullEmitsErrorWithVaryingBufferSize() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: test prefix is not needed

@srvaroa
Copy link
Contributor Author

srvaroa commented Mar 23, 2016

I've seen some failures on the test locally and there is also a bug in the test (the loop should start at 2, not 1). I can't look into it right now but will take a look asap.

@stevegury
Copy link
Member

👍

@akarnokd
Copy link
Member

I'm merging this. The changes are algorithmically solid so I'm not sure what failures you see locally.

@akarnokd akarnokd merged commit d72a41e into ReactiveX:1.x Mar 23, 2016
@srvaroa
Copy link
Contributor Author

srvaroa commented Mar 24, 2016

Thanks @akarnokd. The failure was no error notifications emitted at random sizes, but I did notice Travis didn't complain. Ping me if it surfaces.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants