-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Serialization Behavior #998
Comments
I recommend the following:
This guarantees items can't sit indefinitely in the queue, and will not live lock of callers indefinitely. |
So it's clear what this means, the Rx contract is to never inject concurrency unless a This means every |
The livelock is only an issue when there's already concurrency (watch out for re-entrancy though), since other threads have to be adding data... but I suppose if every one of them was in some context the async-thingy we spun off might violate that. What about some sort of hand-off? After the limit is exceeded, the current consumer sets a flag that others check when they're producing. The first producer that comes along is then forced to block so the consumer can hand-off to them after it finishes the next job. So we double-pay the latency of a job, once on the current consumer and once on blocking the to-be consumer, but gain not blocking anything indefinitely. |
The problem I see with this is that we have no idea how quickly the |
The backpressure (#1000) solution will affect the design of this. In particular it will mean the buffer becomes bounded and may naturally prevent thread starvation as the fast producers will park themselves. The addition of backpressure machinery may or may not necessitate the additional concurrent consumer. |
I suggest changing the current implementation to tradeoff for risking starvation rather than delaying delivery. The starvation problem seems less likely to occur and a symptom of other problems (lack of backpressure?). Or should we leave this as is until we get the backpressure stuff in place? I'm not all that comfortable yet adding more threads to deal with serializing other threads. It feels wrong and makes for a confusing API (particularly on |
I would vote for that as a temporary fix until we have the back pressure stuff in place. |
Sounds acceptable to me. |
And me. Interesting discussion. |
Please review changes on this pull request: #999 In particular this commit: benjchristensen@5b317ad |
(moving comment from pull-request) apologies for joining this a little late, but had a couple of observations/questions
From what I understand of the current solution, this means that in Vert.x the short-circuit where no queue is created will always be executed and there will be minimal impact. It also means that Vert.x is relying on not introducing Concurrency without an explicitly provided Scheduler (aside: I assume naively this never happens?) as the user would also need a matching observeOn() with a customer Scheduler to ensure the thread isolation is preserved. |
Hi Peter,
This assumes that multiple network requests on different event loops are never happening. Perhaps in Vert.x this is prevented, but it is definitely not the case in many apps using NIO/Netty where multiple network calls on separate event loops can all be merged together.
I don't understand how synchronization/serialization relates to this comment, can you please elaborate?
A pipeline processing quickly is not sufficient since Rx supports streams, not just scalar responses, thus a stream from one thread can starve out another stream on a different thread if merged together. Each We are seeking a solution where instead of the other thread(s) being blocked, they can asynchronously deliver their results and move on to continue processing other work (if they are event loops). To complete this solution we need back pressure (#1000) so as to park/pause streams that are too fast for the consumer (which in this case could just be the merge bottleneck).
If everything is always done on the same event-loop, yes, you would have the best-case scenario and should see little-to-no change.
This isn't quite right. Rx never injects concurrency without an operator asking for it, and in all those cases where it is required there is an overload that accepts a This is why I'm against using a separate thread in If you want to force all of these to always run on the Vert.x event-loop(s) then you may want to leverage the |
Hi Ben, (aware this might be too Vert.x specific for this issue so if you'd prefer to move to a different issue / group thread just lmk)
Right. This is particular to Vert.x where all callbacks to requests from a Verticle instance are serialized back to that instance on the same event loop thread.
It doesn't directly relate, other than that both thread-stealing and injecting concurrency would most likely break in Vert.x. The current solution only works because the existing serialization prevents the problem case happening (something I wasn't 100% on and wanted to confirm).
Understood - and back pressure/throttling is the only real solution.
Yep. Just wanted to confirm that.
That was my understanding, but my early reading of the original request was that it was being proposed as a solution - glad you've decided against it.
The problem is that the blocking operations cannot run on the event loop thread, but the subscriptions would have to - and more specifically run on the same thread that triggered the operator (not any event loop thread). I think that might require a custom scheduler that has to be passed into Operators and/or observeOn() (might use |
How is this different than if In short, if inside a Vert.x vertical there is ever a case when multiple threads are being merged together, both
What blocking operations are you referring to? The only blocking operations I'm aware of (after eliminating |
The only case was where serialization always deferred to a seperate thread for delivery.
Indeed. Even a tight-loop or CPU intensive code would be problematic as the threading model means that occupying a thread will starve other operations that are pinned to the same thread (even if there are other cores available) |
Yes, but this is typical of any event-loop system. The goal is for RxJava to have nothing that would block a thread (park/sleep/wait) but that does not prevent a developer from writing computationally intensive code that saturates a thread/loop. If someone is going to do computationally intensive work they would need to use |
Splitting hairs at this stage - but yes. A system that pins work to the same thread is just more susceptible to becoming unbalanced vs one that runs events on a pool of threads.
Vert.x has a separate model for this - a WorkerVerticle. it uses a separate thread-pool and interfaces via the async event bus (so no additional thread synchronization is needed) That said, it is quite coarse grained, so providing support for subscribeOn()/observeOn() might be worthwhile. |
We have removed |
Opening this issue to capture, document and discuss how serialization (
serialize()
,merge()
,flatMap()
,mergeMap()
) behaves and is implemented.Prior to 0.17.1 all serialization was done via blocking synchronization. This is how Rx.Net does it so we adopted the same pattern. This however breaks the model of Rx which is supposed to be non-blocking and be usable in non-blocking environments (think NIO event loops like Netty, Vert.x and Node.js-style apps). Blocking a thread while merging can significantly impact throughput in an application based on event-loops.
The migration to doing "serialization" instead of "synchronization" came with trade-offs.
Back Pressure
To be non-blocking means it must becomes async and allow threads to deliver their notifications, queueing if necessary and return. This can result in buffer-bloat and typical back-pressure problems. Solutions to this are being explored internally and with other teams/companies and will result in changes in the future.
Concurrency
One way of solving the problem without blocking is similar to
observeOn
and everything gets dropped into a queue and another thread pulls off the queue.This however means that we are injecting additional concurrency in places it is not expected and generally not desired.
The current implementation does not do this. It uses the threads that are pushing events through and "steals" a single thread at a time to push through whatever is in the queue and then itself and then return to do its own work.
Thread Starvation
Stealing threads opens up the possibility of thread starvation. If a thread loops continually to drain the queue and the queue always is getting filled by other threads it will never be released to do its own work. This would means that the events intended for it to deliver would never be delivered as it is always busy delivering events on behalf of other threads.
Delayed Delivery
To prevent thread starvation the current implementation only allows draining the queue once. This can be increased to multiple iterations, but at some point it stops draining and returns and allows another thread to "win" and start draining.
During the time gap between the draining thread finishing and and a new thread taking over there may be a delay where events stay in the queue. This can delay delivery of events.
In a fast moving stream this is not a problem as another thread immediately takes over. In an intermittent stream however this can possibly mean long, non-determistic delays.
Possible Improvements
There are a few ways to improve this situation without reverting back to blocking.
Metrics could be kept to know if a stream is fast moving and thus thread-starvation is an issue and draining should be handed off to another thread. If starvation is not an issue then the queue could be fully drained before returning. This is still not perfect and would still risk one of the two occurring, but could probably solve most cases. The difficult is doing so without significantly impacting normal performance.
Another option is conditionally scheduling delivery onto another
Scheduler
when starvation is happening. This would allow most cases to be done by stealing threads, but flip to anobserveOn
style model if contention and/or starvation is happening.Next steps
If the current functionality is breaking your use cases, you may want to stay on 0.17.0 while working with us to improve. Most use cases have shown to work fine with 0.17.1 behavior and the non-blocking and deadlock-free characteristics are necessary.
I welcome discussion, unit tests, pull requests and assistance on this.
Existing performance tests (that need work):
Existing unit tests:
The text was updated successfully, but these errors were encountered: