From cc92f45e830b0dac29151d50107a9bb070cd3ba1 Mon Sep 17 00:00:00 2001 From: Andre Medeiros Date: Fri, 16 Oct 2015 19:26:42 +0300 Subject: [PATCH] fix(retry): fix internal unsubscriptions for retry Fix retry operator to unsubscribe from the source whenever the source emits an error and a new retry will be attempted. Also fix the operator to unsubscribe the internal retried subscription to the source whenever the result Observable is unsubscribed. Resolves issue #546. --- src/operators/retry.ts | 76 +++++++++++++++++++++++++++++++++++------- 1 file changed, 64 insertions(+), 12 deletions(-) diff --git a/src/operators/retry.ts b/src/operators/retry.ts index 8cb10d288e..5b000c9d74 100644 --- a/src/operators/retry.ts +++ b/src/operators/retry.ts @@ -2,36 +2,88 @@ import Operator from '../Operator'; import Observer from '../Observer'; import Subscriber from '../Subscriber'; import Observable from '../Observable'; +import Subscription from '../Subscription'; export default function retry(count: number = 0): Observable { return this.lift(new RetryOperator(count, this)); } class RetryOperator implements Operator { - constructor(private count: number, protected original: Observable) { + constructor(private count: number, + protected source: Observable) { } call(subscriber: Subscriber): Subscriber { - return new RetrySubscriber(subscriber, this.count, this.original); + return new FirstRetrySubscriber(subscriber, this.count, this.source); } } -class RetrySubscriber extends Subscriber { - private retries: number = 0; - constructor(destination: Subscriber, private count: number, private original: Observable) { - super(destination); +class FirstRetrySubscriber extends Subscriber { + private lastSubscription: Subscription; + + constructor(public destination: Subscriber, + private count: number, + private source: Observable) { + super(null); + this.lastSubscription = this; + } + + _next(value: T) { + this.destination.next(value); + } + + error(error?) { + if (!this.isUnsubscribed) { + super.unsubscribe(); + this.resubscribe(); + } + } + + _complete() { + super.unsubscribe(); + this.destination.complete(); + } + + unsubscribe() { + const lastSubscription = this.lastSubscription; + if (lastSubscription === this) { + super.unsubscribe(); + } else { + lastSubscription.unsubscribe(); + } + } + + resubscribe(retried: number = 0) { + this.lastSubscription.unsubscribe(); + const nextSubscriber = new RetryMoreSubscriber(this, this.count, retried + 1); + this.lastSubscription = this.source.subscribe(nextSubscriber); + } +} + +class RetryMoreSubscriber extends Subscriber { + constructor(private parent: FirstRetrySubscriber, + private count: number, + private retried: number = 0) { + super(null); + } + + _next(value: T) { + this.parent.destination.next(value); } _error(err: any) { + const parent = this.parent; + const retried = this.retried; const count = this.count; - if (count && count === (this.retries += 1)) { - this.destination.error(err); + + if (count && retried === count) { + parent.destination.error(err); } else { - this.resubscribe(); + parent.resubscribe(retried); } } - resubscribe() { - this.original.subscribe(this); + _complete() { + this.parent.destination.complete(); } -} \ No newline at end of file +}