-
Notifications
You must be signed in to change notification settings - Fork 3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(from): let from handle any "observablesque"
- `Observable.from(promise)` should work - `Observable.from(iterable)` should work - `Observable.from(observable)` should work This includes any object that implements `Symbol.observer`. closes #156 closes #236
- Loading branch information
Showing
10 changed files
with
145 additions
and
25 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
import PromiseObservable from './PromiseObservable'; | ||
import IteratorObservable from'./IteratorObservable'; | ||
import ArrayObservable from './ArrayObservable'; | ||
|
||
import isArray from '../util/isArray'; | ||
import isPromise from '../util/isPromise'; | ||
import isObservable from '../util/isObservable'; | ||
import Scheduler from '../Scheduler'; | ||
import $$observer from '../util/Symbol_observer'; | ||
import Observable from '../Observable'; | ||
import Subscriber from '../Subscriber'; | ||
import { ObserveOnSubscriber } from '../operators/observeOn'; | ||
|
||
export default class FromObservable<T> extends Observable<T> { | ||
constructor(private observablesque: any, private scheduler: Scheduler) { | ||
super(null); | ||
} | ||
|
||
static create<T>(observablesque: any, scheduler: Scheduler = Scheduler.immediate): Observable<T> { | ||
if (isArray(observablesque)) { | ||
return new ArrayObservable(observablesque, scheduler); | ||
} else if (isPromise(observablesque)) { | ||
return new PromiseObservable(observablesque, scheduler); | ||
} else if (isObservable(observablesque)) { | ||
if(observablesque instanceof Observable) { | ||
return observablesque; | ||
} | ||
return new FromObservable(observablesque, scheduler); | ||
} else { | ||
return new IteratorObservable(observablesque, null, null, scheduler); | ||
} | ||
} | ||
|
||
_subscribe(subscriber: Subscriber<T>) { | ||
const observablesque = this.observablesque; | ||
const scheduler = this.scheduler; | ||
if(scheduler === Scheduler.immediate) { | ||
return this.observablesque[$$observer](subscriber); | ||
} else { | ||
return this.observablesque[$$observer](new ObserveOnSubscriber(subscriber, scheduler, 0)); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,29 +1,42 @@ | ||
import Observable from '../Observable'; | ||
import Subscriber from '../Subscriber'; | ||
import Scheduler from '../Scheduler'; | ||
import Subscription from '../Subscription'; | ||
|
||
export default class PromiseObservable<T> extends Observable<T> { | ||
|
||
static create<T>(promise: Promise<T>) { | ||
return new PromiseObservable(promise); | ||
static create<T>(promise: Promise<T>, scheduler: Scheduler = Scheduler.immediate) { | ||
return new PromiseObservable(promise, scheduler); | ||
} | ||
|
||
constructor(protected promise: Promise<T>) { | ||
constructor(private promise: Promise<T>, private scheduler: Scheduler) { | ||
super(); | ||
} | ||
|
||
_subscribe(subscriber: Subscriber<T>) { | ||
this.promise.then( | ||
(x) => { | ||
if(!subscriber.isUnsubscribed) { | ||
subscriber.next(x); | ||
const scheduler = this.scheduler; | ||
const promise = this.promise; | ||
|
||
if (scheduler === Scheduler.immediate) { | ||
promise.then(value => { | ||
subscriber.next(value); | ||
subscriber.complete(); | ||
} | ||
}, | ||
(e) => { | ||
if(!subscriber.isUnsubscribed) { | ||
subscriber.error(e); | ||
} | ||
} | ||
); | ||
}, | ||
err => subscriber.error(err)); | ||
} else { | ||
let subscription = new Subscription(); | ||
promise.then(value => subscription.add(scheduler.schedule(0, { value, subscriber }, dispatchNext)), | ||
err => subscription.add(scheduler.schedule(0, { err, subscriber }, dispatchError))); | ||
return subscription; | ||
} | ||
} | ||
} | ||
|
||
function dispatchNext({ value, subscriber }) { | ||
subscriber.next(value); | ||
subscriber.complete(); | ||
} | ||
|
||
function dispatchError({ err, subscriber }) { | ||
subscriber.error(err); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
const isArray = Array.isArray || (x => x && typeof x.length === 'number'); | ||
|
||
export default isArray; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
import $$observer from './Symbol_observer'; | ||
|
||
export default function isObservable(x): boolean { | ||
return x && typeof x[$$observer] === 'function'; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
export default function isPromise(x): boolean { | ||
return x && typeof x.then === 'function'; | ||
} |