Skip to content

Commit

Permalink
fix(skipUntil): unsubscribe source when it completes
Browse files Browse the repository at this point in the history
Fix operator skipUntil to automatically unsubscribe from its
source whenever it completes. This is to conform RxJS Next with RxJS 4.

Resolves issue ReactiveX#577.
  • Loading branch information
Andre Medeiros authored and staltz committed Oct 30, 2015
1 parent 97e2258 commit 8a4162b
Showing 1 changed file with 9 additions and 4 deletions.
13 changes: 9 additions & 4 deletions src/operators/skipUntil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import Operator from '../Operator';
import Subscriber from '../Subscriber';
import Observable from '../Observable';

export default function skipUntil(total) {
return this.lift(new SkipUntilOperator(total));
export default function skipUntil<T>(notifier: Observable<any>): Observable<T> {
return this.lift(new SkipUntilOperator(notifier));
}

class SkipUntilOperator<T, R> implements Operator<T, R> {
Expand All @@ -18,8 +18,9 @@ class SkipUntilOperator<T, R> implements Operator<T, R> {
class SkipUntilSubscriber<T> extends Subscriber<T> {
private notificationSubscriber: NotificationSubscriber<any> = null;

constructor(destination: Subscriber<T>, private notifier: Observable<any>) {
super(destination);
constructor(public destination: Subscriber<T>,
private notifier: Observable<any>) {
super(null);
this.notificationSubscriber = new NotificationSubscriber(this);
this.add(this.notifier.subscribe(this.notificationSubscriber));
}
Expand All @@ -30,6 +31,10 @@ class SkipUntilSubscriber<T> extends Subscriber<T> {
}
}

_error(err: any) {
this.destination.error(err);
}

_complete() {
if (this.notificationSubscriber.hasCompleted) {
this.destination.complete();
Expand Down

0 comments on commit 8a4162b

Please sign in to comment.