forked from dominictarr/through
-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
index.js
100 lines (84 loc) · 2.32 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
var Stream = require('stream')
// through
//
// a stream that does nothing but re-emit the input.
// useful for aggregating a series of changing but not ending streams into one stream)
exports = module.exports = through
through.through = through
//create a readable writable stream.
function through (write, end) {
write = write || function (data) { this.emit('data', data) }
end = end || function () { this.emit('end') }
var ended = false, destroyed = false
var stream = new Stream(), buffer = []
stream.buffer = buffer
stream.readable = stream.writable = true
stream.paused = false
stream.write = function (data) {
write.call(this, data)
return !stream.paused
}
function drain() {
while(buffer.length && !stream.paused) {
var data = buffer.shift()
if(null === data)
return stream.emit('end')
else
stream.emit('data', data)
}
}
stream.queue = function (data) {
buffer.push(data)
drain()
}
//this will be registered as the first 'end' listener
//must call destroy next tick, to make sure we're after any
//stream piped from here.
//this is only a problem if end is not emitted synchronously.
//a nicer way to do this is to make sure this is the last listener for 'end'
stream.on('end', function () {
stream.readable = false
if(!stream.writable)
setTimeout(function () {
stream.destroy()
})
})
function _end () {
stream.writable = false
end.call(stream)
if(!stream.readable)
stream.destroy()
}
stream.end = function (data) {
if(ended) return
//this breaks, because pipe doesn't check writable before calling end.
//throw new Error('cannot call end twice')
ended = true
if(arguments.length) stream.write(data)
if(!buffer.length) _end()
}
stream.destroy = function () {
if(destroyed) return
destroyed = true
ended = true
buffer.length = 0
stream.writable = stream.readable = false
stream.emit('close')
}
stream.pause = function () {
if(stream.paused) return
stream.paused = true
stream.emit('pause')
}
stream.resume = function () {
if(stream.paused) {
stream.paused = false
}
drain()
//may have become paused again,
//as drain emits 'data'.
if(!stream.paused)
stream.emit('drain')
}
return stream
}