From a2244e0abb9bf536e6e024b3f949ccdee0e9b975 Mon Sep 17 00:00:00 2001 From: Pierre Guilleminot Date: Wed, 11 Nov 2015 14:49:08 +0100 Subject: [PATCH] feat(operator): add skipWhile operator --- spec/operators/skipWhile-spec.js | 181 +++++++++++++++++++++++++++++++ src/CoreOperators.ts | 1 + src/Rx.KitchenSink.ts | 3 + src/Rx.ts | 5 +- src/operators/skipWhile.ts | 48 ++++++++ 5 files changed, 237 insertions(+), 1 deletion(-) create mode 100644 spec/operators/skipWhile-spec.js create mode 100644 src/operators/skipWhile.ts diff --git a/spec/operators/skipWhile-spec.js b/spec/operators/skipWhile-spec.js new file mode 100644 index 0000000000..a6f404fa3f --- /dev/null +++ b/spec/operators/skipWhile-spec.js @@ -0,0 +1,181 @@ +/* globals describe, it, expect, expectObservable, expectSubscriptions, hot */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.prototype.skipWhile()', function () { + it('should skip all elements with a true predicate', function () { + var source = hot('-1-^2--3--4--5--6--|'); + var sourceSubs = '^ !'; + var expected = '----------------|'; + + expectObservable(source.skipWhile(function () { return true; })).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + + it('should skip all elements with a truthy predicate', function () { + var source = hot('-1-^2--3--4--5--6--|'); + var sourceSubs = '^ !'; + var expected = '----------------|'; + + expectObservable(source.skipWhile(function () { return {}; })).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + + it('should not skip any element with a false predicate', function () { + var source = hot('-1-^2--3--4--5--6--|'); + var sourceSubs = '^ !'; + var expected = '-2--3--4--5--6--|'; + + expectObservable(source.skipWhile(function () { return false; })).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + + it('should not skip any elements with a falsy predicate', function () { + var source = hot('-1-^2--3--4--5--6--|'); + var sourceSubs = '^ !'; + var expected = '-2--3--4--5--6--|'; + + expectObservable(source.skipWhile(function () { return undefined; })).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + + it('should skip all elements until predicate is false', function () { + var source = hot('-1-^2--3--4--5--6--|'); + var sourceSubs = '^ !'; + var expected = '-------4--5--6--|'; + + var predicate = function (v) { + return +v < 4; + }; + + expectObservable(source.skipWhile(predicate)).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + + it('should skip elements on hot source', function () { + var source = hot('--1--2-^-3--4--5--6--7--8--'); + var sourceSubs = '^ '; + var expected = '--------5--6--7--8--'; + + var predicate = function (v) { + return +v < 5; + }; + + expectObservable(source.skipWhile(predicate)).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + + it('should be possible to skip using the element\'s index', function () { + var source = hot('--a--b-^-c--d--e--f--g--h--|'); + var sourceSubs = '^ !'; + var expected = '--------e--f--g--h--|'; + + var predicate = function (v, index) { + return index < 2; + }; + + expectObservable(source.skipWhile(predicate)).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + + it('should skip using index with source unsubscribes early', function () { + var source = hot('--a--b-^-c--d--e--f--g--h--|'); + var sourceSubs = '^ !'; + var unsub = '-----------!'; + var expected = '-----d--e---'; + + var predicate = function (v, index) { + return index < 1; + }; + + expectObservable(source.skipWhile(predicate), unsub).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + + it('should skip using value with source throws', function () { + var source = hot('--a--b-^-c--d--e--f--g--h--#'); + var sourceSubs = '^ !'; + var expected = '-----d--e--f--g--h--#'; + + var predicate = function (v) { + return v !== 'd'; + }; + + expectObservable(source.skipWhile(predicate)).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + + it('should invoke predicate while its false and never again', function () { + var source = hot('--a--b-^-c--d--e--f--g--h--|'); + var sourceSubs = '^ !'; + var expected = '--------e--f--g--h--|'; + + var invoked = 0; + var predicate = function (v) { + invoked++; + return v !== 'e'; + }; + + expectObservable( + source.skipWhile(predicate).do(null, null, function () { + expect(invoked).toBe(3); + }) + ).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + + it('should handle predicate that throws', function () { + var source = hot('--a--b-^-c--d--e--f--g--h--|'); + var sourceSubs = '^ !'; + var expected = '--------#'; + + var predicate = function (v) { + if (v === 'e') { + throw new Error('nom d\'une pipe !'); + } + + return v !== 'f'; + }; + + expectObservable(source.skipWhile(predicate)).toBe(expected, undefined, new Error('nom d\'une pipe !')); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + + it('should accept a thisArg', function () { + var source = hot('-1-^--2--3--4--5--6--|'); + var sourceSubs = '^ !'; + var expected = '---------4--5--6--|'; + + function Skiper() { + this.doSkip = function (v) { return +v < 4; }; + } + + var skiper = new Skiper(); + + expectObservable( + source.skipWhile(function (v) { return this.doSkip(v); }, skiper) + ).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + + it('should handle Observable.empty', function () { + var source = Observable.empty(); + var expected = '|'; + + expectObservable(source.skipWhile(function () { return true; })).toBe(expected); + }); + + it('should handle Observable.never', function () { + var source = Observable.never(); + var expected = '-'; + + expectObservable(source.skipWhile(function () { return true; })).toBe(expected); + }); + + it('should handle Observable.throw', function () { + var source = Observable.throw(new Error('oh no!')); + var expected = '#'; + + expectObservable(source.skipWhile(function () { return true; })).toBe(expected, undefined, new Error('oh no!')); + }); +}); diff --git a/src/CoreOperators.ts b/src/CoreOperators.ts index 75322fbd7d..c5be2fce4d 100644 --- a/src/CoreOperators.ts +++ b/src/CoreOperators.ts @@ -68,6 +68,7 @@ export interface CoreOperators { single?: (predicate?: (value: T, index: number) => boolean, thisArg?: any) => Observable; skip?: (count: number) => Observable; skipUntil?: (notifier: Observable) => Observable; + skipWhile?: (predicate: (x: T, index: number) => boolean, thisArg?: any) => Observable; startWith?: (x: T) => Observable; subscribeOn?: (scheduler: Scheduler, delay?: number) => Observable; switch?: () => Observable; diff --git a/src/Rx.KitchenSink.ts b/src/Rx.KitchenSink.ts index 238862f58c..a55014dd43 100644 --- a/src/Rx.KitchenSink.ts +++ b/src/Rx.KitchenSink.ts @@ -258,6 +258,9 @@ observableProto.skip = skip; import {skipUntil} from './operators/skipUntil'; observableProto.skipUntil = skipUntil; +import {skipWhile} from './operators/skipWhile'; +observableProto.skipWhile = skipWhile; + import {startWith} from './operators/startWith'; observableProto.startWith = startWith; diff --git a/src/Rx.ts b/src/Rx.ts index 88ac4b7de7..efbcf95b0f 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -218,6 +218,9 @@ observableProto.skip = skip; import {skipUntil} from './operators/skipUntil'; observableProto.skipUntil = skipUntil; +import {skipWhile} from './operators/skipWhile'; +observableProto.skipWhile = skipWhile; + import {startWith} from './operators/startWith'; observableProto.startWith = startWith; @@ -316,4 +319,4 @@ export { Notification, EmptyError, ArgumentOutOfRangeError -}; \ No newline at end of file +}; diff --git a/src/operators/skipWhile.ts b/src/operators/skipWhile.ts new file mode 100644 index 0000000000..46365645da --- /dev/null +++ b/src/operators/skipWhile.ts @@ -0,0 +1,48 @@ +import {Observable} from '../Observable'; +import {Operator} from '../Operator'; +import {Subscriber} from '../Subscriber'; +import {tryCatch} from '../util/tryCatch'; +import {errorObject} from '../util/errorObject'; +import {bindCallback} from '../util/bindCallback'; + +export function skipWhile(predicate: (x: T, index: number) => boolean, thisArg?: any): Observable { + return this.lift(new SkipWhileOperator(predicate, thisArg)); +} + +class SkipWhileOperator implements Operator { + private predicate: (x: T, index: number) => boolean; + + constructor(predicate: (x: T, index: number) => boolean, thisArg?: any) { + this.predicate = <(x: T, index: number) => boolean>bindCallback(predicate, thisArg, 2); + } + + call(subscriber: Subscriber): Subscriber { + return new SkipWhileSubscriber(subscriber, this.predicate); + } +} + +class SkipWhileSubscriber extends Subscriber { + private skipping: boolean = true; + private index: number = 0; + + constructor(destination: Subscriber, + private predicate: (x: T, index: number) => boolean) { + super(destination); + } + + _next(value: T): void { + const destination = this.destination; + if (this.skipping === true) { + const index = this.index++; + const result = tryCatch(this.predicate)(value, index); + if (result === errorObject) { + destination.error(result.e); + } else { + this.skipping = Boolean(result); + } + } + if (this.skipping === false) { + destination.next(value); + } + } +}