Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ObserveOn Operator with Backpressure #835

Merged
merged 10 commits into from
Feb 9, 2014

Conversation

benjchristensen
Copy link
Member

This is a re-implementation of ObserveOn done for 3 purposes:

  1. Migrate to new lift style so the Subscription flows through correctly
  2. Eliminate the wasted first schedule step (reported by @mttkay Scheduler Outer/Inner [Preview] #797 (comment))
  3. Eliminate buffering so it naturally provides back pressure

Unit tests are passing on my machine for a full build, but I'm interested in user experience, particularly from Android users where observeOn behavior is mission critical.

This WILL change behavior and possibly cause problems if observeOn was being used to make something async (which is not the purpose of observeOn. The subscribeOn operator is the one to use for making something async concurrent. [Update: Per discussions below we may want another operator or observeOn overload to allow async behavior to use a queue and decouple producer/consumer. The observeOn operator however should by default only do what it says - switch threads.]

This is a complete re-write so I may very well have created nasty bugs that we don't yet have unit tests for so I'd appreciate a thorough review and testing.

One possible concern is performance. In the case of a synchronous firehose (not the normal use case, but a use case nonetheless), this is far slower because now each onNext is sent one-at-a-time across thread boundaries whereas before it would put everything into a queue on one side then read it all from the queue on the other side.

Thus, for this code that synchronously firehoses 100000 values it is about 7 times faster on the current code without back-pressure:

long last = Observable.range(0, 100000).observeOn(Schedulers.newThread()).toBlockingObservable().last();

However, this code is the same performance:

long last = Observable.interval(1, TimeUnit.MILLISECONDS).take(1000).observeOn(Schedulers.newThread()).toBlockingObservable().last();

The difference being that we're not just filling up the queue in the second one as it has latency.

I believe this is okay as the intent of observeOn is moving across thread boundaries, such as for UI events, and eliminating the bad behavior of buffer-bloat is the right thing.

I'm sure we can find some performance improvements in this code, but it's worth pointing out how the change in behavior can affect it.

@benjchristensen
Copy link
Member Author

@headinthebox This relates to discussions you and I have had about eliminating implicit buffers to enable natural back-pressure unless operators such as buffer and window are explicitly requested. I'd appreciate your review of the change in behavior.

@cloudbees-pull-request-builder

RxJava-pull-requests #756 FAILURE
Looks like there's a problem with this pull request

@headinthebox
Copy link
Contributor

@benjchristensen Will do!

We might consider to add an explicit buffer operator to decouple producer and consumers such that is people want they can get the old behavior of observon, whith all caveats about buffer bloat of course.

In .NET you can that by .ToEnumerable().ToObservable() but that is quite a hack, and maybe not giving you exactly what you want.

@benjchristensen
Copy link
Member Author

RxJava-pull-requests #756 FAILURE

That test is non-deterministic and needs to get fixed.

@benjchristensen
Copy link
Member Author

We might consider to add an explicit buffer operator to decouple producer and consumers

I'm open to that. Are you thinking just an overload of buffer, or an overload of observeOn that accepts buffer options? I think a buffer overload makes sense.

@headinthebox
Copy link
Contributor

You are thinking of buffer that takes a scheduler then?

@benjchristensen
Copy link
Member Author

You are thinking of buffer that takes a scheduler then?

I was thinking so, but looking at the buffer signature again I think it may be confusing.

Observable<List<T>> buffer(int count)

That would not line up well with:

Observable<T> buffer(Scheduler s)

Otherwise we could have observeOn(Scheduler s, Queue impl) or observeOn(Scheduler s, int bufferSize) (it defaults to 1).

I don't think we have enough use case to warrant the Queue type though.

@headinthebox
Copy link
Contributor

Yes, plus that buffer has many overloads for overlapping ones. Also agreed on queue. Having observeOn with size (and what about time, like replay, i.e. buffer n ticks) seems like a good start.

@akarnokd
Copy link
Member

akarnokd commented Feb 7, 2014

I did some testing speed testing with 1M range observed on newThread
Original:
473 828,612 ops/s
444 320,052 ops/s
411 198,937 ops/s
416 063,006 ops/s
420 909,267 ops/s

This PR
108 350,741 ops/s
92 500,636 ops/s
104 746,970 ops/s
112 972,259 ops/s
135 964,434 ops/s

Semaphore + queue with permits, observeOn code is here:
1: 100 000 ops/s, tests pass
2: 350 000 ops/s, backpressure tests fail
4: 1 300 000 ops/s, backpressure tests fail, testParallel fails
8: 1 900 000 ops/s, backpressure tests fail, testParallel fails
16: 2 500 000 ops/s, backpressure tests fail, testParallel fails
32: 2 300 000 ops/s, backpressure tests fail, testParallel fails
64: 2 200 000 ops/s, backpressure tests fail, testParallel fails

The testParallel seems to fail because the scheduling is interrupted before the last values in the queue could be
processed. If I remove the interrupt check from poll, any-sized testParallel passes.

Semaphore + queue with permits and batch drain of the queue, code

1: 95 000 ops/s
2: 272 000 ops/s
4: 900 000 ops/s
8: 1 900 000 ops/s
16: 3 074 000 ops/s
32: 5 400 000 ops/s
64: 8 700 000 ops/s
128: 7 700 000 ops/s
256: 7 600 000 ops/s
512: 11 086 000 ops/s

Using batch drain appears to be more efficient when the queue size > 16 on my machine. I think the reason for this is that permits are released in larger numbers and producers less frequently need to wait for a 0 -> 1 permit transition.
Going above 512 resulted in significant fluctuations between 8M - 13M.

The two latter TransferQueue based implementations fluctuate quite a bit between 600k - 1200k for some reason. (Tested on i7 920 @ 2.66GHz, Win7x64, 6GB DDR3&1333MHz, Java 7 u51 x64).

I tried to improve performance of the 1 permit case via TransferQueue, but I couldn't get reliable data exchange nor could I implement proper interruption.

akarnokd and others added 10 commits February 8, 2014 14:23
See ReactiveX#713
It was causing non-deterministic behavior, random test failures and poor performance.
ObserveOn was the wrong mechanism for delaying behavior as it was relying on the buffering of observeOn.
Now using delay() to delay the group since observeOn no longer buffers.
Since we are blocking the producer on* notifications we need to interrupt it on unsubscribe events.
I need to do it on the data structure and not the thread as the thread could change for each onNext and that could have unexpected consequences.
The ObserveOn operator is for moving where it executes, not making it async. SubscribeOn makes it async.
@benjchristensen
Copy link
Member Author

I have updated the code to support observeOn(Scheduler s, int bufferSize) and it uses a ring buffer in the implementation.

The buffer allows getting async behavior and increased throughput:

     * --- version 0.17.1 => with queue size == 1
     * 
     * Run: 10 - 115,033 ops/sec
     * Run: 11 - 118,155 ops/sec
     * Run: 12 - 120,526 ops/sec
     * Run: 13 - 115,035 ops/sec
     * Run: 14 - 116,102 ops/sec
     * 
     * --- version 0.17.1 => with queue size == 16
     * 
     * Run: 10 - 850,412 ops/sec
     * Run: 11 - 711,642 ops/sec
     * Run: 12 - 788,332 ops/sec
     * Run: 13 - 1,064,056 ops/sec
     * Run: 14 - 656,857 ops/sec
     * 
     * --- version 0.17.1 => with queue size == 1000000
     * 
     * Run: 10 - 5,162,622 ops/sec
     * Run: 11 - 5,271,481 ops/sec
     * Run: 12 - 4,442,470 ops/sec
     * Run: 13 - 5,149,330 ops/sec
     * Run: 14 - 5,146,680 ops/sec

However, it's still slower than the previous implementation with an unbounded queue:

     * --- version 0.16.1
     * 
     * Run: 10 - 27,098,802 ops/sec
     * Run: 11 - 24,204,284 ops/sec
     * Run: 12 - 27,208,663 ops/sec
     * Run: 13 - 26,879,552 ops/sec
     * Run: 14 - 26,658,846 ops/sec

@benjchristensen
Copy link
Member Author

I'm going to proceed with the merge as this gets the functionality and API to what is wanted and the performance is adequate for the use case. I'd be interested if anyone else wants to look at how to further improve the performance.

benjchristensen added a commit that referenced this pull request Feb 9, 2014
ObserveOn Operator with Backpressure
@benjchristensen benjchristensen merged commit 52cd2cd into ReactiveX:master Feb 9, 2014
@benjchristensen benjchristensen deleted the observeOn branch February 9, 2014 00:26
@benjchristensen
Copy link
Member Author

I've been playing with this over the weekend with 0.17.0-RC1 and think we need to revert the default back-pressure behavior. I think it is relevant as an option to be applied, but it's too risky as the default behavior if the source Observable is being emitted from an event loop (such as an NIO selector thread).

I'm going to submit a change today that splits out OperatorObserveOn and OperatorObserveOnBounded and leaves the default observeOn(Scheduler s) using the unbounded version.

Further discussion of providing tools for back-pressure will be picked up in the 0.18 dev cycle.

jihoonson pushed a commit to jihoonson/RxJava that referenced this pull request Mar 6, 2020
…figuration class. The bean declarations required more specific return types. (ReactiveX#835)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants