-
Notifications
You must be signed in to change notification settings - Fork 51
/
stream.js
71 lines (63 loc) · 2.09 KB
/
stream.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
var log = require("../lib/util/log")("Stream")
var lob = require('lob-enc');
var ChannelStream = require("./stream.class.js")
// implements https://github.com/telehash/telehash.org/blob/v3/v3/channels/stream.md
exports.name = 'stream';
exports.mesh = function(mesh, cbExt)
{
var ext = {open:{}};
/** attach an incoming stream handler to the mesh
* @memberOf Mesh
* @param {function} onStream - handler for incoming streams
*/
mesh.stream = function(onStream)
{
mesh.log.debug('adding onStream handler',typeof onStream);
ext.onStream = onStream;
}
/** takes any channel and returns a Duplex stream,
* @memberOf Mesh
* @param {Channel} channel - the channel to streamify
* @param {string} encoding - 'binary' or 'json'
* @return {ChannelStream}
*/
mesh.streamize = function(chan, encoding)
{
return new ChannelStream(chan, encoding);
}
// new incoming stream open request
ext.open.stream = function(args, open, cbOpen){
var link = this;
log.debug("got new incoming stream")
if(typeof ext.onStream != 'function') return cbOpen('no stream');
// pass any attached request packet as options, and a method to accept
ext.onStream(link, lob.decode(open.body), function accept(err){
if(err) return cbOpen(err);
var channel = link.x.channel(open);
channel.receive(open); // actually opens it
return mesh.streamize(channel);
});
}
ext.link = function(link, cbLink)
{
/** create a new stream to this link, and send the first packet
* @memberOf TLink
* @param {Buffer|object=} packet - binary/json packet body
* @param {string} encoding - 'binary' or 'json'
* @return {ChannelStream}
*/
link.stream = function(packet, encoding)
{
log.debug("stream?")
var open = {json:{type:'stream'},body:packet};
open.json.seq = 1; // always reliable
var channel = link.x.channel(open);
var stream = mesh.streamize(channel, encoding);
channel.send(open);
log.debug("Constructed outgoing stream")
return stream;
}
cbLink();
}
cbExt(undefined, ext);
}