-
Notifications
You must be signed in to change notification settings - Fork 8
/
index.js
161 lines (141 loc) · 4.46 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
/*
This library consist of three main components channels, processes
and operations. A process takes a generator function that yields
operations. Operations are a tuple of [type, fn], depending on what
type an operation has the process does different things, currently
there are four types chan, fn, spawn and quit. Channels are object
that have two methods, take and put, both of these methods return
operations that have type chan and functions that return whether they
should continue executing or block.
*/
var Chan = exports.Chan = function (size) {
this.size = size+1 || 1;
this.buffer = [];
this.drain = false; // This will be true if the channels buffer is full.
};
Chan.prototype.put = function (msg) {
return ["chan", function () {
if (this.drain && this.buffer.length === 0) { // Was the buffer previously full?
this.drain = false;
return { state: "continue" };
}
if (this.buffer.length < this.size) { // There is still space in the buffer.
this.buffer.push(msg);
this.drain = this.buffer.length === this.size; // Was the size limit reached?
if (this.drain) {
return { state: "sleep" }; // If so block.
}
return { state: "continue" }; // Otherwise carry on.
}
return { state: "sleep" }; // Nobody has taken our messages yet.
}.bind(this)];
};
Chan.prototype.take = function () {
return ["chan", function () {
if (this.buffer.length === 0) { // There are no messages to be taken.
return { state: "sleep" }; // So block.
}
return { state: "continue", msg: this.buffer.shift() }; // Take a messages.
}.bind(this)];
};
var Proc = function (gen) {
this.gen = gen;
this.step = gen.next();
this.done = false;
this.subprocs = [];
};
Proc.prototype.run = function () {
if (this.step.done || this.done) { // Is the process done?
this.subprocs.forEach(function (proc) {
proc.done = true; // If so all subprocess are done too.
});
return ;
}
var value = this.step.value // Operation.
, type = value[0]
, fn = value[1];
if (type === "chan") {
var op = fn();
if (op.state === "continue") {
this.step = this.gen.next(op.msg); // Go to the next yield.
}
return this.spin(); // Schedule the process.
}
if (type === "fn") { // A function to block on.
fn(function (err, msg) {
if (err) {
this.step = this.gen.throw(err);
} else {
this.step = this.gen.next(msg);
}
this.spin();
}.bind(this));
return ;
}
if (type === "spawn") { // A new sub process.
this.step = this.gen.next();
this.subprocs.push(fn);
return this.spin();
}
if (type === "quit") {
this.done = true;
return this.spin();
}
};
// Hand over the process to node's scheduler.
Proc.prototype.spin = function () {
setImmediate(function () { this.run(); }.bind(this), 0);
};
// Create and run a new process.
var spawn = exports.spawn = function () {
var args = Array.prototype.slice.call(arguments, 0)
, gen = args.shift().apply(undefined, args)
, proc = new Proc(gen);
proc.run();
return ["spawn", proc];
};
// Wait on several channels.
var select = exports.select = function () {
var chans = Array.prototype.slice.call(arguments, 0);
return ["chan", function () {
for (var i=0; i < chans.length; i += 1) {
var chan = chans[i];
if (chan.buffer.length === 0) { continue; } // Peak into the channel to see if there are any messages pending.
return { state: "continue", msg: chan }; // If there is send the channel back to the process.
}
return { state: "sleep" };
}];
};
// Wrapper around setTimeout.
var wait = exports.wait = function (ms) {
return ["fn", function (cb) {
setTimeout(cb, ms);
}];
};
// Wrap functions that follow the node convention fn(values ..., cb(err, value))
var wrap = exports.wrap = function (fn) {
return function () {
var args = Array.prototype.slice.call(arguments, 0);
return ["fn", function (cb) {
args.push(cb);
fn.apply(undefined, args);
}];
};
};
// Make a function take a channel instead of a callback.
var chanify = exports.chanify = function (fn) {
return function () {
var args = Array.prototype.slice.call(arguments, 0)
, ch = args.pop();
args.push(function (err, val) {
spawn(function* () {
yield ch.put(val);
});
});
fn.apply(undefined, args);
};
};
// Force a process to quit.
var quit = exports.quit = function () {
return ["quit"];
};