From 94b4c01057652f40708a405be0d98a13e7dedb37 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Wed, 29 Jul 2015 14:48:02 -0700 Subject: [PATCH] feat(catch): add catch operator, related to #141, closes #130 --- spec/operators/catch-spec.js | 109 +++++++++++++++++++++++++++++++++++ src/Observable.ts | 2 + src/Rx.ts | 4 ++ src/operators/catch.ts | 49 ++++++++++++++++ 4 files changed, 164 insertions(+) create mode 100644 spec/operators/catch-spec.js create mode 100644 src/operators/catch.ts diff --git a/spec/operators/catch-spec.js b/spec/operators/catch-spec.js new file mode 100644 index 0000000000..aecb320754 --- /dev/null +++ b/spec/operators/catch-spec.js @@ -0,0 +1,109 @@ +/* globals describe, it, expect */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.prototype.catch()', function () { + it('should pass the error as the first argument', function (done) { + Observable.throw('bad') + .catch(function (err) { + expect(err).toBe('bad'); + return Observable.empty(); + }) + .subscribe(function () { }, + function (err) { + expect('this was called').not.toBeTruthy(); + }, + done); + }); + + it('should catch the error and allow the return of a new observable to use', function (done) { + var expected = [1, 2, 'foo']; + Observable.of(1, 2, 3) + .map(function (n) { + if (n === 3) { + throw 'bad'; + } + return n; + }) + .catch(function (err) { + return Observable.of('foo'); + }) + .subscribe(function (x) { + expect(x).toBe(expected.shift()); + }, function (err) { + expect('this was called').not.toBeTruthy(); + }, function () { + done(); + }); + }); + + it('should catch and allow the observable to be repeated with the third (caught) argument', function (done) { + var expected = [1, 2, 1, 2, 1, 2]; + var retries = 0; + Observable.of(1, 2, 3) + .map(function (n) { + if (n === 3) { + throw 'bad'; + } + return n; + }) + .catch(function (err, caught) { + if (retries++ == 2) { + throw 'done'; + } + return caught; + }) + .subscribe(function (x) { + expect(x).toBe(expected.shift()); + }, function (err) { + expect(err).toBe('done'); + done(); + }, function () { + expect('this was called').not.toBeTruthy(); + }) + }); + + it('should complete if you return Observable.empty()', function (done) { + var expected = [1, 2]; + Observable.of(1, 2, 3) + .map(function (n) { + if (n === 3) { + throw 'bad'; + } + return n; + }) + .catch(function (err) { + return Observable.empty(); + }) + .subscribe(function (x) { + expect(x).toBe(expected.shift()); + }, function (err) { + expect('this was called').not.toBeTruthy(); + }, function () { + done(); + }); + }); + + + it('should error if you return Observable.throw()', function (done) { + var expected = [1, 2]; + Observable.of(1, 2, 3) + .map(function (n) { + if (n === 3) { + throw 'bad'; + } + return n; + }) + .catch(function (err) { + return Observable.throw('haha'); + }) + .subscribe(function (x) { + expect(x).toBe(expected.shift()); + }, function (err) { + expect(err).toBe('haha'); + done(); + }, function () { + expect('this was called').not.toBeTruthy(); + }); + }); +}); \ No newline at end of file diff --git a/src/Observable.ts b/src/Observable.ts index 4383474e8b..3da06c02b8 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -134,4 +134,6 @@ export default class Observable { publish: () => ConnectableObservable; multicast: (subjectFactory: () => Subject) => ConnectableObservable; + + catch: (selector: (err: any, source: Observable, caught: Observable) => Observable) => Observable; } diff --git a/src/Rx.ts b/src/Rx.ts index b3a0f44bb0..650eeaabac 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -119,6 +119,10 @@ import partition from './operators/partition'; observableProto.partition = partition; +import _catch from './operators/catch'; + +observableProto.catch = _catch; + export default { Subject, Scheduler, diff --git a/src/operators/catch.ts b/src/operators/catch.ts new file mode 100644 index 0000000000..351fddbce1 --- /dev/null +++ b/src/operators/catch.ts @@ -0,0 +1,49 @@ +import Operator from '../Operator'; +import Observer from '../Observer'; +import Subscriber from '../Subscriber'; +import Observable from '../Observable'; + +import tryCatch from '../util/tryCatch'; +import {errorObject} from '../util/errorObject'; + +export default function _catch(selector: (err:any, caught:Observable) => Observable) { + var catchOperator = new CatchOperator(selector); + var caught = this.lift(catchOperator); + catchOperator.caught = caught; + return caught; +} + +export class CatchOperator extends Operator { + selector: (err:any, caught:Observable) => Observable; + caught: Observable; + source: Observable; + + constructor(selector: (err:any, caught:Observable) => Observable) { + super(); + this.selector = selector; + } + + call(observer: Observer): Observer { + return new CatchSubscriber(observer, this.selector, this.caught); + } +} + +export class CatchSubscriber extends Subscriber { + selector: (err:any, caught:Observable) => Observable; + caught: Observable; + + constructor(destination: Observer, selector: (err:any, caught:Observable) => Observable, caught: Observable) { + super(destination); + this.selector = selector; + this.caught = caught; + } + + _error(err) { + const result = tryCatch(this.selector)(err, this.caught); + if (result === errorObject) { + this.destination.error(errorObject.e); + } else { + this.add(result.subscribe(this.destination)); + } + } +}