-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathgraft.js
125 lines (95 loc) · 2.48 KB
/
graft.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
'use strict';
var Transform = require('readable-stream').Transform;
var inherits = require('inherits');
var deepMatch = require('./lib/deepMatch');
var wrap = require('./lib/wrapMessage');
var jschan = require('jschan');
function noop() {}
function Graft() {
if (!(this instanceof Graft)) {
return new Graft();
}
Transform.call(this, { objectMode: true, highWaterMark: 16 });
var that = this;
function readFirst() {
/*jshint validthis:true */
this.removeListener('readable', readFirst);
that._transform(wrap(this.read(), this), null, noop);
}
this._session = jschan.memorySession();
this._session.on('channel', function(channel) {
channel.on('readable', readFirst);
});
this._nextChannel = this._session.WriteChannel();
this.on('pipe', function(source) {
source.on('ready', that.emit.bind(that, 'ready'));
this.on('end', function() {
source.end();
});
});
this._patterns = [];
}
inherits(Graft, Transform);
Graft.prototype._transform = function flowing(obj, enc, done) {
if (!obj._session && !obj._channel) {
// it quacks like a duck, so it's a duck - s/duck/request/g
var channel = this._nextChannel;
this._nextChannel = this._session.WriteChannel();
channel.write(obj);
return done();
}
var i;
for (i = 0; i < this._patterns.length; i++) {
if (this._patterns[i].pattern(obj)) {
this._patterns[i].stream.write(obj, done);
return;
}
}
this.push(obj);
done();
};
Graft.prototype.branch = function(pattern, stream) {
if (!pattern) {
throw new Error('missing pattern');
}
if (!stream) {
throw new Error('missing destination');
}
this._patterns.push({
pattern: pattern,
stream: stream
});
return this;
};
Graft.prototype.where = function(pattern, stream) {
return this.branch(function(req) {
return deepMatch(pattern, req);
}, stream);
};
Graft.prototype.close = function(cb) {
function complete() {
/*jshint validthis:true */
this._session.close(function(err) {
if (err) {
return cb(err);
}
return cb();
});
}
if (this._readableState.endEmitted) {
complete.call(this);
} else {
this.on('end', complete);
}
if (!this._readableState.flowing) {
this.resume();
}
this.end();
};
Graft.prototype.ReadChannel = function() {
return this._nextChannel.ReadChannel();
};
Graft.prototype.WriteChannel = function() {
return this._nextChannel.WriteChannel();
};
module.exports = Graft;