Skip to content

Commit

Permalink
feat(operator): Add do operator.
Browse files Browse the repository at this point in the history
  • Loading branch information
trxcllnt committed Aug 18, 2015
1 parent 3f743ab commit 7d9b52b
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 0 deletions.
16 changes: 16 additions & 0 deletions spec/operators/do-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/* globals describe, it, expect */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.do()', function () {
it('should do one value', function (done) {
var act = false;
Observable.value(42).do(function (x) {
act = true;
})
.subscribe(function (x) {
expect(x).toBe(42);
expect(act).toBe(true);
}, null, done);
});
});
1 change: 1 addition & 0 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ export default class Observable<T> {
zip: <R>(...observables: (Observable<any> | ((...values: Array<any>) => R)) []) => Observable<R>;
zipAll: <R>(project?: (...values: Array<any>) => R) => Observable<R>;

do: <T>(next?: (x: T) => void, error?: (e: any) => void, complete?: () => void) => Observable<T>;
map: <T, R>(project: (x: T, ix?: number) => R, thisArg?: any) => Observable<R>;
mapTo: <R>(value: R) => Observable<R>;
toArray: () => Observable<T[]>;
Expand Down
2 changes: 2 additions & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,15 @@ observableProto.switchLatest = switchLatest;
observableProto.switchLatestTo = switchLatestTo;
observableProto.expand = expand;

import _do from './operators/do';
import map from './operators/map';
import mapTo from './operators/mapTo';
import toArray from './operators/toArray';
import scan from './operators/scan';
import reduce from './operators/reduce';
import startWith from './operators/startWith';

observableProto.do = _do;
observableProto.map = map;
observableProto.mapTo = mapTo;
observableProto.toArray = toArray;
Expand Down
72 changes: 72 additions & 0 deletions src/operators/do.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import Operator from '../Operator';
import Observer from '../Observer';
import Subscriber from '../Subscriber';

import noop from '../util/noop';
import tryCatch from '../util/tryCatch';
import {errorObject} from '../util/errorObject';
import bindCallback from '../util/bindCallback';

export default function _do<T>(next?: (x: T) => void, error?: (e: any) => void, complete?: () => void) {
return this.lift(new DoOperator(next || noop, error || noop, complete || noop));
}

export class DoOperator<T, R> extends Operator<T, R> {

next: (x: T) => void;
error: (e: any) => void;
complete: () => void;

constructor(next: (x: T) => void, error: (e: any) => void, complete: () => void) {
super();
this.next = next;
this.error = error;
this.complete = complete;
}

call(observer: Observer<T>): Observer<T> {
return new DoSubscriber(observer, this.next, this.error, this.complete);
}
}

export class DoSubscriber<T> extends Subscriber<T> {

__next: (x: T) => void;
__error: (e: any) => void;
__complete: () => void;


constructor(destination: Observer<T>, next: (x: T) => void, error: (e: any) => void, complete: () => void) {
super(destination);
this.__next = next;
this.__error = error;
this.__complete = complete;
}

_next(x) {
const result = tryCatch(this.__next)(x);
if (result === errorObject) {
this.destination.error(errorObject.e);
} else {
this.destination.next(x);
}
}

_error(e) {
const result = tryCatch(this.__error)(e);
if (result === errorObject) {
this.destination.error(errorObject.e);
} else {
this.destination.error(e);
}
}

_complete() {
const result = tryCatch(this.__complete)();
if (result === errorObject) {
this.destination.error(errorObject.e);
} else {
this.destination.complete();
}
}
}

0 comments on commit 7d9b52b

Please sign in to comment.