This repository has been archived by the owner on Apr 20, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
window.js
105 lines (93 loc) · 3.67 KB
/
window.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
/**
* Projects each element of an observable sequence into zero or more windows.
*
* @param {Mixed} windowOpeningsOrClosingSelector Observable sequence whose elements denote the creation of new windows, or, a function invoked to define the boundaries of the produced windows (a new window is started when the previous one is closed, resulting in non-overlapping windows).
* @param {Function} [windowClosingSelector] A function invoked to define the closing of each produced window. If a closing selector function is specified for the first parameter, this parameter is ignored.
* @returns {Observable} An observable sequence of windows.
*/
observableProto.window = function (windowOpeningsOrClosingSelector, windowClosingSelector) {
if (arguments.length === 1 && typeof arguments[0] !== 'function') {
return observableWindowWithBoundaries.call(this, windowOpeningsOrClosingSelector);
}
return typeof windowOpeningsOrClosingSelector === 'function' ?
observableWindowWithClosingSelector.call(this, windowOpeningsOrClosingSelector) :
observableWindowWithOpenings.call(this, windowOpeningsOrClosingSelector, windowClosingSelector);
};
function observableWindowWithOpenings(windowOpenings, windowClosingSelector) {
return windowOpenings.groupJoin(this, windowClosingSelector, observableEmpty, function (_, win) {
return win;
});
}
function observableWindowWithBoundaries(windowBoundaries) {
var source = this;
return new AnonymousObservable(function (observer) {
var win = new Subject(),
d = new CompositeDisposable(),
r = new RefCountDisposable(d);
observer.onNext(addRef(win, r));
d.add(source.subscribe(function (x) {
win.onNext(x);
}, function (err) {
win.onError(err);
observer.onError(err);
}, function () {
win.onCompleted();
observer.onCompleted();
}));
isPromise(windowBoundaries) && (windowBoundaries = observableFromPromise(windowBoundaries));
d.add(windowBoundaries.subscribe(function (w) {
win.onCompleted();
win = new Subject();
observer.onNext(addRef(win, r));
}, function (err) {
win.onError(err);
observer.onError(err);
}, function () {
win.onCompleted();
observer.onCompleted();
}));
return r;
}, source);
}
function observableWindowWithClosingSelector(windowClosingSelector) {
var source = this;
return new AnonymousObservable(function (observer) {
var m = new SerialDisposable(),
d = new CompositeDisposable(m),
r = new RefCountDisposable(d),
win = new Subject();
observer.onNext(addRef(win, r));
d.add(source.subscribe(function (x) {
win.onNext(x);
}, function (err) {
win.onError(err);
observer.onError(err);
}, function () {
win.onCompleted();
observer.onCompleted();
}));
function createWindowClose () {
var windowClose;
try {
windowClose = windowClosingSelector();
} catch (e) {
observer.onError(e);
return;
}
isPromise(windowClose) && (windowClose = observableFromPromise(windowClose));
var m1 = new SingleAssignmentDisposable();
m.setDisposable(m1);
m1.setDisposable(windowClose.take(1).subscribe(noop, function (err) {
win.onError(err);
observer.onError(err);
}, function () {
win.onCompleted();
win = new Subject();
observer.onNext(addRef(win, r));
createWindowClose();
}));
}
createWindowClose();
return r;
}, source);
}