-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
How important is the "firehose" problem? #12
Comments
So, it's a problem... but it's a bit of an edge case. The problem usually comes from composition where people are creating "inner subscriptions" for things like custom-implemented operators that subscribe to observables to "flatten them" like To illustrate the issue, I'll just use a function (not like the whole pipeable operator thing from RxJS): /**
* A function that flattens an observable of observables
*/
function flatten<T>(sources: Observable<Observable<T>>) {
return new Observable<T>(subscriber => {
let outerComplete = false;
const innerSubscriptions = new Set<Subscription>();
const sourceSubscription = sources.subscribe({
next: (innerSource) => {
const innerSubscription = innerSource.subscribe({
next: value => subscriber.next(value),
error: err => subscriber.error(err),
complete: () => {
innerSubscriptions.delete(innerSubscription);
if (outerComplete && innerSubscriptions.size === 0) {
subscriber.complete();
}
}
})
/*** THE PROBLEM EXISTS IN THIS TEMPORAL SPACE ***/
innerSubscriptions.add(innerSubscription);
},
error: err => subscriber.error(err),
complete: () => {
outerComplete = true;
if (innerSubscriptions.size === 0) {
subscriber.complete();
}
}
});
return () => {
sourceSubscription.unsubscribe();
for (const subscription of innerSubscriptions) {
subscription.unsubscribe();
}
innerSubscriptions.clear();
}
})
}
function take<T>(source: Observable<T>, count: number) {
return new Observable<T>(subscriber => {
let counter = 0
const subscription = source.subscribe({
next: (value) => {
if (counter < count) {
subscriber.next(value);
if (++counter === count) {
subscriber.complete(); // NOTE: causes teardown
}
}
},
error: err => subscriber.error(err),
complete: () => subscriber.complete(),
})
return () => subscription.unsubscribe() // teardown
})
}
const firehose = new Observable<number>(subscriber => {
let n = 0;
while (!subscriber.closed) {
subscriber.next(n++);
}
}); And then we use it like this: const source = Observable.from(['start'])
const flattened = flatten(source);
const threeValues = take(flattened, 3);
// Logs three values, then locks the thread! :)
threeValues.subscribe(console.log); The problem is that since we have to wait for the inner observable's subscribe call to complete and return before we get a handle we can use to tear it down, we can't flip the inner observable "closed" in time. It still only emits the 3 values, but that RxJS has been handling this internally by composing subscriptions through chaining There is an RFC right now about moving to a more token-like API that resolves this issue (and a couple of other more RxJS-related issues)... although in RxJS's case we're leveraging The only problems I have with
But again, I think those are solvable issues. I'm 100% amenable to providing APIs in RxJS-land that utilize |
TLDR here is that I think moving to a token-based cancellation mechanism resolves any issues you'd have with a "synchronous firehose". |
I think I kinda follow the example above, but I'm struggling for a few reasons.
Even then, when I experiment with it I get an error because Given all of that, I'm happy we're going the token-based cancellation route and that it would help stop issues in scenarios like this. It sounds like there isn't much actionable here, so shall we close this @benlesh? |
Yes, that was an oversight. I didn't test the code :) I've updated it.
When you A very important design feature of observable is that teardown will always be called if:
Yes we can close this |
@benlesh I'd like to try and disambiguate some variable names in your example and then try and play around with it. Would you mind releasing it as free software? (Could be as simple as a gist with the MIT license's copyright line and terms at the top.) Do we have an example implementation of Observables that I could run this with?
|
Just try it out in the chromium implementation under chrome://flags. |
A 2019 attempt at reviving the old TC39 proposal involved an API simplification that incidentally allowed Observables to handle the "synchronous firehose of data" problem. That is, it allows a subscriber to unsubscribe (via AbortController, in that proposal) while an Observable is synchronously pushing perhaps a "firehose" of data to the next() handler before
subscribe()
returns (see this example).My understanding is that neither RxJS nor zen-observable currently support this, but if we're generalizing the API for the platform maybe we should consider it? Designing Observables such that they handle this seems nice, as they can unsubscribe via token based cancelation in the next() handler and even before subscribing is complete if the producer pushes "enough" (or "too much") data.
On the other hand, it could get a little clunky. With this approach,
subscribe()
returns nothing and the assumption is that your only interaction with a "subscription" is via the AbortController that you must create before subscribing. That's not terrible, but it could get worse if we ever wanted to give more functionality to a "subscription" beyond just cancellation. In that world we might need to introduce aSubscription
object thatsubscribe()
returns, but then you'd have two objects to keep track of for any given subscription: the AbortController that you created before callingsubscribe()
and then theSubscription
returned fromsubscribe()
.That, plus the fact that two major Observable libraries today don't support this (IIUC) might mean there is not enough appetite to cater our design to it. Thoughts?
The text was updated successfully, but these errors were encountered: