diff --git a/src/CoreOperators.ts b/src/CoreOperators.ts index 70ead563c1..75322fbd7d 100644 --- a/src/CoreOperators.ts +++ b/src/CoreOperators.ts @@ -75,6 +75,7 @@ export interface CoreOperators { switchMapTo?: (observable: Observable, projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable; take?: (count: number) => Observable; takeUntil?: (notifier: Observable) => Observable; + throttle?: (durationSelector: (value: T) => Observable | Promise) => Observable; throttleTime?: (delay: number, scheduler?: Scheduler) => Observable; timeout?: (due: number | Date, errorToSend?: any, scheduler?: Scheduler) => Observable; timeoutWith?: (due: number | Date, withObservable: Observable, scheduler?: Scheduler) => Observable | Observable; diff --git a/src/Observable.ts b/src/Observable.ts index ffe050aac5..f861100ddb 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -228,6 +228,7 @@ export class Observable implements CoreOperators { switchMapTo: (observable: Observable, projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable; take: (count: number) => Observable; takeUntil: (notifier: Observable) => Observable; + throttle: (durationSelector: (value: T) => Observable | Promise) => Observable; throttleTime: (delay: number, scheduler?: Scheduler) => Observable; timeout: (due: number | Date, errorToSend?: any, scheduler?: Scheduler) => Observable; timeoutWith: (due: number | Date, withObservable: Observable, scheduler?: Scheduler) => Observable | Observable; diff --git a/src/Rx.KitchenSink.ts b/src/Rx.KitchenSink.ts index 395037279e..238862f58c 100644 --- a/src/Rx.KitchenSink.ts +++ b/src/Rx.KitchenSink.ts @@ -285,6 +285,9 @@ observableProto.take = take; import {takeUntil} from './operators/takeUntil'; observableProto.takeUntil = takeUntil; +import {throttle} from './operators/throttle'; +observableProto.throttle = throttle; + import {throttleTime} from './operators/throttleTime'; observableProto.throttleTime = throttleTime; diff --git a/src/Rx.ts b/src/Rx.ts index fe7c04983b..88ac4b7de7 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -239,6 +239,9 @@ observableProto.take = take; import {takeUntil} from './operators/takeUntil'; observableProto.takeUntil = takeUntil; +import {throttle} from './operators/throttle'; +observableProto.throttle = throttle; + import {throttleTime} from './operators/throttleTime'; observableProto.throttleTime = throttleTime; diff --git a/src/operators/throttle.ts b/src/operators/throttle.ts new file mode 100644 index 0000000000..36b2babe72 --- /dev/null +++ b/src/operators/throttle.ts @@ -0,0 +1,84 @@ +import {Operator} from '../Operator'; +import {Observable} from '../Observable'; +import {PromiseObservable} from '../observables/PromiseObservable'; +import {Subscriber} from '../Subscriber'; +import {Subscription} from '../Subscription'; + +import {tryCatch} from '../util/tryCatch'; +import {isPromise} from '../util/isPromise'; +import {errorObject} from '../util/errorObject'; + +export function throttle(durationSelector: (value: T) => Observable | Promise): Observable { + return this.lift(new ThrottleOperator(durationSelector)); +} + +class ThrottleOperator implements Operator { + constructor(private durationSelector: (value: T) => Observable | Promise) { + } + + call(subscriber: Subscriber): Subscriber { + return new ThrottleSubscriber(subscriber, this.durationSelector); + } +} + +class ThrottleSubscriber extends Subscriber { + private throttled: Subscription; + + constructor(destination: Subscriber, + private durationSelector: (value: T) => Observable | Promise) { + super(destination); + } + + _next(value: T): void { + if (!this.throttled) { + const destination = this.destination; + let duration = tryCatch(this.durationSelector)(value); + if (duration === errorObject) { + destination.error(errorObject.e); + return; + } + if (isPromise(duration)) { + duration = PromiseObservable.create(duration); + } + this.add(this.throttled = duration._subscribe(new ThrottleDurationSelectorSubscriber(this))); + destination.next(value); + } + } + + _error(err: any): void { + this.clearThrottle(); + super._error(err); + } + + _complete(): void { + this.clearThrottle(); + super._complete(); + } + + clearThrottle(): void { + const throttled = this.throttled; + if (throttled) { + throttled.unsubscribe(); + this.remove(throttled); + this.throttled = null; + } + } +} + +class ThrottleDurationSelectorSubscriber extends Subscriber { + constructor(private parent: ThrottleSubscriber) { + super(null); + } + + _next(unused: T): void { + this.parent.clearThrottle(); + } + + _error(err): void { + this.parent.error(err); + } + + _complete(): void { + this.parent.clearThrottle(); + } +}