From 540af9f28963b4fcc7d8e6605e450b4aea598e8b Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Mon, 26 Sep 2016 22:37:32 +0200 Subject: [PATCH] feat: upgrade to new block api and async crypto --- .aegir.js | 19 - .gitignore | 3 +- .travis.yml | 24 +- API.md | 7 +- README.md | 4 + package.json | 39 +- src/decision/engine.js | 136 ++++--- src/index.js | 87 +++-- src/message/index.js | 26 +- .../{message.proto => message.proto.js} | 6 +- src/network/index.js | 52 ++- src/wantmanager/index.js | 26 +- src/wantmanager/msg-queue.js | 27 +- test/decision/engine-test.js | 77 ++-- test/decision/ledger.spec.js | 12 +- test/decision/peer-request-queue.spec.js | 195 ++++++---- test/index-test.js | 355 +++++++++++------- test/message.spec.js | 190 ++++++---- test/network/gen-bitswap-network.node.js | 124 +++--- test/network/network.node.js | 131 ++++--- test/utils.js | 153 ++++---- test/wantlist.spec.js | 165 ++++---- test/wantmanager/index.spec.js | 88 +++-- test/wantmanager/msg-queue.spec.js | 113 +++--- 24 files changed, 1246 insertions(+), 813 deletions(-) delete mode 100644 .aegir.js rename src/message/{message.proto => message.proto.js} (88%) diff --git a/.aegir.js b/.aegir.js deleted file mode 100644 index a10cdff0..00000000 --- a/.aegir.js +++ /dev/null @@ -1,19 +0,0 @@ -'use strict' - -const path = require('path') - -module.exports = { - webpack: { - resolve: { - alias: { - 'node-forge': path.resolve( - path.dirname(require.resolve('libp2p-crypto')), - '../vendor/forge.bundle.js' - ) - } - }, - externals: { - 'simple-websocket-server': '{}' - } - } -} diff --git a/.gitignore b/.gitignore index 041d367a..03aca806 100644 --- a/.gitignore +++ b/.gitignore @@ -27,6 +27,5 @@ build/Release node_modules dist -lib -test/test-repo-for* \ No newline at end of file +test/test-repo-for* diff --git a/.travis.yml b/.travis.yml index 0c44ec89..4fe6550d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,8 +1,15 @@ sudo: false language: node_js -node_js: - - 4 - - stable +matrix: + include: + - node_js: 4 + env: CXX=g++-4.8 + - node_js: 6 + env: + - SAUCE=true + - CXX=g++-4.8 + - node_js: stable + env: CXX=g++-4.8 # Make sure we have new NPM. before_install: @@ -13,12 +20,17 @@ script: - npm test - npm run coverage -addons: - firefox: 'latest' - before_script: - export DISPLAY=:99.0 - sh -e /etc/init.d/xvfb start after_success: - npm run coverage-publish + +addons: + firefox: 'latest' + apt: + sources: + - ubuntu-toolchain-r-test + packages: + - g++-4.8 \ No newline at end of file diff --git a/API.md b/API.md index 023e4cad..e37fa651 100644 --- a/API.md +++ b/API.md @@ -59,13 +59,14 @@ Cancel previously requested keys. ### `putStream()` Returns a duplex `pull-stream` that emits an object `{key: Multihash}` for every written block when it was stored. +Objects passed into here should be of the form `{data: Buffer, key: Multihash}` -### `put(block, cb)` +### `put(blockAndKey, cb)` -- `block: IpfsBlock` +- `blockAndKey: {data: Buffer, key: Multihash}` - `cb: Function` -Announce that the current node now has the `block`. This will store it +Announce that the current node now has the block containing `data`. This will store it in the local database and attempt to serve it to all peers that are known to have requested it. The callback is called when we are sure that the block is stored. diff --git a/README.md b/README.md index b16e9760..e997ac45 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,10 @@ [![Travis CI](https://travis-ci.org/ipfs/js-ipfs-bitswap.svg?branch=master)](https://travis-ci.org/ipfs/js-ipfs-bitswap) [![Circle CI](https://circleci.com/gh/ipfs/js-ipfs-bitswap.svg?style=svg)](https://circleci.com/gh/ipfs/js-ipfs-bitswap) [![Dependency Status](https://david-dm.org/ipfs/js-ipfs-bitswap.svg?style=flat-square)](https://david-dm.org/ipfs/js-ipfs-bitswap) [![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat-square)](https://github.com/feross/standard) +![](https://img.shields.io/badge/npm-%3E%3D3.0.0-orange.svg?style=flat-square) +![](https://img.shields.io/badge/Node.js-%3E%3D4.0.0-orange.svg?style=flat-square) + +[![Sauce Test Status](https://saucelabs.com/browser-matrix/js-ipfs-bitswap.svg)](https://saucelabs.com/u/js-ipfs-bitswap) > Node.js implementation of the Bitswap 'data exchange' protocol used by IPFS diff --git a/package.json b/package.json index 056834fd..4075385c 100644 --- a/package.json +++ b/package.json @@ -2,8 +2,10 @@ "name": "ipfs-bitswap", "version": "0.7.1", "description": "Node.js implementation of the Bitswap data exchange protocol used by IPFS", - "main": "lib/index.js", - "jsnext:main": "src/index.js", + "main": "src/index.js", + "browser": { + "libp2p-ipfs": false + }, "scripts": { "test": "aegir-test", "test:browser": "aegir-test browser", @@ -33,38 +35,39 @@ }, "homepage": "https://github.com/ipfs/js-ipfs-bitswap#readme", "devDependencies": { - "aegir": "^8.0.1", + "aegir": "^9.1.1", "buffer-loader": "0.0.1", "chai": "^3.5.0", "fs-pull-blob-store": "^0.4.1", - "idb-pull-blob-store": "^0.4.0", - "interface-pull-blob-store": "^0.5.0", - "ipfs-repo": "^0.9.0", - "libp2p-ipfs": "^0.14.1", - "lodash": "^4.16.2", + "idb-pull-blob-store": "^0.5.1", + "interface-pull-blob-store": "^0.6.0", + "ipfs-repo": "^0.11.1", + "libp2p-ipfs": "^0.15.0", + "lodash": "^4.16.6", "multiaddr": "^2.0.3", "ncp": "^2.0.0", "peer-book": "^0.3.0", - "peer-id": "^0.7.0", - "peer-info": "^0.7.1", + "peer-id": "^0.8.0", + "peer-info": "^0.8.0", "rimraf": "^2.5.4", "safe-buffer": "^5.0.1" }, "dependencies": { - "async": "^2.1.0", - "debug": "^2.3.1", + "async": "^2.1.2", + "cids": "^0.2.0", + "debug": "^2.3.2", "heap": "^0.2.6", - "ipfs-block": "^0.3.0", + "ipfs-block": "^0.5.0", + "lodash.debounce": "^4.0.8", "lodash.isequalwith": "^4.4.0", "lodash.isundefined": "^3.0.1", "multihashes": "^0.2.2", - "protocol-buffers": "^3.1.6", + "protocol-buffers": "^3.1.8", "pull-defer": "^0.2.2", - "pull-generate": "^2.2.0", "pull-length-prefixed": "^1.2.0", - "pull-paramap": "^1.1.6", + "pull-paramap": "^1.2.0", "pull-pushable": "^2.0.1", - "pull-stream": "^3.4.5" + "pull-stream": "^3.5.0" }, "contributors": [ "David Dias ", @@ -74,4 +77,4 @@ "greenkeeperio-bot ", "npmcdn-to-unpkg-bot " ] -} \ No newline at end of file +} diff --git a/src/decision/engine.js b/src/decision/engine.js index 545a3b4b..b4c9676c 100644 --- a/src/decision/engine.js +++ b/src/decision/engine.js @@ -3,7 +3,10 @@ const debug = require('debug') const mh = require('multihashes') const pull = require('pull-stream') -const generate = require('pull-generate') +const whilst = require('async/whilst') +const setImmediate = require('async/setImmediate') +const each = require('async/each') +const debounce = require('lodash.debounce') const log = debug('bitswap:engine') log.error = debug('bitswap:engine:error') @@ -26,37 +29,45 @@ module.exports = class Engine { this.peerRequestQueue = new PeerRequestQueue() this._running = false + + this._outbox = debounce(this._outboxExec.bind(this), 100) } _sendBlock (env, cb) { const msg = new Message(false) - msg.addBlock(env.block) - - log('Sending block to %s', env.peer.toB58String(), env.block.data.toString()) - - this.network.sendMessage(env.peer, msg, (err) => { + msg.addBlock(env.block, (err) => { if (err) { - log('sendblock error: %s', err.message) + return cb(err) } - cb(null, 'done') + + log('Sending block to %s', env.peer.toB58String(), env.block.data.toString()) + + this.network.sendMessage(env.peer, msg, (err) => { + if (err) { + log('sendblock error: %s', err.message) + } + cb(null, 'done') + }) }) } - _outbox () { - if (!this._running) return + _outboxExec () { + let nextTask + log('outbox') - const doIt = (cb) => pull( - generate(null, (state, cb) => { - log('generating', this._running) + whilst( + () => { if (!this._running) { - return cb(true) + return } - const nextTask = this.peerRequestQueue.pop() + nextTask = this.peerRequestQueue.pop() + log('check', this._running && nextTask) + return Boolean(nextTask) + }, + (next) => { + log('generating') log('got task', nextTask) - if (!nextTask) { - return cb(true) - } pull( this.blockstore.getStream(nextTask.entry.key), @@ -65,31 +76,20 @@ module.exports = class Engine { const block = blocks[0] if (err || !block) { nextTask.done() - return cb(null, false) + return next() } - cb(null, { + this._sendBlock({ peer: nextTask.target, block: block, - sent: () => { + sent () { nextTask.done() } - }) + }, next) }) ) - }), - pull.filter(Boolean), - pull.asyncMap(this._sendBlock.bind(this)), - pull.onEnd(cb) + } ) - - if (!this._timer) { - this._timer = setTimeout(() => { - doIt(() => { - this._timer = null - }) - }, 50) - } } wantlistForPeer (peerId) { @@ -118,20 +118,25 @@ module.exports = class Engine { ledger.wantlist = new Wantlist() } - this._processBlocks(msg.blocks, ledger) - log('wantlist', Array.from(msg.wantlist.values()).map((e) => e.toString())) - - pull( - pull.values(Array.from(msg.wantlist.values())), - pull.asyncMap((entry, cb) => { - this._processWantlist(ledger, peerId, entry, cb) - }), - pull.onEnd((err) => { - if (err) return cb(err) - this._outbox() - cb() - }) - ) + this._processBlocks(msg.blocks, ledger, (err) => { + if (err) { + log.error(`failed to process blocks: ${err.message}`) + } + + log('wantlist', Array.from(msg.wantlist.values()).map((e) => e.toString())) + + pull( + pull.values(Array.from(msg.wantlist.values())), + pull.asyncMap((entry, cb) => { + this._processWantlist(ledger, peerId, entry, cb) + }), + pull.onEnd((err) => { + if (err) return cb(err) + this._outbox() + cb() + }) + ) + }) } receivedBlock (key) { @@ -173,23 +178,36 @@ module.exports = class Engine { } } - _processBlocks (blocks, ledger) { - for (let block of blocks.values()) { - log('got block %s (%s bytes)', mh.toB58String(block.key), block.data.length) - ledger.receivedBytes(block.data.length) + _processBlocks (blocks, ledger, callback) { + each(blocks.values(), (block, cb) => { + block.key((err, key) => { + if (err) { + return cb(err) + } + log('got block %s (%s bytes)', mh.toB58String(key), block.data.length) + ledger.receivedBytes(block.data.length) - this.receivedBlock(block.key) - } + this.receivedBlock(key) + cb() + }) + }, callback) } // Clear up all accounting things after message was sent - messageSent (peerId, msg) { + messageSent (peerId, msg, callback) { const ledger = this._findOrCreate(peerId) - for (let block of msg.blocks.values()) { + each(msg.blocks.values(), (block, cb) => { ledger.sentBytes(block.data.length) - ledger.wantlist.remove(block.key) - this.peerRequestQueue.remove(block.key, peerId) - } + block.key((err, key) => { + if (err) { + return cb(err) + } + + ledger.wantlist.remove(key) + this.peerRequestQueue.remove(key, peerId) + cb() + }) + }, callback) } numBytesSentTo (peerId) { diff --git a/src/index.js b/src/index.js index 32df2149..86eb0a07 100644 --- a/src/index.js +++ b/src/index.js @@ -10,6 +10,7 @@ const mh = require('multihashes') const pull = require('pull-stream') const paramap = require('pull-paramap') const defer = require('pull-defer/source') +const Block = require('ipfs-block') const cs = require('./constants') const WantManager = require('./wantmanager') @@ -59,7 +60,7 @@ module.exports = class Bitwap { pull( pull.values(iblocks), - pull.map((block) => block.key), + pull.asyncMap((block, cb) => block.key(cb)), pull.filter((key) => this.wm.wl.contains(key)), pull.collect((err, keys) => { if (err) { @@ -78,7 +79,6 @@ module.exports = class Bitwap { } _handleReceivedBlock (peerId, block, cb) { - log('handling block', block) series([ (cb) => this._updateReceiveCounters(block, (err) => { if (err) { @@ -87,33 +87,44 @@ module.exports = class Bitwap { return cb() } - log('got block from %s', peerId.toB58String(), block.data.toString()) + log('got block from %s', peerId.toB58String(), block.data.length) cb() }), - (cb) => this.put(block, (err) => { + (cb) => block.key((err, key) => { if (err) { - log.error('receiveMessage put error: %s', err.message) + return cb(err) } - cb() + this.put({data: block.data, key: key}, (err) => { + if (err) { + log.error('receiveMessage put error: %s', err.message) + } + cb() + }) }) ], cb) } _updateReceiveCounters (block, cb) { this.blocksRecvd ++ - this.blockstore.has(block.key, (err, has) => { + block.key((err, key) => { if (err) { - log('blockstore.has error: %s', err.message) return cb(err) } - if (has) { - this.dupBlocksRecvd ++ - this.dupDataRecvd += block.data.length - return cb(new Error('Already have block')) - } + this.blockstore.has(key, (err, has) => { + if (err) { + log('blockstore.has error: %s', err.message) + return cb(err) + } + + if (has) { + this.dupBlocksRecvd ++ + this.dupDataRecvd += block.data.length + return cb(new Error('Already have block')) + } - cb() + cb() + }) }) } @@ -122,6 +133,7 @@ module.exports = class Bitwap { retry({times, interval: 400}, (done) => { pull( pull.values([block]), + pull.asyncMap(blockToStore), this.blockstore.putStream(), pull.onEnd(done) ) @@ -150,6 +162,7 @@ module.exports = class Bitwap { } getStream (keys) { + log('getStream', keys.length) if (!Array.isArray(keys)) { return this._getStreamSingle(keys) } @@ -167,6 +180,7 @@ module.exports = class Bitwap { } _getStreamSingle (key) { + log('getStreamSingle', mh.toB58String(key)) const unwantListeners = {} const blockListeners = {} const unwantEvent = (key) => `unwant:${key}` @@ -197,7 +211,7 @@ module.exports = class Bitwap { } blockListeners[keyS] = (block) => { - this.wm.cancelWants([block.key]) + this.wm.cancelWants([key]) cleanupListener(key) d.resolve(pull.values([block])) } @@ -211,6 +225,7 @@ module.exports = class Bitwap { return d.resolve(pull.error(err)) } if (exists) { + log('already have block', mh.toB58String(key)) return d.resolve(this.blockstore.getStream(key)) } @@ -243,24 +258,32 @@ module.exports = class Bitwap { putStream () { return pull( - pull.asyncMap((block, cb) => { - this.blockstore.has(block.key, (err, exists) => { - if (err) return cb(err) - cb(null, [block, exists]) + pull.asyncMap((blockAndKey, cb) => { + this.blockstore.has(blockAndKey.key, (err, exists) => { + if (err) { + return cb(err) + } + cb(null, [new Block(blockAndKey.data), exists]) }) }), pull.filter((val) => !val[1]), pull.map((val) => { const block = val[0] - + log('putting block') return pull( pull.values([block]), + pull.asyncMap(blockToStore), this.blockstore.putStream(), - pull.through((meta) => { - const key = block.key - log('put block: %s', mh.toB58String(key)) - this.notifications.emit(`block:${mh.toB58String(key)}`, block) - this.engine.receivedBlock(key) + pull.asyncMap((meta, cb) => { + block.key((err, key) => { + if (err) { + return cb(err) + } + log('put block: %s', mh.toB58String(key)) + this.notifications.emit(`block:${mh.toB58String(key)}`, block) + this.engine.receivedBlock(key) + cb(null, meta) + }) }) ) }), @@ -269,9 +292,9 @@ module.exports = class Bitwap { } // announces the existance of a block to this service - put (block, cb) { + put (blockAndKey, cb) { pull( - pull.values([block]), + pull.values([blockAndKey]), this.putStream(), pull.onEnd(cb) ) @@ -304,3 +327,13 @@ module.exports = class Bitwap { this.engine.stop() } } + +// Helper method, to add a cid to a block before storing it in the ipfs-repo/blockstore +function blockToStore (b, cb) { + b.key((err, key) => { + if (err) { + return cb(err) + } + cb(null, {data: b.data, key: key}) + }) +} diff --git a/src/message/index.js b/src/message/index.js index 2a1f8382..be4682e7 100644 --- a/src/message/index.js +++ b/src/message/index.js @@ -1,14 +1,13 @@ 'use strict' const protobuf = require('protocol-buffers') -const fs = require('fs') const Block = require('ipfs-block') -const path = require('path') const isEqualWith = require('lodash.isequalwith') const mh = require('multihashes') const assert = require('assert') +const map = require('async/map') -const pbm = protobuf(fs.readFileSync(path.join(__dirname, 'message.proto'))) +const pbm = protobuf(require('./message.proto')) const Entry = require('./entry') class BitswapMessage { @@ -35,8 +34,15 @@ class BitswapMessage { } } - addBlock (block) { - this.blocks.set(mh.toB58String(block.key), block) + addBlock (block, cb) { + block.key((err, key) => { + if (err) { + return cb(err) + } + + this.blocks.set(mh.toB58String(key), block) + cb() + }) } cancel (key) { @@ -90,16 +96,20 @@ class BitswapMessage { } } -BitswapMessage.fromProto = (raw) => { +BitswapMessage.fromProto = (raw, callback) => { const dec = pbm.Message.decode(raw) const m = new BitswapMessage(dec.wantlist.full) dec.wantlist.entries.forEach((e) => { m.addEntry(e.block, e.priority, e.cancel) }) - dec.blocks.forEach((b) => m.addBlock(new Block(b))) - return m + map(dec.blocks, (b, cb) => m.addBlock(new Block(b), cb), (err) => { + if (err) { + return callback(err) + } + callback(null, m) + }) } BitswapMessage.Entry = Entry diff --git a/src/message/message.proto b/src/message/message.proto.js similarity index 88% rename from src/message/message.proto rename to src/message/message.proto.js index da11427b..6038b42a 100644 --- a/src/message/message.proto +++ b/src/message/message.proto.js @@ -1,4 +1,6 @@ -package bitswap.message.pb; +'use strict' + +module.exports = `package bitswap.message.pb; message Message { @@ -16,4 +18,4 @@ message Message { optional Wantlist wantlist = 1; repeated bytes blocks = 2; -} \ No newline at end of file +}` diff --git a/src/network/index.js b/src/network/index.js index da974c9f..2b1adac7 100644 --- a/src/network/index.js +++ b/src/network/index.js @@ -3,10 +3,13 @@ const debug = require('debug') const lp = require('pull-length-prefixed') const pull = require('pull-stream') +const pushable = require('pull-pushable') +const setImmediate = require('async/setImmediate') const Message = require('../message') const cs = require('../constants') const log = debug('bitswap:network') +log.error = debug('bitswap:network:error') const PROTOCOL_IDENTIFIER = '/ipfs/bitswap/1.0.0' @@ -15,6 +18,7 @@ module.exports = class Network { this.libp2p = libp2p this.peerBook = peerBook this.bitswap = bitswap + this.conns = new Map() // increase event listener max this.libp2p.swarm.setMaxListeners(cs.maxListeners) @@ -29,7 +33,6 @@ module.exports = class Network { this.libp2p.handle(PROTOCOL_IDENTIFIER, this._onConnection) this.libp2p.swarm.on('peer-mux-established', this._onPeerMux) - this.libp2p.swarm.on('peer-mux-closed', this._onPeerMuxClosed) // All existing connections are like new ones for us @@ -46,26 +49,24 @@ module.exports = class Network { this.libp2p.swarm.removeListener('peer-mux-closed', this._onPeerMuxClosed) } - _onConnection (conn) { - log('incomming new bitswap connection') + _onConnection (protocol, conn) { + log('incomming new bitswap connection: %s', protocol) pull( conn, lp.decode(), - pull.through((data) => { - let msg - try { - msg = Message.fromProto(data) - } catch (err) { - return this.bitswap._receiveError(err) - } + pull.asyncMap((data, cb) => Message.fromProto(data, cb)), + pull.asyncMap((msg, cb) => { conn.getPeerInfo((err, peerInfo) => { if (err) { - return this.bitswap._receiveError(err) + return cb(err) } + log('data from', peerInfo.id.toB58String()) this.bitswap._receiveMessage(peerInfo.id, msg) + cb() }) }), pull.onEnd((err) => { + log('ending connection') if (err) { return this.bitswap._receiveError(err) } @@ -97,8 +98,8 @@ module.exports = class Network { // Send the given msg (instance of Message) to the given peer sendMessage (peerId, msg, cb) { - log('sendMessage to %s', peerId.toB58String()) - log('msg', msg) + const stringId = peerId.toB58String() + log('sendMessage to %s', stringId) let peerInfo try { peerInfo = this.peerBook.getByMultihash(peerId.toBytes()) @@ -106,16 +107,37 @@ module.exports = class Network { return cb(err) } + if (this.conns.has(stringId)) { + log('connection exists') + this.conns.get(stringId).push(msg.toProto()) + return cb() + } + + log('dialByPeerInfo') this.libp2p.dialByPeerInfo(peerInfo, PROTOCOL_IDENTIFIER, (err, conn) => { log('dialed %s', peerInfo.id.toB58String(), err) if (err) { return cb(err) } + + const msgQueue = pushable() + msgQueue.push(msg.toProto()) + + this.conns.set(stringId, msgQueue) + pull( - pull.values([msg.toProto()]), + msgQueue, lp.encode(), - conn + conn, + pull.onEnd((err) => { + if (err) { + log.error(err) + } + msgQueue.end() + this.conns.delete(stringId) + }) ) + cb() }) } diff --git a/src/wantmanager/index.js b/src/wantmanager/index.js index ebce56fe..c9cc34bd 100644 --- a/src/wantmanager/index.js +++ b/src/wantmanager/index.js @@ -108,6 +108,7 @@ module.exports = class Wantmanager { // cancel wanting all of the given keys cancelWants (keys) { + log('keys', keys) log('cancel wants: ', keys.map((k) => mh.toB58String(k))) this._addEntries(keys, true) } @@ -128,24 +129,23 @@ module.exports = class Wantmanager { } run () { - // TODO: is this needed? if so enable it - // // resend entirew wantlist every so often - // const es = [] - // for (let e of this.wl.entries()) { - // es.push(new Message.Entry(e.key, e.priority)) - // } - - // this.peers.forEach((p) => { - // p.addEntries(es, true) - // }) - // timer.start() - // } - // } + this.timer = setInterval(() => { + // resend entirew wantlist every so often + const fullwantlist = new Message(true) + for (let entry of this.wl.entries()) { + fullwantlist.addEntry(entry[1].key, entry[1].priority) + } + + this.peers.forEach((p) => { + p.addMessage(fullwantlist) + }) + }, 10 * 1000) } stop () { for (let mq of this.peers.values()) { this.disconnected(mq.p) } + clearInterval(this.timer) } } diff --git a/src/wantmanager/msg-queue.js b/src/wantmanager/msg-queue.js index aa61954d..30259425 100644 --- a/src/wantmanager/msg-queue.js +++ b/src/wantmanager/msg-queue.js @@ -1,8 +1,7 @@ 'use strict' +const queue = require('async/queue') const debug = require('debug') -const pull = require('pull-stream') -const pushable = require('pull-pushable') const Message = require('../message') @@ -15,10 +14,14 @@ module.exports = class MsgQueue { this.network = network this.refcnt = 1 - this.queue = pushable() + this.queue = queue(this.doWork.bind(this), 1) + this.queue.pause() } addMessage (msg) { + if (msg.empty) { + return + } log('addMessage: %s', this.p.toB58String(), msg) this.queue.push(msg) } @@ -43,12 +46,13 @@ module.exports = class MsgQueue { this.network.connectTo(this.p, (err) => { if (err) { log.error('cant connect to peer %s: %s', this.p.toB58String(), err.message) - return cb() + return cb(err) } log('sending message', wlm) this.network.sendMessage(this.p, wlm, (err) => { if (err) { log.error('send error: %s', err.message) + return cb(err) } cb() }) @@ -57,20 +61,11 @@ module.exports = class MsgQueue { run () { log('starting queue') - - pull( - this.queue, - pull.asyncMap(this.doWork.bind(this)), - pull.onEnd((err) => { - if (err) { - log.error('error processing message queue', err) - } - this.queue = pushable() - }) - ) + this.queue.resume() } stop () { - this.queue.end() + log('killing queue') + this.queue.kill() } } diff --git a/test/decision/engine-test.js b/test/decision/engine-test.js index c47c8811..0154e0e1 100644 --- a/test/decision/engine-test.js +++ b/test/decision/engine-test.js @@ -1,3 +1,4 @@ +/* eslint max-nested-callbacks: ["error", 8] */ /* eslint-env mocha */ 'use strict' @@ -7,6 +8,7 @@ const _ = require('lodash') const Block = require('ipfs-block') const parallel = require('async/parallel') const series = require('async/series') +const map = require('async/map') const eachSeries = require('async/eachSeries') const pull = require('pull-stream') const paramap = require('pull-paramap') @@ -18,13 +20,19 @@ const mockNetwork = require('../utils').mockNetwork module.exports = (repo) => { function newEngine (id, done) { - repo.create(id, (err, repo) => { - if (err) return done(err) - const engine = new Engine(repo.blockstore, mockNetwork()) + parallel([ + (cb) => repo.create(id, cb), + (cb) => PeerId.create(cb) + ], (err, results) => { + if (err) { + return done(err) + } + const blockstore = results[0].blockstore + const engine = new Engine(blockstore, mockNetwork()) engine.start() done(null, { - peer: PeerId.create({bits: 64}), + peer: results[1], engine }) }) @@ -48,11 +56,18 @@ module.exports = (repo) => { pull( pull.values(_.range(1000)), paramap((i, cb) => { - const m = new Message(false) const content = `this is message ${i}` - m.addBlock(new Block(content)) - sender.engine.messageSent(receiver.peer, m) - receiver.engine.messageReceived(sender.peer, m, cb) + cb(null, new Block(content)) + }), + paramap((block, cb) => { + const m = new Message(false) + m.addBlock(block, (err) => { + if (err) { + return cb(err) + } + sender.engine.messageSent(receiver.peer, m) + receiver.engine.messageReceived(sender.peer, m, cb) + }) }, 100), pull.onEnd((err) => { expect(err).to.not.exist @@ -139,29 +154,40 @@ module.exports = (repo) => { pull( pull.values(alphabet), - pull.map((l) => new Block(l)), + pull.asyncMap((l, cb) => { + const block = new Block(l) + block.key((err, key) => { + if (err) { + return cb(err) + } + cb(null, {data: block.data, key: key}) + }) + }), repo.blockstore.putStream(), pull.onEnd((err) => { expect(err).to.not.exist const partnerWants = (e, keys, p, cb) => { const add = new Message(false) - keys.forEach((letter, i) => { - const block = new Block(letter) - add.addEntry(block.key, Math.pow(2, 32) - 1 - i) - }) + const blocks = keys.map((k) => new Block(k)) + map(blocks, (b, cb) => b.key(cb), (err, keys) => { + expect(err).to.not.exist + blocks.forEach((b, i) => { + add.addEntry(keys[i], Math.pow(2, 32) - 1 - i) + }) - e.messageReceived(p, add, cb) + e.messageReceived(p, add, cb) + }) } const partnerCancels = (e, keys, p, cb) => { const cancels = new Message(false) - keys.forEach((k) => { - const block = new Block(k) - cancels.cancel(block.key) + const blocks = keys.map((k) => new Block(k)) + map(blocks, (b, cb) => b.key(cb), (err, keys) => { + expect(err).to.not.exist + keys.forEach((k) => cancels.cancel(k)) + e.messageReceived(p, cancels, cb) }) - - e.messageReceived(p, cancels, cb) } eachSeries(_.range(numRounds), (i, cb) => { @@ -186,10 +212,17 @@ module.exports = (repo) => { const e = new Engine(repo.blockstore, network) e.start() - const partner = PeerId.create({bits: 64}) + let partner series([ - (c) => partnerWants(e, set, partner, c), - (c) => partnerCancels(e, cancels, partner, c) + (cb) => PeerId.create((err, id) => { + if (err) { + return cb(err) + } + partner = id + cb() + }), + (cb) => partnerWants(e, set, partner, cb), + (cb) => partnerCancels(e, cancels, partner, cb) ], (err) => { if (err) throw err }) diff --git a/test/decision/ledger.spec.js b/test/decision/ledger.spec.js index 68ec87eb..712ff8c0 100644 --- a/test/decision/ledger.spec.js +++ b/test/decision/ledger.spec.js @@ -7,9 +7,19 @@ const PeerId = require('peer-id') const Ledger = require('../../src/decision/ledger') describe('Ledger', () => { - const p = PeerId.create({bits: 64}) + let p let ledger + before((done) => { + PeerId.create((err, id) => { + if (err) { + return done(err) + } + + p = id + done() + }) + }) beforeEach(() => { ledger = new Ledger(p) }) diff --git a/test/decision/peer-request-queue.spec.js b/test/decision/peer-request-queue.spec.js index 47b6da66..ff0e96c0 100644 --- a/test/decision/peer-request-queue.spec.js +++ b/test/decision/peer-request-queue.spec.js @@ -1,3 +1,4 @@ +/* eslint max-nested-callbacks: ["error", 8] */ /* eslint-env mocha */ 'use strict' @@ -5,91 +6,143 @@ const expect = require('chai').expect const PeerId = require('peer-id') const _ = require('lodash') const Block = require('ipfs-block') - -const hash = (data) => (new Block(data)).key +const map = require('async/map') +const each = require('async/each') +const waterfall = require('async/waterfall') +const parallel = require('async/parallel') + +const hash = (data, cb) => { + const block = new Block(data) + block.key((err, key) => { + if (err) { + return cb(err) + } + + cb(null, key) + }) +} const WantlistEntry = require('../../src/wantlist').Entry const PeerRequestQueue = require('../../src/decision/peer-request-queue') describe('PeerRequestQueue', () => { - it('push and pop', () => { + it('push and pop', (done) => { const prq = new PeerRequestQueue() - const partner = PeerId.create({bits: 64}) - const alphabet = 'abcdefghijklmnopqrstuvwxyz'.split('').sort() - const vowels = 'aeiou'.split('').sort() - const consonants = alphabet.filter((a) => !_.includes(vowels, a)).sort() - - alphabet.forEach((a, i) => { - prq.push(new WantlistEntry(hash(a), Math.pow(2, 32) - 1 - i), partner) - }) - - consonants.forEach((c) => { - prq.remove(hash(c), partner) - }) - - const out = [] - alphabet.forEach(() => { - const rec = prq.pop() - if (!rec) return - out.push(rec.entry.key) - }) - - expect(out.length).to.be.eql(vowels.length) - - vowels.forEach((v, i) => { - expect( - out[i].toString('hex') - ).to.be.eql( - hash(v).toString('hex') - ) + PeerId.create({bits: 1024}, (err, partner) => { + if (err) { + return done(err) + } + + const alphabet = 'abcdefghijklmnopqrstuvwxyz'.split('').sort() + const vowels = 'aeiou'.split('').sort() + const vowelsIndex = vowels.map((v) => alphabet.indexOf(v)) + const consonants = alphabet + .filter((a) => !_.includes(vowels, a)) + .sort() + .map((c) => alphabet.indexOf(c)) + + map(alphabet, hash, (err, hashes) => { + if (err) { + return done(err) + } + + alphabet.forEach((a, i) => { + prq.push(new WantlistEntry(hashes[i], Math.pow(2, 32) - 1 - i), partner) + }) + + consonants.forEach((c) => { + prq.remove(hashes[c], partner) + }) + + const out = [] + alphabet.forEach(() => { + const rec = prq.pop() + if (!rec) return + out.push(rec.entry.key) + }) + + expect(out.length).to.be.eql(vowels.length) + + vowelsIndex.forEach((v, i) => { + expect( + out[i].toString('hex') + ).to.be.eql( + hashes[v].toString('hex') + ) + }) + done() + }) }) }) - it('peer repeats', () => { + it('peer repeats', (done) => { // This test checks that peers wont starve out other peers const prq = new PeerRequestQueue() - const peers = _.range(4).map(() => PeerId.create({bits: 64})) - - _.range(5).map((i) => { - peers.forEach((peer) => { - prq.push(new WantlistEntry(hash('hello-' + i)), peer) - }) - }) - - let targets = [] - const tasks = [] - - _.range(4).forEach((i) => { - const t = prq.pop() - targets.push(t.target.toHexString()) - tasks.push(t) - }) - - const expected = peers.map((p) => p.toHexString()).sort() - targets = targets.sort() - expect(targets).to.be.eql(expected) - - // Now, if one of the tasks gets finished, the next task off the queue should - // be for the same peer - _.range(3).forEach((blockI) => { - _.range(3).forEach((i) => { - // its okay to mark the same task done multiple times here (JUST FOR TESTING) - tasks[i].done() - const ntask = prq.pop() - expect(ntask.target).to.be.eql(tasks[i].target) - }) - }) + waterfall([ + (cb) => map(_.range(4), (i, cb) => PeerId.create({bits: 1024}, cb), cb), + (peers, cb) => { + each(_.range(5), (i, cb) => { + hash('hello-' + i, (err, digest) => { + if (err) { + return cb(err) + } + peers.forEach((peer) => { + prq.push(new WantlistEntry(digest), peer) + }) + cb() + }) + }, (err) => { + if (err) { + return cb(err) + } + let targets = [] + const tasks = [] + + _.range(4).forEach((i) => { + const t = prq.pop() + targets.push(t.target.toHexString()) + tasks.push(t) + }) + + const expected = peers.map((p) => p.toHexString()).sort() + targets = targets.sort() + + expect(targets).to.be.eql(expected) + + // Now, if one of the tasks gets finished, the next task off the queue should + // be for the same peer + _.range(3).forEach((blockI) => { + _.range(3).forEach((i) => { + // its okay to mark the same task done multiple times here (JUST FOR TESTING) + tasks[i].done() + const ntask = prq.pop() + expect(ntask.target).to.be.eql(tasks[i].target) + }) + }) + cb() + }) + } + ], done) }) - it('push same block multiple times', () => { + it('push same block multiple times', (done) => { const prq = new PeerRequestQueue() - const partner = PeerId.create({bits: 64}) - - prq.push(new WantlistEntry(hash('hello')), partner) - prq.push(new WantlistEntry(hash('hello')), partner) - - expect(prq.pop()).to.exist - expect(prq.pop()).to.not.exist + parallel([ + (cb) => PeerId.create({bits: 1024}, cb), + (cb) => hash('hello', cb) + ], (err, results) => { + if (err) { + return done(err) + } + const partner = results[0] + const digest = results[1] + prq.push(new WantlistEntry(digest), partner) + prq.push(new WantlistEntry(digest), partner) + + expect(prq.pop()).to.exist + expect(prq.pop()).to.not.exist + done() + }) }) }) diff --git a/test/index-test.js b/test/index-test.js index ccb805ab..d23fd113 100644 --- a/test/index-test.js +++ b/test/index-test.js @@ -5,6 +5,9 @@ const eachSeries = require('async/eachSeries') const waterfall = require('async/waterfall') const each = require('async/each') +const map = require('async/map') +const parallel = require('async/parallel') +const setImmediate = require('async/setImmediate') const _ = require('lodash') const expect = require('chai').expect const PeerId = require('peer-id') @@ -18,7 +21,7 @@ const Bitswap = require('../src') const utils = require('./utils') -const makeBlock = () => new Block(`hello world ${Math.random()}`) +const makeBlock = (cb) => cb(null, new Block(`hello world ${Math.random()}`)) module.exports = (repo) => { const libp2pMock = { @@ -32,11 +35,23 @@ module.exports = (repo) => { describe('bitswap', () => { let store + let blocks + let ids before((done) => { - repo.create('hello', (err, r) => { - if (err) return done(err) - store = r.blockstore + parallel([ + (cb) => repo.create('hello', cb), + (cb) => map(_.range(12), (i, cb) => makeBlock(cb), cb), + (cb) => map(_.range(2), (i, cb) => PeerId.create(cb), cb) + ], (err, results) => { + if (err) { + return done(err) + } + + store = results[0].blockstore + blocks = results[1] + ids = results[2] + done() }) }) @@ -47,111 +62,150 @@ module.exports = (repo) => { describe('receive message', () => { it('simple block message', (done) => { - const me = PeerId.create({bits: 64}) + const me = ids[0] const book = new PeerBook() const bs = new Bitswap(me, libp2pMock, store, book) bs.start() - const other = PeerId.create({bits: 64}) - const b1 = makeBlock() - const b2 = makeBlock() + const other = ids[1] + const b1 = blocks[0] + const b2 = blocks[1] const msg = new Message(false) - msg.addBlock(b1) - msg.addBlock(b2) + each([b1, b2], (b, cb) => msg.addBlock(b, cb), (err) => { + expect(err).to.not.exist - bs._receiveMessage(other, msg, (err) => { - if (err) throw err + bs._receiveMessage(other, msg, (err) => { + if (err) { + throw err + } - expect(bs.blocksRecvd).to.be.eql(2) - expect(bs.dupBlocksRecvd).to.be.eql(0) + expect(bs.blocksRecvd).to.be.eql(2) + expect(bs.dupBlocksRecvd).to.be.eql(0) - pull( - pull.values([b1, b1]), - pull.map((block) => store.getStream(block.key)), - pull.flatten(), - pull.collect((err, blocks) => { - if (err) return done(err) + pull( + pull.values([b1, b2]), + pull.asyncMap((b, cb) => b.key(cb)), + pull.map((key) => store.getStream(key)), + pull.flatten(), + pull.collect((err, blocks) => { + if (err) return done(err) - expect(blocks).to.be.eql([b1, b1]) - done() - }) - ) + expect(blocks[0].data).to.be.eql(b1.data) + expect(blocks[1].data).to.be.eql(b2.data) + done() + }) + ) + }) }) }) it('simple want message', (done) => { - const me = PeerId.create({bits: 64}) + const me = ids[0] const book = new PeerBook() const bs = new Bitswap(me, libp2pMock, store, book) bs.start() - const other = PeerId.create({bits: 64}) - const b1 = makeBlock() - const b2 = makeBlock() + const other = ids[1] + const b1 = blocks[2] + const b2 = blocks[3] const msg = new Message(false) - msg.addEntry(b1.key, 1, false) - msg.addEntry(b2.key, 1, false) + parallel([ + (cb) => b1.key(cb), + (cb) => b2.key(cb) + ], (err, keys) => { + expect(err).to.not.exist - bs._receiveMessage(other, msg, (err) => { - if (err) throw err + msg.addEntry(keys[0], 1, false) + msg.addEntry(keys[1], 1, false) - expect(bs.blocksRecvd).to.be.eql(0) - expect(bs.dupBlocksRecvd).to.be.eql(0) + bs._receiveMessage(other, msg, (err) => { + expect(err).to.not.exist + + expect(bs.blocksRecvd).to.be.eql(0) + expect(bs.dupBlocksRecvd).to.be.eql(0) - const wl = bs.wantlistForPeer(other) + const wl = bs.wantlistForPeer(other) - expect(wl.has(mh.toB58String(b1.key))).to.be.eql(true) - expect(wl.has(mh.toB58String(b2.key))).to.be.eql(true) + expect(wl.has(mh.toB58String(keys[0]))).to.be.eql(true) + expect(wl.has(mh.toB58String(keys[1]))).to.be.eql(true) - done() + done() + }) }) }) it('multi peer', (done) => { - const me = PeerId.create({bits: 64}) + const me = ids[0] const book = new PeerBook() const bs = new Bitswap(me, libp2pMock, store, book) bs.start() - const others = _.range(5).map(() => PeerId.create({bits: 64})) - const blocks = _.range(10).map((i) => new Block(`hello ${i}`)) - const messages = _.range(5).map((i) => { - const m = new Message(false) - m.addBlock(blocks[i]) - m.addBlock(blocks[5 + i]) - return m - }) - let i = 0 - eachSeries(others, (other, cb) => { - const msg = messages[i] - i++ - bs._receiveMessage(other, msg, (err) => { - if (err) return cb(err) - hasBlocks(msg, store, cb) + parallel([ + (cb) => map(_.range(5), (i, cb) => PeerId.create(cb), cb), + (cb) => cb(null, _.range(10).map((i) => new Block(`hello ${i}`))) + ], (err, results) => { + if (err) { + return done(err) + } + + const others = results[0] + const blocks = results[1] + + map(_.range(5), (i, cb) => { + const m = new Message(false) + each( + [blocks[i], blocks[5 + i]], + (b, cb) => m.addBlock(b, cb), + (err) => { + if (err) { + return cb(err) + } + cb(null, m) + } + ) + }, (err, messages) => { + expect(err).to.not.exist + let i = 0 + eachSeries(others, (other, cb) => { + const msg = messages[i] + i++ + bs._receiveMessage(other, msg, (err) => { + if (err) return cb(err) + hasBlocks(msg, store, cb) + }) + }, done) }) - }, done) + }) }) }) describe('getStream', () => { it('block exists locally', (done) => { - const me = PeerId.create({bits: 64}) - const block = makeBlock() + const me = ids[0] + const block = blocks[4] pull( pull.values([block]), + pull.asyncMap(blockToStore), store.putStream(), pull.onEnd((err) => { - if (err) return done(err) + if (err) { + return done(err) + } const book = new PeerBook() const bs = new Bitswap(me, libp2pMock, store, book) pull( - bs.getStream(block.key), + pull.values([block]), + pull.asyncMap((b, cb) => b.key(cb)), + pull.map((key) => bs.getStream(key)), + pull.flatten(), pull.collect((err, res) => { - if (err) return done(err) + if (err) { + return done(err) + } - expect(res).to.be.eql([block]) + expect(res[0].data).to.be.eql(block.data) done() }) ) @@ -160,27 +214,37 @@ module.exports = (repo) => { }) it('blocks exist locally', (done) => { - const me = PeerId.create({bits: 64}) - const b1 = makeBlock() - const b2 = makeBlock() - const b3 = makeBlock() + const me = ids[0] + const b1 = blocks[5] + const b2 = blocks[6] + const b3 = blocks[7] pull( pull.values([b1, b2, b3]), + pull.asyncMap(blockToStore), store.putStream(), pull.onEnd((err) => { - if (err) return done(err) + expect(err).to.not.exist const book = new PeerBook() const bs = new Bitswap(me, libp2pMock, store, book) pull( - bs.getStream([b1.key, b2.key, b3.key]), - pull.collect((err, res) => { - if (err) return done(err) - - expect(res).to.be.eql([b1, b2, b3]) - done() + pull.values([b1, b2, b3]), + pull.asyncMap((b, cb) => b.key(cb)), + pull.collect((err, keys) => { + expect(err).to.not.exist + pull( + bs.getStream(keys), + pull.collect((err, res) => { + expect(err).to.not.exist + + expect(res[0].data).to.be.eql(b1.data) + expect(res[1].data).to.be.eql(b2.data) + expect(res[2].data).to.be.eql(b3.data) + done() + }) + ) }) ) }) @@ -191,7 +255,7 @@ module.exports = (repo) => { // test fails because now the network is not properly mocked // what are these net.stores and mockNet.bitswaps? it.skip('block is retrived from peer', (done) => { - const block = makeBlock() + const block = blocks[8] let mockNet waterfall([ @@ -204,23 +268,28 @@ module.exports = (repo) => { mockNet.bitswaps[0]._onPeerConnected(mockNet.ids[1]) mockNet.bitswaps[1]._onPeerConnected(mockNet.ids[0]) pull( - mockNet.bitswaps[0].getStream(block.key), + pull.values([block]), + pull.asyncMap((b, cb) => b.key(cb)), + pull.map((key) => mockNet.bitswaps[0].getStream(key)), + pull.flatten(), pull.collect((err, res) => { - if (err) return cb(err) + if (err) { + return cb(err) + } cb(null, res[0]) }) ) }, (res, cb) => { - expect(res).to.be.eql(res) + expect(res).to.be.eql(block) cb() } ], done) }) it('block is added locally afterwards', (done) => { - const me = PeerId.create({bits: 64}) - const block = makeBlock() + const me = ids[0] + const block = blocks[9] const book = new PeerBook() const bs = new Bitswap(me, libp2pMock, store, book) const net = utils.mockNetwork() @@ -229,24 +298,30 @@ module.exports = (repo) => { bs.engine.network = net bs.start() - pull( - bs.getStream(block.key), - pull.collect((err, res) => { - if (err) throw err - expect(res).to.be.eql([block]) - done() - }) - ) + block.key((err, key) => { + expect(err).to.not.exist + pull( + bs.getStream(key), + pull.collect((err, res) => { + expect(err).to.not.exist + expect(res[0].data).to.be.eql(block.data) + done() + }) + ) - setTimeout(() => { - bs.put(block, () => {}) - }, 200) + setTimeout(() => { + bs.put({ + data: block.data, + key: key + }, () => {}) + }, 200) + }) }) it('block is sent after local add', (done) => { - const me = PeerId.create({bits: 64}) - const other = PeerId.create({bits: 64}) - const block = makeBlock() + const me = ids[0] + const other = ids[1] + const block = blocks[10] let bs1 let bs2 @@ -301,17 +376,24 @@ module.exports = (repo) => { bs2.start() bs1._onPeerConnected(other) bs2._onPeerConnected(me) - pull( - bs1.getStream(block.key), - pull.collect((err, res) => { - if (err) return cb(err) - cb(null, res[0]) - }) - ) - setTimeout(() => { - bs2.put(block) - }, 1000) + block.key((err, key) => { + expect(err).to.not.exist + pull( + bs1.getStream(key), + pull.collect((err, res) => { + expect(err).to.not.exist + cb(null, res[0]) + }) + ) + + setTimeout(() => { + bs2.put({ + data: block.data, + key: key + }) + }, 1000) + }) }, (res, cb) => { expect(res).to.be.eql(res) @@ -323,7 +405,7 @@ module.exports = (repo) => { describe('stat', () => { it('has initial stats', () => { - const me = PeerId.create({bits: 64}) + const me = ids[0] const bs = new Bitswap(me, libp2pMock, {}, new PeerBook()) const stats = bs.stat() @@ -337,10 +419,10 @@ module.exports = (repo) => { describe('unwant', () => { it('removes blocks that are wanted multiple times', (done) => { - const me = PeerId.create({bits: 64}) + const me = ids[0] const bs = new Bitswap(me, libp2pMock, store, new PeerBook()) bs.start() - const b = makeBlock() + const b = blocks[11] let i = 0 const finish = () => { @@ -349,25 +431,27 @@ module.exports = (repo) => { done() } } + b.key((err, key) => { + expect(err).to.not.exist + pull( + bs.getStream(key), + pull.collect((err, res) => { + expect(err).to.not.exist + expect(res).to.be.empty + finish() + }) + ) + pull( + bs.getStream(key), + pull.collect((err, res) => { + expect(err).to.not.exist + expect(res).to.be.empty + finish() + }) + ) - pull( - bs.getStream(b.key), - pull.collect((err, res) => { - expect(err).to.not.exist - expect(res).to.be.empty - finish() - }) - ) - pull( - bs.getStream(b.key), - pull.collect((err, res) => { - expect(err).to.not.exist - expect(res).to.be.empty - finish() - }) - ) - - setTimeout(() => bs.unwant(b.key), 10) + setTimeout(() => bs.unwant(key), 10) + }) }) }) }) @@ -375,10 +459,25 @@ module.exports = (repo) => { function hasBlocks (msg, store, cb) { each(Array.from(msg.blocks.values()), (b, next) => { - if (!b.cancel) { - store.has(b.key, next) - } else { - next() - } + b.key((err, key) => { + if (err) { + return next(err) + } + if (!b.cancel) { + store.has(key, next) + } else { + next() + } + }) }, cb) } + +function blockToStore (b, cb) { + b.key((err, key) => { + if (err) { + return cb(err) + } + + cb(null, {data: b.data, key: key}) + }) +} diff --git a/test/message.spec.js b/test/message.spec.js index 4d66cb9d..4cbb9ac7 100644 --- a/test/message.spec.js +++ b/test/message.spec.js @@ -2,63 +2,83 @@ 'use strict' const expect = require('chai').expect -const fs = require('fs') const Block = require('ipfs-block') const protobuf = require('protocol-buffers') -const path = require('path') const mh = require('multihashes') -const pbm = protobuf(fs.readFileSync(path.join(__dirname, '../src/message/message.proto'))) +const series = require('async/series') +const map = require('async/map') +const pbm = protobuf(require('../src/message/message.proto')) const BitswapMessage = require('../src/message') describe('BitswapMessage', () => { - it('go interop', () => { + let blocks + let keys + + before((done) => { + const data = [ + 'foo', + 'hello', + 'world' + ] + blocks = data.map((d) => new Block(d)) + map(blocks, (b, cb) => b.key(cb), (err, res) => { + if (err) { + return done(err) + } + keys = res + done() + }) + }) + + it('go interop', (done) => { const goEncoded = new Buffer('CioKKAoiEiAs8k26X7CjDiboOyrFueKeGxYeXB+nQl5zBDNik4uYJBAKGAA=', 'base64') const m = new BitswapMessage(false) m.addEntry(mh.fromB58String('QmRN6wdp1S2A5EtjW9A3M1vKSBuQQGcgvuhoMUoEz4iiT5'), 10) - expect( - BitswapMessage.fromProto(goEncoded) - ).to.be.eql( - m - ) + BitswapMessage.fromProto(goEncoded, (err, res) => { + expect(err).to.not.exist + expect(res).to.be.eql(m) - expect( - m.toProto() - ).to.be.eql( - goEncoded - ) + expect( + m.toProto() + ).to.be.eql( + goEncoded + ) + done() + }) }) it('append wanted', () => { - const str = 'foo' - const block = new Block(str) + const key = keys[1] const m = new BitswapMessage(true) - m.addEntry(block.key, 1) + m.addEntry(key, 1) expect( pbm.Message.decode(m.toProto()).wantlist.entries[0] ).to.be.eql({ - block: block.key, + block: key, priority: 1, cancel: false }) }) - it('encodes blocks', () => { - const block = new Block('hello') + it('encodes blocks', (done) => { + const block = blocks[1] const m = new BitswapMessage(true) - m.addBlock(block) - - expect( - pbm.Message.decode(m.toProto()).blocks - ).to.be.eql([ - block.data - ]) + m.addBlock(block, (err) => { + expect(err).to.not.exist + expect( + pbm.Message.decode(m.toProto()).blocks + ).to.be.eql([ + block.data + ]) + done() + }) }) - it('new message fromProto', () => { + it('new message fromProto', (done) => { const raw = pbm.Message.encode({ wantlist: { entries: [{ @@ -70,42 +90,52 @@ describe('BitswapMessage', () => { blocks: ['hello', 'world'] }) - const protoMessage = BitswapMessage.fromProto(raw) - - expect( - protoMessage.full - ).to.be.eql( - true - ) - expect( - Array.from(protoMessage.wantlist) - ).to.be.eql([ - [mh.toB58String(new Buffer('hello')), new BitswapMessage.Entry(new Buffer('hello'), 0, false)] - ]) - - const b1 = new Block('hello') - const b2 = new Block('world') - expect( - Array.from(protoMessage.blocks) - ).to.be.eql([ - [mh.toB58String(b1.key), b1], - [mh.toB58String(b2.key), b2] - ]) + BitswapMessage.fromProto(raw, (err, protoMessage) => { + expect(err).to.not.exist + expect( + protoMessage.full + ).to.be.eql( + true + ) + expect( + Array.from(protoMessage.wantlist) + ).to.be.eql([ + [mh.toB58String(new Buffer('hello')), new BitswapMessage.Entry(new Buffer('hello'), 0, false)] + ]) + + const b1 = blocks[1] + const b2 = blocks[2] + const k1 = keys[1] + const k2 = keys[2] + + expect( + Array.from(protoMessage.blocks).map((b) => [b[0], b[1].data]) + ).to.be.eql([ + [mh.toB58String(k1), b1.data], + [mh.toB58String(k2), b2.data] + ]) + + done() + }) }) - it('duplicates', () => { - const b = new Block('foo') + it('duplicates', (done) => { + const b = blocks[0] + const key = keys[0] const m = new BitswapMessage(true) - m.addEntry(b.key, 1) - m.addEntry(b.key, 1) + m.addEntry(key, 1) + m.addEntry(key, 1) expect(m.wantlist.size).to.be.eql(1) - - m.addBlock(b) - m.addBlock(b) - - expect(m.blocks.size).to.be.eql(1) + series([ + (cb) => m.addBlock(b, cb), + (cb) => m.addBlock(b, cb) + ], (err) => { + expect(err).to.not.exist + expect(m.blocks.size).to.be.eql(1) + done() + }) }) it('empty', () => { @@ -129,30 +159,42 @@ describe('BitswapMessage', () => { }) describe('.equals', () => { - it('true, same message', () => { - const b = new Block('foo') + it('true, same message', (done) => { + const b = blocks[0] + const key = keys[0] const m1 = new BitswapMessage(true) const m2 = new BitswapMessage(true) - m1.addEntry(b.key, 1) - m1.addBlock(b) - m2.addEntry(b.key, 1) - m2.addBlock(b) - - expect(m1.equals(m2)).to.be.eql(true) + m1.addEntry(key, 1) + m2.addEntry(key, 1) + + series([ + (cb) => m1.addBlock(b, cb), + (cb) => m2.addBlock(b, cb) + ], (err) => { + expect(err).to.not.exist + expect(m1.equals(m2)).to.be.eql(true) + done() + }) }) - it('false, different entries', () => { - const b = new Block('foo') + it('false, different entries', (done) => { + const b = blocks[0] + const key = keys[0] const m1 = new BitswapMessage(true) const m2 = new BitswapMessage(true) - m1.addEntry(b.key, 1) - m1.addBlock(b) - m2.addEntry(b.key, 2) - m2.addBlock(b) - - expect(m1.equals(m2)).to.be.eql(false) + m1.addEntry(key, 1) + m2.addEntry(key, 2) + + series([ + (cb) => m1.addBlock(b, cb), + (cb) => m2.addBlock(b, cb) + ], (err) => { + expect(err).to.not.exist + expect(m1.equals(m2)).to.be.eql(false) + done() + }) }) }) diff --git a/test/network/gen-bitswap-network.node.js b/test/network/gen-bitswap-network.node.js index 8c8725dd..ecd13591 100644 --- a/test/network/gen-bitswap-network.node.js +++ b/test/network/gen-bitswap-network.node.js @@ -1,16 +1,17 @@ -/* eslint-env mocha */ /* eslint max-nested-callbacks: ["error", 8] */ +/* eslint-env mocha */ 'use strict' const expect = require('chai').expect -const utils = require('../utils') const series = require('async/series') const parallel = require('async/parallel') +const map = require('async/map') const each = require('async/each') const _ = require('lodash') const Block = require('ipfs-block') const Buffer = require('safe-buffer').Buffer const pull = require('pull-stream') +const utils = require('../utils') describe('gen Bitswap network', function () { // CI is very slow @@ -21,32 +22,49 @@ describe('gen Bitswap network', function () { expect(err).to.not.exist const node = nodes[0] - const blocks = _.range(100).map((k) => { - const b = Buffer.alloc(1024) - b.fill(k) - return new Block(b) - }) + let blocks series([ + (cb) => map(_.range(100), (k, cb) => { + const b = Buffer.alloc(1024) + b.fill(k) + cb(null, new Block(b)) + }, (err, _blocks) => { + if (err) { + return cb(err) + } + blocks = _blocks + cb() + }), (cb) => { pull( pull.values(blocks), + pull.asyncMap((b, cb) => { + b.key((err, key) => { + if (err) { + return cb(err) + } + + cb(null, {data: b.data, key: key}) + }) + }), node.bitswap.putStream(), pull.onEnd(cb) ) }, (cb) => { each(_.range(100), (i, cb) => { - pull( - node.bitswap.getStream( - blocks.map((b) => b.key) - ), - pull.collect((err, res) => { - if (err) return cb(err) - expect(res).to.have.length(blocks.length) - cb() - }) - ) + map(blocks, (b, cb) => b.key(cb), (err, keys) => { + expect(err).to.not.exist + pull( + node.bitswap.getStream(keys), + pull.collect((err, res) => { + expect(err).to.not.exist + expect(res).to.have.length(blocks.length) + cb() + }) + ) + }) }, cb) } ], (err) => { @@ -79,45 +97,55 @@ describe('gen Bitswap network', function () { const round = (j, cb) => { const blockFactor = 10 - const blocks = _.range(n * blockFactor).map((k) => { - const buf = Buffer.alloc(1024) - buf.fill(k) - buf[0] = j - return new Block(buf) - }) + map(_.range(n * blockFactor), (k, cb) => { + const b = Buffer.alloc(1024) + b.fill(k) + cb(null, new Block(b)) + }, (err, blocks) => { + if (err) { + return cb(err) + } - const d = (new Date()).getTime() + const d = (new Date()).getTime() - parallel(_.map(nodeArr, (node, i) => (callback) => { - node.bitswap.start() - parallel([ - (finish) => { - pull( + parallel(_.map(nodeArr, (node, i) => (callback) => { + node.bitswap.start() + parallel([ + (finish) => pull( pull.values( _.range(blockFactor) ), pull.map((j) => blocks[i * blockFactor + j]), + pull.asyncMap((b, cb) => { + b.key((err, key) => { + if (err) { + return cb(err) + } + cb(null, {data: b.data, key: key}) + }) + }), node.bitswap.putStream(), pull.onEnd(finish) - ) - }, - (finish) => { - pull( - node.bitswap.getStream( - blocks.map((b) => b.key) - ), - pull.collect((err, res) => { - if (err) return finish(err) - expect(res).to.have.length(blocks.length) - finish() + ), + (finish) => { + map(blocks, (b, cb) => b.key(cb), (err, keys) => { + expect(err).to.not.exist + pull( + node.bitswap.getStream(keys), + pull.collect((err, res) => { + expect(err).to.not.exist + expect(res).to.have.length(blocks.length) + finish() + }) + ) }) - ) - } - ], callback) - }), (err) => { - if (err) return cb(err) - console.log(' time -- %s', (new Date()).getTime() - d) - cb() + } + ], callback) + }), (err) => { + expect(err).to.not.exist + console.log(' time -- %s', (new Date()).getTime() - d) + cb() + }) }) } @@ -133,7 +161,7 @@ describe('gen Bitswap network', function () { }), (err2) => { done(err || err2) }) - }, 2000) + }, 3000) } ) }) diff --git a/test/network/network.node.js b/test/network/network.node.js index 7621b3b1..317952ce 100644 --- a/test/network/network.node.js +++ b/test/network/network.node.js @@ -1,5 +1,4 @@ /* eslint-env mocha */ - 'use strict' const libp2p = require('libp2p-ipfs') @@ -10,13 +9,13 @@ const PeerBook = require('peer-book') const Block = require('ipfs-block') const lp = require('pull-length-prefixed') const pull = require('pull-stream') +const parallel = require('async/parallel') +const series = require('async/series') const Network = require('../../src/network') const Message = require('../../src/message') -describe('network', function () { - this.timeout(15 * 1000) - +describe('network', () => { let libp2pNodeA let libp2pNodeB let peerInfoA @@ -25,31 +24,42 @@ describe('network', function () { let peerBookB let networkA let networkB + let blocks before((done) => { let counter = 0 - peerInfoA = new PeerInfo() - peerInfoB = new PeerInfo() + parallel([ + (cb) => PeerInfo.create(cb), + (cb) => PeerInfo.create(cb) + ], (err, results) => { + if (err) { + return done(err) + } - peerInfoA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/10100/ipfs/' + peerInfoA.id.toB58String())) - peerInfoB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/10500/ipfs/' + peerInfoB.id.toB58String())) + peerInfoA = results[0] + peerInfoB = results[1] + blocks = ['hello', 'world'].map((b) => new Block(b)) - peerBookA = new PeerBook() - peerBookB = new PeerBook() + peerInfoA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/10100/ipfs/' + peerInfoA.id.toB58String())) + peerInfoB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/10500/ipfs/' + peerInfoB.id.toB58String())) - peerBookA.put(peerInfoB) - peerBookB.put(peerInfoA) + peerBookA = new PeerBook() + peerBookB = new PeerBook() - libp2pNodeA = new libp2p.Node(peerInfoA, peerBookA) - libp2pNodeA.start(started) - libp2pNodeB = new libp2p.Node(peerInfoB, peerBookB) - libp2pNodeB.start(started) + peerBookA.put(peerInfoB) + peerBookB.put(peerInfoA) - function started () { - if (++counter === 2) { - done() + libp2pNodeA = new libp2p.Node(peerInfoA, peerBookA) + libp2pNodeA.start(started) + libp2pNodeB = new libp2p.Node(peerInfoB, peerBookB) + libp2pNodeB.start(started) + + function started () { + if (++counter === 2) { + done() + } } - } + }) }) after((done) => { @@ -133,48 +143,61 @@ describe('network', function () { it('_receiveMessage success', (done) => { const msg = new Message(true) - const b = new Block('hello') - msg.addEntry(b.key, 0, false) - msg.addBlock(b) - msg.addBlock(new Block('world')) - - bitswapMockB._receiveMessage = (peerId, msgReceived) => { - expect(msg).to.deep.equal(msgReceived) - bitswapMockB._receiveMessage = () => {} - bitswapMockB._receiveError = () => {} - done() - } - - bitswapMockB._receiveError = (err) => { - expect(err).to.not.exist - } + const b = blocks[0] - libp2pNodeA.dialByPeerInfo(peerInfoB, '/ipfs/bitswap/1.0.0', (err, conn) => { + b.key((err, key) => { expect(err).to.not.exist - - pull( - pull.values([msg.toProto()]), - lp.encode(), - conn - ) + msg.addEntry(key, 0, false) + + series([ + (cb) => msg.addBlock(b, cb), + (cb) => msg.addBlock(blocks[1], cb) + ], (err) => { + expect(err).to.not.exist + bitswapMockB._receiveMessage = (peerId, msgReceived) => { + expect(msg).to.deep.equal(msgReceived) + bitswapMockB._receiveMessage = () => {} + bitswapMockB._receiveError = () => {} + done() + } + + bitswapMockB._receiveError = (err) => { + expect(err).to.not.exist + } + + libp2pNodeA.dialByPeerInfo(peerInfoB, '/ipfs/bitswap/1.0.0', (err, conn) => { + expect(err).to.not.exist + + pull( + pull.values([msg.toProto()]), + lp.encode(), + conn + ) + }) + }) }) }) it('sendMessage', (done) => { const msg = new Message(true) - const b = new Block('hello') - msg.addEntry(b.key, 0, false) - msg.addBlock(b) - msg.addBlock(new Block('world')) - - bitswapMockB._receiveMessage = (peerId, msgReceived) => { - expect(msg).to.deep.equal(msgReceived) - bitswapMockB._receiveMessage = () => {} - done() - } - - networkA.sendMessage(peerInfoB.id, msg, (err) => { + blocks[0].key((err, key) => { expect(err).to.not.exist + msg.addEntry(key, 0, false) + series([ + (cb) => msg.addBlock(blocks[0], cb), + (cb) => msg.addBlock(blocks[1], cb) + ], (err) => { + expect(err).to.not.exist + bitswapMockB._receiveMessage = (peerId, msgReceived) => { + expect(msg).to.deep.equal(msgReceived) + bitswapMockB._receiveMessage = () => {} + done() + } + + networkA.sendMessage(peerInfoB.id, msg, (err) => { + expect(err).to.not.exist + }) + }) }) }) }) diff --git a/test/utils.js b/test/utils.js index 6ffee229..2199b296 100644 --- a/test/utils.js +++ b/test/utils.js @@ -3,6 +3,7 @@ const each = require('async/each') const eachSeries = require('async/eachSeries') const map = require('async/map') +const parallel = require('async/parallel') const _ = require('lodash') const PeerId = require('peer-id') const PeerInfo = require('peer-info') @@ -12,7 +13,7 @@ const Bitswap = require('../src') const libp2p = require('libp2p-ipfs') const os = require('os') const Repo = require('ipfs-repo') -const Store = require('interface-pull-blob-store/lib/reference') +const Store = require('interface-pull-blob-store') exports.mockNetwork = (calls, done) => { done = done || (() => {}) @@ -47,13 +48,16 @@ exports.mockNetwork = (calls, done) => { } exports.createMockNet = (repo, count, cb) => { - map(_.range(count), (i, cb) => repo.create(`repo-${i}`, (err, res) => { - if (err) return cb(err) - cb(null, res.blockstore) - }), (err, stores) => { - if (err) return cb(err) + parallel([ + (cb) => map(_.range(count), (i, cb) => repo.create(`repo-${i}`), cb), + (cb) => map(_.range(count), (i, cb) => PeerId.create(cb), cb) + ], (err, results) => { + if (err) { + return cb(err) + } + const stores = results[0].map((r) => r.blockstore) + const ids = results[1] - const ids = _.range(count).map((i) => PeerId.create({bits: 64})) const hexIds = ids.map((id) => id.toHexString()) const bitswaps = _.range(count).map((i) => new Bitswap(ids[i], {}, stores[i])) const networks = _.range(count).map((i) => { @@ -99,82 +103,87 @@ exports.genBitswapNetwork = (n, callback) => { const basePort = 12000 // create PeerInfo and libp2p.Node for each - _.range(n).forEach((i) => { - const p = new PeerInfo() - const mh1 = multiaddr('/ip4/127.0.0.1/tcp/' + (basePort + i) + - '/ipfs/' + p.id.toB58String()) - p.multiaddr.add(mh1) - - // const mh2 = multiaddr('/ip4/127.0.0.1/tcp/' + (basePort + i + 2000) + '/ws' + - // '/ipfs/' + p.id.toB58String()) - // p.multiaddr.add(mh2) - - const l = new libp2p.Node(p) - netArray.push({peerInfo: p, libp2p: l}) - }) - - // create PeerBook and populate peerBook - netArray.forEach((net, i) => { - const pb = netArray[i].libp2p.peerBook - netArray.forEach((net, j) => { - if (i === j) { - return - } - pb.put(net.peerInfo) - }) - netArray[i].peerBook = pb - }) - - // create the repos - const tmpDir = os.tmpdir() - netArray.forEach((net, i) => { - const repoPath = tmpDir + '/' + net.peerInfo.id.toB58String() - net.repo = new Repo(repoPath, { stores: Store }) - }) - - // start every libp2pNode - each(netArray, (net, cb) => { - net.libp2p.start(cb) - }, (err) => { + map(_.range(n), (i, cb) => PeerInfo.create(cb), (err, peers) => { if (err) { - throw err + return callback(err) } - createBitswaps() - }) - // create every BitSwap - function createBitswaps () { - netArray.forEach((net) => { - net.bitswap = new Bitswap(net.peerInfo, net.libp2p, net.repo.blockstore, net.peerBook) + peers.forEach((p, i) => { + const mh1 = multiaddr('/ip4/127.0.0.1/tcp/' + (basePort + i) + + '/ipfs/' + p.id.toB58String()) + p.multiaddr.add(mh1) + + // const mh2 = multiaddr('/ip4/127.0.0.1/tcp/' + (basePort + i + 2000) + '/ws' + + // '/ipfs/' + p.id.toB58String()) + // p.multiaddr.add(mh2) + + const l = new libp2p.Node(p) + netArray.push({peerInfo: p, libp2p: l}) }) - establishLinks() - } - // connect all the nodes between each other - function establishLinks () { - eachSeries(netArray, (from, cbI) => { - eachSeries(netArray, (to, cbJ) => { - if (from.peerInfo.id.toB58String() === - to.peerInfo.id.toB58String()) { - return cbJ() - } - from.libp2p.dialByPeerInfo(to.peerInfo, cbJ) - }, (err) => { - if (err) { - throw err + // create PeerBook and populate peerBook + netArray.forEach((net, i) => { + const pb = netArray[i].libp2p.peerBook + netArray.forEach((net, j) => { + if (i === j) { + return } - cbI() + pb.put(net.peerInfo) }) + netArray[i].peerBook = pb + }) + + // create the repos + const tmpDir = os.tmpdir() + netArray.forEach((net, i) => { + const repoPath = tmpDir + '/' + net.peerInfo.id.toB58String() + net.repo = new Repo(repoPath, { stores: Store }) + }) + + // start every libp2pNode + each(netArray, (net, cb) => { + net.libp2p.start(cb) }, (err) => { if (err) { throw err } - finish() + createBitswaps() }) - } - // callback with netArray - function finish () { - callback(null, netArray) - } + // create every BitSwap + function createBitswaps () { + netArray.forEach((net) => { + net.bitswap = new Bitswap(net.peerInfo, net.libp2p, net.repo.blockstore, net.peerBook) + }) + establishLinks() + } + + // connect all the nodes between each other + function establishLinks () { + eachSeries(netArray, (from, cbI) => { + eachSeries(netArray, (to, cbJ) => { + if (from.peerInfo.id.toB58String() === + to.peerInfo.id.toB58String()) { + return cbJ() + } + from.libp2p.dialByPeerInfo(to.peerInfo, cbJ) + }, (err) => { + if (err) { + throw err + } + cbI() + }) + }, (err) => { + if (err) { + throw err + } + finish() + }) + } + + // callback with netArray + function finish () { + callback(null, netArray) + } + }) } diff --git a/test/wantlist.spec.js b/test/wantlist.spec.js index f684b243..3bcf8945 100644 --- a/test/wantlist.spec.js +++ b/test/wantlist.spec.js @@ -4,106 +4,143 @@ const expect = require('chai').expect const Block = require('ipfs-block') const mh = require('multihashes') +const map = require('async/map') const Wantlist = require('../src/wantlist') describe('Wantlist', () => { let wm + let blocks + + before((done) => { + const data = ['hello', 'world'] + blocks = data.map((d) => new Block(d)) + done() + }) + beforeEach(() => { wm = new Wantlist() }) - it('length', () => { - const b1 = new Block('hello') - const b2 = new Block('world') + it('length', (done) => { + const b1 = blocks[0] + const b2 = blocks[1] - wm.add(b1.key, 2) - wm.add(b2.key, 1) + map([b1, b2], (b, cb) => b.key(cb), (err, keys) => { + expect(err).to.not.exist + wm.add(keys[0], 2) + wm.add(keys[1], 1) - expect(wm).to.have.length(2) + expect(wm).to.have.length(2) + done() + }) }) describe('remove', () => { - it('removes with a single ref', () => { - const b = new Block('hello') + it('removes with a single ref', (done) => { + const b = blocks[0] - wm.add(b.key, 1) - wm.remove(b.key) + b.key((err, key) => { + expect(err).to.not.exist + wm.add(key, 1) + wm.remove(key) - expect(wm).to.have.length(0) + expect(wm).to.have.length(0) + done() + }) }) - it('removes with multiple refs', () => { - const b1 = new Block('hello') - const b2 = new Block('world') + it('removes with multiple refs', (done) => { + const b1 = blocks[0] + const b2 = blocks[1] - wm.add(b1.key, 1) - wm.add(b2.key, 2) + map([b1, b2], (b, cb) => b.key(cb), (err, keys) => { + expect(err).to.not.exist + wm.add(keys[0], 1) + wm.add(keys[1], 2) - expect(wm).to.have.length(2) + expect(wm).to.have.length(2) - wm.remove(b2.key) + wm.remove(keys[1]) - expect(wm).to.have.length(1) + expect(wm).to.have.length(1) - wm.add(b1.key, 2) - wm.remove(b1.key) + wm.add(keys[0], 2) + wm.remove(keys[0]) - expect(wm).to.have.length(1) + expect(wm).to.have.length(1) - wm.remove(b1.key) + wm.remove(keys[0]) - expect(wm).to.have.length(0) + expect(wm).to.have.length(0) + done() + }) }) - it('ignores non existing removes', () => { - const b = new Block('hello') + it('ignores non existing removes', (done) => { + const b = blocks[0] - wm.add(b.key, 1) - wm.remove(b.key) - wm.remove(b.key) + b.key((err, key) => { + expect(err).to.not.exist + wm.add(key, 1) + wm.remove(key) + wm.remove(key) - expect(wm).to.have.length(0) + expect(wm).to.have.length(0) + done() + }) }) }) - it('entries', () => { - const b = new Block('hello') - wm.add(b.key, 2) - - expect( - Array.from(wm.entries()) - ).to.be.eql([ - [mh.toB58String(b.key), new Wantlist.Entry(b.key, 2)] - ]) + it('entries', (done) => { + const b = blocks[0] + b.key((err, key) => { + expect(err).to.not.exist + wm.add(key, 2) + + expect( + Array.from(wm.entries()) + ).to.be.eql([ + [mh.toB58String(key), new Wantlist.Entry(key, 2)] + ]) + done() + }) }) - it('sortedEntries', () => { - const b1 = new Block('hello') - const b2 = new Block('world') - - wm.add(b2.key, 1) - wm.add(b1.key, 1) - - expect( - Array.from(wm.sortedEntries()) - ).to.be.eql([ - [mh.toB58String(b1.key), new Wantlist.Entry(b1.key, 1)], - [mh.toB58String(b2.key), new Wantlist.Entry(b2.key, 1)] - ]) + it('sortedEntries', (done) => { + const b1 = blocks[0] + const b2 = blocks[1] + + map([b1, b2], (b, cb) => b.key(cb), (err, keys) => { + expect(err).to.not.exist + wm.add(keys[1], 1) + wm.add(keys[0], 1) + + expect( + Array.from(wm.sortedEntries()) + ).to.be.eql([ + [mh.toB58String(keys[0]), new Wantlist.Entry(keys[0], 1)], + [mh.toB58String(keys[1]), new Wantlist.Entry(keys[1], 1)] + ]) + done() + }) }) - it('contains', () => { - const b1 = new Block('hello') - const b2 = new Block('world') - wm.add(b1.key, 2) - - expect( - wm.contains(b1.key) - ).to.exist - - expect( - wm.contains(b2.key) - ).to.not.exist + it('contains', (done) => { + const b1 = blocks[0] + const b2 = blocks[1] + map([b1, b2], (b, cb) => b.key(cb), (err, keys) => { + expect(err).to.not.exist + wm.add(keys[0], 2) + + expect( + wm.contains(keys[0]) + ).to.exist + + expect( + wm.contains(keys[1]) + ).to.not.exist + done() + }) }) }) diff --git a/test/wantmanager/index.spec.js b/test/wantmanager/index.spec.js index cd761479..0f5a52bd 100644 --- a/test/wantmanager/index.spec.js +++ b/test/wantmanager/index.spec.js @@ -3,6 +3,8 @@ const expect = require('chai').expect const PeerId = require('peer-id') +const parallel = require('async/parallel') +const series = require('async/series') const cs = require('../../src/constants') const Message = require('../../src/message') @@ -12,48 +14,60 @@ const mockNetwork = require('../utils').mockNetwork describe('Wantmanager', () => { it('sends wantlist to all connected peers', (done) => { - const peer1 = PeerId.create({bits: 64}) - const peer2 = PeerId.create({bits: 64}) - let wm - const network = mockNetwork(6, (calls) => { - expect(calls.connects).to.have.length(6) - const m1 = new Message(true) - m1.addEntry(new Buffer('hello'), cs.kMaxPriority) - m1.addEntry(new Buffer('world'), cs.kMaxPriority - 1) - - const m2 = new Message(false) - m2.cancel(new Buffer('world')) - - const m3 = new Message(false) - m3.addEntry(new Buffer('foo'), cs.kMaxPriority) - - const msgs = [m1, m1, m2, m2, m3, m3] - - calls.messages.forEach((m, i) => { - expect(m[0]).to.be.eql(calls.connects[i]) - expect(m[1].equals(msgs[i])).to.be.eql(true) - }) + parallel([ + (cb) => PeerId.create(cb), + (cb) => PeerId.create(cb) + ], (err, peers) => { + if (err) { + return done(err) + } - wm = null - done() - }) + const peer1 = peers[0] + const peer2 = peers[1] - wm = new Wantmanager(network) + let wm + const network = mockNetwork(6, (calls) => { + expect(calls.connects).to.have.length(6) + const m1 = new Message(true) + m1.addEntry(new Buffer('hello'), cs.kMaxPriority) + m1.addEntry(new Buffer('world'), cs.kMaxPriority - 1) - wm.run() - wm.wantBlocks([new Buffer('hello'), new Buffer('world')]) + const m2 = new Message(false) + m2.cancel(new Buffer('world')) - wm.connected(peer1) - wm.connected(peer2) + const m3 = new Message(false) + m3.addEntry(new Buffer('foo'), cs.kMaxPriority) - setTimeout(() => { - wm.cancelWants([new Buffer('world')]) - setTimeout(() => { - wm.wantBlocks([new Buffer('foo')]) + const msgs = [m1, m1, m2, m2, m3, m3] + + calls.messages.forEach((m, i) => { + expect(m[0]).to.be.eql(calls.connects[i]) + expect(m[1].equals(msgs[i])).to.be.eql(true) + }) + + wm = null + done() + }) - wm.disconnected(peer1) - wm.disconnected(peer2) - }, 100) - }, 100) + wm = new Wantmanager(network) + + wm.run() + wm.wantBlocks([new Buffer('hello'), new Buffer('world')]) + + wm.connected(peer1) + wm.connected(peer2) + + series([ + (cb) => setTimeout(cb, 100), + (cb) => { + wm.cancelWants([new Buffer('world')]) + cb() + }, + (cb) => setTimeout(cb, 100) + ], (err) => { + expect(err).to.not.exist + wm.wantBlocks([new Buffer('foo')]) + }) + }) }) }) diff --git a/test/wantmanager/msg-queue.spec.js b/test/wantmanager/msg-queue.spec.js index 9e41a613..7edace3e 100644 --- a/test/wantmanager/msg-queue.spec.js +++ b/test/wantmanager/msg-queue.spec.js @@ -9,70 +9,75 @@ const MsgQueue = require('../../src/wantmanager/msg-queue') describe('MsgQueue', () => { it('connects and sends messages', (done) => { - const id = PeerId.create({bits: 64}) - const msg = new Message(true) - msg.addEntry(new Buffer('hello world'), 3) - msg.addEntry(new Buffer('foo bar'), 1) + PeerId.create((err, id) => { + if (err) { + return done(err) + } + + const msg = new Message(true) + msg.addEntry(new Buffer('hello world'), 3) + msg.addEntry(new Buffer('foo bar'), 1) - const messages = [] - const connects = [] - let i = 0 - const finish = () => { - i++ - if (i === 3) { - expect( - connects - ).to.be.eql([ - id, id, id - ]) + const messages = [] + const connects = [] + let i = 0 + const finish = () => { + i++ + if (i === 3) { + expect( + connects + ).to.be.eql([ + id, id, id + ]) - const m1 = new Message(false) - m1.addEntry(new Buffer('hello'), 1) - m1.addEntry(new Buffer('world'), 2) - const m2 = new Message(false) - m2.cancel(new Buffer('foo')) - m2.cancel(new Buffer('bar')) + const m1 = new Message(false) + m1.addEntry(new Buffer('hello'), 1) + m1.addEntry(new Buffer('world'), 2) + const m2 = new Message(false) + m2.cancel(new Buffer('foo')) + m2.cancel(new Buffer('bar')) - expect( - messages - ).to.be.eql([ - [id, m1], - [id, m2], - [id, msg] - ]) + expect( + messages + ).to.be.eql([ + [id, m1], + [id, m2], + [id, msg] + ]) - done() + done() + } } - } - const network = { - connectTo (p, cb) { - connects.push(p) - cb() - }, - sendMessage (p, msg, cb) { - messages.push([p, msg]) - cb() - finish() + const network = { + connectTo (p, cb) { + connects.push(p) + cb() + }, + sendMessage (p, msg, cb) { + messages.push([p, msg]) + cb() + finish() + } } - } - const mq = new MsgQueue(id, network) + const mq = new MsgQueue(id, network) - expect(mq.refcnt).to.be.eql(1) + expect(mq.refcnt).to.be.eql(1) - const batch1 = [ - new Message.Entry(new Buffer('hello'), 1, false), - new Message.Entry(new Buffer('world'), 2, false) - ] + const batch1 = [ + new Message.Entry(new Buffer('hello'), 1, false), + new Message.Entry(new Buffer('world'), 2, false) + ] - const batch2 = [ - new Message.Entry(new Buffer('foo'), 1, true), - new Message.Entry(new Buffer('bar'), 2, true) - ] + const batch2 = [ + new Message.Entry(new Buffer('foo'), 1, true), + new Message.Entry(new Buffer('bar'), 2, true) + ] - mq.run() - mq.addEntries(batch1) - mq.addEntries(batch2) - mq.addMessage(msg) + mq.run() + mq.addEntries(batch1) + mq.addEntries(batch2) + mq.addMessage(msg) + }) }) })