Skip to content

Commit

Permalink
feat(first): add resultSelector
Browse files Browse the repository at this point in the history
closes #417
  • Loading branch information
benlesh committed Oct 1, 2015
1 parent 4d973a3 commit 3c20fcc
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 16 deletions.
18 changes: 15 additions & 3 deletions spec/operators/first-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ describe('Observable.prototype.first()', function() {
it('should return the default value if source observable was empty', function() {
var e1 = hot('-----^----|');
var expected = '-----(a|)';
expectObservable(e1.first(null, null, 'a')).toBe(expected);
expectObservable(e1.first(null, null, null, 'a')).toBe(expected);
});

it('should propagate error from the source observable', function() {
Expand Down Expand Up @@ -63,7 +63,7 @@ describe('Observable.prototype.first()', function() {
expect(this).toEqual(42);
return value % 2 === 1;
};
expectObservable(e1.first(predicate, 42)).toBe(expected, {c: 3});
expectObservable(e1.first(predicate, null, 42)).toBe(expected, {c: 3});
});

it('should error when no value matches the predicate', function() {
Expand All @@ -81,7 +81,7 @@ describe('Observable.prototype.first()', function() {
var predicate = function (value) {
return value === 's';
};
expectObservable(e1.first(predicate, null, 'd')).toBe(expected);
expectObservable(e1.first(predicate, null, null, 'd')).toBe(expected);
});

it('should propagate error when no value matches the predicate', function() {
Expand Down Expand Up @@ -114,4 +114,16 @@ describe('Observable.prototype.first()', function() {
};
expectObservable(e1.first(predicate)).toBe(expected, null, 'error');
});

it('should support a result selector argument', function() {
var e1 = hot('--a--^---b---c---d---e--|');
var expected = '--------(x|)';
var predicate = function (x){ return x === 'c'; };
var resultSelector = function(x, i) {
expect(i).toBe(1);
expect(x).toBe('c');
return 'x';
};
expectObservable(e1.first(predicate, resultSelector)).toBe(expected);
});
});
2 changes: 1 addition & 1 deletion src/CoreOperators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export interface CoreOperators<T> {
expand?: (project: (x: T, ix: number) => Observable<any>) => Observable<any>;
filter?: (predicate: (x: T) => boolean, ix?: number, thisArg?: any) => Observable<T>;
finally?: (ensure: () => void, thisArg?: any) => Observable<T>;
first?: (predicate?: (value: T, index: number, source: Observable<T>) => boolean, thisArg?: any, defaultValue?: any) => Observable<T>;
first?: <R>(predicate?: (value: T, index: number, source: Observable<T>) => boolean, resultSelector?: (value:T, index: number) => R, thisArg?: any, defaultValue?: any) => Observable<R>;
flatMap?: <R>(project: ((x: T, ix: number) => Observable<any>), projectResult?: (x: T, y: any, ix: number, iy: number) => R, concurrent?: number) => Observable<R>;
flatMapTo?: <R>(observable: Observable<any>, projectResult?: (x: T, y: any, ix: number, iy: number) => R, concurrent?: number) => Observable<R>;
groupBy?: <T, R>(keySelector: (value:T) => string, durationSelector?: (group:GroupSubject<R>) => Observable<any>, elementSelector?: (value:T) => R) => Observable<R>;
Expand Down
32 changes: 20 additions & 12 deletions src/operators/first.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,35 @@ import {errorObject} from '../util/errorObject';
import bindCallback from '../util/bindCallback';
import EmptyError from '../util/EmptyError';

export default function first<T>(predicate?: (value: T, index: number, source: Observable<T>) => boolean,
export default function first<T, R>(predicate?: (value: T, index: number, source: Observable<T>) => boolean,
resultSelector?: (value: T, index: number) => R,
thisArg?: any,
defaultValue?: any): Observable<T> {
return this.lift(new FirstOperator(predicate, thisArg, defaultValue, this));
defaultValue?: any): Observable<R> {
return this.lift(new FirstOperator(predicate, thisArg, resultSelector, defaultValue, this));
}

class FirstOperator<T, R> implements Operator<T, R> {
constructor(private predicate?: (value: T, index: number, source: Observable<T>) => boolean,
private thisArg?: any,
private resultSelector?: (value: T, index: number) => R,
private defaultValue?: any,
private source?: Observable<T>) {
}

call(observer: Subscriber<R>): Subscriber<T> {
return new FirstSubscriber(
observer, this.predicate, this.thisArg, this.defaultValue, this.source
);
return new FirstSubscriber(observer, this.predicate, this.thisArg, this.resultSelector, this.defaultValue, this.source);
}
}

class FirstSubscriber<T> extends Subscriber<T> {
class FirstSubscriber<T, R> extends Subscriber<T> {
private predicate: Function;
private index: number = 0;
private hasCompleted: boolean = false;

constructor(destination: Observer<T>,
predicate?: (value: T, index: number, source: Observable<T>) => boolean,
private thisArg?: any,
private resultSelector?: (value: T, index: number) => R,
private defaultValue?: any,
private source?: Observable<T>) {
super(destination);
Expand All @@ -44,18 +45,25 @@ class FirstSubscriber<T> extends Subscriber<T> {
}
}

_next(value: T) {
const destination = this.destination;
const predicate = this.predicate;
_next(value: any) {
const { destination, predicate, resultSelector } = this;
const index = this.index++;
let passed: any = true;
if (predicate) {
passed = tryCatch(predicate)(value, this.index++, this.source);
passed = tryCatch(predicate)(value,index, this.source);
if (passed === errorObject) {
destination.error(passed.e);
destination.error(errorObject.e);
return;
}
}
if (passed) {
if(resultSelector) {
value = tryCatch(resultSelector)(value, index);
if(value === errorObject) {
destination.error(errorObject.e);
return;
}
}
destination.next(value);
destination.complete();
this.hasCompleted = true;
Expand Down

0 comments on commit 3c20fcc

Please sign in to comment.