Skip to content

Commit

Permalink
fix(deferObservable): accepts factory returns promise
Browse files Browse the repository at this point in the history
relates to ReactiveX#1483
  • Loading branch information
kwonoj committed Mar 21, 2016
1 parent 56c9191 commit 0cb44e1
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 15 deletions.
38 changes: 34 additions & 4 deletions spec/observables/defer-spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import * as Rx from '../../dist/cjs/Rx';
declare const {hot, expectObservable, expectSubscriptions};
import {DoneSignature} from '../helpers/test-helper';
declare const {hot, expectObservable, expectSubscriptions, type};

const Observable = Rx.Observable;

Expand Down Expand Up @@ -27,6 +28,36 @@ describe('Observable.defer', () => {
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should accept factory returns promise resolves', (done: DoneSignature) => {
const expected = 42;
const e1 = Observable.defer(() => {
return new Promise((resolve: any) => { resolve(expected); });
});

e1.subscribe((x: number) => {
expect(x).toBe(expected);
done();
}, x => {
done.fail();
});
});

it('should accept factory returns promise rejects', (done: DoneSignature) => {
const expected = 42;
const e1 = Observable.defer(() => {
return new Promise((resolve: any, reject: any) => { reject(expected); });
});

e1.subscribe((x: number) => {
done.fail();
}, x => {
expect(x).toBe(expected);
done();
}, () => {
done.fail();
});
});

it('should create an observable from error', () => {
const source = hot('#');
const sourceSubs = '(^!)';
Expand All @@ -39,10 +70,9 @@ describe('Observable.defer', () => {
});

it('should create an observable when factory throws', () => {
//type definition need to be updated
const e1 = Observable.defer(<any>(() => {
const e1 = Observable.defer(() => {
throw 'error';
}));
});
const expected = '#';

expectObservable(e1).toBe(expected);
Expand Down
37 changes: 26 additions & 11 deletions src/observable/DeferObservable.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import {Observable} from '../Observable';
import {Observable, SubscribableOrPromise} from '../Observable';
import {Subscriber} from '../Subscriber';
import {tryCatch} from '../util/tryCatch';
import {errorObject} from '../util/errorObject';
import {Subscription} from '../Subscription';

import {subscribeToResult} from '../util/subscribeToResult';
import {OuterSubscriber} from '../OuterSubscriber';
/**
* We need this JSDoc comment for affecting ESDoc.
* @extends {Ignored}
Expand All @@ -17,20 +18,34 @@ export class DeferObservable<T> extends Observable<T> {
* @name defer
* @owner Observable
*/
static create<T>(observableFactory: () => Observable<T>): Observable<T> {
static create<T>(observableFactory: () => SubscribableOrPromise<T> | void): Observable<T> {
return new DeferObservable(observableFactory);
}

constructor(private observableFactory: () => Observable<T>) {
constructor(private observableFactory: () => SubscribableOrPromise<T> | void) {
super();
}

protected _subscribe(subscriber: Subscriber<T>) {
const result = tryCatch(this.observableFactory)();
if (result === errorObject) {
subscriber.error(errorObject.e);
} else {
result.subscribe(subscriber);
protected _subscribe(subscriber: Subscriber<T>): Subscription {
return new DeferSubscriber(subscriber, this.observableFactory);
}
}

class DeferSubscriber<T> extends OuterSubscriber<T, T> {
constructor(destination: Subscriber<T>,
private factory: () => SubscribableOrPromise<T> | void) {
super(destination);
this.tryDefer();
}

private tryDefer(): void {
try {
const result = this.factory.call(this);
if (result) {
this.add(subscribeToResult(this, result));
}
} catch (err) {
this._error(err);
}
}
}

0 comments on commit 0cb44e1

Please sign in to comment.