This repository has been archived by the owner on Apr 20, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
expand.js
96 lines (85 loc) · 2.98 KB
/
expand.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
var ExpandObservable = (function(__super__) {
inherits(ExpandObservable, __super__);
function ExpandObservable(source, fn, scheduler) {
this.source = source;
this._fn = fn;
this._scheduler = scheduler;
__super__.call(this);
}
function scheduleRecursive(args, recurse) {
var state = args[0], self = args[1];
var work;
if (state.q.length > 0) {
work = state.q.shift();
} else {
state.isAcquired = false;
return;
}
var m1 = new SingleAssignmentDisposable();
state.d.add(m1);
m1.setDisposable(work.subscribe(new ExpandObserver(state, self, m1)));
recurse([state, self]);
}
ExpandObservable.prototype._ensureActive = function (state) {
var isOwner = false;
if (state.q.length > 0) {
isOwner = !state.isAcquired;
state.isAcquired = true;
}
isOwner && state.m.setDisposable(this._scheduler.scheduleRecursive([state, this], scheduleRecursive));
};
ExpandObservable.prototype.subscribeCore = function (o) {
var m = new SerialDisposable(),
d = new CompositeDisposable(m),
state = {
q: [],
m: m,
d: d,
activeCount: 0,
isAcquired: false,
o: o
};
state.q.push(this.source);
state.activeCount++;
this._ensureActive(state);
return d;
};
return ExpandObservable;
}(ObservableBase));
var ExpandObserver = (function(__super__) {
inherits(ExpandObserver, __super__);
function ExpandObserver(state, parent, m1) {
this._s = state;
this._p = parent;
this._m1 = m1;
__super__.call(this);
}
ExpandObserver.prototype.next = function (x) {
this._s.o.onNext(x);
var result = tryCatch(this._p._fn)(x);
if (result === errorObj) { return this._s.o.onError(result.e); }
this._s.q.push(result);
this._s.activeCount++;
this._p._ensureActive(this._s);
};
ExpandObserver.prototype.error = function (e) {
this._s.o.onError(e);
};
ExpandObserver.prototype.completed = function () {
this._s.d.remove(this._m1);
this._s.activeCount--;
this._s.activeCount === 0 && this._s.o.onCompleted();
};
return ExpandObserver;
}(AbstractObserver));
/**
* Expands an observable sequence by recursively invoking selector.
*
* @param {Function} selector Selector function to invoke for each produced element, resulting in another sequence to which the selector will be invoked recursively again.
* @param {Scheduler} [scheduler] Scheduler on which to perform the expansion. If not provided, this defaults to the current thread scheduler.
* @returns {Observable} An observable sequence containing all the elements produced by the recursive expansion.
*/
observableProto.expand = function (selector, scheduler) {
isScheduler(scheduler) || (scheduler = currentThreadScheduler);
return new ExpandObservable(this, selector, scheduler);
};