diff --git a/perf/micro/immediate-scheduler/operators/single-predicate-this.js b/perf/micro/immediate-scheduler/operators/single-predicate-this.js new file mode 100644 index 0000000000..24b34eb547 --- /dev/null +++ b/perf/micro/immediate-scheduler/operators/single-predicate-this.js @@ -0,0 +1,26 @@ +var RxOld = require("rx"); +var RxNew = require("../../../../index"); + +module.exports = function (suite) { + + var predicate = function(value, i) { + return value === 20; + }; + + var testThis = {}; + + var oldSinglePredicateThisArg = RxOld.Observable.range(0, 50, RxOld.Scheduler.immediate).single(predicate, testThis); + var newSinglePredicateThisArg = RxNew.Observable.range(0, 50).single(predicate, testThis); + + return suite + .add('old single(predicate, thisArg) with immediate scheduler', function () { + oldSinglePredicateThisArg.subscribe(_next, _error, _complete); + }) + .add('new single(predicate, thisArg) with immediate scheduler', function () { + newSinglePredicateThisArg.subscribe(_next, _error, _complete); + }); + + function _next(x) { } + function _error(e){ } + function _complete(){ } +}; \ No newline at end of file diff --git a/perf/micro/immediate-scheduler/operators/single-predicate.js b/perf/micro/immediate-scheduler/operators/single-predicate.js new file mode 100644 index 0000000000..c4570c4ccf --- /dev/null +++ b/perf/micro/immediate-scheduler/operators/single-predicate.js @@ -0,0 +1,24 @@ +var RxOld = require("rx"); +var RxNew = require("../../../../index"); + +module.exports = function (suite) { + + var predicate = function(value, i) { + return value === 20; + }; + + var oldSinglePredicate = RxOld.Observable.range(0, 50, RxOld.Scheduler.immediate).single(predicate); + var newSinglePredicate = RxNew.Observable.range(0, 50).single(predicate); + + return suite + .add('old single() with immediate scheduler', function () { + oldSinglePredicate.subscribe(_next, _error, _complete); + }) + .add('new single() with immediate scheduler', function () { + newSinglePredicate.subscribe(_next, _error, _complete); + }); + + function _next(x) { } + function _error(e){ } + function _complete(){ } +}; \ No newline at end of file diff --git a/spec/operators/single-spec.js b/spec/operators/single-spec.js new file mode 100644 index 0000000000..5073a736f6 --- /dev/null +++ b/spec/operators/single-spec.js @@ -0,0 +1,101 @@ +/* globals describe, it, expect, expectObservable, hot, cold */ +var Rx = require('../../dist/cjs/Rx'); + +describe('Observable.prototype.single()', function() { + it('Should raise error from empty predicate if observable does not emit', function() { + var e1 = hot('--a--^--|'); + var expected = '---#'; + + expectObservable(e1.single()).toBe(expected, null, new Rx.EmptyError); + }); + + it('Should return only element from empty predicate if observable emits only once', function() { + var e1 = hot('--a--|'); + var expected = '-----(a|)'; + + expectObservable(e1.single()).toBe(expected); + }); + + it('Should raise error from empty predicate if observable emits multiple time', function() { + var e1 = hot('--a--b--c--|'); + var expected = '-----#'; + + expectObservable(e1.single()).toBe(expected, null, 'Sequence contains more than one element'); + }); + + it('Should raise error from empty predicate if observable emits error', function() { + var e1 = hot('--a--b^--#'); + var expected = '---#'; + + expectObservable(e1.single()).toBe(expected); + }); + + it('Should raise error from predicate if observable emits error', function() { + var e1 = hot('--a--b^--#'); + var expected = '---#'; + + var predicate = function (value) { + return value === 'c'; + } + + expectObservable(e1.single(predicate)).toBe(expected); + }); + + it('Should raise error if predicate throws error', function() { + var e1 = hot('--a--b--c--d--|'); + var expected = '-----------#'; + + var predicate = function (value) { + if (value !== 'd') { + return false; + } + throw 'error'; + } + + expectObservable(e1.single(predicate)).toBe(expected); + }); + + it('Should return element from predicate if observable have single matching element', function() { + var e1 = hot('--a--b--c--|'); + var expected = '-----------(b|)'; + + var predicate = function (value) { + return value === 'b'; + } + + expectObservable(e1.single(predicate)).toBe(expected); + }); + + it('Should raise error from predicate if observable have multiple matching element', function() { + var e1 = hot('--a--b--a--b--b--|'); + var expected = '-----------#'; + + var predicate = function (value) { + return value === 'b'; + } + + expectObservable(e1.single(predicate)).toBe(expected, null, 'Sequence contains more than one element'); + }); + + it('Should raise error from predicate if observable does not emit', function() { + var e1 = hot('--a--^--|'); + var expected = '---#'; + + var predicate = function (value) { + return value === 'a'; + } + + expectObservable(e1.single(predicate)).toBe(expected, null, new Rx.EmptyError); + }); + + it('Should return undefined from predicate if observable does not contain matching element', function() { + var e1 = hot('--a--b--c--|'); + var expected = '-----------(z|)'; + + var predicate = function (value) { + return value === 'x'; + } + + expectObservable(e1.single(predicate)).toBe(expected, {z: undefined}); + }); +}); \ No newline at end of file diff --git a/src/Observable.ts b/src/Observable.ts index efa0cce0cc..2dfed04e82 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -186,6 +186,7 @@ export default class Observable { elementAt: (index: number, defaultValue?: any) => Observable; last: (predicate?: (value: T, index:number) => boolean, thisArg?: any, defaultValue?: any) => Observable; + single: (predicate?: (value: T, index:number) => boolean, thisArg?: any) => Observable; filter: (predicate: (x: T) => boolean, ix?: number, thisArg?: any) => Observable; distinctUntilChanged: (compare?: (x: T, y: T) => boolean, thisArg?: any) => Observable; diff --git a/src/Rx.ts b/src/Rx.ts index 1aeaf0aa75..1348cfca7d 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -220,8 +220,9 @@ observableProto.sample = sample; observableProto.sampleTime = sampleTime; import last from './operators/last'; - +import single from './operators/single'; observableProto.last = last; +observableProto.single = single var Scheduler = { nextTick, diff --git a/src/operators/single.ts b/src/operators/single.ts new file mode 100644 index 0000000000..111b5e1145 --- /dev/null +++ b/src/operators/single.ts @@ -0,0 +1,74 @@ +import Observable from '../Observable'; +import Operator from '../Operator'; +import Subscriber from '../Subscriber'; +import Observer from '../Observer'; + +import tryCatch from '../util/tryCatch'; +import {errorObject} from '../util/errorObject'; +import bindCallback from '../util/bindCallback'; +import EmptyError from '../util/EmptyError'; + +export default function single(predicate?: (value: T, index: number, source: Observable) => boolean, thisArg?: any) : Observable { + return this.lift(new SingleOperator(predicate, thisArg, this)); +} + +class SingleOperator implements Operator { + constructor(private predicate?: (value: T, index: number, source: Observable) => boolean, private thisArg?: any, private source?: Observable) { + + } + + call(subscriber: Subscriber): Subscriber { + return new SingleSubscriber(subscriber, this.predicate, this.thisArg, this.source); + } +} + +class SingleSubscriber extends Subscriber { + private predicate: Function; + private seenValue: boolean = false; + private singleValue: T; + private index: number = 0; + + constructor(destination : Observer, predicate?: (value: T, index: number, source: Observable) => boolean, private thisArg?: any, private source?: Observable) { + super(destination); + + if (typeof predicate === 'function') { + this.predicate = bindCallback(predicate, thisArg, 3); + } + } + + private applySingleValue(value): void { + if (this.seenValue) { + this.destination.error('Sequence contains more than one element'); + } else { + this.seenValue = true; + this.singleValue = value; + } + } + + _next(value: T) { + const predicate = this.predicate; + const currentIndex = this.index++; + + if (predicate) { + let result = tryCatch(predicate)(value, currentIndex, this.source); + if (result === errorObject) { + this.destination.error(result.e); + } else if (result) { + this.applySingleValue(value); + } + } else { + this.applySingleValue(value); + } + } + + _complete() { + const destination = this.destination; + + if (this.index > 0) { + destination.next(this.seenValue ? this.singleValue : undefined); + destination.complete(); + } else { + destination.error(new EmptyError); + } + } +} \ No newline at end of file