From c2e2d29600d6b3495bec756d4356b821c583eaf4 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 14 Sep 2015 21:36:56 -0700 Subject: [PATCH] fix(switchAll): switch all will properly handle async observables --- spec/operators/switchAll-spec.js | 10 +++++- src/operators/switchAll.ts | 62 ++++++++++++++++++++++---------- 2 files changed, 52 insertions(+), 20 deletions(-) diff --git a/spec/operators/switchAll-spec.js b/spec/operators/switchAll-spec.js index 08d79528fd..9731cc6c7c 100644 --- a/spec/operators/switchAll-spec.js +++ b/spec/operators/switchAll-spec.js @@ -1,4 +1,4 @@ -/* expect, it, describe */ +/* expect, it, describe, expectObserable, hot, cold */ var Rx = require('../../dist/cjs/Rx'); var Observable = Rx.Observable; @@ -26,4 +26,12 @@ describe('Observable.prototype.switchAll()', function(){ expect(x).toBe(r[i++]); }, null, done); }); + + it('should handle a hot observable of observables', function() { + var x = cold( '--a---b---c--|'); + var y = cold( '---d--e---f---|'); + var e1 = hot( '------x-------y------|', { x: x, y: y }); + var expected = '--------a---b----d--e---f---|'; + expectObservable(e1.switchAll()).toBe(expected); + }); }); \ No newline at end of file diff --git a/src/operators/switchAll.ts b/src/operators/switchAll.ts index 98044e863f..3e900bc31f 100644 --- a/src/operators/switchAll.ts +++ b/src/operators/switchAll.ts @@ -19,31 +19,55 @@ class SwitchOperator implements Operator { } } -class SwitchSubscriber extends MergeSubscriber { - +class SwitchSubscriber extends Subscriber { + private active: number = 0; + private hasCompleted: boolean = false; innerSubscription: Subscription; - constructor(destination: Observer) { - super(destination, 1); + constructor(destination: Observer) { + super(destination); } - - _buffer(value) { - const active = this.active; - if(active > 0) { - this.active = active - 1; - const inner = this.innerSubscription; - if(inner) { - inner.unsubscribe() - this.innerSubscription = null; - } + + _next(value: any) { + this.active++; + this.unsubscribeInner(); + this.add(this.innerSubscription = value.subscribe(new InnerSwitchSubscriber(this.destination, this))); + } + + _complete() { + this.hasCompleted = true; + if(this.active === 0) { + this.destination.complete(); + } + } + + unsubscribeInner() { + const innerSubscription = this.innerSubscription; + if(innerSubscription) { + this.active--; + innerSubscription.unsubscribe(); + this.remove(innerSubscription); } - this._next(value); } + + notifyComplete() { + this.unsubscribeInner(); + if(this.hasCompleted && this.active === 0) { + this.destination.complete(); + } + } +} - _subscribeInner(observable, value, index) { - this.innerSubscription = new Subscription(); - this.innerSubscription.add(super._subscribeInner(observable, value, index)); - return this.innerSubscription; +class InnerSwitchSubscriber extends Subscriber { + constructor(destination: Observer, private parent: SwitchSubscriber) { + super(destination); + } + + _next(value: T) { + super._next(value); + } + _complete() { + this.parent.notifyComplete(); } }