-
Notifications
You must be signed in to change notification settings - Fork 3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(withLatestFrom): add higher-order lettable version of withLatest…
…From
- Loading branch information
Showing
3 changed files
with
152 additions
and
87 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,149 @@ | ||
import { Operator } from '../Operator'; | ||
import { Subscriber } from '../Subscriber'; | ||
import { Observable, ObservableInput } from '../Observable'; | ||
import { OuterSubscriber } from '../OuterSubscriber'; | ||
import { InnerSubscriber } from '../InnerSubscriber'; | ||
import { subscribeToResult } from '../util/subscribeToResult'; | ||
import { OperatorFunction } from '../interfaces'; | ||
|
||
/* tslint:disable:max-line-length */ | ||
export function withLatestFrom<T, R>(project: (v1: T) => R): OperatorFunction<T, R>; | ||
export function withLatestFrom<T, T2, R>(v2: ObservableInput<T2>, project: (v1: T, v2: T2) => R): OperatorFunction<T, R>; | ||
export function withLatestFrom<T, T2, T3, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, project: (v1: T, v2: T2, v3: T3) => R): OperatorFunction<T, R>; | ||
export function withLatestFrom<T, T2, T3, T4, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, project: (v1: T, v2: T2, v3: T3, v4: T4) => R): OperatorFunction<T, R>; | ||
export function withLatestFrom<T, T2, T3, T4, T5, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, project: (v1: T, v2: T2, v3: T3, v4: T4, v5: T5) => R): OperatorFunction<T, R>; | ||
export function withLatestFrom<T, T2, T3, T4, T5, T6, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>, project: (v1: T, v2: T2, v3: T3, v4: T4, v5: T5, v6: T6) => R): OperatorFunction<T, R> ; | ||
export function withLatestFrom<T, T2>(v2: ObservableInput<T2>): OperatorFunction<T, [T, T2]>; | ||
export function withLatestFrom<T, T2, T3>(v2: ObservableInput<T2>, v3: ObservableInput<T3>): OperatorFunction<T, [T, T2, T3]>; | ||
export function withLatestFrom<T, T2, T3, T4>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>): OperatorFunction<T, [T, T2, T3, T4]>; | ||
export function withLatestFrom<T, T2, T3, T4, T5>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>): OperatorFunction<T, [T, T2, T3, T4, T5]>; | ||
export function withLatestFrom<T, T2, T3, T4, T5, T6>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>): OperatorFunction<T, [T, T2, T3, T4, T5, T6]> ; | ||
export function withLatestFrom<T, R>(...observables: Array<ObservableInput<any> | ((...values: Array<any>) => R)>): OperatorFunction<T, R>; | ||
export function withLatestFrom<T, R>(array: ObservableInput<any>[]): OperatorFunction<T, R>; | ||
export function withLatestFrom<T, R>(array: ObservableInput<any>[], project: (...values: Array<any>) => R): OperatorFunction<T, R>; | ||
/* tslint:enable:max-line-length */ | ||
|
||
/** | ||
* Combines the source Observable with other Observables to create an Observable | ||
* whose values are calculated from the latest values of each, only when the | ||
* source emits. | ||
* | ||
* <span class="informal">Whenever the source Observable emits a value, it | ||
* computes a formula using that value plus the latest values from other input | ||
* Observables, then emits the output of that formula.</span> | ||
* | ||
* <img src="./img/withLatestFrom.png" width="100%"> | ||
* | ||
* `withLatestFrom` combines each value from the source Observable (the | ||
* instance) with the latest values from the other input Observables only when | ||
* the source emits a value, optionally using a `project` function to determine | ||
* the value to be emitted on the output Observable. All input Observables must | ||
* emit at least one value before the output Observable will emit a value. | ||
* | ||
* @example <caption>On every click event, emit an array with the latest timer event plus the click event</caption> | ||
* var clicks = Rx.Observable.fromEvent(document, 'click'); | ||
* var timer = Rx.Observable.interval(1000); | ||
* var result = clicks.withLatestFrom(timer); | ||
* result.subscribe(x => console.log(x)); | ||
* | ||
* @see {@link combineLatest} | ||
* | ||
* @param {ObservableInput} other An input Observable to combine with the source | ||
* Observable. More than one input Observables may be given as argument. | ||
* @param {Function} [project] Projection function for combining values | ||
* together. Receives all values in order of the Observables passed, where the | ||
* first parameter is a value from the source Observable. (e.g. | ||
* `a.withLatestFrom(b, c, (a1, b1, c1) => a1 + b1 + c1)`). If this is not | ||
* passed, arrays will be emitted on the output Observable. | ||
* @return {Observable} An Observable of projected values from the most recent | ||
* values from each input Observable, or an array of the most recent values from | ||
* each input Observable. | ||
* @method withLatestFrom | ||
* @owner Observable | ||
*/ | ||
export function withLatestFrom<T, R>(...args: Array<ObservableInput<any> | ((...values: Array<any>) => R)>): OperatorFunction<T, R> { | ||
return (source: Observable<T>) => { | ||
let project: any; | ||
if (typeof args[args.length - 1] === 'function') { | ||
project = args.pop(); | ||
} | ||
const observables = <Observable<any>[]>args; | ||
return source.lift(new WithLatestFromOperator(observables, project)); | ||
}; | ||
} | ||
|
||
class WithLatestFromOperator<T, R> implements Operator<T, R> { | ||
constructor(private observables: Observable<any>[], | ||
private project?: (...values: any[]) => Observable<R>) { | ||
} | ||
|
||
call(subscriber: Subscriber<R>, source: any): any { | ||
return source.subscribe(new WithLatestFromSubscriber(subscriber, this.observables, this.project)); | ||
} | ||
} | ||
|
||
/** | ||
* We need this JSDoc comment for affecting ESDoc. | ||
* @ignore | ||
* @extends {Ignored} | ||
*/ | ||
class WithLatestFromSubscriber<T, R> extends OuterSubscriber<T, R> { | ||
private values: any[]; | ||
private toRespond: number[] = []; | ||
|
||
constructor(destination: Subscriber<R>, | ||
private observables: Observable<any>[], | ||
private project?: (...values: any[]) => Observable<R>) { | ||
super(destination); | ||
const len = observables.length; | ||
this.values = new Array(len); | ||
|
||
for (let i = 0; i < len; i++) { | ||
this.toRespond.push(i); | ||
} | ||
|
||
for (let i = 0; i < len; i++) { | ||
let observable = observables[i]; | ||
this.add(subscribeToResult<T, R>(this, observable, <any>observable, i)); | ||
} | ||
} | ||
|
||
notifyNext(outerValue: T, innerValue: R, | ||
outerIndex: number, innerIndex: number, | ||
innerSub: InnerSubscriber<T, R>): void { | ||
this.values[outerIndex] = innerValue; | ||
const toRespond = this.toRespond; | ||
if (toRespond.length > 0) { | ||
const found = toRespond.indexOf(outerIndex); | ||
if (found !== -1) { | ||
toRespond.splice(found, 1); | ||
} | ||
} | ||
} | ||
|
||
notifyComplete() { | ||
// noop | ||
} | ||
|
||
protected _next(value: T) { | ||
if (this.toRespond.length === 0) { | ||
const args = [value, ...this.values]; | ||
if (this.project) { | ||
this._tryProject(args); | ||
} else { | ||
this.destination.next(args); | ||
} | ||
} | ||
} | ||
|
||
private _tryProject(args: any[]) { | ||
let result: any; | ||
try { | ||
result = this.project.apply(this, args); | ||
} catch (err) { | ||
this.destination.error(err); | ||
return; | ||
} | ||
this.destination.next(result); | ||
} | ||
} |