From 544e4a41653ffe7382f4db7d934289854a4f337a Mon Sep 17 00:00:00 2001 From: David Dias Date: Sun, 20 Sep 2015 21:08:28 +0100 Subject: [PATCH 01/18] update README with new candidate interface --- README.md | 79 +++++++++++++++++++++++-------------------------------- 1 file changed, 33 insertions(+), 46 deletions(-) diff --git a/README.md b/README.md index f585e5773f..4fcf3e6827 100644 --- a/README.md +++ b/README.md @@ -1,82 +1,69 @@ -ipfs-swarm Node.js implementation +libp2p-swarm Node.js implementation ================================= [![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io) [![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](http://ipfs.io/) [![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs) [![Build Status](https://img.shields.io/travis/diasdavid/node-ipfs-swarm/master.svg?style=flat-square)](https://travis-ci.org/diasdavid/node-ipfs-swarm) -> IPFS swarm implementation in Node.js +> libp2p swarm implementation in Node.js # Description -ipfs-swarm is an abstraction for the network layer on IPFS. It offers an API to open streams between peers on a specific protocol. +libp2p-swarm is connection abstraction that is able to leverage several transports and connection upgrades (such as congestion control, encrypt a channel, multiplex several streams in one connection, and more. It does this by bringing protocol multiplexing to the application level (instead of the traditional Port level) using multicodec and multistream. -Ref spec (WIP) - https://github.com/diasdavid/specs/blob/protocol-spec/protocol/layers.md#network-layer +libp2p-swarm is used by libp2p but it can be also used as a standalone module. # Usage -### Create a new Swarm +### Install and create a Swarm -```javascript -var Swarm = require('ipfs-swarm') +libp2p-swarm is available on npm and so, like any other npm module, just: -var s = new Swarm([port]) // `port` defalts to 4001 +```bash +$ npm install libp2p-swarm --save ``` -### Set the swarm to listen for incoming streams +And use it on your Node.js code as: -```javascript -s.listen([port], [callback]) // `port` defaults to 4001, `callback` gets called when the socket starts listening -``` - -### Close the listener/socket and every open stream that was multiplexed on it +```JavaScript +var Swarm = require('libp2p-swarm') -```javascript -s.closeListener() +var sw = new Swarm(peerInfoSelf) ``` -### Register a protocol to be handled by an incoming stream - -```javascript -s.registerHandler('/name/protocol/you/want/version', function (stream) {}) -``` +peerInfoSelf is a PeerInfo object that represents the peer creating this swarm instance. -### Open a new connection +### Support a transport -Used when we want to make sure we can connect to a given peer, but do not intend to establish a stream with any of the services offered right away. +libp2p-swarm expects transports that implement [abstract-transport](https://github.com/diasdavid/abstract-transport). For example [libp2p-tcp](https://github.com/diasdavid/node-libp2p-tcp), a simple shim on top of the `net` module to make it work with swarm expectations. -``` -s.openConnection(peerConnection, function (err) {}) +```JavaScript +sw.addTransport(transport, [options, dialOptions, listenOptions]) ``` +### Add an connection upgrade -### Dial a new stream +A connection upgrade must be able to receive and return something that implements the [abstract-connection]() interface. +```JavaScript +sw.addUpgrade(connUpgrade, [options]) ``` -s.openStream(peerInfo, protocol, function (err, stream) {}) -``` - -peerInfo must be a [`ipfs-peer`](https://www.npmjs.com/package/ipfs-peer) object, contaning both peer-id and multiaddrs. -## Events emitted +Upgrading a connection to use a Stream Muxer is still considered a upgrade, but a special case since once this connection is applied, the returned obj will implement the [abstract-stream-muxer]() interface. +```JavaScript +sw.addStreamMuxer(streamMuxer, [options]) ``` -.on('error') -.on('connection') -.on('connection-unknown') // used by Identify to start the Identify protocol from listener to dialer -``` - -## Identify protocol - -The Identify protocol is an integral part to Swarm. It enables peers to share observedAddrs, identities and other possible address available. This enables us to do better NAT traversal. - -To instantiate Identify: +### Dial to another peer +```JavaScript +sw.dial(PeerInfo, protocol, options) +sw.dial(PeerInfo, options) ``` -var Identify = require('ipfs-swarm/identify') -var i = new Identify(swarmInstance, peerSelf) -``` +dial uses the best transport (whatever works first, in the future we can have some criteria), and jump starts the connection until the point we have to negotiate the protocol. If a Muxer is available, then drop the muxer onto that connection. Good to warm up connections or to check for connectivity. If we have already a Muxer for that peerInfo, than do nothing. -`swarmInstance` must be an Instance of swarm and `peerSelf` must be a instance of `ipfs-peer` that represents the peer that instantiated this Identify +### Accept requests on a specific protocol -Identify emits a `peer-update` event each time it receives information from another peer. +```JavaScript +sw.handleProtocol(protocol, handlerFunction) +``` From 1833ded0f730686aebcba108c177ee31b74f9419 Mon Sep 17 00:00:00 2001 From: David Dias Date: Mon, 21 Sep 2015 16:46:04 +0100 Subject: [PATCH 02/18] making progress --- README.md | 2 +- examples/{c.js => peerA.js} | 0 examples/{s.js => peerB.js} | 5 +- src/swarm-old.js | 189 +++++++++++++++++++++++++++++++++ src/swarm.js | 201 +++++++++--------------------------- 5 files changed, 241 insertions(+), 156 deletions(-) rename examples/{c.js => peerA.js} (100%) rename examples/{s.js => peerB.js} (87%) create mode 100644 src/swarm-old.js diff --git a/README.md b/README.md index 4fcf3e6827..8910334387 100644 --- a/README.md +++ b/README.md @@ -56,7 +56,7 @@ sw.addStreamMuxer(streamMuxer, [options]) ### Dial to another peer ```JavaScript -sw.dial(PeerInfo, protocol, options) +sw.dial(PeerInfo, options, protocol) sw.dial(PeerInfo, options) ``` diff --git a/examples/c.js b/examples/peerA.js similarity index 100% rename from examples/c.js rename to examples/peerA.js diff --git a/examples/s.js b/examples/peerB.js similarity index 87% rename from examples/s.js rename to examples/peerB.js index 856730d060..c480983a36 100644 --- a/examples/s.js +++ b/examples/peerB.js @@ -1,6 +1,7 @@ -// var Identify = require('./../src/identify') var Swarm = require('./../src') -// var Peer = require('ipfs-peer') + + +var Peer = require('ipfs-peer') // var Id = require('ipfs-peer-id') // var multiaddr = require('multiaddr') diff --git a/src/swarm-old.js b/src/swarm-old.js new file mode 100644 index 0000000000..9a27bdad41 --- /dev/null +++ b/src/swarm-old.js @@ -0,0 +1,189 @@ +var tcp = require('net') +var Select = require('multistream-select').Select +var Interactive = require('multistream-select').Interactive +var Muxer = require('./stream-muxer') +var log = require('ipfs-logger').group('swarm') +var async = require('async') +var EventEmitter = require('events').EventEmitter +var util = require('util') + +exports = module.exports = Swarm + +util.inherits(Swarm, EventEmitter) + +function Swarm () { + var self = this + + if (!(self instanceof Swarm)) { + throw new Error('Swarm must be called with new') + } + + self.port = parseInt(process.env.IPFS_SWARM_PORT, 10) || 4001 + self.connections = {} // {peerIdB58: {conn: <>, socket: <>} + self.handles = {} + + // set the listener + + self.listen = function (port, ready) { + if (!ready) { + ready = function noop () {} + } + if (typeof port === 'function') { + ready = port + } else if (port) { self.port = port } + + // + + self.listener = tcp.createServer(function (socket) { + errorUp(self, socket) + var ms = new Select() + ms.handle(socket) + ms.addHandler('/spdy/3.1.0', function (ds) { + log.info('Negotiated spdy with incoming socket') + + var conn = new Muxer().attach(ds, true) + + // attach multistream handlers to incoming streams + + conn.on('stream', registerHandles) + errorUp(self, conn) + + // FOR IDENTIFY + self.emit('connection-unknown', conn, socket) + + // IDENTIFY DOES THIS FOR US + // conn.on('close', function () { delete self.connections[conn.peerId] }) + }) + }).listen(self.port, ready) + errorUp(self, self.listener) + } + + // interface + + // open stream account for connection reuse + self.openConnection = function (peer, cb) { + // If no connection open yet, open it + if (!self.connections[peer.id.toB58String()]) { + // Establish a socket with one of the addresses + var socket + async.eachSeries(peer.multiaddrs, function (multiaddr, next) { + if (socket) { return next() } + + var tmp = tcp.connect(multiaddr.toOptions(), function () { + socket = tmp + errorUp(self, socket) + next() + }) + + tmp.once('error', function (err) { + log.warn(multiaddr.toString(), 'on', peer.id.toB58String(), 'not available', err) + next() + }) + + }, function done () { + if (!socket) { + return cb(new Error('Not able to open a scoket with peer - ', + peer.id.toB58String())) + } + gotSocket(socket) + }) + } else { + cb() + } + + // do the spdy people dance (multistream-select into spdy) + function gotSocket (socket) { + var msi = new Interactive() + msi.handle(socket, function () { + msi.select('/spdy/3.1.0', function (err, ds) { + if (err) { cb(err) } + + var conn = new Muxer().attach(ds, false) + conn.on('stream', registerHandles) + self.connections[peer.id.toB58String()] = { + conn: conn, + socket: socket + } + conn.on('close', function () { delete self.connections[peer.id.toB58String()]}) + errorUp(self, conn) + + cb() + }) + }) + } + } + + self.openStream = function (peer, protocol, cb) { + self.openConnection(peer, function (err) { + if (err) { + return cb(err) + } + // spawn new muxed stream + var conn = self.connections[peer.id.toB58String()].conn + conn.dialStream(function (err, stream) { + if (err) { return cb(err) } + errorUp(self, stream) + // negotiate desired protocol + var msi = new Interactive() + msi.handle(stream, function () { + msi.select(protocol, function (err, ds) { + if (err) { return cb(err) } + peer.lastSeen = new Date() + cb(null, ds) // return the stream + }) + }) + }) + }) + } + + self.registerHandler = function (protocol, handlerFunc) { + if (self.handles[protocol]) { + return handlerFunc(new Error('Handle for protocol already exists', protocol)) + } + self.handles[protocol] = handlerFunc + log.info('Registered handler for protocol:', protocol) + } + + self.closeConns = function (cb) { + var keys = Object.keys(self.connections) + var number = keys.length + if (number === 0) { cb() } + var c = new Counter(number, cb) + + keys.forEach(function (key) { + self.connections[key].conn.end() + c.hit() + }) + } + + self.closeListener = function (cb) { + self.listener.close(cb) + } + + function registerHandles (stream) { + log.info('Registering protocol handlers on new stream') + errorUp(self, stream) + var msH = new Select() + msH.handle(stream) + Object.keys(self.handles).forEach(function (protocol) { + msH.addHandler(protocol, self.handles[protocol]) + }) + } + +} + +function errorUp (self, emitter) { + emitter.on('error', function (err) { + self.emit('error', err) + }) +} + +function Counter (target, callback) { + var c = 0 + this.hit = count + + function count () { + c += 1 + if (c === target) { callback() } + } +} diff --git a/src/swarm.js b/src/swarm.js index 9a27bdad41..4d792999a0 100644 --- a/src/swarm.js +++ b/src/swarm.js @@ -1,189 +1,84 @@ -var tcp = require('net') -var Select = require('multistream-select').Select -var Interactive = require('multistream-select').Interactive -var Muxer = require('./stream-muxer') -var log = require('ipfs-logger').group('swarm') -var async = require('async') -var EventEmitter = require('events').EventEmitter -var util = require('util') exports = module.exports = Swarm -util.inherits(Swarm, EventEmitter) - -function Swarm () { +function Swarm (peerInfo) { var self = this if (!(self instanceof Swarm)) { throw new Error('Swarm must be called with new') } - self.port = parseInt(process.env.IPFS_SWARM_PORT, 10) || 4001 - self.connections = {} // {peerIdB58: {conn: <>, socket: <>} - self.handles = {} - - // set the listener + self.peerInfo = peerInfo - self.listen = function (port, ready) { - if (!ready) { - ready = function noop () {} - } - if (typeof port === 'function') { - ready = port - } else if (port) { self.port = port } + // peerIdB58: { conn: } + self.conns = {} - // + // peerIdB58: { muxer: } + self.muxedConns = {} - self.listener = tcp.createServer(function (socket) { - errorUp(self, socket) - var ms = new Select() - ms.handle(socket) - ms.addHandler('/spdy/3.1.0', function (ds) { - log.info('Negotiated spdy with incoming socket') + // transportName: { transport: transport, + // dialOptions: dialOptions, + // listenOptions: listenOptions, + // listeners: [] } + self.transports = {} - var conn = new Muxer().attach(ds, true) + self.listeners = {} - // attach multistream handlers to incoming streams + // public interface - conn.on('stream', registerHandles) - errorUp(self, conn) + self.addTransport = function (transport, options, dialOptions, listenOptions, callback) { + // set up the transport and add the list of incoming streams + // add transport to the list of transports - // FOR IDENTIFY - self.emit('connection-unknown', conn, socket) - - // IDENTIFY DOES THIS FOR US - // conn.on('close', function () { delete self.connections[conn.peerId] }) - }) - }).listen(self.port, ready) - errorUp(self, self.listener) - } + var listener = transport.createListener(options, listen) - // interface - - // open stream account for connection reuse - self.openConnection = function (peer, cb) { - // If no connection open yet, open it - if (!self.connections[peer.id.toB58String()]) { - // Establish a socket with one of the addresses - var socket - async.eachSeries(peer.multiaddrs, function (multiaddr, next) { - if (socket) { return next() } - - var tmp = tcp.connect(multiaddr.toOptions(), function () { - socket = tmp - errorUp(self, socket) - next() - }) - - tmp.once('error', function (err) { - log.warn(multiaddr.toString(), 'on', peer.id.toB58String(), 'not available', err) - next() - }) - - }, function done () { - if (!socket) { - return cb(new Error('Not able to open a scoket with peer - ', - peer.id.toB58String())) - } - gotSocket(socket) - }) - } else { - cb() - } - - // do the spdy people dance (multistream-select into spdy) - function gotSocket (socket) { - var msi = new Interactive() - msi.handle(socket, function () { - msi.select('/spdy/3.1.0', function (err, ds) { - if (err) { cb(err) } - - var conn = new Muxer().attach(ds, false) - conn.on('stream', registerHandles) - self.connections[peer.id.toB58String()] = { - conn: conn, - socket: socket - } - conn.on('close', function () { delete self.connections[peer.id.toB58String()]}) - errorUp(self, conn) - - cb() - }) - }) - } - } + listener.listen(listenOptions, function ready () { + self.transports[options.name] = { + transport: transport, + dialOptions: dialOptions, + listenOptions: listenOptions, + listeners: [listener] + } - self.openStream = function (peer, protocol, cb) { - self.openConnection(peer, function (err) { - if (err) { - return cb(err) + // If a known multiaddr is passed, then add to our list of multiaddrs + if (options.multiaddr) { + self.peerInfo.multiaddrs.push(options.multiaddr) } - // spawn new muxed stream - var conn = self.connections[peer.id.toB58String()].conn - conn.dialStream(function (err, stream) { - if (err) { return cb(err) } - errorUp(self, stream) - // negotiate desired protocol - var msi = new Interactive() - msi.handle(stream, function () { - msi.select(protocol, function (err, ds) { - if (err) { return cb(err) } - peer.lastSeen = new Date() - cb(null, ds) // return the stream - }) - }) - }) + + callback() }) } - self.registerHandler = function (protocol, handlerFunc) { - if (self.handles[protocol]) { - return handlerFunc(new Error('Handle for protocol already exists', protocol)) - } - self.handles[protocol] = handlerFunc - log.info('Registered handler for protocol:', protocol) + self.addUpgrade = function (ConnUpgrade, options) { + } - self.closeConns = function (cb) { - var keys = Object.keys(self.connections) - var number = keys.length - if (number === 0) { cb() } - var c = new Counter(number, cb) + self.addStreamMuxer = function (StreamMuxer, options) { - keys.forEach(function (key) { - self.connections[key].conn.end() - c.hit() - }) } - self.closeListener = function (cb) { - self.listener.close(cb) + self.dial = function (peerInfo, options, protocol, callback) { + // 1. check if we have transports we support } - function registerHandles (stream) { - log.info('Registering protocol handlers on new stream') - errorUp(self, stream) - var msH = new Select() - msH.handle(stream) - Object.keys(self.handles).forEach(function (protocol) { - msH.addHandler(protocol, self.handles[protocol]) - }) + self.closeListener = function (transportName, callback) { + // close a specific listener + // remove it from available transports } -} + self.close = function (callback) { + // close everything + } -function errorUp (self, emitter) { - emitter.on('error', function (err) { - self.emit('error', err) - }) -} + self.handleProtocol = function (protocol, handlerFunction) { + + } -function Counter (target, callback) { - var c = 0 - this.hit = count + // internals - function count () { - c += 1 - if (c === target) { callback() } + function listen (conn) { + console.log('Received new connection') + // apply upgrades + // then add it to the multistreamHandler } } From 73a6a4fd4575abcefa396200d11e7656690c0592 Mon Sep 17 00:00:00 2001 From: David Dias Date: Mon, 21 Sep 2015 19:56:42 +0100 Subject: [PATCH 03/18] adding transports works --- examples/peerB.js | 33 +++++++++------------------------ package.json | 5 +++-- src/swarm.js | 21 +++++++-------------- 3 files changed, 19 insertions(+), 40 deletions(-) diff --git a/examples/peerB.js b/examples/peerB.js index c480983a36..95ca3ff620 100644 --- a/examples/peerB.js +++ b/examples/peerB.js @@ -1,29 +1,14 @@ var Swarm = require('./../src') +var Peer = require('peer-info') +var Id = require('peer-id') +var multiaddr = require('multiaddr') +var tcp = require('libp2p-tcp') -var Peer = require('ipfs-peer') -// var Id = require('ipfs-peer-id') -// var multiaddr = require('multiaddr') +var mh = multiaddr('/ip4/127.0.0.1/tcp/8010') +var p = new Peer(Id.create(), []) +var sw = new Swarm(p) -var b = new Swarm() -b.port = 4001 -// var peerB = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/' + b.port)]) - -// var i = new Identify(b, peerB) -// i.on('thenews', function (news) { -// console.log('such news') -// }) - -b.on('error', function (err) { - console.log(err) -}) - -b.listen() - -b.registerHandler('/ipfs/sparkles/1.2.3', function (stream) { - // if (err) { - // return console.log(err) - // } - - console.log('woop got a stream') +sw.addTransport('tcp', tcp, { multiaddr: mh }, {}, {port: 8010}, function () { + console.log('transport added') }) diff --git a/package.json b/package.json index f59408d538..4f5129bf57 100644 --- a/package.json +++ b/package.json @@ -33,6 +33,7 @@ "devDependencies": { "code": "^1.4.1", "lab": "^5.13.0", + "libp2p-tcp": "^0.1.1", "precommit-hook": "^3.0.0", "sinon": "^1.15.4", "standard": "^4.5.2", @@ -42,11 +43,11 @@ "async": "^1.3.0", "ip-address": "^4.0.0", "ipfs-logger": "^0.1.0", - "ipfs-peer": "^0.3.0", - "ipfs-peer-id": "^0.3.0", "multiaddr": "^1.0.0", "multiplex-stream-muxer": "^0.2.0", "multistream-select": "^0.6.1", + "peer-id": "^0.3.3", + "peer-info": "^0.3.2", "protocol-buffers-stream": "^1.2.0", "spdy-stream-muxer": "^0.6.0" } diff --git a/src/swarm.js b/src/swarm.js index 4d792999a0..6c38ec09e5 100644 --- a/src/swarm.js +++ b/src/swarm.js @@ -1,4 +1,3 @@ - exports = module.exports = Swarm function Swarm (peerInfo) { @@ -26,14 +25,14 @@ function Swarm (peerInfo) { // public interface - self.addTransport = function (transport, options, dialOptions, listenOptions, callback) { + self.addTransport = function (name, transport, options, dialOptions, listenOptions, callback) { // set up the transport and add the list of incoming streams // add transport to the list of transports var listener = transport.createListener(options, listen) listener.listen(listenOptions, function ready () { - self.transports[options.name] = { + self.transports[name] = { transport: transport, dialOptions: dialOptions, listenOptions: listenOptions, @@ -49,13 +48,9 @@ function Swarm (peerInfo) { }) } - self.addUpgrade = function (ConnUpgrade, options) { - - } + self.addUpgrade = function (ConnUpgrade, options) {} - self.addStreamMuxer = function (StreamMuxer, options) { - - } + self.addStreamMuxer = function (StreamMuxer, options) {} self.dial = function (peerInfo, options, protocol, callback) { // 1. check if we have transports we support @@ -70,15 +65,13 @@ function Swarm (peerInfo) { // close everything } - self.handleProtocol = function (protocol, handlerFunction) { - - } + self.handleProtocol = function (protocol, handlerFunction) {} // internals function listen (conn) { console.log('Received new connection') - // apply upgrades - // then add it to the multistreamHandler + // apply upgrades + // then add it to the multistreamHandler } } From e1df0b9ecd70c2af78a4f25b2f58a6e7af122527 Mon Sep 17 00:00:00 2001 From: David Dias Date: Tue, 22 Sep 2015 14:31:23 +0100 Subject: [PATCH 04/18] add transport and close listener test --- package.json | 9 +- src/swarm.js | 44 ++- ...r-test.js => multistream-and-muxer-old.js} | 0 tests/swarm-old.js | 254 ++++++++++++++++++ tests/swarm-test.js | 105 +++++--- 5 files changed, 362 insertions(+), 50 deletions(-) rename tests/{multistream-and-muxer-test.js => multistream-and-muxer-old.js} (100%) create mode 100644 tests/swarm-old.js diff --git a/package.json b/package.json index 4f5129bf57..0532386605 100644 --- a/package.json +++ b/package.json @@ -1,14 +1,13 @@ { - "name": "ipfs-swarm", + "name": "libp2p-swarm", "version": "0.4.1", "description": "IPFS swarm implementation in Node.js", "main": "src/index.js", "scripts": { "test": "./node_modules/.bin/lab tests/*-test.js", "coverage": "./node_modules/.bin/lab -t 100 tests/*-test.js", - "codestyle": "./node_modules/.bin/standard --format", - "lint": "./node_modules/.bin/standard", - "validate": "npm ls" + "laf": "./node_modules/.bin/standard --format", + "lint": "./node_modules/.bin/standard" }, "repository": { "type": "git", @@ -24,7 +23,7 @@ }, "homepage": "https://github.com/diasdavid/node-ipfs-swarm", "pre-commit": [ - "codestyle", + "lint", "test" ], "engines": { diff --git a/src/swarm.js b/src/swarm.js index 6c38ec09e5..7637f23bdd 100644 --- a/src/swarm.js +++ b/src/swarm.js @@ -1,3 +1,5 @@ +var multistream = require('multistream-select') + exports = module.exports = Swarm function Swarm (peerInfo) { @@ -23,6 +25,8 @@ function Swarm (peerInfo) { self.listeners = {} + self.protocols = {} + // public interface self.addTransport = function (name, transport, options, dialOptions, listenOptions, callback) { @@ -34,9 +38,10 @@ function Swarm (peerInfo) { listener.listen(listenOptions, function ready () { self.transports[name] = { transport: transport, + options: options, dialOptions: dialOptions, listenOptions: listenOptions, - listeners: [listener] + listener: listener } // If a known multiaddr is passed, then add to our list of multiaddrs @@ -48,30 +53,53 @@ function Swarm (peerInfo) { }) } - self.addUpgrade = function (ConnUpgrade, options) {} + self.addUpgrade = function (ConnUpgrade, options) { + + } - self.addStreamMuxer = function (StreamMuxer, options) {} + self.addStreamMuxer = function (StreamMuxer, options) { + + } self.dial = function (peerInfo, options, protocol, callback) { // 1. check if we have transports we support } self.closeListener = function (transportName, callback) { - // close a specific listener - // remove it from available transports + self.transports[transportName].listener.close(closed) + + // only gets called when all the streams on this transport are closed too + function closed () { + delete self.transports[transportName] + callback() + } } self.close = function (callback) { // close everything } - self.handleProtocol = function (protocol, handlerFunction) {} + self.handleProtocol = function (protocol, handlerFunction) { + self.protocols[protocol] = handlerFunction + } // internals function listen (conn) { console.log('Received new connection') - // apply upgrades - // then add it to the multistreamHandler + // TODO apply upgrades + // TODO then add StreamMuxer if available + + // if no stream muxer, then + userProtocolMuxer(conn) + } + + // Handle user given protocols + function userProtocolMuxer (conn) { + var msS = new multistream.Select() + msS.handle(conn) + Object.keys(self.protocols).forEach(function (protocol) { + msS.addHandler(protocol, self.protocols[protocol]) + }) } } diff --git a/tests/multistream-and-muxer-test.js b/tests/multistream-and-muxer-old.js similarity index 100% rename from tests/multistream-and-muxer-test.js rename to tests/multistream-and-muxer-old.js diff --git a/tests/swarm-old.js b/tests/swarm-old.js new file mode 100644 index 0000000000..408b8f7393 --- /dev/null +++ b/tests/swarm-old.js @@ -0,0 +1,254 @@ +var Lab = require('lab') +var Code = require('code') +var sinon = require('sinon') +var lab = exports.lab = Lab.script() + +var experiment = lab.experiment +var test = lab.test +var beforeEach = lab.beforeEach +var afterEach = lab.afterEach +var expect = Code.expect + +var multiaddr = require('multiaddr') +var Id = require('ipfs-peer-id') +var Peer = require('ipfs-peer') +var Swarm = require('../src/') +var Identify = require('../src/identify') + +var swarmA +var swarmB +var peerA +var peerB + +beforeEach(function (done) { + swarmA = new Swarm() + swarmB = new Swarm() + var c = new Counter(2, done) + + swarmA.listen(8100, function () { + peerA = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/' + swarmA.port)]) + c.hit() + }) + + swarmB.listen(8101, function () { + peerB = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/' + swarmB.port)]) + c.hit() + }) + +}) + +afterEach(function (done) { + // This should be 2, but for some reason + // that will fail in most of the tests + var c = new Counter(1, done) + + swarmA.closeListener(function () { + c.hit() + }) + swarmB.closeListener(function () { + c.hit() + }) +}) + +experiment('BASICS', function () { + experiment('Swarm', function () { + test('enforces instantiation with new', function (done) { + expect(function () { + Swarm() + }).to.throw('Swarm must be called with new') + done() + }) + + test('parses $IPFS_SWARM_PORT', function (done) { + process.env.IPFS_SWARM_PORT = 1111 + var swarm = new Swarm() + expect(swarm.port).to.be.equal(1111) + process.env.IPFS_SWARM_PORT = undefined + done() + }) + }) + + experiment('Swarm.listen', function (done) { + test('handles missing port', function (done) { + var swarm = new Swarm() + swarm.listen(done) + }) + + test('handles passed in port', function (done) { + var swarm = new Swarm() + swarm.listen(1234) + expect(swarm.port).to.be.equal(1234) + done() + }) + }) + + experiment('Swarm.registerHandler', function () { + test('throws when registering a protcol handler twice', function (done) { + var swarm = new Swarm() + swarm.registerHandler('/sparkles/1.1.1', function () {}) + swarm.registerHandler('/sparkles/1.1.1', function (err) { + expect(err).to.be.an.instanceOf(Error) + expect(err.message).to.be.equal('Handle for protocol already exists') + done() + }) + }) + }) + + experiment('Swarm.closeConns', function () { + test('calls end on all connections', function (done) { + swarmA.openConnection(peerB, function () { + var key = Object.keys(swarmA.connections)[0] + sinon.spy(swarmA.connections[key].conn, 'end') + swarmA.closeConns(function () { + expect(swarmA.connections[key].conn.end.called).to.be.equal(true) + done() + }) + }) + }) + }) +}) + +experiment('BASE', function () { + test('Open a stream', function (done) { + var protocol = '/sparkles/3.3.3' + var c = new Counter(2, done) + + swarmB.registerHandler(protocol, function (stream) { + c.hit() + }) + + swarmA.openStream(peerB, protocol, function (err, stream) { + expect(err).to.not.be.instanceof(Error) + c.hit() + }) + }) + + test('Reuse connection (from dialer)', function (done) { + var protocol = '/sparkles/3.3.3' + + swarmB.registerHandler(protocol, function (stream) {}) + + swarmA.openStream(peerB, protocol, function (err, stream) { + expect(err).to.not.be.instanceof(Error) + + swarmA.openStream(peerB, protocol, function (err, stream) { + expect(err).to.not.be.instanceof(Error) + expect(swarmA.connections.length === 1) + done() + }) + }) + }) + test('Check for lastSeen', function (done) { + var protocol = '/sparkles/3.3.3' + + swarmB.registerHandler(protocol, function (stream) {}) + + swarmA.openStream(peerB, protocol, function (err, stream) { + expect(err).to.not.be.instanceof(Error) + expect(peerB.lastSeen).to.be.instanceof(Date) + done() + }) + }) + +}) + +experiment('IDENTIFY', function () { + test('Attach Identify, open a stream, see a peer update', function (done) { + swarmA.on('error', function (err) { + console.log('A - ', err) + }) + + swarmB.on('error', function (err) { + console.log('B - ', err) + }) + + var protocol = '/sparkles/3.3.3' + + var identifyA = new Identify(swarmA, peerA) + var identifyB = new Identify(swarmB, peerB) + setTimeout(function () { + swarmB.registerHandler(protocol, function (stream) {}) + + swarmA.openStream(peerB, protocol, function (err, stream) { + expect(err).to.not.be.instanceof(Error) + }) + + identifyB.on('peer-update', function (answer) { + done() + }) + identifyA.on('peer-update', function (answer) {}) + }, 500) + }) + + test('Attach Identify, open a stream, reuse stream', function (done) { + var protocol = '/sparkles/3.3.3' + + var identifyA = new Identify(swarmA, peerA) + var identifyB = new Identify(swarmB, peerB) + + swarmA.registerHandler(protocol, function (stream) {}) + swarmB.registerHandler(protocol, function (stream) {}) + + swarmA.openStream(peerB, protocol, function (err, stream) { + expect(err).to.not.be.instanceof(Error) + }) + + identifyB.on('peer-update', function (answer) { + expect(Object.keys(swarmB.connections).length).to.equal(1) + swarmB.openStream(peerA, protocol, function (err, stream) { + expect(err).to.not.be.instanceof(Error) + expect(Object.keys(swarmB.connections).length).to.equal(1) + done() + }) + }) + identifyA.on('peer-update', function (answer) {}) + }) + + test('Attach Identify, reuse peer', function (done) { + var protocol = '/sparkles/3.3.3' + + var identifyA = new Identify(swarmA, peerA) + var identifyB = new Identify(swarmB, peerB) // eslint-disable-line no-unused-vars + + swarmA.registerHandler(protocol, function (stream) {}) + swarmB.registerHandler(protocol, function (stream) {}) + + var restartA = function (cb) { + swarmA.openStream(peerB, protocol, function (err, stream) { + expect(err).to.not.be.instanceof(Error) + + stream.end(cb) + }) + } + + restartA(function () { + identifyA.once('peer-update', function () { + expect(peerA.previousObservedAddrs.length).to.be.equal(1) + + var c = new Counter(2, done) + + swarmA.closeConns(c.hit.bind(c)) + swarmB.closeConns(c.hit.bind(c)) + }) + }) + }) +}) + +experiment('HARDNESS', function () {}) + +function Counter (target, callback) { + var c = 0 + this.hit = count + + function count () { + c += 1 + if (c === target) { + callback() + } + } +} + +// function checkErr (err) { +// console.log('err') +// expect(err).to.be.instanceof(Error) +// } diff --git a/tests/swarm-test.js b/tests/swarm-test.js index 408b8f7393..6c014e9414 100644 --- a/tests/swarm-test.js +++ b/tests/swarm-test.js @@ -1,55 +1,85 @@ var Lab = require('lab') var Code = require('code') -var sinon = require('sinon') var lab = exports.lab = Lab.script() var experiment = lab.experiment var test = lab.test -var beforeEach = lab.beforeEach -var afterEach = lab.afterEach var expect = Code.expect var multiaddr = require('multiaddr') -var Id = require('ipfs-peer-id') -var Peer = require('ipfs-peer') -var Swarm = require('../src/') -var Identify = require('../src/identify') - -var swarmA -var swarmB -var peerA -var peerB - -beforeEach(function (done) { - swarmA = new Swarm() - swarmB = new Swarm() - var c = new Counter(2, done) - - swarmA.listen(8100, function () { - peerA = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/' + swarmA.port)]) - c.hit() - }) +var Id = require('peer-id') +var Peer = require('peer-info') +var Swarm = require('../src') +var tcp = require('libp2p-tcp') + +/* TODO +experiment('Basics', function () { + test('enforces creation with new', function (done) {done() }) +}) +*/ + +experiment('Without a Stream Muxer', function () { + experiment('tcp', function () { + test('add the transport', function (done) { + var mh = multiaddr('/ip4/127.0.0.1/tcp/8010') + var p = new Peer(Id.create(), []) + var sw = new Swarm(p) + + sw.addTransport('tcp', tcp, + { multiaddr: mh }, {}, {port: 8010}, function () { + expect(sw.transports['tcp'].options).to.deep.equal({ multiaddr: mh }) + expect(sw.transports['tcp'].dialOptions).to.deep.equal({}) + expect(sw.transports['tcp'].listenOptions).to.deep.equal({port: 8010}) + expect(sw.transports['tcp'].transport).to.deep.equal(tcp) + sw.closeListener('tcp', function () { + done() + }) + }) + }) - swarmB.listen(8101, function () { - peerB = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/' + swarmB.port)]) - c.hit() + test('dial a conn', function (done) { + done() + }) + test('dial a conn on a protocol', function (done) { done() }) + test('add an upgrade', function (done) { done() }) + test('dial a conn on top of a upgrade', function (done) { done() }) + test('dial a conn on a protocol on top of a upgrade', function (done) { done() }) }) + /* TODO + experiment('udp', function () { + test('add the transport', function (done) { done() }) + test('dial a conn', function (done) { done() }) + test('dial a conn on a protocol', function (done) { done() }) + test('add an upgrade', function (done) { done() }) + test('dial a conn on top of a upgrade', function (done) { done() }) + test('dial a conn on a protocol on top of a upgrade', function (done) { done() }) + }) */ + + /* TODO + experiment('udt', function () { + test('add the transport', function (done) { done() }) + test('dial a conn', function (done) { done() }) + test('dial a conn on a protocol', function (done) { done() }) + test('add an upgrade', function (done) { done() }) + test('dial a conn on top of a upgrade', function (done) { done() }) + test('dial a conn on a protocol on top of a upgrade', function (done) { done() }) + }) */ + + /* TODO + experiment('utp', function () { + test('add the transport', function (done) { done() }) + test('dial a conn', function (done) { done() }) + test('dial a conn on a protocol', function (done) { done() }) + test('add an upgrade', function (done) { done() }) + test('dial a conn on top of a upgrade', function (done) { done() }) + test('dial a conn on a protocol on top of a upgrade', function (done) { done() }) + }) */ }) -afterEach(function (done) { - // This should be 2, but for some reason - // that will fail in most of the tests - var c = new Counter(1, done) - - swarmA.closeListener(function () { - c.hit() - }) - swarmB.closeListener(function () { - c.hit() - }) -}) +experiment('With a SPDY Stream Muxer', function () {}) +/* OLD experiment('BASICS', function () { experiment('Swarm', function () { test('enforces instantiation with new', function (done) { @@ -247,6 +277,7 @@ function Counter (target, callback) { } } } +*/ // function checkErr (err) { // console.log('err') From 5e4cca52c0d1144a84e5ff7e9f6ad548a0868d61 Mon Sep 17 00:00:00 2001 From: David Dias Date: Tue, 22 Sep 2015 16:16:21 +0100 Subject: [PATCH 05/18] dial a conn + test --- src/swarm.js | 111 +++++++++++++++++++++++++++++++++++++++++++- tests/swarm-test.js | 25 +++++++++- 2 files changed, 134 insertions(+), 2 deletions(-) diff --git a/src/swarm.js b/src/swarm.js index 7637f23bdd..b2ce002461 100644 --- a/src/swarm.js +++ b/src/swarm.js @@ -1,4 +1,5 @@ var multistream = require('multistream-select') +var async = require('async') exports = module.exports = Swarm @@ -27,6 +28,8 @@ function Swarm (peerInfo) { self.protocols = {} + self.muxer + // public interface self.addTransport = function (name, transport, options, dialOptions, listenOptions, callback) { @@ -63,6 +66,112 @@ function Swarm (peerInfo) { self.dial = function (peerInfo, options, protocol, callback) { // 1. check if we have transports we support + // 2. check if we have a conn waiting + // 3. check if we have a stream muxer available + + if (typeof protocol === 'function') { + callback = protocol + protocol = undefined + } + + // check if a conn is waiting + // if it is and protocol was selected, jump into multistreamHandshake + // if it is and no protocol was selected, do nothing and call and empty callback + + if (self.conns[peerInfo.id.toB58String()]) { + if (protocol) { + multistreamHandshake(self.conns[peerInfo.id.toB58String()]) + self.conns[peerInfo.id.toB58String()] = undefined + return + } else { + return callback() + } + } + + // check if a stream muxer for this peer is available + if (self.muxedConns[peerInfo.id.toB58String()]) { + return openMuxedStream() + } + + // Creating a new conn with this peer routine + + // TODO - check if there is a preference in protocol to use on + // options.protocol + var supportedTransports = Object.keys(self.protocols) + var multiaddrs = peerInfo.multiaddrs.filter(function (multiaddr) { + return multiaddr.protoNames().some(function (proto) { + return supportedTransports.indexOf(proto) >= 0 + }) + }) + + var conn + + async.eachSeries(multiaddrs, function (multiaddr, next) { + if (conn) { + return next() + } + + var transportName = getTransportNameForMultiaddr(multiaddr) + var transport = self.transports[transportName] + var dialOptions = clone(transport.dialOptions) + dialOptions.ready = connected + + var connTry = transport.transport.dial(multiaddr, dialOptions) + + connTry.once('error', function (err) { + if (err) { + return // TODO handle error + } + next() // try next multiaddr + }) + + function connected () { + conn = connTry + next() + } + + function getTransportNameForMultiaddr (multiaddr) { + // this works for all those ip + transport + port tripplets + return multiaddr.protoNames()[1] + } + + function clone (obj) { + var target = {} + for (var i in obj) { + if (obj.hasOwnProperty(i)) { + target[i] = obj[i] + } + } + return target + } + }, done) + + function done () { + // TODO apply upgrades + // TODO apply stream muxer + // if no protocol is selected, save it in the pool + // if protocol is selected, multistream that protocol + + if (protocol) { + multistreamHandshake(conn) + } else { + self.conns[peerInfo.id.toB58String()] = conn + callback() + } + } + + function openMuxedStream () { + // 1. create a new stream on this muxedConn and pass that to + // multistreamHanshake + } + + function multistreamHandshake (conn) { + var msS = new multistream.Select() + msS.handle(conn) + self.protocols.forEach(function (protocol) { + msS.addHandler(protocol, self.protocols[protocol]) + }) + } } self.closeListener = function (transportName, callback) { @@ -88,7 +197,7 @@ function Swarm (peerInfo) { function listen (conn) { console.log('Received new connection') // TODO apply upgrades - // TODO then add StreamMuxer if available + // TODO then add StreamMuxer if available (and point streams from muxer to userProtocolMuxer) // if no stream muxer, then userProtocolMuxer(conn) diff --git a/tests/swarm-test.js b/tests/swarm-test.js index 6c014e9414..692ad9bee9 100644 --- a/tests/swarm-test.js +++ b/tests/swarm-test.js @@ -38,9 +38,32 @@ experiment('Without a Stream Muxer', function () { }) test('dial a conn', function (done) { - done() + var mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010') + var p1 = new Peer(Id.create(), []) + var sw1 = new Swarm(p1) + sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, ready) + + var mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020') + var p2 = new Peer(Id.create(), []) + var sw2 = new Swarm(p2) + sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, ready) + + var count = 0 + + function ready () { + count++ + if (count < 2) { + return + } + + sw1.dial(p2, {}, function () { + expect(Object.keys(sw1.conns).length).to.equal(1) + done() + }) + } }) test('dial a conn on a protocol', function (done) { done() }) + test('dial a protocol on a previous created conn', function (done) { done() }) test('add an upgrade', function (done) { done() }) test('dial a conn on top of a upgrade', function (done) { done() }) test('dial a conn on a protocol on top of a upgrade', function (done) { done() }) From 8e8d8e9093aad0c4f961d5e589f4d1e9db977e75 Mon Sep 17 00:00:00 2001 From: David Dias Date: Tue, 22 Sep 2015 16:50:42 +0100 Subject: [PATCH 06/18] dial on a protocol + test --- src/swarm.js | 25 ++++++++++++---- tests/swarm-test.js | 71 ++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 86 insertions(+), 10 deletions(-) diff --git a/src/swarm.js b/src/swarm.js index b2ce002461..763b73b174 100644 --- a/src/swarm.js +++ b/src/swarm.js @@ -79,6 +79,7 @@ function Swarm (peerInfo) { // if it is and no protocol was selected, do nothing and call and empty callback if (self.conns[peerInfo.id.toB58String()]) { + console.log('Had conn warmed up') if (protocol) { multistreamHandshake(self.conns[peerInfo.id.toB58String()]) self.conns[peerInfo.id.toB58String()] = undefined @@ -97,7 +98,7 @@ function Swarm (peerInfo) { // TODO - check if there is a preference in protocol to use on // options.protocol - var supportedTransports = Object.keys(self.protocols) + var supportedTransports = Object.keys(self.transports) var multiaddrs = peerInfo.multiaddrs.filter(function (multiaddr) { return multiaddr.protoNames().some(function (proto) { return supportedTransports.indexOf(proto) >= 0 @@ -120,7 +121,7 @@ function Swarm (peerInfo) { connTry.once('error', function (err) { if (err) { - return // TODO handle error + return console.log(err) // TODO handle error } next() // try next multiaddr }) @@ -151,6 +152,9 @@ function Swarm (peerInfo) { // TODO apply stream muxer // if no protocol is selected, save it in the pool // if protocol is selected, multistream that protocol + if (!conn) { + callback(new Error('Unable to open a connection')) + } if (protocol) { multistreamHandshake(conn) @@ -166,10 +170,9 @@ function Swarm (peerInfo) { } function multistreamHandshake (conn) { - var msS = new multistream.Select() - msS.handle(conn) - self.protocols.forEach(function (protocol) { - msS.addHandler(protocol, self.protocols[protocol]) + var msI = new multistream.Interactive() + msI.handle(conn, function () { + msI.select(protocol, callback) }) } } @@ -184,6 +187,16 @@ function Swarm (peerInfo) { } } + self.closeConns = function (callback) { + // close warmed up cons so that the listener can gracefully exit + Object.keys(self.conns).forEach(function (conn) { + self.conns[conn].destroy() + }) + self.conns = {} + + callback() + } + self.close = function (callback) { // close everything } diff --git a/tests/swarm-test.js b/tests/swarm-test.js index 692ad9bee9..658e2a3e47 100644 --- a/tests/swarm-test.js +++ b/tests/swarm-test.js @@ -48,6 +48,68 @@ experiment('Without a Stream Muxer', function () { var sw2 = new Swarm(p2) sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, ready) + var readyCounter = 0 + + function ready () { + readyCounter++ + if (readyCounter < 2) { + return + } + + sw1.dial(p2, {}, function (err) { + expect(err).to.equal(undefined) + expect(Object.keys(sw1.conns).length).to.equal(1) + var cleaningCounter = 0 + sw1.closeConns(cleaningUp) + sw2.closeConns(cleaningUp) + + sw1.closeListener('tcp', cleaningUp) + sw2.closeListener('tcp', cleaningUp) + + function cleaningUp () { + cleaningCounter++ + if (cleaningCounter < 4) { + return + } + + done() + } + }) + } + }) + + test('dial a conn on a protocol', function (done) { + var mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010') + var p1 = new Peer(Id.create(), []) + var sw1 = new Swarm(p1) + sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, ready) + + var mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020') + var p2 = new Peer(Id.create(), []) + var sw2 = new Swarm(p2) + sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, ready) + + sw2.handleProtocol('/sparkles/1.0.0', function (conn) { + conn.end() + conn.on('end', function () { + var cleaningCounter = 0 + sw1.closeConns(cleaningUp) + sw2.closeConns(cleaningUp) + + sw1.closeListener('tcp', cleaningUp) + sw2.closeListener('tcp', cleaningUp) + + function cleaningUp () { + cleaningCounter++ + if (cleaningCounter < 4) { + return + } + + done() + } + }) + }) + var count = 0 function ready () { @@ -56,13 +118,14 @@ experiment('Without a Stream Muxer', function () { return } - sw1.dial(p2, {}, function () { - expect(Object.keys(sw1.conns).length).to.equal(1) - done() + sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { + expect(err).to.equal(null) + expect(Object.keys(sw1.conns).length).to.equal(0) + conn.end() }) } }) - test('dial a conn on a protocol', function (done) { done() }) + test('dial a protocol on a previous created conn', function (done) { done() }) test('add an upgrade', function (done) { done() }) test('dial a conn on top of a upgrade', function (done) { done() }) From 59b00f68864d6ec3a4b97fd03ad01d1daffef15e Mon Sep 17 00:00:00 2001 From: David Dias Date: Tue, 22 Sep 2015 17:27:37 +0100 Subject: [PATCH 07/18] use warmed up connection + test --- src/swarm.js | 3 +-- tests/swarm-test.js | 54 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 54 insertions(+), 3 deletions(-) diff --git a/src/swarm.js b/src/swarm.js index 763b73b174..2949c11e13 100644 --- a/src/swarm.js +++ b/src/swarm.js @@ -79,10 +79,10 @@ function Swarm (peerInfo) { // if it is and no protocol was selected, do nothing and call and empty callback if (self.conns[peerInfo.id.toB58String()]) { - console.log('Had conn warmed up') if (protocol) { multistreamHandshake(self.conns[peerInfo.id.toB58String()]) self.conns[peerInfo.id.toB58String()] = undefined + delete self.conns[peerInfo.id.toB58String()] return } else { return callback() @@ -208,7 +208,6 @@ function Swarm (peerInfo) { // internals function listen (conn) { - console.log('Received new connection') // TODO apply upgrades // TODO then add StreamMuxer if available (and point streams from muxer to userProtocolMuxer) diff --git a/tests/swarm-test.js b/tests/swarm-test.js index 658e2a3e47..129a716f94 100644 --- a/tests/swarm-test.js +++ b/tests/swarm-test.js @@ -126,7 +126,59 @@ experiment('Without a Stream Muxer', function () { } }) - test('dial a protocol on a previous created conn', function (done) { done() }) + test('dial a protocol on a previous created conn', function (done) { + var mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010') + var p1 = new Peer(Id.create(), []) + var sw1 = new Swarm(p1) + sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, ready) + + var mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020') + var p2 = new Peer(Id.create(), []) + var sw2 = new Swarm(p2) + sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, ready) + + var readyCounter = 0 + + sw2.handleProtocol('/sparkles/1.0.0', function (conn) { + conn.end() + conn.on('end', function () { + var cleaningCounter = 0 + sw1.closeConns(cleaningUp) + sw2.closeConns(cleaningUp) + + sw1.closeListener('tcp', cleaningUp) + sw2.closeListener('tcp', cleaningUp) + + function cleaningUp () { + cleaningCounter++ + if (cleaningCounter < 4) { + return + } + + done() + } + }) + }) + + function ready () { + readyCounter++ + if (readyCounter < 2) { + return + } + + sw1.dial(p2, {}, function (err) { + expect(err).to.equal(undefined) + expect(Object.keys(sw1.conns).length).to.equal(1) + + sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { + expect(err).to.equal(null) + expect(Object.keys(sw1.conns).length).to.equal(0) + conn.end() + }) + }) + } + }) + test('add an upgrade', function (done) { done() }) test('dial a conn on top of a upgrade', function (done) { done() }) test('dial a conn on a protocol on top of a upgrade', function (done) { done() }) From 416e107d641b1e4c4112c32b2b690e3e6a545324 Mon Sep 17 00:00:00 2001 From: David Dias Date: Tue, 22 Sep 2015 17:50:41 +0100 Subject: [PATCH 08/18] quick fix for travis --- src/swarm.js | 4 +++- tests/swarm-test.js | 5 +++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/src/swarm.js b/src/swarm.js index 2949c11e13..099917b446 100644 --- a/src/swarm.js +++ b/src/swarm.js @@ -28,7 +28,9 @@ function Swarm (peerInfo) { self.protocols = {} - self.muxer + // muxerName: { muxer: muxer + // options: options } + self.muxers = {} // public interface diff --git a/tests/swarm-test.js b/tests/swarm-test.js index 129a716f94..a5a13ea056 100644 --- a/tests/swarm-test.js +++ b/tests/swarm-test.js @@ -18,6 +18,11 @@ experiment('Basics', function () { }) */ +// because of Travis-CI +process.on('uncaughtException', function (err) { + console.log('Caught exception: ' + err) +}) + experiment('Without a Stream Muxer', function () { experiment('tcp', function () { test('add the transport', function (done) { From 0040be765df3aeee108acbb98fa85c8101c8c48e Mon Sep 17 00:00:00 2001 From: David Dias Date: Wed, 23 Sep 2015 17:11:32 +0100 Subject: [PATCH 09/18] add spdy + test --- package.json | 1 + src/swarm.js | 78 +++++++++++++++++++++++++++++++++++++-------- tests/swarm-test.js | 77 +++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 142 insertions(+), 14 deletions(-) diff --git a/package.json b/package.json index 0532386605..df32523ad4 100644 --- a/package.json +++ b/package.json @@ -32,6 +32,7 @@ "devDependencies": { "code": "^1.4.1", "lab": "^5.13.0", + "libp2p-spdy": "^0.1.0", "libp2p-tcp": "^0.1.1", "precommit-hook": "^3.0.0", "sinon": "^1.15.4", diff --git a/src/swarm.js b/src/swarm.js index 099917b446..0673c39144 100644 --- a/src/swarm.js +++ b/src/swarm.js @@ -24,11 +24,13 @@ function Swarm (peerInfo) { // listeners: [] } self.transports = {} + // transportName: listener self.listeners = {} + // protocolName: handlerFunc self.protocols = {} - // muxerName: { muxer: muxer + // muxerName: { Muxer: Muxer // Muxer is a constructor // options: options } self.muxers = {} @@ -62,8 +64,11 @@ function Swarm (peerInfo) { } - self.addStreamMuxer = function (StreamMuxer, options) { - + self.addStreamMuxer = function (name, StreamMuxer, options) { + self.muxers[name] = { + Muxer: StreamMuxer, + options: options + } } self.dial = function (peerInfo, options, protocol, callback) { @@ -82,7 +87,12 @@ function Swarm (peerInfo) { if (self.conns[peerInfo.id.toB58String()]) { if (protocol) { - multistreamHandshake(self.conns[peerInfo.id.toB58String()]) + if (self.muxers['spdy']) { + // TODO upgrade this conn to a muxer + console.log('TODO: upgrade a warm conn to muxer that was added after') + } else { + multistreamHandshake(self.conns[peerInfo.id.toB58String()]) + } self.conns[peerInfo.id.toB58String()] = undefined delete self.conns[peerInfo.id.toB58String()] return @@ -93,7 +103,11 @@ function Swarm (peerInfo) { // check if a stream muxer for this peer is available if (self.muxedConns[peerInfo.id.toB58String()]) { - return openMuxedStream() + if (protocol) { + return openMuxedStream(self.muxedConns[peerInfo.id.toB58String()]) + } else { + return callback() + } } // Creating a new conn with this peer routine @@ -151,24 +165,49 @@ function Swarm (peerInfo) { function done () { // TODO apply upgrades - // TODO apply stream muxer + // apply stream muxer // if no protocol is selected, save it in the pool // if protocol is selected, multistream that protocol if (!conn) { callback(new Error('Unable to open a connection')) } - if (protocol) { - multistreamHandshake(conn) + if (self.muxers['spdy']) { + var spdy = new self.muxers['spdy'].Muxer(self.muxers['spdy'].options) + spdy.attach(conn, false, function (err, muxer) { + if (err) { + return console.log(err) // TODO Treat error + } + + muxer.on('stream', userProtocolMuxer) + + self.muxedConns[peerInfo.id.toB58String()] = muxer + + if (protocol) { + openMuxedStream(muxer) + } else { + callback() + } + }) } else { - self.conns[peerInfo.id.toB58String()] = conn - callback() + if (protocol) { + multistreamHandshake(conn) + } else { + self.conns[peerInfo.id.toB58String()] = conn + callback() + } } } - function openMuxedStream () { + function openMuxedStream (muxer) { // 1. create a new stream on this muxedConn and pass that to // multistreamHanshake + muxer.dialStream(function (err, conn) { + if (err) { + return console.log(err) // TODO Treat error + } + multistreamHandshake(conn) + }) } function multistreamHandshake (conn) { @@ -213,8 +252,21 @@ function Swarm (peerInfo) { // TODO apply upgrades // TODO then add StreamMuxer if available (and point streams from muxer to userProtocolMuxer) - // if no stream muxer, then - userProtocolMuxer(conn) + if (self.muxers['spdy']) { + var spdy = new self.muxers['spdy'].Muxer(self.muxers['spdy'].options) + spdy.attach(conn, true, function (err, muxer) { + if (err) { + return console.log(err) // TODO treat error + } + + // TODO This muxer has to be identified! + + muxer.on('stream', userProtocolMuxer) + }) + } else { + // if no stream muxer, then + userProtocolMuxer(conn) + } } // Handle user given protocols diff --git a/tests/swarm-test.js b/tests/swarm-test.js index a5a13ea056..8fd474a7ac 100644 --- a/tests/swarm-test.js +++ b/tests/swarm-test.js @@ -11,6 +11,7 @@ var Id = require('peer-id') var Peer = require('peer-info') var Swarm = require('../src') var tcp = require('libp2p-tcp') +var Spdy = require('libp2p-spdy') /* TODO experiment('Basics', function () { @@ -220,7 +221,81 @@ experiment('Without a Stream Muxer', function () { }) */ }) -experiment('With a SPDY Stream Muxer', function () {}) +experiment('With a SPDY Stream Muxer', function () { + experiment('tcp', function () { + test('add Stream Muxer', function (done) { + // var mh = multiaddr('/ip4/127.0.0.1/tcp/8010') + var p = new Peer(Id.create(), []) + var sw = new Swarm(p) + sw.addStreamMuxer('spdy', Spdy, {}) + + done() + }) + + test('dial a conn on a protocol', function (done) { + var mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010') + var p1 = new Peer(Id.create(), []) + var sw1 = new Swarm(p1) + sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, ready) + sw1.addStreamMuxer('spdy', Spdy, {}) + + var mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020') + var p2 = new Peer(Id.create(), []) + var sw2 = new Swarm(p2) + sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, ready) + sw2.addStreamMuxer('spdy', Spdy, {}) + + sw2.handleProtocol('/sparkles/1.0.0', function (conn) { + // formallity so that the conn starts flowing + conn.on('data', function (chunk) {}) + + conn.end() + conn.on('end', function () { + expect(Object.keys(sw1.muxedConns).length).to.equal(1) + expect(Object.keys(sw2.muxedConns).length).to.equal(0) + var cleaningCounter = 0 + sw1.closeConns(cleaningUp) + sw2.closeConns(cleaningUp) + + sw1.closeListener('tcp', cleaningUp) + sw2.closeListener('tcp', cleaningUp) + + function cleaningUp () { + cleaningCounter++ + // TODO FIX: here should be 4, but because super wrapping of + // streams, it makes it so hard to properly close the muxed + // streams - https://github.com/indutny/spdy-transport/issues/14 + if (cleaningCounter < 3) { + return + } + + done() + } + }) + }) + + var count = 0 + + function ready () { + count++ + if (count < 2) { + return + } + + sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { + conn.on('data', function () {}) + expect(err).to.equal(null) + expect(Object.keys(sw1.conns).length).to.equal(0) + conn.end() + }) + } + }) + test('dial two conns (transport reuse)', function (done) { + done() + }) + test('identify', function (done) { done() }) + }) +}) /* OLD experiment('BASICS', function () { From 168d01befd6ed8c5efadcf39a953c4800b128ab5 Mon Sep 17 00:00:00 2001 From: David Dias Date: Wed, 23 Sep 2015 17:25:21 +0100 Subject: [PATCH 10/18] stream muxer for connection reuse test --- src/swarm.js | 3 + tests/swarm-test.js | 248 ++++++++++---------------------------------- 2 files changed, 58 insertions(+), 193 deletions(-) diff --git a/src/swarm.js b/src/swarm.js index 0673c39144..873bb8e956 100644 --- a/src/swarm.js +++ b/src/swarm.js @@ -34,6 +34,9 @@ function Swarm (peerInfo) { // options: options } self.muxers = {} + // for connection reuse + self.identify = false + // public interface self.addTransport = function (name, transport, options, dialOptions, listenOptions, callback) { diff --git a/tests/swarm-test.js b/tests/swarm-test.js index 8fd474a7ac..6ea7a45b50 100644 --- a/tests/swarm-test.js +++ b/tests/swarm-test.js @@ -291,213 +291,75 @@ experiment('With a SPDY Stream Muxer', function () { } }) test('dial two conns (transport reuse)', function (done) { - done() - }) - test('identify', function (done) { done() }) - }) -}) + var mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010') + var p1 = new Peer(Id.create(), []) + var sw1 = new Swarm(p1) + sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, ready) + sw1.addStreamMuxer('spdy', Spdy, {}) -/* OLD -experiment('BASICS', function () { - experiment('Swarm', function () { - test('enforces instantiation with new', function (done) { - expect(function () { - Swarm() - }).to.throw('Swarm must be called with new') - done() - }) + var mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020') + var p2 = new Peer(Id.create(), []) + var sw2 = new Swarm(p2) + sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, ready) + sw2.addStreamMuxer('spdy', Spdy, {}) - test('parses $IPFS_SWARM_PORT', function (done) { - process.env.IPFS_SWARM_PORT = 1111 - var swarm = new Swarm() - expect(swarm.port).to.be.equal(1111) - process.env.IPFS_SWARM_PORT = undefined - done() - }) - }) + sw2.handleProtocol('/sparkles/1.0.0', function (conn) { + // formallity so that the conn starts flowing + conn.on('data', function (chunk) {}) - experiment('Swarm.listen', function (done) { - test('handles missing port', function (done) { - var swarm = new Swarm() - swarm.listen(done) - }) + conn.end() + conn.on('end', function () { + expect(Object.keys(sw1.muxedConns).length).to.equal(1) + expect(Object.keys(sw2.muxedConns).length).to.equal(0) + conn.end() - test('handles passed in port', function (done) { - var swarm = new Swarm() - swarm.listen(1234) - expect(swarm.port).to.be.equal(1234) - done() - }) - }) + var cleaningCounter = 0 + sw1.closeConns(cleaningUp) + sw2.closeConns(cleaningUp) - experiment('Swarm.registerHandler', function () { - test('throws when registering a protcol handler twice', function (done) { - var swarm = new Swarm() - swarm.registerHandler('/sparkles/1.1.1', function () {}) - swarm.registerHandler('/sparkles/1.1.1', function (err) { - expect(err).to.be.an.instanceOf(Error) - expect(err.message).to.be.equal('Handle for protocol already exists') - done() - }) - }) - }) + sw1.closeListener('tcp', cleaningUp) + sw2.closeListener('tcp', cleaningUp) - experiment('Swarm.closeConns', function () { - test('calls end on all connections', function (done) { - swarmA.openConnection(peerB, function () { - var key = Object.keys(swarmA.connections)[0] - sinon.spy(swarmA.connections[key].conn, 'end') - swarmA.closeConns(function () { - expect(swarmA.connections[key].conn.end.called).to.be.equal(true) - done() + function cleaningUp () { + cleaningCounter++ + // TODO FIX: here should be 4, but because super wrapping of + // streams, it makes it so hard to properly close the muxed + // streams - https://github.com/indutny/spdy-transport/issues/14 + if (cleaningCounter < 3) { + return + } + + done() + } }) }) - }) - }) -}) -experiment('BASE', function () { - test('Open a stream', function (done) { - var protocol = '/sparkles/3.3.3' - var c = new Counter(2, done) - - swarmB.registerHandler(protocol, function (stream) { - c.hit() - }) - - swarmA.openStream(peerB, protocol, function (err, stream) { - expect(err).to.not.be.instanceof(Error) - c.hit() - }) - }) - - test('Reuse connection (from dialer)', function (done) { - var protocol = '/sparkles/3.3.3' + var count = 0 - swarmB.registerHandler(protocol, function (stream) {}) + function ready () { + count++ + if (count < 2) { + return + } - swarmA.openStream(peerB, protocol, function (err, stream) { - expect(err).to.not.be.instanceof(Error) + sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { + // TODO Improve clarity + sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { + conn.on('data', function () {}) + expect(err).to.equal(null) + expect(Object.keys(sw1.conns).length).to.equal(0) + conn.end() + }) - swarmA.openStream(peerB, protocol, function (err, stream) { - expect(err).to.not.be.instanceof(Error) - expect(swarmA.connections.length === 1) - done() - }) + conn.on('data', function () {}) + expect(err).to.equal(null) + expect(Object.keys(sw1.conns).length).to.equal(0) + conn.end() + }) + } }) - }) - test('Check for lastSeen', function (done) { - var protocol = '/sparkles/3.3.3' - - swarmB.registerHandler(protocol, function (stream) {}) - - swarmA.openStream(peerB, protocol, function (err, stream) { - expect(err).to.not.be.instanceof(Error) - expect(peerB.lastSeen).to.be.instanceof(Date) + test('identify', function (done) { done() }) }) - -}) - -experiment('IDENTIFY', function () { - test('Attach Identify, open a stream, see a peer update', function (done) { - swarmA.on('error', function (err) { - console.log('A - ', err) - }) - - swarmB.on('error', function (err) { - console.log('B - ', err) - }) - - var protocol = '/sparkles/3.3.3' - - var identifyA = new Identify(swarmA, peerA) - var identifyB = new Identify(swarmB, peerB) - setTimeout(function () { - swarmB.registerHandler(protocol, function (stream) {}) - - swarmA.openStream(peerB, protocol, function (err, stream) { - expect(err).to.not.be.instanceof(Error) - }) - - identifyB.on('peer-update', function (answer) { - done() - }) - identifyA.on('peer-update', function (answer) {}) - }, 500) - }) - - test('Attach Identify, open a stream, reuse stream', function (done) { - var protocol = '/sparkles/3.3.3' - - var identifyA = new Identify(swarmA, peerA) - var identifyB = new Identify(swarmB, peerB) - - swarmA.registerHandler(protocol, function (stream) {}) - swarmB.registerHandler(protocol, function (stream) {}) - - swarmA.openStream(peerB, protocol, function (err, stream) { - expect(err).to.not.be.instanceof(Error) - }) - - identifyB.on('peer-update', function (answer) { - expect(Object.keys(swarmB.connections).length).to.equal(1) - swarmB.openStream(peerA, protocol, function (err, stream) { - expect(err).to.not.be.instanceof(Error) - expect(Object.keys(swarmB.connections).length).to.equal(1) - done() - }) - }) - identifyA.on('peer-update', function (answer) {}) - }) - - test('Attach Identify, reuse peer', function (done) { - var protocol = '/sparkles/3.3.3' - - var identifyA = new Identify(swarmA, peerA) - var identifyB = new Identify(swarmB, peerB) // eslint-disable-line no-unused-vars - - swarmA.registerHandler(protocol, function (stream) {}) - swarmB.registerHandler(protocol, function (stream) {}) - - var restartA = function (cb) { - swarmA.openStream(peerB, protocol, function (err, stream) { - expect(err).to.not.be.instanceof(Error) - - stream.end(cb) - }) - } - - restartA(function () { - identifyA.once('peer-update', function () { - expect(peerA.previousObservedAddrs.length).to.be.equal(1) - - var c = new Counter(2, done) - - swarmA.closeConns(c.hit.bind(c)) - swarmB.closeConns(c.hit.bind(c)) - }) - }) - }) }) - -experiment('HARDNESS', function () {}) - -function Counter (target, callback) { - var c = 0 - this.hit = count - - function count () { - c += 1 - if (c === target) { - callback() - } - } -} -*/ - -// function checkErr (err) { -// console.log('err') -// expect(err).to.be.instanceof(Error) -// } From 0bcbe63005a20e006403c03b991b6a4cf30798fa Mon Sep 17 00:00:00 2001 From: David Dias Date: Wed, 23 Sep 2015 17:26:26 +0100 Subject: [PATCH 11/18] rm old code --- src/stream-muxer.js | 2 - src/swarm-old.js | 189 -------------------------------------------- 2 files changed, 191 deletions(-) delete mode 100644 src/stream-muxer.js delete mode 100644 src/swarm-old.js diff --git a/src/stream-muxer.js b/src/stream-muxer.js deleted file mode 100644 index 346371fb30..0000000000 --- a/src/stream-muxer.js +++ /dev/null @@ -1,2 +0,0 @@ -exports = module.exports = require('spdy-stream-muxer') -// exports = module.exports = require('multiplex-stream-muxer') diff --git a/src/swarm-old.js b/src/swarm-old.js deleted file mode 100644 index 9a27bdad41..0000000000 --- a/src/swarm-old.js +++ /dev/null @@ -1,189 +0,0 @@ -var tcp = require('net') -var Select = require('multistream-select').Select -var Interactive = require('multistream-select').Interactive -var Muxer = require('./stream-muxer') -var log = require('ipfs-logger').group('swarm') -var async = require('async') -var EventEmitter = require('events').EventEmitter -var util = require('util') - -exports = module.exports = Swarm - -util.inherits(Swarm, EventEmitter) - -function Swarm () { - var self = this - - if (!(self instanceof Swarm)) { - throw new Error('Swarm must be called with new') - } - - self.port = parseInt(process.env.IPFS_SWARM_PORT, 10) || 4001 - self.connections = {} // {peerIdB58: {conn: <>, socket: <>} - self.handles = {} - - // set the listener - - self.listen = function (port, ready) { - if (!ready) { - ready = function noop () {} - } - if (typeof port === 'function') { - ready = port - } else if (port) { self.port = port } - - // - - self.listener = tcp.createServer(function (socket) { - errorUp(self, socket) - var ms = new Select() - ms.handle(socket) - ms.addHandler('/spdy/3.1.0', function (ds) { - log.info('Negotiated spdy with incoming socket') - - var conn = new Muxer().attach(ds, true) - - // attach multistream handlers to incoming streams - - conn.on('stream', registerHandles) - errorUp(self, conn) - - // FOR IDENTIFY - self.emit('connection-unknown', conn, socket) - - // IDENTIFY DOES THIS FOR US - // conn.on('close', function () { delete self.connections[conn.peerId] }) - }) - }).listen(self.port, ready) - errorUp(self, self.listener) - } - - // interface - - // open stream account for connection reuse - self.openConnection = function (peer, cb) { - // If no connection open yet, open it - if (!self.connections[peer.id.toB58String()]) { - // Establish a socket with one of the addresses - var socket - async.eachSeries(peer.multiaddrs, function (multiaddr, next) { - if (socket) { return next() } - - var tmp = tcp.connect(multiaddr.toOptions(), function () { - socket = tmp - errorUp(self, socket) - next() - }) - - tmp.once('error', function (err) { - log.warn(multiaddr.toString(), 'on', peer.id.toB58String(), 'not available', err) - next() - }) - - }, function done () { - if (!socket) { - return cb(new Error('Not able to open a scoket with peer - ', - peer.id.toB58String())) - } - gotSocket(socket) - }) - } else { - cb() - } - - // do the spdy people dance (multistream-select into spdy) - function gotSocket (socket) { - var msi = new Interactive() - msi.handle(socket, function () { - msi.select('/spdy/3.1.0', function (err, ds) { - if (err) { cb(err) } - - var conn = new Muxer().attach(ds, false) - conn.on('stream', registerHandles) - self.connections[peer.id.toB58String()] = { - conn: conn, - socket: socket - } - conn.on('close', function () { delete self.connections[peer.id.toB58String()]}) - errorUp(self, conn) - - cb() - }) - }) - } - } - - self.openStream = function (peer, protocol, cb) { - self.openConnection(peer, function (err) { - if (err) { - return cb(err) - } - // spawn new muxed stream - var conn = self.connections[peer.id.toB58String()].conn - conn.dialStream(function (err, stream) { - if (err) { return cb(err) } - errorUp(self, stream) - // negotiate desired protocol - var msi = new Interactive() - msi.handle(stream, function () { - msi.select(protocol, function (err, ds) { - if (err) { return cb(err) } - peer.lastSeen = new Date() - cb(null, ds) // return the stream - }) - }) - }) - }) - } - - self.registerHandler = function (protocol, handlerFunc) { - if (self.handles[protocol]) { - return handlerFunc(new Error('Handle for protocol already exists', protocol)) - } - self.handles[protocol] = handlerFunc - log.info('Registered handler for protocol:', protocol) - } - - self.closeConns = function (cb) { - var keys = Object.keys(self.connections) - var number = keys.length - if (number === 0) { cb() } - var c = new Counter(number, cb) - - keys.forEach(function (key) { - self.connections[key].conn.end() - c.hit() - }) - } - - self.closeListener = function (cb) { - self.listener.close(cb) - } - - function registerHandles (stream) { - log.info('Registering protocol handlers on new stream') - errorUp(self, stream) - var msH = new Select() - msH.handle(stream) - Object.keys(self.handles).forEach(function (protocol) { - msH.addHandler(protocol, self.handles[protocol]) - }) - } - -} - -function errorUp (self, emitter) { - emitter.on('error', function (err) { - self.emit('error', err) - }) -} - -function Counter (target, callback) { - var c = 0 - this.hit = count - - function count () { - c += 1 - if (c === target) { callback() } - } -} From 2000827273efc7b40794be9ab6f47cc0c3ffe3d7 Mon Sep 17 00:00:00 2001 From: David Dias Date: Wed, 23 Sep 2015 19:14:29 +0100 Subject: [PATCH 12/18] add identify --- src/identify/index.js | 142 ++++++++++++++++++++++++------------------ src/swarm.js | 41 ++++++++++-- tests/swarm-test.js | 63 ++++++++++++++++++- 3 files changed, 179 insertions(+), 67 deletions(-) diff --git a/src/identify/index.js b/src/identify/index.js index 08a09bb45c..a37b4a04a3 100644 --- a/src/identify/index.js +++ b/src/identify/index.js @@ -4,93 +4,111 @@ */ var Interactive = require('multistream-select').Interactive -var EventEmmiter = require('events').EventEmitter -var util = require('util') var protobufs = require('protocol-buffers-stream') var fs = require('fs') var schema = fs.readFileSync(__dirname + '/identify.proto') var v6 = require('ip-address').v6 -var Id = require('ipfs-peer-id') +var Id = require('peer-id') var multiaddr = require('multiaddr') -exports = module.exports = Identify +exports = module.exports = identify -util.inherits(Identify, EventEmmiter) +var protoId = '/ipfs/identify/1.0.0' -function Identify (swarm, peerSelf) { - var self = this - self.createProtoStream = protobufs(schema) +exports.protoId = protoId +var createProtoStream = protobufs(schema) - swarm.registerHandler('/ipfs/identify/1.0.0', function (stream) { - var ps = self.createProtoStream() +function identify (muxedConns, peerInfoSelf, socket, conn, muxer) { + var msi = new Interactive() + msi.handle(conn, function () { + msi.select(protoId, function (err, ds) { + if (err) { + return console.log(err) // TODO Treat error + } - ps.on('identify', function (msg) { - updateSelf(peerSelf, msg.observedAddr) + var ps = createProtoStream() - var peerId = Id.createFromPubKey(msg.publicKey) + ps.on('identify', function (msg) { + var peerId = Id.createFromPubKey(msg.publicKey) + + updateSelf(peerInfoSelf, msg.observedAddr) + + muxedConns[peerId.toB58String()] = { + muxer: muxer, + socket: socket + } + console.log('do I get back') + + // TODO: Pass the new discovered info about the peer that contacted us + // to something like the Kademlia Router, so the peerInfo for this peer + // is fresh + // - before this was exectued through a event emitter + // self.emit('peer-update', { + // peerId: peerId, + // listenAddrs: msg.listenAddrs.map(function (mhb) {return multiaddr(mhb)}) + // }) + }) - var socket = swarm.connections[peerId.toB58String()].socket var mh = getMultiaddr(socket) + ps.identify({ protocolVersion: 'na', agentVersion: 'na', - publicKey: peerSelf.id.pubKey, - listenAddrs: peerSelf.multiaddrs.map(function (mh) {return mh.buffer}), + publicKey: peerInfoSelf.id.pubKey, + listenAddrs: peerInfoSelf.multiaddrs.map(function (mh) { + return mh.buffer + }), observedAddr: mh.buffer }) - self.emit('peer-update', { - peerId: peerId, - listenAddrs: msg.listenAddrs.map(function (mhb) {return multiaddr(mhb)}) - }) - + ps.pipe(ds).pipe(ps) ps.finalize() }) - ps.pipe(stream).pipe(ps) }) +} + +exports.getHandlerFunction = function (peerInfoSelf, muxedConns) { + return function (conn) { + // wait for the other peer to identify itself + // update our multiaddr with observed addr list + // then get the socket from our list of muxedConns and send the reply back + + var ps = createProtoStream() + + ps.on('identify', function (msg) { + updateSelf(peerInfoSelf, msg.observedAddr) - swarm.on('connection-unknown', function (conn, socket) { - conn.dialStream(function (err, stream) { - if (err) { return console.log(err) } - var msi = new Interactive() - msi.handle(stream, function () { - msi.select('/ipfs/identify/1.0.0', function (err, ds) { - if (err) { return console.log(err) } - - var ps = self.createProtoStream() - - ps.on('identify', function (msg) { - var peerId = Id.createFromPubKey(msg.publicKey) - - updateSelf(peerSelf, msg.observedAddr) - - swarm.connections[peerId.toB58String()] = { - conn: conn, - socket: socket - } - - self.emit('peer-update', { - peerId: peerId, - listenAddrs: msg.listenAddrs.map(function (mhb) {return multiaddr(mhb)}) - }) - }) - - var mh = getMultiaddr(socket) - - ps.identify({ - protocolVersion: 'na', - agentVersion: 'na', - publicKey: peerSelf.id.pubKey, - listenAddrs: peerSelf.multiaddrs.map(function (mh) {return mh.buffer}), - observedAddr: mh.buffer - }) - - ps.pipe(ds).pipe(ps) - ps.finalize() - }) + var peerId = Id.createFromPubKey(msg.publicKey) + + var socket = muxedConns[peerId.toB58String()].socket + + var mh = getMultiaddr(socket) + + ps.identify({ + protocolVersion: 'na', + agentVersion: 'na', + publicKey: peerInfoSelf.id.pubKey, + listenAddrs: peerInfoSelf.multiaddrs.map(function (mh) { + return mh.buffer + }), + observedAddr: mh.buffer }) + + // TODO: Pass the new discovered info about the peer that contacted us + // to something like the Kademlia Router, so the peerInfo for this peer + // is fresh + // - before this was exectued through a event emitter + // self.emit('peer-update', { + // peerId: peerId, + // listenAddrs: msg.listenAddrs.map(function (mhb) { + // return multiaddr(mhb) + // }) + // }) + + ps.finalize() }) - }) + ps.pipe(conn).pipe(ps) + } } function getMultiaddr (socket) { diff --git a/src/swarm.js b/src/swarm.js index 873bb8e956..7284738b62 100644 --- a/src/swarm.js +++ b/src/swarm.js @@ -1,5 +1,6 @@ var multistream = require('multistream-select') var async = require('async') +var identify = require('./identify') exports = module.exports = Swarm @@ -15,7 +16,10 @@ function Swarm (peerInfo) { // peerIdB58: { conn: } self.conns = {} - // peerIdB58: { muxer: } + // peerIdB58: { + // muxer: , + // socket: socket // so we can extract the info we need for identify + // } self.muxedConns = {} // transportName: { transport: transport, @@ -107,7 +111,7 @@ function Swarm (peerInfo) { // check if a stream muxer for this peer is available if (self.muxedConns[peerInfo.id.toB58String()]) { if (protocol) { - return openMuxedStream(self.muxedConns[peerInfo.id.toB58String()]) + return openMuxedStream(self.muxedConns[peerInfo.id.toB58String()].muxer) } else { return callback() } @@ -184,7 +188,10 @@ function Swarm (peerInfo) { muxer.on('stream', userProtocolMuxer) - self.muxedConns[peerInfo.id.toB58String()] = muxer + self.muxedConns[peerInfo.id.toB58String()] = { + muxer: muxer, + socket: conn + } if (protocol) { openMuxedStream(muxer) @@ -245,6 +252,17 @@ function Swarm (peerInfo) { // close everything } + self.enableIdentify = function () { + // set flag to true + // add identify to the list of handled protocols + self.identify = true + + // we pass muxedConns so that identify can access the socket of the other + // peer + self.handleProtocol(identify.protoId, + identify.getHandlerFunction(self.peerInfo, self.muxedConns)) + } + self.handleProtocol = function (protocol, handlerFunction) { self.protocols[protocol] = handlerFunction } @@ -253,7 +271,7 @@ function Swarm (peerInfo) { function listen (conn) { // TODO apply upgrades - // TODO then add StreamMuxer if available (and point streams from muxer to userProtocolMuxer) + // add StreamMuxer if available (and point streams from muxer to userProtocolMuxer) if (self.muxers['spdy']) { var spdy = new self.muxers['spdy'].Muxer(self.muxers['spdy'].options) @@ -263,6 +281,21 @@ function Swarm (peerInfo) { } // TODO This muxer has to be identified! + // pass to identify a reference of + // our muxedConn list + // ourselves (peerInfo) + // the conn, which is the socket + // and a stream it can send stuff + if (self.identify) { + muxer.dialStream(function (err, stream) { + if (err) { + return console.log(err) // TODO Treat error + } + // conn === socket at this point + console.log('bimbas') + identify(self.muxedConns, self.peerInfo, conn, stream, muxer) + }) + } muxer.on('stream', userProtocolMuxer) }) diff --git a/tests/swarm-test.js b/tests/swarm-test.js index 6ea7a45b50..ddf64824ad 100644 --- a/tests/swarm-test.js +++ b/tests/swarm-test.js @@ -358,8 +358,69 @@ experiment('With a SPDY Stream Muxer', function () { }) } }) + test('identify', function (done) { - done() + var mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010') + var p1 = new Peer(Id.create(), []) + var sw1 = new Swarm(p1) + sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, ready) + sw1.addStreamMuxer('spdy', Spdy, {}) + sw1.enableIdentify() + + var mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020') + var p2 = new Peer(Id.create(), []) + var sw2 = new Swarm(p2) + sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, ready) + sw2.addStreamMuxer('spdy', Spdy, {}) + sw2.enableIdentify() + + sw2.handleProtocol('/sparkles/1.0.0', function (conn) { + // formallity so that the conn starts flowing + conn.on('data', function (chunk) {}) + + conn.end() + conn.on('end', function () { + expect(Object.keys(sw1.muxedConns).length).to.equal(1) + + var cleaningCounter = 0 + sw1.closeConns(cleaningUp) + sw2.closeConns(cleaningUp) + + sw1.closeListener('tcp', cleaningUp) + sw2.closeListener('tcp', cleaningUp) + + function cleaningUp () { + cleaningCounter++ + // TODO FIX: here should be 4, but because super wrapping of + // streams, it makes it so hard to properly close the muxed + // streams - https://github.com/indutny/spdy-transport/issues/14 + if (cleaningCounter < 3) { + return + } + // give time for identify to finish + setTimeout(function () { + expect(Object.keys(sw2.muxedConns).length).to.equal(1) + done() + }, 500) + } + }) + }) + + var count = 0 + + function ready () { + count++ + if (count < 2) { + return + } + + sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { + conn.on('data', function () {}) + expect(err).to.equal(null) + expect(Object.keys(sw1.conns).length).to.equal(0) + conn.end() + }) + } }) }) }) From fb37b4dec91e684f5a63c2c89bd0bd1a70061d3f Mon Sep 17 00:00:00 2001 From: David Dias Date: Wed, 23 Sep 2015 19:57:37 +0100 Subject: [PATCH 13/18] clear unused console logs --- src/identify/index.js | 1 - src/swarm.js | 1 - 2 files changed, 2 deletions(-) diff --git a/src/identify/index.js b/src/identify/index.js index a37b4a04a3..181cd3808a 100644 --- a/src/identify/index.js +++ b/src/identify/index.js @@ -37,7 +37,6 @@ function identify (muxedConns, peerInfoSelf, socket, conn, muxer) { muxer: muxer, socket: socket } - console.log('do I get back') // TODO: Pass the new discovered info about the peer that contacted us // to something like the Kademlia Router, so the peerInfo for this peer diff --git a/src/swarm.js b/src/swarm.js index 7284738b62..1cfff09509 100644 --- a/src/swarm.js +++ b/src/swarm.js @@ -292,7 +292,6 @@ function Swarm (peerInfo) { return console.log(err) // TODO Treat error } // conn === socket at this point - console.log('bimbas') identify(self.muxedConns, self.peerInfo, conn, stream, muxer) }) } From e6bcde41fb3aa203f1d57d615b2735285ac4776c Mon Sep 17 00:00:00 2001 From: David Dias Date: Wed, 23 Sep 2015 19:58:14 +0100 Subject: [PATCH 14/18] change cov --- package.json | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/package.json b/package.json index df32523ad4..d5d93a269c 100644 --- a/package.json +++ b/package.json @@ -5,7 +5,7 @@ "main": "src/index.js", "scripts": { "test": "./node_modules/.bin/lab tests/*-test.js", - "coverage": "./node_modules/.bin/lab -t 100 tests/*-test.js", + "coverage": "./node_modules/.bin/lab -t 88 tests/*-test.js", "laf": "./node_modules/.bin/standard --format", "lint": "./node_modules/.bin/standard" }, @@ -24,7 +24,8 @@ "homepage": "https://github.com/diasdavid/node-ipfs-swarm", "pre-commit": [ "lint", - "test" + "test", + "coverage" ], "engines": { "node": "^4.0.0" From 92b499df822a39fbcb0b7d88692eb15fa1b9c981 Mon Sep 17 00:00:00 2001 From: David Dias Date: Wed, 23 Sep 2015 20:06:10 +0100 Subject: [PATCH 15/18] fix readme typos and missing links --- README.md | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 8910334387..7a87626e2a 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ libp2p-swarm Node.js implementation # Description -libp2p-swarm is connection abstraction that is able to leverage several transports and connection upgrades (such as congestion control, encrypt a channel, multiplex several streams in one connection, and more. It does this by bringing protocol multiplexing to the application level (instead of the traditional Port level) using multicodec and multistream. +libp2p-swarm is a connection abstraction that is able to leverage several transports and connection upgrades, such as congestion control, channel encryption, multiplexing several streams in one connection, and more. It does this by bringing protocol multiplexing to the application level (instead of the traditional Port level) using multicodec and multistream. libp2p-swarm is used by libp2p but it can be also used as a standalone module. @@ -21,7 +21,7 @@ libp2p-swarm is available on npm and so, like any other npm module, just: $ npm install libp2p-swarm --save ``` -And use it on your Node.js code as: +And use it in your Node.js code as: ```JavaScript var Swarm = require('libp2p-swarm') @@ -29,7 +29,7 @@ var Swarm = require('libp2p-swarm') var sw = new Swarm(peerInfoSelf) ``` -peerInfoSelf is a PeerInfo object that represents the peer creating this swarm instance. +peerInfoSelf is a [PeerInfo](https://github.com/diasdavid/node-peer-info) object that represents the peer creating this swarm instance. ### Support a transport @@ -39,15 +39,15 @@ libp2p-swarm expects transports that implement [abstract-transport](https://gith sw.addTransport(transport, [options, dialOptions, listenOptions]) ``` -### Add an connection upgrade +### Add a connection upgrade -A connection upgrade must be able to receive and return something that implements the [abstract-connection]() interface. +A connection upgrade must be able to receive and return something that implements the [abstract-connection](https://github.com/diasdavid/abstract-connection) interface. ```JavaScript sw.addUpgrade(connUpgrade, [options]) ``` -Upgrading a connection to use a Stream Muxer is still considered a upgrade, but a special case since once this connection is applied, the returned obj will implement the [abstract-stream-muxer]() interface. +Upgrading a connection to use a stream muxer is still considered an upgrade, but a special case since once this connection is applied, the returned obj will implement the [abstract-stream-muxer](https://github.com/diasdavid/abstract-stream-muxer) interface. ```JavaScript sw.addStreamMuxer(streamMuxer, [options]) @@ -60,7 +60,7 @@ sw.dial(PeerInfo, options, protocol) sw.dial(PeerInfo, options) ``` -dial uses the best transport (whatever works first, in the future we can have some criteria), and jump starts the connection until the point we have to negotiate the protocol. If a Muxer is available, then drop the muxer onto that connection. Good to warm up connections or to check for connectivity. If we have already a Muxer for that peerInfo, than do nothing. +dial uses the best transport (whatever works first, in the future we can have some criteria), and jump starts the connection until the point we have to negotiate the protocol. If a muxer is available, then drop the muxer onto that connection. Good to warm up connections or to check for connectivity. If we have already a muxer for that peerInfo, than do nothing. ### Accept requests on a specific protocol From 5fe94446d83e48867e4e4387b662bb5f3395a3ef Mon Sep 17 00:00:00 2001 From: David Dias Date: Wed, 23 Sep 2015 20:07:05 +0100 Subject: [PATCH 16/18] rm old test file --- tests/swarm-old.js | 254 --------------------------------------------- 1 file changed, 254 deletions(-) delete mode 100644 tests/swarm-old.js diff --git a/tests/swarm-old.js b/tests/swarm-old.js deleted file mode 100644 index 408b8f7393..0000000000 --- a/tests/swarm-old.js +++ /dev/null @@ -1,254 +0,0 @@ -var Lab = require('lab') -var Code = require('code') -var sinon = require('sinon') -var lab = exports.lab = Lab.script() - -var experiment = lab.experiment -var test = lab.test -var beforeEach = lab.beforeEach -var afterEach = lab.afterEach -var expect = Code.expect - -var multiaddr = require('multiaddr') -var Id = require('ipfs-peer-id') -var Peer = require('ipfs-peer') -var Swarm = require('../src/') -var Identify = require('../src/identify') - -var swarmA -var swarmB -var peerA -var peerB - -beforeEach(function (done) { - swarmA = new Swarm() - swarmB = new Swarm() - var c = new Counter(2, done) - - swarmA.listen(8100, function () { - peerA = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/' + swarmA.port)]) - c.hit() - }) - - swarmB.listen(8101, function () { - peerB = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/' + swarmB.port)]) - c.hit() - }) - -}) - -afterEach(function (done) { - // This should be 2, but for some reason - // that will fail in most of the tests - var c = new Counter(1, done) - - swarmA.closeListener(function () { - c.hit() - }) - swarmB.closeListener(function () { - c.hit() - }) -}) - -experiment('BASICS', function () { - experiment('Swarm', function () { - test('enforces instantiation with new', function (done) { - expect(function () { - Swarm() - }).to.throw('Swarm must be called with new') - done() - }) - - test('parses $IPFS_SWARM_PORT', function (done) { - process.env.IPFS_SWARM_PORT = 1111 - var swarm = new Swarm() - expect(swarm.port).to.be.equal(1111) - process.env.IPFS_SWARM_PORT = undefined - done() - }) - }) - - experiment('Swarm.listen', function (done) { - test('handles missing port', function (done) { - var swarm = new Swarm() - swarm.listen(done) - }) - - test('handles passed in port', function (done) { - var swarm = new Swarm() - swarm.listen(1234) - expect(swarm.port).to.be.equal(1234) - done() - }) - }) - - experiment('Swarm.registerHandler', function () { - test('throws when registering a protcol handler twice', function (done) { - var swarm = new Swarm() - swarm.registerHandler('/sparkles/1.1.1', function () {}) - swarm.registerHandler('/sparkles/1.1.1', function (err) { - expect(err).to.be.an.instanceOf(Error) - expect(err.message).to.be.equal('Handle for protocol already exists') - done() - }) - }) - }) - - experiment('Swarm.closeConns', function () { - test('calls end on all connections', function (done) { - swarmA.openConnection(peerB, function () { - var key = Object.keys(swarmA.connections)[0] - sinon.spy(swarmA.connections[key].conn, 'end') - swarmA.closeConns(function () { - expect(swarmA.connections[key].conn.end.called).to.be.equal(true) - done() - }) - }) - }) - }) -}) - -experiment('BASE', function () { - test('Open a stream', function (done) { - var protocol = '/sparkles/3.3.3' - var c = new Counter(2, done) - - swarmB.registerHandler(protocol, function (stream) { - c.hit() - }) - - swarmA.openStream(peerB, protocol, function (err, stream) { - expect(err).to.not.be.instanceof(Error) - c.hit() - }) - }) - - test('Reuse connection (from dialer)', function (done) { - var protocol = '/sparkles/3.3.3' - - swarmB.registerHandler(protocol, function (stream) {}) - - swarmA.openStream(peerB, protocol, function (err, stream) { - expect(err).to.not.be.instanceof(Error) - - swarmA.openStream(peerB, protocol, function (err, stream) { - expect(err).to.not.be.instanceof(Error) - expect(swarmA.connections.length === 1) - done() - }) - }) - }) - test('Check for lastSeen', function (done) { - var protocol = '/sparkles/3.3.3' - - swarmB.registerHandler(protocol, function (stream) {}) - - swarmA.openStream(peerB, protocol, function (err, stream) { - expect(err).to.not.be.instanceof(Error) - expect(peerB.lastSeen).to.be.instanceof(Date) - done() - }) - }) - -}) - -experiment('IDENTIFY', function () { - test('Attach Identify, open a stream, see a peer update', function (done) { - swarmA.on('error', function (err) { - console.log('A - ', err) - }) - - swarmB.on('error', function (err) { - console.log('B - ', err) - }) - - var protocol = '/sparkles/3.3.3' - - var identifyA = new Identify(swarmA, peerA) - var identifyB = new Identify(swarmB, peerB) - setTimeout(function () { - swarmB.registerHandler(protocol, function (stream) {}) - - swarmA.openStream(peerB, protocol, function (err, stream) { - expect(err).to.not.be.instanceof(Error) - }) - - identifyB.on('peer-update', function (answer) { - done() - }) - identifyA.on('peer-update', function (answer) {}) - }, 500) - }) - - test('Attach Identify, open a stream, reuse stream', function (done) { - var protocol = '/sparkles/3.3.3' - - var identifyA = new Identify(swarmA, peerA) - var identifyB = new Identify(swarmB, peerB) - - swarmA.registerHandler(protocol, function (stream) {}) - swarmB.registerHandler(protocol, function (stream) {}) - - swarmA.openStream(peerB, protocol, function (err, stream) { - expect(err).to.not.be.instanceof(Error) - }) - - identifyB.on('peer-update', function (answer) { - expect(Object.keys(swarmB.connections).length).to.equal(1) - swarmB.openStream(peerA, protocol, function (err, stream) { - expect(err).to.not.be.instanceof(Error) - expect(Object.keys(swarmB.connections).length).to.equal(1) - done() - }) - }) - identifyA.on('peer-update', function (answer) {}) - }) - - test('Attach Identify, reuse peer', function (done) { - var protocol = '/sparkles/3.3.3' - - var identifyA = new Identify(swarmA, peerA) - var identifyB = new Identify(swarmB, peerB) // eslint-disable-line no-unused-vars - - swarmA.registerHandler(protocol, function (stream) {}) - swarmB.registerHandler(protocol, function (stream) {}) - - var restartA = function (cb) { - swarmA.openStream(peerB, protocol, function (err, stream) { - expect(err).to.not.be.instanceof(Error) - - stream.end(cb) - }) - } - - restartA(function () { - identifyA.once('peer-update', function () { - expect(peerA.previousObservedAddrs.length).to.be.equal(1) - - var c = new Counter(2, done) - - swarmA.closeConns(c.hit.bind(c)) - swarmB.closeConns(c.hit.bind(c)) - }) - }) - }) -}) - -experiment('HARDNESS', function () {}) - -function Counter (target, callback) { - var c = 0 - this.hit = count - - function count () { - c += 1 - if (c === target) { - callback() - } - } -} - -// function checkErr (err) { -// console.log('err') -// expect(err).to.be.instanceof(Error) -// } From 5b7a6051ad332e2670f4b9cdb0cb0d6e56b0b599 Mon Sep 17 00:00:00 2001 From: David Dias Date: Wed, 23 Sep 2015 20:07:55 +0100 Subject: [PATCH 17/18] comment undone tests --- tests/swarm-test.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/swarm-test.js b/tests/swarm-test.js index ddf64824ad..7975abf2ae 100644 --- a/tests/swarm-test.js +++ b/tests/swarm-test.js @@ -185,9 +185,9 @@ experiment('Without a Stream Muxer', function () { } }) - test('add an upgrade', function (done) { done() }) - test('dial a conn on top of a upgrade', function (done) { done() }) - test('dial a conn on a protocol on top of a upgrade', function (done) { done() }) + // test('add an upgrade', function (done) { done() }) + // test('dial a conn on top of a upgrade', function (done) { done() }) + // test('dial a conn on a protocol on top of a upgrade', function (done) { done() }) }) /* TODO From 1ba8e80d4d7ea9acb6344d618141c0bde4472126 Mon Sep 17 00:00:00 2001 From: David Dias Date: Wed, 23 Sep 2015 20:34:31 +0100 Subject: [PATCH 18/18] rm laf --- package.json | 1 - src/identify/index.js | 16 ++++++++-------- src/swarm.js | 8 +++----- tests/swarm-test.js | 40 ++++++++++++++++++++-------------------- 4 files changed, 31 insertions(+), 34 deletions(-) diff --git a/package.json b/package.json index d5d93a269c..a53645ca18 100644 --- a/package.json +++ b/package.json @@ -6,7 +6,6 @@ "scripts": { "test": "./node_modules/.bin/lab tests/*-test.js", "coverage": "./node_modules/.bin/lab -t 88 tests/*-test.js", - "laf": "./node_modules/.bin/standard --format", "lint": "./node_modules/.bin/standard" }, "repository": { diff --git a/src/identify/index.js b/src/identify/index.js index 181cd3808a..e7a857c420 100644 --- a/src/identify/index.js +++ b/src/identify/index.js @@ -38,14 +38,14 @@ function identify (muxedConns, peerInfoSelf, socket, conn, muxer) { socket: socket } - // TODO: Pass the new discovered info about the peer that contacted us - // to something like the Kademlia Router, so the peerInfo for this peer - // is fresh - // - before this was exectued through a event emitter - // self.emit('peer-update', { - // peerId: peerId, - // listenAddrs: msg.listenAddrs.map(function (mhb) {return multiaddr(mhb)}) - // }) + // TODO: Pass the new discovered info about the peer that contacted us + // to something like the Kademlia Router, so the peerInfo for this peer + // is fresh + // - before this was exectued through a event emitter + // self.emit('peer-update', { + // peerId: peerId, + // listenAddrs: msg.listenAddrs.map(function (mhb) {return multiaddr(mhb)}) + // }) }) var mh = getMultiaddr(socket) diff --git a/src/swarm.js b/src/swarm.js index 1cfff09509..2fa4f5d494 100644 --- a/src/swarm.js +++ b/src/swarm.js @@ -67,9 +67,7 @@ function Swarm (peerInfo) { }) } - self.addUpgrade = function (ConnUpgrade, options) { - - } + self.addUpgrade = function (ConnUpgrade, options) {} self.addStreamMuxer = function (name, StreamMuxer, options) { self.muxers[name] = { @@ -260,7 +258,7 @@ function Swarm (peerInfo) { // we pass muxedConns so that identify can access the socket of the other // peer self.handleProtocol(identify.protoId, - identify.getHandlerFunction(self.peerInfo, self.muxedConns)) + identify.getHandlerFunction(self.peerInfo, self.muxedConns)) } self.handleProtocol = function (protocol, handlerFunction) { @@ -299,7 +297,7 @@ function Swarm (peerInfo) { muxer.on('stream', userProtocolMuxer) }) } else { - // if no stream muxer, then + // if no stream muxer, then userProtocolMuxer(conn) } } diff --git a/tests/swarm-test.js b/tests/swarm-test.js index 7975abf2ae..bf5baeaa32 100644 --- a/tests/swarm-test.js +++ b/tests/swarm-test.js @@ -32,15 +32,15 @@ experiment('Without a Stream Muxer', function () { var sw = new Swarm(p) sw.addTransport('tcp', tcp, - { multiaddr: mh }, {}, {port: 8010}, function () { - expect(sw.transports['tcp'].options).to.deep.equal({ multiaddr: mh }) - expect(sw.transports['tcp'].dialOptions).to.deep.equal({}) - expect(sw.transports['tcp'].listenOptions).to.deep.equal({port: 8010}) - expect(sw.transports['tcp'].transport).to.deep.equal(tcp) - sw.closeListener('tcp', function () { - done() + { multiaddr: mh }, {}, {port: 8010}, function () { + expect(sw.transports['tcp'].options).to.deep.equal({ multiaddr: mh }) + expect(sw.transports['tcp'].dialOptions).to.deep.equal({}) + expect(sw.transports['tcp'].listenOptions).to.deep.equal({port: 8010}) + expect(sw.transports['tcp'].transport).to.deep.equal(tcp) + sw.closeListener('tcp', function () { + done() + }) }) - }) }) test('dial a conn', function (done) { @@ -185,9 +185,9 @@ experiment('Without a Stream Muxer', function () { } }) - // test('add an upgrade', function (done) { done() }) - // test('dial a conn on top of a upgrade', function (done) { done() }) - // test('dial a conn on a protocol on top of a upgrade', function (done) { done() }) + // test('add an upgrade', function (done) { done() }) + // test('dial a conn on top of a upgrade', function (done) { done() }) + // test('dial a conn on a protocol on top of a upgrade', function (done) { done() }) }) /* TODO @@ -210,15 +210,15 @@ experiment('Without a Stream Muxer', function () { test('dial a conn on a protocol on top of a upgrade', function (done) { done() }) }) */ - /* TODO - experiment('utp', function () { - test('add the transport', function (done) { done() }) - test('dial a conn', function (done) { done() }) - test('dial a conn on a protocol', function (done) { done() }) - test('add an upgrade', function (done) { done() }) - test('dial a conn on top of a upgrade', function (done) { done() }) - test('dial a conn on a protocol on top of a upgrade', function (done) { done() }) - }) */ +/* TODO +experiment('utp', function () { + test('add the transport', function (done) { done() }) + test('dial a conn', function (done) { done() }) + test('dial a conn on a protocol', function (done) { done() }) + test('add an upgrade', function (done) { done() }) + test('dial a conn on top of a upgrade', function (done) { done() }) + test('dial a conn on a protocol on top of a upgrade', function (done) { done() }) +}) */ }) experiment('With a SPDY Stream Muxer', function () {