-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
90 lines (77 loc) · 1.67 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
'use strict';
var ReadableStream = require('readable-stream')
module.exports = (function create(opts) {
union.using = function(opts_) {
return create(
{ cmp: opts_.cmp || opts.cmp
, toKey: opts_.toKey || opts.toKey
, merge: opts_.merge || opts.merge
})
}
var cmp = opts.cmp || simpleCmp
, toKey = opts.toKey || identity
, merge = opts.merge || identity
return union
function union(a, b) {
var stream = new ReadableStream({ objectMode: true })
, aVal = null
, bVal = null
, ended = false
stream._read = readA
a.on('end', end)
b.on('end', end)
function end() {
if (ended) return
ended = true
stream.push(null)
}
return stream
function readA() {
if (ended) return
aVal = a.read()
if (aVal === null)
return a.once('readable', readA)
push()
}
function readB() {
if (ended) return
bVal = b.read()
if (bVal === null)
return b.once('readable', readB)
push()
}
function push() {
if (aVal === null)
return readA()
if (bVal === null)
return readB()
var c = cmp(toKey(aVal), toKey(bVal))
if (c < 0) {
stream.push(aVal)
return readA()
}
if (c > 0)
stream.push(bVal)
return readB()
}
var val = merge(aVal, bVal)
if (val === undefined)
readA()
else if (val !== null)
stream.push(val)
else
end()
}
})({})
function identity(x) { return x }
function simpleCmp(a, b) {
if (a !== a)
return -1
if (b !== b)
return 1
return a < b
? -1
: a > b
? 1
: 0
}