From 8e1413b98457751c2050ee38616f553e371b0956 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Wed, 11 May 2016 13:10:10 +0200 Subject: [PATCH] fix: handling of ipfs addresses in available transports and some refactoring into multiple files --- package.json | 2 +- src/connection.js | 65 +++++ src/default-handler.js | 18 ++ src/dial.js | 161 +++++++++++++ src/index.js | 348 ++------------------------- src/transport.js | 117 +++++++++ test/03-transport-websockets.node.js | 4 +- 7 files changed, 387 insertions(+), 328 deletions(-) create mode 100644 src/connection.js create mode 100644 src/default-handler.js create mode 100644 src/dial.js create mode 100644 src/transport.js diff --git a/package.json b/package.json index 5ec3e1e..4fd5f59 100644 --- a/package.json +++ b/package.json @@ -79,4 +79,4 @@ "Richard Littauer ", "dignifiedquire " ] -} \ No newline at end of file +} diff --git a/src/connection.js b/src/connection.js new file mode 100644 index 0000000..cba1538 --- /dev/null +++ b/src/connection.js @@ -0,0 +1,65 @@ +'use strict' + +const connHandler = require('./default-handler') +const identify = require('./identify') + +module.exports = function connection (swarm) { + return { + addUpgrade () {}, + + addStreamMuxer (muxer) { + // for dialing + swarm.muxers[muxer.multicodec] = muxer + + // for listening + swarm.handle(muxer.multicodec, (conn) => { + const muxedConn = muxer(conn, true) + + var peerIdForConn + + muxedConn.on('stream', (conn) => { + function gotId () { + if (peerIdForConn) { + conn.peerId = peerIdForConn + connHandler(swarm.protocols, conn) + } else { + setTimeout(gotId, 100) + } + } + + if (swarm.identify) { + return gotId() + } + + connHandler(swarm.protocols, conn) + }) + + // if identify is enabled, attempt to do it for muxer reuse + if (swarm.identify) { + identify.exec(conn, muxedConn, swarm._peerInfo, (err, pi) => { + if (err) { + return console.log('Identify exec failed', err) + } + + peerIdForConn = pi.id + swarm.muxedConns[pi.id.toB58String()] = {} + swarm.muxedConns[pi.id.toB58String()].muxer = muxedConn + swarm.muxedConns[pi.id.toB58String()].conn = conn // to be able to extract addrs + + swarm.emit('peer-mux-established', pi) + + muxedConn.on('close', () => { + delete swarm.muxedConns[pi.id.toB58String()] + swarm.emit('peer-mux-closed', pi) + }) + }) + } + }) + }, + + reuse () { + swarm.identify = true + swarm.handle(identify.multicodec, identify.handler(swarm._peerInfo, swarm)) + } + } +} diff --git a/src/default-handler.js b/src/default-handler.js new file mode 100644 index 0000000..52a63a6 --- /dev/null +++ b/src/default-handler.js @@ -0,0 +1,18 @@ +'use strict' + +const multistream = require('multistream-select') + +// incomming connection handler +module.exports = function connHandler (protocols, conn) { + var msS = new multistream.Select() + + Object.keys(protocols).forEach((protocol) => { + if (!protocol) { + return + } + + msS.addHandler(protocol, protocols[protocol]) + }) + + msS.handle(conn) +} diff --git a/src/dial.js b/src/dial.js new file mode 100644 index 0000000..dc7b023 --- /dev/null +++ b/src/dial.js @@ -0,0 +1,161 @@ +'use strict' + +const multistream = require('multistream-select') +const DuplexPassThrough = require('duplex-passthrough') + +const connHandler = require('./default-handler') + +module.exports = function dial (swarm) { + return (pi, protocol, callback) => { + if (typeof protocol === 'function') { + callback = protocol + protocol = null + } + + if (!callback) { + callback = function noop () {} + } + + const pt = new DuplexPassThrough() + + const b58Id = pi.id.toB58String() + + if (!swarm.muxedConns[b58Id]) { + if (!swarm.conns[b58Id]) { + attemptDial(pi, (err, conn) => { + if (err) { + return callback(err) + } + gotWarmedUpConn(conn) + }) + } else { + const conn = swarm.conns[b58Id] + swarm.conns[b58Id] = undefined + gotWarmedUpConn(conn) + } + } else { + if (!protocol) { + return callback() + } + gotMuxer(swarm.muxedConns[b58Id].muxer) + } + + return pt + + function gotWarmedUpConn (conn) { + attemptMuxerUpgrade(conn, (err, muxer) => { + if (!protocol) { + if (err) { + swarm.conns[b58Id] = conn + } + return callback() + } + + if (err) { + // couldn't upgrade to Muxer, it is ok + protocolHandshake(conn, protocol, callback) + } else { + gotMuxer(muxer) + } + }) + } + + function gotMuxer (muxer) { + openConnInMuxedConn(muxer, (conn) => { + protocolHandshake(conn, protocol, callback) + }) + } + + function attemptDial (pi, cb) { + const tKeys = swarm.availableTransports(pi) + + if (tKeys.length === 0) { + return cb(new Error('No available tranport to dial to')) + } + + nextTransport(tKeys.shift()) + + function nextTransport (key) { + const multiaddrs = pi.multiaddrs.slice() + swarm.transport.dial(key, multiaddrs, (err, conn) => { + if (err) { + if (tKeys.length === 0) { + return cb(new Error('Could not dial in any of the transports')) + } + return nextTransport(tKeys.shift()) + } + cb(null, conn) + }) + } + } + + function attemptMuxerUpgrade (conn, cb) { + const muxers = Object.keys(swarm.muxers) + if (muxers.length === 0) { + return cb(new Error('no muxers available')) + } + + // 1. try to handshake in one of the muxers available + // 2. if succeeds + // - add the muxedConn to the list of muxedConns + // - add incomming new streams to connHandler + + nextMuxer(muxers.shift()) + + function nextMuxer (key) { + var msI = new multistream.Interactive() + msI.handle(conn, function () { + msI.select(key, (err, conn) => { + if (err) { + if (muxers.length === 0) { + cb(new Error('could not upgrade to stream muxing')) + } else { + nextMuxer(muxers.shift()) + } + return + } + + const muxedConn = swarm.muxers[key](conn, false) + swarm.muxedConns[b58Id] = {} + swarm.muxedConns[b58Id].muxer = muxedConn + swarm.muxedConns[b58Id].conn = conn + + swarm.emit('peer-mux-established', pi) + + muxedConn.on('close', () => { + delete swarm.muxedConns[pi.id.toB58String()] + swarm.emit('peer-mux-closed', pi) + }) + + // in case identify is on + muxedConn.on('stream', (conn) => { + conn.peerId = pi.id + connHandler(swarm.protocols, conn) + }) + + cb(null, muxedConn) + }) + }) + } + } + + function openConnInMuxedConn (muxer, cb) { + cb(muxer.newStream()) + } + + function protocolHandshake (conn, protocol, cb) { + var msI = new multistream.Interactive() + msI.handle(conn, function () { + msI.select(protocol, (err, conn) => { + if (err) { + return callback(err) + } + + pt.wrapStream(conn) + pt.peerId = pi.id + callback(null, pt) + }) + }) + } + } +} diff --git a/src/index.js b/src/index.js index 136e055..c888607 100644 --- a/src/index.js +++ b/src/index.js @@ -1,12 +1,13 @@ 'use strict' -const multistream = require('multistream-select') -const identify = require('./identify') -const DuplexPassThrough = require('duplex-passthrough') -const contains = require('lodash.contains') const util = require('util') const EE = require('events').EventEmitter const parallel = require('run-parallel') +const contains = require('lodash.contains') + +const transport = require('./transport') +const connection = require('./connection') +const dial = require('./dial') exports = module.exports = Swarm @@ -21,112 +22,13 @@ function Swarm (peerInfo) { throw new Error('You must provide a value for `peerInfo`') } + this._peerInfo = peerInfo + // transports -- // { key: transport }; e.g { tcp: } this.transports = {} - this.transport = {} - - this.transport.add = (key, transport, options, callback) => { - if (typeof options === 'function') { - callback = options - options = {} - } - if (!callback) { callback = noop } - - if (this.transports[key]) { - throw new Error('There is already a transport with this key') - } - this.transports[key] = transport - callback() - } - - this.transport.dial = (key, multiaddrs, callback) => { - const t = this.transports[key] - - if (!Array.isArray(multiaddrs)) { - multiaddrs = [multiaddrs] - } - - // a) filter the multiaddrs that are actually valid for this transport (use a func from the transport itself) (maybe even make the transport do that) - multiaddrs = t.filter(multiaddrs) - - // b) if multiaddrs.length = 1, return the conn from the - // transport, otherwise, create a passthrough - if (multiaddrs.length === 1) { - const conn = t.dial(multiaddrs.shift(), {ready: () => { - const cb = callback - callback = noop // this is done to avoid connection drops as connect errors - cb(null, conn) - }}) - conn.once('error', () => { - callback(new Error('failed to connect to every multiaddr')) - }) - return conn - } - - // c) multiaddrs should already be a filtered list - // specific for the transport we are using - const pt = new DuplexPassThrough() - - next(multiaddrs.shift()) - return pt - function next (multiaddr) { - const conn = t.dial(multiaddr, {ready: () => { - pt.wrapStream(conn) - const cb = callback - callback = noop // this is done to avoid connection drops as connect errors - cb(null, pt) - }}) - - conn.once('error', () => { - if (multiaddrs.length === 0) { - return callback(new Error('failed to connect to every multiaddr')) - } - next(multiaddrs.shift()) - }) - } - } - - this.transport.listen = (key, options, handler, callback) => { - // if no callback is passed, we pass conns to connHandler - if (!handler) { handler = connHandler } - - const multiaddrs = this.transports[key].filter( - peerInfo.multiaddrs.map((addr) => { - // ipfs multiaddrs are not dialable so we drop them here - if (contains(addr.protoNames(), 'ipfs')) { - return addr.decapsulate('ipfs') - } - - return addr - }) - ) - - this.transports[key].createListener(multiaddrs, handler, (err, maUpdate) => { - if (err) { - return callback(err) - } - if (maUpdate) { - // because we can listen on port 0... - peerInfo.multiaddr.replace(multiaddrs, maUpdate) - } - - callback() - }) - } - - this.transport.close = (key, callback) => { - const transport = this.transports[key] - - if (!transport) { - return callback(new Error(`Trying to close non existing transport: ${key}`)) - } - - transport.close(callback) - } - // connections -- // { peerIdB58: { conn: }} @@ -143,240 +45,38 @@ function Swarm (peerInfo) { // { protocol: handler } this.protocols = {} - this.connection = {} - this.connection.addUpgrade = () => {} - // { muxerCodec: } e.g { '/spdy/0.3.1': spdy } this.muxers = {} - this.connection.addStreamMuxer = (muxer) => { - // for dialing - this.muxers[muxer.multicodec] = muxer - // for listening - this.handle(muxer.multicodec, (conn) => { - const muxedConn = muxer(conn, true) + // is the Identify protocol enabled? + this.identify = false - var peerIdForConn + this.transport = transport(this) + this.connection = connection(this) - muxedConn.on('stream', (conn) => { - function gotId () { - if (peerIdForConn) { - conn.peerId = peerIdForConn - connHandler(conn) - } else { - setTimeout(gotId, 100) - } - } + this.availableTransports = (pi) => { + const addrs = pi.multiaddrs - if (this.identify) { - return gotId() + // Only listen on transports we actually have addresses for + return Object.keys(this.transports).filter((ts) => { + // ipfs multiaddrs are not dialable so we drop them here + let dialable = addrs.map((addr) => { + if (contains(addr.protoNames(), 'ipfs')) { + return addr.decapsulate('ipfs') } - - connHandler(conn) + return addr }) - // if identify is enabled, attempt to do it for muxer reuse - if (this.identify) { - identify.exec(conn, muxedConn, peerInfo, (err, pi) => { - if (err) { - return console.log('Identify exec failed', err) - } - - peerIdForConn = pi.id - this.muxedConns[pi.id.toB58String()] = {} - this.muxedConns[pi.id.toB58String()].muxer = muxedConn - this.muxedConns[pi.id.toB58String()].conn = conn // to be able to extract addrs - - self.emit('peer-mux-established', pi) - - muxedConn.on('close', () => { - delete self.muxedConns[pi.id.toB58String()] - self.emit('peer-mux-closed', pi) - }) - }) - } - }) - } - - // enable the Identify protocol - this.identify = false - this.connection.reuse = () => { - this.identify = true - this.handle(identify.multicodec, identify.handler(peerInfo, this)) - } - - const self = this // prefered this to bind - - // incomming connection handler - function connHandler (conn) { - var msS = new multistream.Select() - Object.keys(self.protocols).forEach((protocol) => { - if (!protocol) { return } - msS.addHandler(protocol, self.protocols[protocol]) - }) - msS.handle(conn) - } - - function availableTransports (pi) { - const addrs = pi.multiaddrs - return Object.keys(self.transports).filter((ts) => { - // Only listen on transports we actually have addresses for - return self.transports[ts].filter(addrs).length > 0 + return this.transports[ts].filter(dialable).length > 0 }) } // higher level (public) API - this.dial = (pi, protocol, callback) => { - if (typeof protocol === 'function') { - callback = protocol - protocol = null - } - - if (!callback) { - callback = function noop () {} - } - - const pt = new DuplexPassThrough() - - const b58Id = pi.id.toB58String() - - if (!this.muxedConns[b58Id]) { - if (!this.conns[b58Id]) { - attemptDial(pi, (err, conn) => { - if (err) { - return callback(err) - } - gotWarmedUpConn(conn) - }) - } else { - const conn = this.conns[b58Id] - this.conns[b58Id] = undefined - gotWarmedUpConn(conn) - } - } else { - if (!protocol) { - return callback() - } - gotMuxer(this.muxedConns[b58Id].muxer) - } - - return pt - - function gotWarmedUpConn (conn) { - attemptMuxerUpgrade(conn, (err, muxer) => { - if (!protocol) { - if (err) { - self.conns[b58Id] = conn - } - return callback() - } - - if (err) { - // couldn't upgrade to Muxer, it is ok - protocolHandshake(conn, protocol, callback) - } else { - gotMuxer(muxer) - } - }) - } - - function gotMuxer (muxer) { - openConnInMuxedConn(muxer, (conn) => { - protocolHandshake(conn, protocol, callback) - }) - } - - function attemptDial (pi, cb) { - const tKeys = availableTransports(pi) - nextTransport(tKeys.shift()) - - function nextTransport (key) { - const multiaddrs = pi.multiaddrs.slice() - self.transport.dial(key, multiaddrs, (err, conn) => { - if (err) { - if (tKeys.length === 0) { - return cb(new Error('Could not dial in any of the transports')) - } - return nextTransport(tKeys.shift()) - } - cb(null, conn) - }) - } - } - - function attemptMuxerUpgrade (conn, cb) { - const muxers = Object.keys(self.muxers) - if (muxers.length === 0) { - return cb(new Error('no muxers available')) - } - - // 1. try to handshake in one of the muxers available - // 2. if succeeds - // - add the muxedConn to the list of muxedConns - // - add incomming new streams to connHandler - - nextMuxer(muxers.shift()) - - function nextMuxer (key) { - var msI = new multistream.Interactive() - msI.handle(conn, function () { - msI.select(key, (err, conn) => { - if (err) { - if (muxers.length === 0) { - cb(new Error('could not upgrade to stream muxing')) - } else { - nextMuxer(muxers.shift()) - } - return - } - - const muxedConn = self.muxers[key](conn, false) - self.muxedConns[b58Id] = {} - self.muxedConns[b58Id].muxer = muxedConn - self.muxedConns[b58Id].conn = conn - - self.emit('peer-mux-established', pi) - - muxedConn.on('close', () => { - delete self.muxedConns[pi.id.toB58String()] - self.emit('peer-mux-closed', pi) - }) - - // in case identify is on - muxedConn.on('stream', (conn) => { - conn.peerId = pi.id - connHandler(conn) - }) - - cb(null, muxedConn) - }) - }) - } - } - - function openConnInMuxedConn (muxer, cb) { - cb(muxer.newStream()) - } - - function protocolHandshake (conn, protocol, cb) { - var msI = new multistream.Interactive() - msI.handle(conn, function () { - msI.select(protocol, (err, conn) => { - if (err) { - return callback(err) - } - - pt.wrapStream(conn) - pt.peerId = pi.id - callback(null, pt) - }) - }) - } - } + this.dial = dial(this) // Start listening on all available transports this.listen = (callback) => { - parallel(availableTransports(peerInfo).map((ts) => (cb) => { + parallel(this.availableTransports(peerInfo).map((ts) => (cb) => { // Listen on the given transport this.transport.listen(ts, {}, null, cb) }), callback) @@ -402,5 +102,3 @@ function Swarm (peerInfo) { }), callback) } } - -function noop () {} diff --git a/src/transport.js b/src/transport.js new file mode 100644 index 0000000..c1bcc1f --- /dev/null +++ b/src/transport.js @@ -0,0 +1,117 @@ +'use strict' + +const contains = require('lodash.contains') +const DuplexPassThrough = require('duplex-passthrough') + +const connHandler = require('./default-handler') + +module.exports = function (swarm) { + return { + add (key, transport, options, callback) { + if (typeof options === 'function') { + callback = options + options = {} + } + if (!callback) { callback = noop } + + if (swarm.transports[key]) { + throw new Error('There is already a transport with this key') + } + swarm.transports[key] = transport + callback() + }, + + dial (key, multiaddrs, callback) { + const t = swarm.transports[key] + + if (!Array.isArray(multiaddrs)) { + multiaddrs = [multiaddrs] + } + + // a) filter the multiaddrs that are actually valid for this transport (use a func from the transport itself) (maybe even make the transport do that) + multiaddrs = dialables(t, multiaddrs) + + // b) if multiaddrs.length = 1, return the conn from the + // transport, otherwise, create a passthrough + if (multiaddrs.length === 1) { + const conn = t.dial(multiaddrs.shift(), {ready: () => { + const cb = callback + callback = noop // this is done to avoid connection drops as connect errors + cb(null, conn) + }}) + conn.once('error', () => { + callback(new Error('failed to connect to every multiaddr')) + }) + return conn + } + + // c) multiaddrs should already be a filtered list + // specific for the transport we are using + const pt = new DuplexPassThrough() + + next(multiaddrs.shift()) + return pt + function next (multiaddr) { + const conn = t.dial(multiaddr, {ready: () => { + pt.wrapStream(conn) + const cb = callback + callback = noop // this is done to avoid connection drops as connect errors + cb(null, pt) + }}) + + conn.once('error', () => { + if (multiaddrs.length === 0) { + return callback(new Error('failed to connect to every multiaddr')) + } + next(multiaddrs.shift()) + }) + } + }, + + listen (key, options, handler, callback) { + // if no callback is passed, we pass conns to connHandler + if (!handler) { + handler = connHandler.bind(null, swarm.protocols) + } + + const multiaddrs = dialables(swarm.transports[key], swarm._peerInfo.multiaddrs) + + swarm.transports[key].createListener(multiaddrs, handler, (err, maUpdate) => { + if (err) { + return callback(err) + } + if (maUpdate) { + // because we can listen on port 0... + swarm._peerInfo.multiaddr.replace(multiaddrs, maUpdate) + } + + callback() + }) + }, + + close (key, callback) { + const transport = swarm.transports[key] + + if (!transport) { + return callback(new Error(`Trying to close non existing transport: ${key}`)) + } + + transport.close(callback) + } + } +} + +// transform given multiaddrs to a list of dialable addresses +// for the given transport `tp`. +function dialables (tp, multiaddrs) { + return tp.filter(multiaddrs.map((addr) => { + // ipfs multiaddrs are not dialable so we drop them here + if (contains(addr.protoNames(), 'ipfs')) { + return addr.decapsulate('ipfs') + } + + return addr + })) +} + +function noop () {} diff --git a/test/03-transport-websockets.node.js b/test/03-transport-websockets.node.js index f20ffef..414bd27 100644 --- a/test/03-transport-websockets.node.js +++ b/test/03-transport-websockets.node.js @@ -20,7 +20,7 @@ describe('transport - websockets', function () { before(() => { peerA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9888/websockets')) - peerB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9999/websockets')) + peerB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9999/websockets/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC')) swarmA = new Swarm(peerA) swarmB = new Swarm(peerB) }) @@ -51,7 +51,7 @@ describe('transport - websockets', function () { ) expect(peerB.multiaddrs.length).to.equal(1) expect( - peerB.multiaddrs[0].equals(multiaddr('/ip4/127.0.0.1/tcp/9999/websockets')) + peerB.multiaddrs[0].equals(multiaddr('/ip4/127.0.0.1/tcp/9999/websockets/ipfs/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC')) ).to.equal( true )