Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(find): unsubscribe from source when found #3968

Merged
merged 5 commits into from
Aug 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 30 additions & 13 deletions spec/operators/find-spec.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import { expect } from 'chai';
import { find, mergeMap } from 'rxjs/operators';
import { find, mergeMap, delay } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { of, Observable, from } from 'rxjs';

declare function asDiagram(arg: string): Function;

declare const rxTestScheduler: TestScheduler;

/** @test {find} */
describe('find operator', () => {
function truePredicate(x: any) {
Expand All @@ -19,13 +22,13 @@ describe('find operator', () => {

const predicate = function (x: number) { return x % 5 === 0; };

expectObservable((<any>source).pipe(find(predicate))).toBe(expected, values);
expectObservable(source.pipe(find(predicate))).toBe(expected, values);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should throw if not provided a function', () => {
expect(() => {
(<any>of('yut', 'yee', 'sam')).pipe(find('yee' as any));
of('yut', 'yee', 'sam').pipe(find('yee' as any));
}).to.throw(TypeError, 'predicate is not a function');
});

Expand All @@ -34,7 +37,7 @@ describe('find operator', () => {
const subs = '^';
const expected = '-';

expectObservable((<any>source).pipe(find(truePredicate))).toBe(expected);
expectObservable(source.pipe(find(truePredicate))).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

Expand All @@ -43,7 +46,7 @@ describe('find operator', () => {
const subs = '(^!)';
const expected = '(x|)';

const result = (<any>source).pipe(find(truePredicate));
const result = source.pipe(find(truePredicate));

expectObservable(result).toBe(expected, {x: undefined});
expectSubscriptions(source.subscriptions).toBe(subs);
Expand All @@ -58,7 +61,7 @@ describe('find operator', () => {
return value === 'a';
};

expectObservable((<any>source).pipe(find(predicate))).toBe(expected);
expectObservable(source.pipe(find(predicate))).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

Expand All @@ -71,7 +74,7 @@ describe('find operator', () => {
return value === 'b';
};

expectObservable((<any>source).pipe(find(predicate))).toBe(expected);
expectObservable(source.pipe(find(predicate))).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

Expand All @@ -87,7 +90,7 @@ describe('find operator', () => {
return value === this.target;
};

expectObservable((<any>source).pipe(find(predicate, finder))).toBe(expected);
expectObservable(source.pipe(find(predicate, finder))).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

Expand All @@ -100,7 +103,7 @@ describe('find operator', () => {
return value === 'z';
};

expectObservable((<any>source).pipe(find(predicate))).toBe(expected, { x: undefined });
expectObservable(source.pipe(find(predicate))).toBe(expected, { x: undefined });
expectSubscriptions(source.subscriptions).toBe(subs);
});

Expand All @@ -110,7 +113,7 @@ describe('find operator', () => {
const expected = '------- ';
const unsub = ' ! ';

const result = (<any>source).pipe(find((value: string) => value === 'z'));
const result = source.pipe(find((value: string) => value === 'z'));

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
Expand All @@ -122,7 +125,7 @@ describe('find operator', () => {
const expected = '------- ';
const unsub = ' ! ';

const result = (<any>source).pipe(
const result = source.pipe(
mergeMap((x: string) => of(x)),
find((value: string) => value === 'z'),
mergeMap((x: string) => of(x))
Expand All @@ -132,6 +135,20 @@ describe('find operator', () => {
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should unsubscribe when the predicate is matched', () => {
const source = hot('--a--b---c-|');
const subs = '^ !';
const expected = '-------(b|)';

const duration = rxTestScheduler.createTime('--|');

expectObservable(source.pipe(
find((value: string) => value === 'b'),
delay(duration, rxTestScheduler)
)).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should raise if source raise error while element does not match with predicate', () => {
const source = hot('--a--b--#');
const subs = '^ !';
Expand All @@ -141,7 +158,7 @@ describe('find operator', () => {
return value === 'z';
};

expectObservable((<any>source).pipe(find(predicate))).toBe(expected);
expectObservable(source.pipe(find(predicate))).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

Expand All @@ -154,7 +171,7 @@ describe('find operator', () => {
throw 'error';
};

expectObservable((<any>source).pipe(find(predicate))).toBe(expected);
expectObservable(source.pipe(find(predicate))).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

Expand Down
43 changes: 30 additions & 13 deletions spec/operators/findIndex-spec.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import { findIndex, mergeMap } from 'rxjs/operators';
import { findIndex, mergeMap, delay } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { of } from 'rxjs';

declare function asDiagram(arg: string): Function;

declare const rxTestScheduler: TestScheduler;

/** @test {findIndex} */
describe('findIndex operator', () => {
function truePredicate(x: any) {
Expand All @@ -18,7 +21,7 @@ describe('findIndex operator', () => {

const predicate = function (x: number) { return x % 5 === 0; };

expectObservable((<any>source).pipe(findIndex(predicate))).toBe(expected, { x: 2 });
expectObservable(source.pipe(findIndex(predicate))).toBe(expected, { x: 2 });
expectSubscriptions(source.subscriptions).toBe(subs);
});

Expand All @@ -27,7 +30,7 @@ describe('findIndex operator', () => {
const subs = '^';
const expected = '-';

expectObservable((<any>source).pipe(findIndex(truePredicate))).toBe(expected);
expectObservable(source.pipe(findIndex(truePredicate))).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

Expand All @@ -36,7 +39,7 @@ describe('findIndex operator', () => {
const subs = '(^!)';
const expected = '(x|)';

const result = (<any>source).pipe(findIndex(truePredicate));
const result = source.pipe(findIndex(truePredicate));

expectObservable(result).toBe(expected, {x: -1});
expectSubscriptions(source.subscriptions).toBe(subs);
Expand All @@ -52,7 +55,7 @@ describe('findIndex operator', () => {
return value === sourceValue;
};

expectObservable((<any>source).pipe(findIndex(predicate))).toBe(expected, { x: 0 });
expectObservable(source.pipe(findIndex(predicate))).toBe(expected, { x: 0 });
expectSubscriptions(source.subscriptions).toBe(subs);
});

Expand All @@ -65,7 +68,7 @@ describe('findIndex operator', () => {
return value === 7;
};

expectObservable((<any>source).pipe(findIndex(predicate))).toBe(expected, { x: 1 });
expectObservable(source.pipe(findIndex(predicate))).toBe(expected, { x: 1 });
expectSubscriptions(source.subscriptions).toBe(subs);
});

Expand All @@ -78,7 +81,7 @@ describe('findIndex operator', () => {
const predicate = function (this: typeof sourceValues, value: number) {
return value === this.b;
};
const result = (<any>source).pipe(findIndex(predicate, sourceValues));
const result = source.pipe(findIndex(predicate, sourceValues));

expectObservable(result).toBe(expected, { x: 1 });
expectSubscriptions(source.subscriptions).toBe(subs);
Expand All @@ -93,7 +96,7 @@ describe('findIndex operator', () => {
return value === 'z';
};

expectObservable((<any>source).pipe(findIndex(predicate))).toBe(expected, { x: -1 });
expectObservable(source.pipe(findIndex(predicate))).toBe(expected, { x: -1 });
expectSubscriptions(source.subscriptions).toBe(subs);
});

Expand All @@ -103,7 +106,7 @@ describe('findIndex operator', () => {
const expected = '------- ';
const unsub = ' ! ';

const result = (<any>source).pipe(findIndex((value: string) => value === 'z'));
const result = source.pipe(findIndex((value: string) => value === 'z'));

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
Expand All @@ -115,16 +118,30 @@ describe('findIndex operator', () => {
const expected = '------- ';
const unsub = ' ! ';

const result = (<any>source).pipe(
const result = source.pipe(
mergeMap((x: string) => of(x)),
findIndex((value: string) => value === 'z'),
mergeMap((x: string) => of(x))
mergeMap((x: number) => of(x))
);

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should unsubscribe when the predicate is matched', () => {
const source = hot('--a--b---c-|');
const subs = '^ !';
const expected = '-------(x|)';

const duration = rxTestScheduler.createTime('--|');

expectObservable(source.pipe(
findIndex((value: string) => value === 'b'),
delay(duration, rxTestScheduler)
)).toBe(expected, { x: 1 });
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should raise if source raise error while element does not match with predicate', () => {
const source = hot('--a--b--#');
const subs = '^ !';
Expand All @@ -134,7 +151,7 @@ describe('findIndex operator', () => {
return value === 'z';
};

expectObservable((<any>source).pipe(findIndex(predicate))).toBe(expected);
expectObservable(source.pipe(findIndex(predicate))).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

Expand All @@ -147,7 +164,7 @@ describe('findIndex operator', () => {
throw 'error';
};

expectObservable((<any>source).pipe(findIndex(predicate))).toBe(expected);
expectObservable(source.pipe(findIndex(predicate))).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});
19 changes: 18 additions & 1 deletion spec/operators/first-spec.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import { expect } from 'chai';
import { hot, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { first, mergeMap } from 'rxjs/operators';
import { first, mergeMap, delay } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { of, from, Observable, Subject, EmptyError } from 'rxjs';

declare function asDiagram(arg: string): Function;

declare const rxTestScheduler: TestScheduler;

/** @test {first} */
describe('Observable.prototype.first', () => {
asDiagram('first')('should take the first value of an observable with many values', () => {
Expand Down Expand Up @@ -101,6 +104,20 @@ describe('Observable.prototype.first', () => {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should unsubscribe when the first value is receiv', () => {
const source = hot('--a--b---c-|');
const subs = '^ !';
const expected = '----(a|)';

const duration = rxTestScheduler.createTime('--|');

expectObservable(source.pipe(
first(),
delay(duration, rxTestScheduler)
)).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should return first value that matches a predicate', () => {
const e1 = hot('--a-^--b--c--a--c--|');
const expected = '------(c|)';
Expand Down
1 change: 1 addition & 0 deletions src/internal/operators/find.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ export class FindValueSubscriber<T> extends Subscriber<T> {

destination.next(value);
destination.complete();
this.unsubscribe();
}

protected _next(value: T): void {
Expand Down