-
Notifications
You must be signed in to change notification settings - Fork 3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(take): add higher-order lettable version of take
- Loading branch information
Showing
3 changed files
with
90 additions
and
47 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
import { Operator } from '../Operator'; | ||
import { Subscriber } from '../Subscriber'; | ||
import { ArgumentOutOfRangeError } from '../util/ArgumentOutOfRangeError'; | ||
import { EmptyObservable } from '../observable/EmptyObservable'; | ||
import { Observable } from '../Observable'; | ||
import { TeardownLogic } from '../Subscription'; | ||
import { MonoTypeOperatorFunction } from '../interfaces'; | ||
|
||
/** | ||
* Emits only the first `count` values emitted by the source Observable. | ||
* | ||
* <span class="informal">Takes the first `count` values from the source, then | ||
* completes.</span> | ||
* | ||
* <img src="./img/take.png" width="100%"> | ||
* | ||
* `take` returns an Observable that emits only the first `count` values emitted | ||
* by the source Observable. If the source emits fewer than `count` values then | ||
* all of its values are emitted. After that, it completes, regardless if the | ||
* source completes. | ||
* | ||
* @example <caption>Take the first 5 seconds of an infinite 1-second interval Observable</caption> | ||
* var interval = Rx.Observable.interval(1000); | ||
* var five = interval.take(5); | ||
* five.subscribe(x => console.log(x)); | ||
* | ||
* @see {@link takeLast} | ||
* @see {@link takeUntil} | ||
* @see {@link takeWhile} | ||
* @see {@link skip} | ||
* | ||
* @throws {ArgumentOutOfRangeError} When using `take(i)`, it delivers an | ||
* ArgumentOutOrRangeError to the Observer's `error` callback if `i < 0`. | ||
* | ||
* @param {number} count The maximum number of `next` values to emit. | ||
* @return {Observable<T>} An Observable that emits only the first `count` | ||
* values emitted by the source Observable, or all of the values from the source | ||
* if the source emits fewer than `count` values. | ||
* @method take | ||
* @owner Observable | ||
*/ | ||
export function take<T>(count: number): MonoTypeOperatorFunction<T> { | ||
return (source: Observable<T>) => { | ||
if (count === 0) { | ||
return new EmptyObservable<T>(); | ||
} else { | ||
return source.lift(new TakeOperator(count)); | ||
} | ||
}; | ||
} | ||
|
||
class TakeOperator<T> implements Operator<T, T> { | ||
constructor(private total: number) { | ||
if (this.total < 0) { | ||
throw new ArgumentOutOfRangeError; | ||
} | ||
} | ||
|
||
call(subscriber: Subscriber<T>, source: any): TeardownLogic { | ||
return source.subscribe(new TakeSubscriber(subscriber, this.total)); | ||
} | ||
} | ||
|
||
/** | ||
* We need this JSDoc comment for affecting ESDoc. | ||
* @ignore | ||
* @extends {Ignored} | ||
*/ | ||
class TakeSubscriber<T> extends Subscriber<T> { | ||
private count: number = 0; | ||
|
||
constructor(destination: Subscriber<T>, private total: number) { | ||
super(destination); | ||
} | ||
|
||
protected _next(value: T): void { | ||
const total = this.total; | ||
const count = ++this.count; | ||
if (count <= total) { | ||
this.destination.next(value); | ||
if (count === total) { | ||
this.destination.complete(); | ||
this.unsubscribe(); | ||
} | ||
} | ||
} | ||
} |