From b526143f49ae693acffdbf8ce5604c79e61f2a2e Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Wed, 21 Aug 2019 11:37:02 +0200 Subject: [PATCH] refactor: switch to async iterators BREAKING CHANGE: Switch to using async/await and async iterators. The transport and connection interfaces have changed. --- .aegir.js | 16 ++- .npmignore | 28 ---- .travis.yml | 4 +- README.md | 76 ++++------ gulpfile.js | 2 + package.json | 27 ++-- src/constants.js | 9 ++ src/index.js | 253 +++++++++++++++++----------------- src/listener.js | 107 ++++++++++++++ src/socket-to-conn.js | 101 ++++++++++++++ test/compliance.js | 32 +++++ test/dial.spec.js | 54 ++++---- test/filter.spec.js | 9 +- test/instance.spec.js | 7 +- test/listen.js | 43 +++--- test/node.js | 2 + test/valid-connection.spec.js | 61 -------- 17 files changed, 502 insertions(+), 329 deletions(-) delete mode 100644 .npmignore create mode 100644 src/constants.js create mode 100644 src/listener.js create mode 100644 src/socket-to-conn.js create mode 100644 test/compliance.js delete mode 100644 test/valid-connection.spec.js diff --git a/.aegir.js b/.aegir.js index f9106e2..3efbf00 100644 --- a/.aegir.js +++ b/.aegir.js @@ -1,23 +1,29 @@ 'use strict' const WebRTCDirect = require('./src') -const pull = require('pull-stream') +const pipe = require('it-pipe') const multiaddr = require('multiaddr') const ma = multiaddr('/ip4/127.0.0.1/tcp/12345/http/p2p-webrtc-direct') let listener +const mockUpgrader = { + upgradeInbound: maConn => maConn, + upgradeOutbound: maConn => maConn +} + function boot (done) { - const wd = new WebRTCDirect() - listener = wd.createListener((conn) => pull(conn, conn)) - listener.listen(ma, done) + const wd = new WebRTCDirect({ upgrader: mockUpgrader }) + listener = wd.createListener((conn) => pipe(conn, conn)) listener.on('listening', () => { console.log('gulp listener started on:', ma.toString()) }) + listener.listen(ma).then(() => done()).catch(done) + listener.on('error', console.error) } function shutdown (done) { - listener.close(done) + listener.close().then(done).catch(done) } module.exports = { diff --git a/.npmignore b/.npmignore deleted file mode 100644 index c57cab7..0000000 --- a/.npmignore +++ /dev/null @@ -1,28 +0,0 @@ -# Logs -logs -*.log - -# Runtime data -pids -*.pid -*.seed - -# Directory for instrumented libs generated by jscoverage/JSCover -lib-cov - -# Coverage directory used by tools like istanbul -coverage - -# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files) -.grunt - -# Compiled binary addons (http://nodejs.org/api/addons.html) -build/Release - -# Optional npm cache directory -.npm - -# Optional REPL history -.node_repl_history - -test \ No newline at end of file diff --git a/.travis.yml b/.travis.yml index aa8cf09..613fd45 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,5 +1,5 @@ language: node_js -cache: npm +cache: false stages: - check - test @@ -19,7 +19,7 @@ jobs: include: - stage: check script: - - npx aegir commitlint --travis + - npx aegir build --bundlesize - npx aegir dep-check -- -i wrtc -i electron-webrtc - npm run lint diff --git a/README.md b/README.md index 9049b1d..ddd9017 100644 --- a/README.md +++ b/README.md @@ -39,43 +39,33 @@ ```js const WebRTCDirect = require('libp2p-webrtc-direct') const multiaddr = require('multiaddr') -const pull = require('pull-stream') +const pipe = require('pull-stream') +const { collect } = require('streaming-iterables') -const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/http/p2p-webrtc-direct') +const addr = multiaddr('/ip4/127.0.0.1/tcp/9090/http/p2p-webrtc-direct') const webRTCDirect = new WebRTCDirect() const listener = webRTCDirect.createListener((socket) => { console.log('new connection opened') - pull( - pull.values(['hello']), + pipe( + ['hello'], socket ) }) -listener.listen(mh, () => { - console.log('listening') - - webRTCDirect.dial(mh, (err, conn) => { - if (!err) { - pull( - conn, - pull.collect((err, values) => { - if (!err) { - console.log(`Value: ${values.toString()}`) - } else { - console.log(`Error: ${err}`) - } - - // Close connection after reading - listener.close() - }), - ) - } else { - console.log(`Error: ${err}`) - } - }) -}) +await listener.listen(addr) +console.log('listening') + +const conn = await webRTCDirect.dial(addr) +const values = await pipe( + conn, + collect +) +console.log(`Value: ${values.toString()}`) + +// Close connection after reading +await listener.close() ``` Outputs: @@ -89,32 +79,22 @@ Note that it may take some time for the connection to be established. ## API -Follows the interface defined by `interface-transport` - -[![](https://raw.githubusercontent.com/diasdavid/interface-transport/master/img/badge.png)](https://github.com/libp2p/interface-transport) +### Transport -## Pull-streams +[![](https://raw.githubusercontent.com/libp2p/interface-transport/master/img/badge.png)](https://github.com/libp2p/interface-transport) -### This module uses `pull-streams` +### Connection -We expose a streaming interface based on `pull-streams`, rather then on the Node.js core streams implementation (aka Node.js streams). `pull-streams` offers us a better mechanism for error handling and flow control guarantees. If you would like to know more about why we did this, see the discussion at this [issue](https://github.com/ipfs/js-ipfs/issues/362). +[![](https://raw.githubusercontent.com/libp2p/interface-connection/master/img/badge.png)](https://github.com/libp2p/interface-connection) -You can learn more about pull-streams at: +## Contribute -- [The history of Node.js streams, nodebp April 2014](https://www.youtube.com/watch?v=g5ewQEuXjsQ) -- [The history of streams, 2016](http://dominictarr.com/post/145135293917/history-of-streams) -- [pull-streams, the simple streaming primitive](http://dominictarr.com/post/149248845122/pull-streams-pull-streams-are-a-very-simple) -- [pull-streams documentation](https://pull-stream.github.io/) +The libp2p implementation in JavaScript is a work in progress. As such, there are a few things you can do right now to help out: -#### Converting `pull-streams` to Node.js Streams + - Go through the modules and **check out existing issues**. This would be especially useful for modules in active development. Some knowledge of IPFS/libp2p may be required, as well as the infrastructure behind it - for instance, you may need to read up on p2p and more complex operations like muxing to be able to help technically. + - **Perform code reviews**. + - **Add tests**. There can never be enough tests. -If you are a Node.js streams user, you can convert a pull-stream to a Node.js stream using the module [`pull-stream-to-stream`](https://github.com/pull-stream/pull-stream-to-stream), giving you an instance of a Node.js stream that is linked to the pull-stream. For example: - -```JavaScript -const pullToStream = require('pull-stream-to-stream') - -const nodeStreamInstance = pullToStream(pullStreamInstance) -// nodeStreamInstance is an instance of a Node.js Stream -``` +## License -To learn more about this utility, visit https://pull-stream.github.io/#pull-stream-to-stream. +[MIT](LICENSE) © Protocol Labs diff --git a/gulpfile.js b/gulpfile.js index 027e74d..c1d1248 100644 --- a/gulpfile.js +++ b/gulpfile.js @@ -18,7 +18,9 @@ function boot (done) { listener = wd.createListener((conn) => pull(conn, conn)) listener.listen(ma, done) listener.on('listening', () => { + /* eslint-disable no-console */ console.log('gulp listener started on:', ma.toString()) + /* eslint-enable no-console */ }) } diff --git a/package.json b/package.json index 5caa6d3..57a6659 100644 --- a/package.json +++ b/package.json @@ -12,18 +12,17 @@ "scripts": { "lint": "aegir lint", "build": "aegir build", + "docs": "aegir docs", "test": "aegir test --target node --target browser", "test:node": "aegir test --target node", "test:browser": "aegir test --target browser", "release": "aegir release --target node --target browser", "release-minor": "aegir release --type minor --target node --target browser", "release-major": "aegir release --type major --target node --target browser", - "coverage": "aegir coverage", - "coverage-publish": "aegir coverage --provider coveralls" + "coverage": "nyc --reporter=text --reporter=lcov npm run test:node" }, "pre-push": [ - "lint", - "test" + "lint" ], "repository": { "type": "git", @@ -44,26 +43,28 @@ }, "homepage": "https://github.com/libp2p/js-libp2p-webrtc-direct#readme", "devDependencies": { - "aegir": "^18.2.1", + "aegir": "^20.3.1", "chai": "^4.2.0", "dirty-chai": "^2.0.1", - "gulp": "^4.0.0", - "multiaddr": "^6.0.6", + "gulp": "^4.0.2", + "multiaddr": "^7.1.0", "webrtcsupport": "^2.2.0" }, "dependencies": { + "abortable-iterator": "^2.1.0", "class-is": "^1.1.0", "concat-stream": "^2.0.0", "detect-node": "^2.0.4", - "interface-connection": "~0.3.3", - "mafmt": "^6.0.7", + "err-code": "^2.0.0", + "interface-transport": "libp2p/interface-transport#chore/skip-abort-while-reading-for-webrtc", + "libp2p-utils": "^0.1.0", + "mafmt": "^7.0.0", "multibase": "~0.6.0", "once": "^1.4.0", - "pull-stream": "^3.6.9", "request": "^2.88.0", - "simple-peer": "9.3.0", - "stream-to-pull-stream": "^1.7.3", - "wrtc": "~0.2.1", + "simple-peer": "9.6.0", + "stream-to-it": "^0.1.1", + "wrtc": "~0.4.2", "xhr": "^2.5.0" }, "contributors": [ diff --git a/src/constants.js b/src/constants.js new file mode 100644 index 0000000..aaf2bc9 --- /dev/null +++ b/src/constants.js @@ -0,0 +1,9 @@ +'use strict' + +// p2p multi-address code +exports.CODE_P2P = 421 +exports.CODE_CIRCUIT = 290 + +// Time to wait for a connection to close gracefully before destroying it +// manually +module.exports.CLOSE_TIMEOUT = 2000 diff --git a/src/index.js b/src/index.js index 20cc241..0d1b75e 100644 --- a/src/index.js +++ b/src/index.js @@ -1,178 +1,181 @@ 'use strict' +const assert = require('debug') +const debug = require('debug') +const log = debug('libp2p:webrtcdirect') +const errcode = require('err-code') + const wrtc = require('wrtc') const SimplePeer = require('simple-peer') const isNode = require('detect-node') -const http = require('http') -const toPull = require('stream-to-pull-stream') -const Connection = require('interface-connection').Connection -const EE = require('events').EventEmitter const mafmt = require('mafmt') const multibase = require('multibase') -const once = require('once') const request = require('request') const withIs = require('class-is') +const { AbortError } = require('abortable-iterator') -function noop () {} +const { CODE_CIRCUIT, CODE_P2P } = require('./constants') +const toConnection = require('./socket-to-conn') +const createListener = require('./listener') -function cleanMultiaddr (ma) { - return ma.decapsulate('/p2p-webrtc-direct') -} +function noop () {} +/** + * @class WebRTCDirect + */ class WebRTCDirect { - dial (ma, options, callback) { - if (typeof options === 'function') { - callback = options - options = {} - } + /** + * @constructor + * @param {object} options + * @param {Upgrader} options.upgrader + */ + constructor ({ upgrader }) { + assert(upgrader, 'An upgrader must be provided. See https://github.com/libp2p/interface-transport#upgrader.') + this._upgrader = upgrader + } - callback = once(callback || noop) + /** + * @async + * @param {Multiaddr} ma + * @param {object} options + * @param {AbortSignal} options.signal Used to abort dial requests + * @returns {Connection} An upgraded Connection + */ + async dial (ma, options = {}) { + const socket = await this._connect(ma, options) + const maConn = toConnection(socket, { remoteAddr: ma, signal: options.signal }) + log('new outbound connection %s', maConn.remoteAddr) + const conn = await this._upgrader.upgradeOutbound(maConn) + log('outbound connection %s upgraded', maConn.remoteAddr) + return conn + } - Object.assign(options, { - initiator: true, - trickle: false - }) + /** + * @private + * @param {Multiaddr} ma + * @param {object} options + * @param {AbortSignal} options.signal Used to abort dial requests + * @returns {Promise} Resolves a SimplePeer Webrtc channel + */ + _connect (ma, options = {}) { + if (options.signal && options.signal.aborted) { + throw new AbortError() + } - if (isNode) { - options.wrtc = wrtc + options = { + ...options, + initiator: true, + trickle: false, + wrtc: isNode ? wrtc : undefined } - const channel = new SimplePeer(options) - const conn = new Connection(toPull.duplex(channel)) + return new Promise((resolve, reject) => { + const start = Date.now() + let connected - let connected = false + const cOpts = ma.toOptions() + log('Dialing %s:%s', cOpts.host, cOpts.port) - channel.on('signal', (signal) => { - const signalStr = JSON.stringify(signal) - const cma = cleanMultiaddr(ma) - const url = 'http://' + cma.toOptions().host + ':' + cma.toOptions().port - const path = '/?signal=' + multibase.encode('base58btc', Buffer.from(signalStr)) - const uri = url + path + const channel = new SimplePeer(options) - request.get(uri, (err, res) => { - if (err) { - return callback(err) + const onError = (err) => { + if (!connected) { + err.message = `connection error ${cOpts.host}:${cOpts.port}: ${err.message}` + done(err) } - const incSignalBuf = multibase.decode(res.body) - const incSignalStr = incSignalBuf.toString() - const incSignal = JSON.parse(incSignalStr) - channel.signal(incSignal) - }) - }) - - channel.on('connect', () => { - connected = true - callback(null, conn) - }) - - conn.destroy = channel.destroy.bind(channel) - conn.getObservedAddrs = (callback) => callback(null, [ma]) - - channel.on('timeout', () => callback(new Error('timeout'))) - channel.on('close', () => conn.destroy()) - channel.on('error', (err) => { - if (!connected) { - callback(err) } - }) - } - - createListener (options, handler) { - if (!isNode) { - throw new Error(`Can't listen if run from the Browser`) - } - if (typeof options === 'function') { - handler = options - options = {} - } + const onTimeout = () => { + log('connnection timeout %s:%s', cOpts.host, cOpts.port) + const err = errcode(new Error(`connection timeout after ${Date.now() - start}ms`), 'ERR_CONNECT_TIMEOUT') + // Note: this will result in onError() being called + channel.emit('error', err) + } - const listener = new EE() - const server = http.createServer() - let maSelf + const onConnect = () => { + connected = true - server.on('request', (req, res) => { - res.setHeader('Content-Type', 'text/plain') - res.setHeader('Access-Control-Allow-Origin', '*') + log('connection opened %s:%s', cOpts.host, cOpts.port) + done(null) + } - const path = req.url - const incSignalStr = path.split('?signal=')[1] - const incSignalBuf = multibase.decode(Buffer.from(incSignalStr)) - const incSignal = JSON.parse(incSignalBuf.toString()) + const onAbort = () => { + log('connection aborted %s:%s', cOpts.host, cOpts.port) + channel.destroy() + done(new AbortError()) + } - Object.assign(options, { - trickle: false - }) + const done = (err) => { + channel.removeListener('error', onError) + channel.removeListener('timeout', onTimeout) + channel.removeListener('connect', onConnect) + options.signal && options.signal.removeEventListener('abort', onAbort) - if (isNode) { - options.wrtc = wrtc + err ? reject(err) : resolve(channel) } - const channel = new SimplePeer(options) - const conn = new Connection(toPull.duplex(channel)) - - channel.on('connect', () => { - conn.getObservedAddrs = (callback) => callback(null, []) - listener.emit('connection', conn) - handler(conn) - }) + channel.once('error', onError) + channel.once('timeout', onTimeout) + channel.once('connect', onConnect) + channel.on('close', () => channel.destroy()) + options.signal && options.signal.addEventListener('abort', onAbort) channel.on('signal', (signal) => { const signalStr = JSON.stringify(signal) - const signalEncoded = multibase.encode('base58btc', Buffer.from(signalStr)) - res.end(signalEncoded.toString()) + const url = 'http://' + cOpts.host + ':' + cOpts.port + const path = '/?signal=' + multibase.encode('base58btc', Buffer.from(signalStr)) + const uri = url + path + + request.get(uri, (err, res) => { + if (err) { + return reject(err) + } + const incSignalBuf = multibase.decode(res.body) + const incSignalStr = incSignalBuf.toString() + const incSignal = JSON.parse(incSignalStr) + channel.signal(incSignal) + }) }) - - channel.signal(incSignal) }) + } - listener.listen = (ma, callback) => { - callback = callback || noop - - maSelf = ma - server.on('listening', () => { - listener.emit('listening') - callback() - }) - - const cma = cleanMultiaddr(ma) - server.listen(cma.toOptions()) + /** + * Creates a WebrtcDirect listener. The provided `handler` function will be called + * anytime a new incoming Connection has been successfully upgraded via + * `upgrader.upgradeInbound`. + * @param {*} [options] + * @param {function(Connection)} handler + * @returns {Listener} A WebrtcDirect listener + */ + createListener (options = {}, handler) { + if (!isNode) { + throw errcode(new Error('Can\'t listen if run from the Browser'), 'ERR_NO_SUPPORT_FROM_BROWSER') } - listener.close = (options, callback) => { - if (typeof options === 'function') { - callback = options - options = {} - } - - callback = callback || noop - - server.close(() => { - listener.emit('close') - callback() - }) + if (typeof options === 'function') { + handler = options + options = {} } - listener.getAddrs = (callback) => { - setImmediate(() => { - callback(null, [maSelf]) - }) - } + handler = handler || noop - return listener + return createListener({ handler, upgrader: this._upgrader }, options) } + /** + * Takes a list of `Multiaddr`s and returns only valid addresses + * @param {Multiaddr[]} multiaddrs + * @returns {Multiaddr[]} Valid multiaddrs + */ filter (multiaddrs) { - if (!Array.isArray(multiaddrs)) { - multiaddrs = [multiaddrs] - } + multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs] return multiaddrs.filter((ma) => { - if (ma.protoNames().indexOf('p2p-circuit') > -1) { + if (ma.protoCodes().includes(CODE_CIRCUIT)) { return false } - return mafmt.WebRTCDirect.matches(ma) + return mafmt.WebRTCDirect.matches(ma.decapsulateCode(CODE_P2P)) }) } } diff --git a/src/listener.js b/src/listener.js new file mode 100644 index 0000000..08b5e3a --- /dev/null +++ b/src/listener.js @@ -0,0 +1,107 @@ +'use strict' + +const http = require('http') +const EventEmitter = require('events') +const log = require('debug')('libp2p:libp2p:webrtcdirect:listener') + +const isNode = require('detect-node') +const wrtc = require('wrtc') +const SimplePeer = require('simple-peer') + +const multibase = require('multibase') + +const toConnection = require('./socket-to-conn') + +module.exports = ({ handler, upgrader }, options = {}) => { + const listener = new EventEmitter() + const server = http.createServer() + + let maSelf + + // Keep track of open connections to destroy in case of timeout + server.__connections = [] + + server.on('request', async (req, res) => { + res.setHeader('Content-Type', 'text/plain') + res.setHeader('Access-Control-Allow-Origin', '*') + + const path = req.url + const incSignalStr = path.split('?signal=')[1] + const incSignalBuf = multibase.decode(Buffer.from(incSignalStr)) + const incSignal = JSON.parse(incSignalBuf.toString()) + + options = { + ...options, + trickle: false + } + + if (isNode) { + options.wrtc = wrtc + } + + const channel = new SimplePeer(options) + const maConn = toConnection(channel) + log('new inbound connection %s', maConn.remoteAddr) + + const conn = await upgrader.upgradeInbound(maConn) + log('inbound connection %s upgraded', maConn.remoteAddr) + + trackConn(server, maConn) + + channel.on('connect', () => { + listener.emit('connection', conn) + handler(conn) + }) + + channel.on('signal', (signal) => { + const signalStr = JSON.stringify(signal) + const signalEncoded = multibase.encode('base58btc', Buffer.from(signalStr)) + res.end(signalEncoded.toString()) + }) + + channel.signal(incSignal) + }) + + server.on('error', (err) => listener.emit('error', err)) + server.on('close', () => listener.emit('close')) + + listener.listen = (ma) => { + maSelf = ma + const lOpts = ma.decapsulate('/p2p-webrtc-direct').toOptions() + + return new Promise((resolve, reject) => { + server.on('listening', (err) => { + if (err) { + return reject(err) + } + + listener.emit('listening') + log('Listening on %s %s', lOpts.port, lOpts.host) + resolve() + }) + + server.listen(lOpts) + }) + } + + listener.close = () => { + if (!server.listening) { + return + } + + return new Promise((resolve, reject) => { + server.__connections.forEach(maConn => maConn.close()) + server.close((err) => err ? reject(err) : resolve()) + }) + } + + listener.getAddrs = () => { + return [maSelf] + } + + return listener +} + +function trackConn (server, maConn) { + server.__connections.push(maConn) +} diff --git a/src/socket-to-conn.js b/src/socket-to-conn.js new file mode 100644 index 0000000..a26b343 --- /dev/null +++ b/src/socket-to-conn.js @@ -0,0 +1,101 @@ +'use strict' + +const abortable = require('abortable-iterator') +const toIterable = require('stream-to-it') + +const { CLOSE_TIMEOUT } = require('./constants') +const toMultiaddr = require('libp2p-utils/src/ip-port-to-multiaddr') + +const debug = require('debug') +const log = debug('libp2p:libp2p:webrtcdirect:socket') +log.error = debug('libp2p:libp2p:webrtcdirect:socket:error') + +// Convert a socket into a MultiaddrConnection +// https://github.com/libp2p/interface-transport#multiaddrconnection + +module.exports = (socket, options = {}) => { + const { sink, source } = toIterable.duplex(socket) + + const maConn = { + async sink (source) { + if (options.signal) { + source = abortable(source, options.signal) + } + + try { + await sink((async function * () { + for await (const chunk of source) { + // Convert BufferList to Buffer + yield Buffer.isBuffer(chunk) ? chunk : chunk.slice() + } + })()) + } catch (err) { + // If aborted we can safely ignore + if (err.type !== 'aborted') { + // If the source errored the socket will already have been destroyed by + // toIterable.duplex(). If the socket errored it will already be + // destroyed. There's nothing to do here except log the error & return. + log.error(err) + } + } + }, + + source: options.signal ? abortable(source, options.signal) : source, + + conn: socket, + + localAddr: socket.localAddress && socket.localPort + ? toMultiaddr(socket.localAddress, socket.localPort) : undefined, + + // If the remote address was passed, use it - it may have the peer ID encapsulated + remoteAddr: options.remoteAddr || (socket.remoteAddress && socket.remotePort + ? toMultiaddr(socket.remoteAddress, socket.remotePort) : undefined), + + timeline: { open: Date.now() }, + + close () { + if (socket.destroyed) return + + return new Promise((resolve, reject) => { + const start = Date.now() + + // Attempt to end the socket. If it takes longer to close than the + // timeout, destroy it manually. + const timeout = setTimeout(() => { + if (maConn.remoteAddr) { + const { host, port } = maConn.remoteAddr.toOptions() + log('timeout closing socket to %s:%s after %dms, destroying it manually', + host, port, Date.now() - start) + } + + if (!socket.destroyed) { + socket.destroy() + } + }, CLOSE_TIMEOUT) + + socket.once('close', () => { + clearTimeout(timeout) + maConn.timeline.close = Date.now() + resolve() + }) + + socket.end(err => { + if (err) return reject(err) + maConn.timeline.close = Date.now() + resolve() + }) + }) + } + } + + socket.once('close', () => { + // In instances where `close` was not explicitly called, + // such as an iterable stream ending, ensure we have set the close + // timeline + if (!maConn.timeline.close) { + maConn.timeline.close = Date.now() + } + }) + + return maConn +} diff --git a/test/compliance.js b/test/compliance.js new file mode 100644 index 0000000..fd684cc --- /dev/null +++ b/test/compliance.js @@ -0,0 +1,32 @@ +/* eslint-env mocha */ +'use strict' + +const wrtc = require('wrtc') + +const tests = require('interface-transport') +const multiaddr = require('multiaddr') + +const WDirect = require('../src') + +describe('interface-transport compliance', () => { + tests({ + setup ({ upgrader }) { + const ws = new WDirect({ upgrader, wrtc: wrtc }) + + const addrs = [ + multiaddr('/ip4/127.0.0.1/tcp/22222/http/p2p-webrtc-direct'), + multiaddr('/ip4/127.0.0.1/tcp/33333/http/p2p-webrtc-direct'), + multiaddr('/ip4/127.0.0.1/tcp/44444/http/p2p-webrtc-direct'), + multiaddr('/ip4/127.0.0.1/tcp/55555/http/p2p-webrtc-direct') + ] + + // Used by the dial tests to simulate a delayed connect + const connector = { + delay () {}, + restore () {} + } + + return { transport: ws, addrs, connector } + } + }) +}) diff --git a/test/dial.spec.js b/test/dial.spec.js index ac45861..8255357 100644 --- a/test/dial.spec.js +++ b/test/dial.spec.js @@ -8,10 +8,17 @@ const expect = chai.expect chai.use(dirtyChai) const multiaddr = require('multiaddr') -const pull = require('pull-stream') + +const pipe = require('it-pipe') +const { collect } = require('streaming-iterables') const WebRTCDirect = require('../src') +const mockUpgrader = { + upgradeInbound: maConn => maConn, + upgradeOutbound: maConn => maConn +} + describe('dial', function () { this.timeout(20 * 1000) @@ -19,37 +26,36 @@ describe('dial', function () { let wd before(() => { - wd = new WebRTCDirect() + wd = new WebRTCDirect({ upgrader: mockUpgrader }) }) - it('dial on IPv4, check callback', (done) => { - wd.dial(ma, { config: {} }, (err, conn) => { - expect(err).to.not.exist() - - const data = Buffer.from('some data') - - pull( - pull.values([data]), - conn, - pull.collect((err, values) => { - expect(err).to.not.exist() - expect(values).to.eql([data]) - done() - }) - ) - }) + it('dial on IPv4', async () => { + const conn = await wd.dial(ma) + const data = Buffer.from('some data') + + const values = await pipe( + [data], + conn, + collect + ) + + expect(values).to.eql([data]) }) - it('dial offline / non-existent node on IPv4, check callback', (done) => { - let maOffline = multiaddr('/ip4/127.0.0.1/tcp/55555/http/p2p-webrtc-direct') + it('dial offline / non-existent node on IPv4, check callback', async () => { + const maOffline = multiaddr('/ip4/127.0.0.1/tcp/55555/http/p2p-webrtc-direct') - wd.dial(maOffline, { config: {} }, (err, conn) => { + try { + await wd.dial(maOffline, { config: {} }) + } catch (err) { expect(err).to.exist() - done() - }) + return + } + + throw new Error('dial did not fail') }) - it.skip('dial on IPv6', (done) => { + it.skip('dial on IPv6', () => { // TODO IPv6 not supported yet }) }) diff --git a/test/filter.spec.js b/test/filter.spec.js index cc91282..5ed3bc0 100644 --- a/test/filter.spec.js +++ b/test/filter.spec.js @@ -9,9 +9,14 @@ const multiaddr = require('multiaddr') const WebRTCDirect = require('../src') +const mockUpgrader = { + upgradeInbound: maConn => maConn, + upgradeOutbound: maConn => maConn +} + describe('filter', () => { it('filters non valid webrtc-direct multiaddrs', () => { - const wd = new WebRTCDirect() + const wd = new WebRTCDirect({ upgrader: mockUpgrader }) const maArr = [ multiaddr('/ip4/1.2.3.4/tcp/3456/http/p2p-webrtc-direct'), multiaddr('/ip4/127.0.0.1/tcp/9090/ws'), @@ -26,7 +31,7 @@ describe('filter', () => { }) it('filter a single addr for this transport', () => { - const wd = new WebRTCDirect() + const wd = new WebRTCDirect({ upgrader: mockUpgrader }) const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/http/p2p-webrtc-direct') const filtered = wd.filter(ma) diff --git a/test/instance.spec.js b/test/instance.spec.js index 9c7e063..681a01f 100644 --- a/test/instance.spec.js +++ b/test/instance.spec.js @@ -8,9 +8,14 @@ chai.use(dirtyChai) const WebRTCDirect = require('../src') +const mockUpgrader = { + upgradeInbound: maConn => maConn, + upgradeOutbound: maConn => maConn +} + describe('instances', () => { it('create', (done) => { - const wdirect = new WebRTCDirect() + const wdirect = new WebRTCDirect({ upgrader: mockUpgrader }) expect(wdirect).to.exist() done() }) diff --git a/test/listen.js b/test/listen.js index 09331f3..977da92 100644 --- a/test/listen.js +++ b/test/listen.js @@ -9,38 +9,42 @@ chai.use(dirtyChai) const multiaddr = require('multiaddr') const WebRTCDirect = require('../src') +const mockUpgrader = { + upgradeInbound: maConn => maConn, + upgradeOutbound: maConn => maConn +} + describe('listen', () => { let wd const ma = multiaddr('/ip4/127.0.0.1/tcp/20123/http/p2p-webrtc-direct') before(() => { - wd = new WebRTCDirect() + wd = new WebRTCDirect({ upgrader: mockUpgrader }) }) - it('listen, check for callback', (done) => { - const listener = wd.createListener({ config: {} }, (conn) => {}) + it('listen, check for promise', async () => { + const listener = wd.createListener({ config: {} }, (_) => { }) - listener.listen(ma, (err) => { - expect(err).to.not.exist() - listener.close(done) - }) + await listener.listen(ma) + await listener.close() }) it('listen, check for listening event', (done) => { const listener = wd.createListener({ config: {} }, (conn) => {}) - listener.once('listening', () => { - listener.close(done) + listener.once('listening', async () => { + await listener.close() + done() }) listener.listen(ma) }) it('listen, check for the close event', (done) => { const listener = wd.createListener({ config: {} }, (conn) => {}) - listener.listen(ma, (err) => { - expect(err).to.not.exist() + listener.listen(ma).then(() => { listener.once('close', done) + listener.close() }) }) @@ -61,15 +65,14 @@ describe('listen', () => { // TODO IPv6 not supported yet }) - it('getAddrs', (done) => { + it('getAddrs', async () => { const listener = wd.createListener({ config: {} }, (conn) => {}) - listener.listen(ma, (err) => { - expect(err).to.not.exist() - listener.getAddrs((err, addrs) => { - expect(err).to.not.exist() - expect(addrs[0]).to.deep.equal(ma) - listener.close(done) - }) - }) + + await listener.listen(ma) + + const addrs = listener.getAddrs() + expect(addrs[0]).to.deep.equal(ma) + + await listener.close() }) }) diff --git a/test/node.js b/test/node.js index f690778..58b90fd 100644 --- a/test/node.js +++ b/test/node.js @@ -1,2 +1,4 @@ 'use strict' + require('./listen.js') +require('./compliance.js') diff --git a/test/valid-connection.spec.js b/test/valid-connection.spec.js deleted file mode 100644 index 51b765b..0000000 --- a/test/valid-connection.spec.js +++ /dev/null @@ -1,61 +0,0 @@ -/* eslint-env mocha */ -'use strict' - -const chai = require('chai') -const dirtyChai = require('dirty-chai') -const expect = chai.expect -chai.use(dirtyChai) - -const multiaddr = require('multiaddr') -const pull = require('pull-stream') - -const WebRTCDirect = require('../src') - -describe('valid Connection', function () { - this.timeout(20 * 1000) - const ma = multiaddr('/ip4/127.0.0.1/tcp/12345/http/p2p-webrtc-direct') - let wd - let conn - - before((done) => { - wd = new WebRTCDirect() - - wd.dial(ma, { config: {} }, (err, _conn) => { - expect(err).to.not.exist() - conn = _conn - done() - }) - }) - - after((done) => { - pull( - pull.empty(), - conn, - pull.onEnd(done) - ) - }) - - it('get observed addrs', (done) => { - conn.getObservedAddrs((err, addrs) => { - expect(err).to.not.exist() - expect(addrs[0].toString()).to.equal(ma.toString()) - done() - }) - }) - - it('get Peer Info', (done) => { - conn.getPeerInfo((err, peerInfo) => { - expect(err).to.exist() - done() - }) - }) - - it('set Peer Info', (done) => { - conn.setPeerInfo('info') - conn.getPeerInfo((err, peerInfo) => { - expect(err).to.not.exist() - expect(peerInfo).to.equal('info') - done() - }) - }) -})