This repository has been archived by the owner on Apr 20, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
Copy pathforkjoin.js
102 lines (88 loc) · 3.08 KB
/
forkjoin.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
function argumentsToArray() {
var len = arguments.length, args = new Array(len);
for(var i = 0; i < len; i++) { args[i] = arguments[i]; }
return args;
}
var ForkJoinObservable = (function (__super__) {
inherits(ForkJoinObservable, __super__);
function ForkJoinObservable(sources, cb) {
this._sources = sources;
this._cb = cb;
__super__.call(this);
}
ForkJoinObservable.prototype.subscribeCore = function (o) {
if (this._sources.length === 0) {
o.onCompleted();
return disposableEmpty;
}
var count = this._sources.length;
var state = {
finished: false,
hasResults: new Array(count),
hasCompleted: new Array(count),
results: new Array(count)
};
var subscriptions = new CompositeDisposable();
for (var i = 0, len = this._sources.length; i < len; i++) {
var source = this._sources[i];
isPromise(source) && (source = observableFromPromise(source));
subscriptions.add(source.subscribe(new ForkJoinObserver(o, state, i, this._cb, subscriptions)));
}
return subscriptions;
};
return ForkJoinObservable;
}(ObservableBase));
var ForkJoinObserver = (function(__super__) {
inherits(ForkJoinObserver, __super__);
function ForkJoinObserver(o, s, i, cb, subs) {
this._o = o;
this._s = s;
this._i = i;
this._cb = cb;
this._subs = subs;
__super__.call(this);
}
ForkJoinObserver.prototype.next = function (x) {
if (!this._s.finished) {
this._s.hasResults[this._i] = true;
this._s.results[this._i] = x;
}
};
ForkJoinObserver.prototype.error = function (e) {
this._s.finished = true;
this._o.onError(e);
this._subs.dispose();
};
ForkJoinObserver.prototype.completed = function () {
if (!this._s.finished) {
if (!this._s.hasResults[this._i]) {
return this._o.onCompleted();
}
this._s.hasCompleted[this._i] = true;
for (var i = 0; i < this._s.results.length; i++) {
if (!this._s.hasCompleted[i]) { return; }
}
this._s.finished = true;
var res = tryCatch(this._cb).apply(null, this._s.results);
if (res === errorObj) { return this._o.onError(res.e); }
this._o.onNext(res);
this._o.onCompleted();
}
};
return ForkJoinObserver;
}(AbstractObserver));
/**
* Runs all observable sequences in parallel and collect their last elements.
*
* @example
* 1 - res = Rx.Observable.forkJoin([obs1, obs2]);
* 1 - res = Rx.Observable.forkJoin(obs1, obs2, ...);
* @returns {Observable} An observable sequence with an array collecting the last elements of all the input sequences.
*/
Observable.forkJoin = function () {
var len = arguments.length, args = new Array(len);
for(var i = 0; i < len; i++) { args[i] = arguments[i]; }
var resultSelector = isFunction(args[len - 1]) ? args.pop() : argumentsToArray;
Array.isArray(args[0]) && (args = args[0]);
return new ForkJoinObservable(args, resultSelector);
};