From 27fe1f72d16ca1c441d47c2ce921b071e867407d Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Wed, 14 Dec 2016 13:16:37 +0100 Subject: [PATCH] feat: perf stuffs - reduce base58 encoding - use utf8 encoding instead of hex - extract benchmarks from tests - group multiple blocks into single messages --- benchmarks/index.js | 108 ++++++++++ benchmarks/put-get.js | 89 +++++++++ package.json | 9 +- src/decision/engine.js | 242 +++++++++++++---------- src/decision/peer-request-queue.js | 182 ----------------- src/index.js | 75 ++----- src/message/entry.js | 8 +- src/message/index.js | 23 ++- src/network/index.js | 29 ++- src/wantlist/entry.js | 21 +- src/wantlist/index.js | 16 +- src/wantmanager/index.js | 67 +++---- src/wantmanager/msg-queue.js | 57 +++--- test/decision/engine-test.js | 20 +- test/decision/peer-request-queue.spec.js | 148 -------------- test/decision/pq.spec.js | 73 ------- test/index-test.js | 5 +- test/message.spec.js | 9 +- test/network/gen-bitswap-network.node.js | 169 ++++++++-------- test/utils.js | 20 +- test/wantlist.spec.js | 7 +- test/wantmanager/index.spec.js | 4 +- test/wantmanager/msg-queue.spec.js | 15 +- 23 files changed, 595 insertions(+), 801 deletions(-) create mode 100644 benchmarks/index.js create mode 100644 benchmarks/put-get.js delete mode 100644 src/decision/peer-request-queue.js delete mode 100644 test/decision/peer-request-queue.spec.js delete mode 100644 test/decision/pq.spec.js diff --git a/benchmarks/index.js b/benchmarks/index.js new file mode 100644 index 00000000..73537139 --- /dev/null +++ b/benchmarks/index.js @@ -0,0 +1,108 @@ +/* eslint max-nested-callbacks: ["error", 8] */ +'use strict' + +const series = require('async/series') +const parallel = require('async/parallel') +const map = require('async/map') +const mapSeries = require('async/mapSeries') +const each = require('async/each') +const _ = require('lodash') +const Block = require('ipfs-block') +const pull = require('pull-stream') +const assert = require('assert') +const crypto = require('crypto') + +const utils = require('../test/utils') + +const nodes = [2, 5, 10, 20] +const blockFactors = [1, 10, 100] + +console.log('-- start') +mapSeries(nodes, (n, cb) => { + mapSeries(blockFactors, (blockFactor, cb) => { + utils.genBitswapNetwork(n, (err, nodeArr) => { + if (err) { + return cb(err) + } + + round(nodeArr, blockFactor, n, (err) => { + if (err) { + return cb(err) + } + + shutdown(nodeArr, cb) + }) + }) + }, cb) +}, (err) => { + if (err) { + throw err + } + console.log('-- finished') +}) + +function shutdown (nodeArr, cb) { + each(nodeArr, (node, cb) => { + node.bitswap.stop() + node.libp2p.stop(cb) + }, cb) +} + +function round (nodeArr, blockFactor, n, cb) { + const blocks = createBlocks(n, blockFactor) + map(blocks, (b, cb) => b.key(cb), (err, keys) => { + if (err) { + return cb(err) + } + let d + series([ + // put blockFactor amount of blocks per node + (cb) => parallel(_.map(nodeArr, (node, i) => (callback) => { + node.bitswap.start() + + const data = _.map(_.range(blockFactor), (j) => { + const index = i * blockFactor + j + return { + data: blocks[index].data, + key: keys[index] + } + }) + each( + data, + (d, cb) => node.bitswap.put(d, cb), + callback + ) + }), cb), + (cb) => { + d = (new Date()).getTime() + cb() + }, + // fetch all blocks on every node + (cb) => parallel(_.map(nodeArr, (node, i) => (callback) => { + pull( + node.bitswap.getStream(keys), + pull.collect((err, res) => { + if (err) { + return callback(err) + } + + assert(res.length === blocks.length) + callback() + }) + ) + }), cb) + ], (err) => { + if (err) { + return cb(err) + } + console.log(' %s nodes - %s blocks/node - %sms', n, blockFactor, (new Date()).getTime() - d) + cb() + }) + }) +} + +function createBlocks (n, blockFactor) { + return _.map(_.range(n * blockFactor), () => { + return new Block(crypto.randomBytes(n * blockFactor)) + }) +} diff --git a/benchmarks/put-get.js b/benchmarks/put-get.js new file mode 100644 index 00000000..333f0353 --- /dev/null +++ b/benchmarks/put-get.js @@ -0,0 +1,89 @@ +'use strict' + +const Benchmark = require('benchmark') +const _ = require('lodash') +const Block = require('ipfs-block') +const assert = require('assert') +const pull = require('pull-stream') +const series = require('async/series') +const crypto = require('crypto') +const utils = require('../test/utils') + +const suite = new Benchmark.Suite('put-get') + +const blockCounts = [1, 10, 1000] +const blockSizes = [10, 1024, 10 * 1024] + +utils.genBitswapNetwork(1, (err, nodes) => { + if (err) { + throw err + } + const node = nodes[0] + const bitswap = node.bitswap + + blockCounts.forEach((n) => blockSizes.forEach((k) => { + suite.add(`put-get ${n} blocks of size ${k}`, (defer) => { + const blocks = createBlocks(n, k) + series([ + (cb) => put(blocks, bitswap, cb), + (cb) => get(blocks, bitswap, cb) + ], (err) => { + if (err) { + throw err + } + defer.resolve() + }) + }, { + defer: true + }) + })) + + suite + .on('cycle', (event) => { + console.log(String(event.target)) + }) + .on('complete', () => { + process.exit() + }) + .run({ + async: true + }) +}) + +function createBlocks (n, k) { + return _.map(_.range(n), () => { + return new Block(crypto.randomBytes(k)) + }) +} + +function put (blocks, bs, callback) { + pull( + pull.values(blocks), + pull.asyncMap((b, cb) => { + b.key((err, key) => { + if (err) { + return cb(err) + } + cb(null, {key: key, data: b.data}) + }) + }), + bs.putStream(), + pull.onEnd(callback) + ) +} + +function get (blocks, bs, callback) { + pull( + pull.values(blocks), + pull.asyncMap((b, cb) => b.key(cb)), + pull.map((k) => bs.getStream(k)), + pull.flatten(), + pull.collect((err, res) => { + if (err) { + return callback(err) + } + assert(res.length === blocks.length) + callback() + }) + ) +} diff --git a/package.json b/package.json index 7533e268..e22c6bd0 100644 --- a/package.json +++ b/package.json @@ -14,6 +14,7 @@ "release": "aegir-release", "release-minor": "aegir-release --type minor", "release-major": "aegir-release --type major", + "bench": "node benchmarks/index", "build": "aegir-build", "coverage": "aegir-coverage", "coverage-publish": "aegir-coverage publish" @@ -36,6 +37,7 @@ "homepage": "https://github.com/ipfs/js-ipfs-bitswap#readme", "devDependencies": { "aegir": "9.2.2", + "benchmark": "^2.1.2", "buffer-loader": "0.0.1", "chai": "^3.5.0", "fs-pull-blob-store": "^0.4.1", @@ -59,8 +61,13 @@ "heap": "^0.2.6", "ipfs-block": "^0.5.3", "lodash.debounce": "^4.0.8", + "lodash.find": "^4.6.0", + "lodash.groupby": "^4.6.0", "lodash.isequalwith": "^4.4.0", "lodash.isundefined": "^3.0.1", + "lodash.pullallwith": "^4.7.0", + "lodash.uniqwith": "^4.5.0", + "lodash.values": "^4.3.0", "multihashes": "^0.3.0", "protocol-buffers": "^3.2.1", "pull-defer": "^0.2.2", @@ -77,4 +84,4 @@ "greenkeeperio-bot ", "npmcdn-to-unpkg-bot " ] -} \ No newline at end of file +} diff --git a/src/decision/engine.js b/src/decision/engine.js index 7e7b4b71..f1462351 100644 --- a/src/decision/engine.js +++ b/src/decision/engine.js @@ -1,19 +1,22 @@ 'use strict' const debug = require('debug') -const mh = require('multihashes') const pull = require('pull-stream') -const whilst = require('async/whilst') -const setImmediate = require('async/setImmediate') const each = require('async/each') +const map = require('async/map') +const waterfall = require('async/waterfall') const debounce = require('lodash.debounce') +const uniqWith = require('lodash.uniqwith') +const find = require('lodash.find') +const values = require('lodash.values') +const groupBy = require('lodash.groupby') +const pullAllWith = require('lodash.pullallwith') const log = debug('bitswap:engine') log.error = debug('bitswap:engine:error') const Message = require('../message') const Wantlist = require('../wantlist') -const PeerRequestQueue = require('./peer-request-queue') const Ledger = require('./ledger') module.exports = class Engine { @@ -23,71 +26,82 @@ module.exports = class Engine { // A list of of ledgers by their partner id this.ledgerMap = new Map() - - // A priority queue of requests received from different - // peers. - this.peerRequestQueue = new PeerRequestQueue() - this._running = false - this._outbox = debounce(this._outboxExec.bind(this), 100) + // List of tasks to be processed + this._tasks = [] + + this._outbox = debounce(this._processTasks.bind(this), 100) } - _sendBlock (env, cb) { + _sendBlocks (env, cb) { const msg = new Message(false) - msg.addBlock(env.block, (err) => { + env.blocks.forEach((block) => { + msg.addBlockWithKey(block.block, block.key) + }) + + // console.log('sending %s blocks', msg.blocks.size) + this.network.sendMessage(env.peer, msg, (err) => { if (err) { - return cb(err) + log('sendblock error: %s', err.message) } - - log('Sending block to %s', env.peer.toB58String(), env.block.data.toString()) - - this.network.sendMessage(env.peer, msg, (err) => { - if (err) { - log('sendblock error: %s', err.message) - } - cb(null, 'done') - }) + cb() }) } - _outboxExec () { - let nextTask - log('outbox') - - whilst( - () => { - if (!this._running) { - return - } + _processTasks () { + if (!this._running || !this._tasks.length) { + return + } - nextTask = this.peerRequestQueue.pop() - log('check', this._running && nextTask) - return Boolean(nextTask) - }, - (next) => { - log('got task') + const tasks = this._tasks + this._tasks = [] + const entries = tasks.map((t) => t.entry) + const keys = entries.map((e) => e.key) + const uniqKeys = uniqWith(keys, (a, b) => a.equals(b)) + const groupedTasks = groupBy(tasks, (task) => task.target.toB58String()) + waterfall([ + (cb) => map(uniqKeys, (k, cb) => { pull( - this.blockstore.getStream(nextTask.entry.key), + this.blockstore.getStream(k), pull.collect((err, blocks) => { - const block = blocks[0] - if (err || !block) { - nextTask.done() - return next() + if (err) { + return cb(err) } - - this._sendBlock({ - peer: nextTask.target, - block: block, - sent () { - nextTask.done() - } - }, next) + cb(null, { + key: k, + block: blocks[0] + }) }) ) + }, cb), + (blocks, cb) => each(values(groupedTasks), (tasks, cb) => { + // all tasks have the same target + const peer = tasks[0].target + const blockList = keys.map((k) => { + return find(blocks, (b) => b.key.equals(k)) + }) + + this._sendBlocks({ + peer: peer, + blocks: blockList + }, (err) => { + if (err) { + log.error('failed to send', err) + } + blockList.forEach((block) => { + this.messageSent(peer, block.block, block.key) + }) + cb() + }) + }) + ], (err) => { + this._tasks = [] + if (err) { + log.error(err) } - ) + }) } wantlistForPeer (peerId) { @@ -102,12 +116,31 @@ module.exports = class Engine { return Array.from(this.ledgerMap.values()).map((l) => l.partner) } + receivedBlocks (keys) { + if (!keys.length) { + return + } + // Check all connected peers if they want the block we received + for (let l of this.ledgerMap.values()) { + keys + .map((k) => l.wantlistContains(k)) + .filter(Boolean) + .forEach((e) => { + // this.peerRequestQueue.push(e, l.partner) + this._tasks.push({ + entry: e, + target: l.partner + }) + }) + } + this._outbox() + } + // Handle incoming messages messageReceived (peerId, msg, cb) { const ledger = this._findOrCreate(peerId) if (msg.empty) { - log('received empty message from %s', peerId.toB58String()) return cb() } @@ -121,92 +154,85 @@ module.exports = class Engine { log.error(`failed to process blocks: ${err.message}`) } - const arrayWantlist = Array.from(msg.wantlist.values()) - log('wantlist', arrayWantlist.map((e) => e.toString())) - - if (arrayWantlist.length === 0) { + if (msg.wantlist.size === 0) { return cb() } - pull( - pull.values(arrayWantlist), - pull.asyncMap((entry, cb) => { - this._processWantlist(ledger, peerId, entry, cb) - }), - pull.onEnd(cb) - ) + let cancels = [] + let wants = [] + for (let entry of msg.wantlist.values()) { + if (entry.cancel) { + ledger.cancelWant(entry.key) + cancels.push(entry) + } else { + ledger.wants(entry.key, entry.priority) + wants.push(entry) + } + } + + this._cancelWants(ledger, peerId, cancels) + this._addWants(ledger, peerId, wants, cb) }) } - receivedBlock (key) { - this._processBlock(key) - this._outbox() - } + _cancelWants (ledger, peerId, entries) { + const id = peerId.toB58String() - _processBlock (key) { - // Check all connected peers if they want the block we received - for (let l of this.ledgerMap.values()) { - const entry = l.wantlistContains(key) - if (entry) { - this.peerRequestQueue.push(entry, l.partner) - } - } + pullAllWith(this._tasks, entries, (t, e) => { + const sameTarget = t.target.toB58String() === id + const sameKey = t.entry.key.equals(e.key) + return sameTarget && sameKey + }) } - _processWantlist (ledger, peerId, entry, cb) { - if (entry.cancel) { - log('cancel %s', mh.toB58String(entry.key)) - ledger.cancelWant(entry.key) - this.peerRequestQueue.remove(entry.key, peerId) - setImmediate(() => cb()) - } else { - log('wants %s - %s', mh.toB58String(entry.key), entry.priority) - ledger.wants(entry.key, entry.priority) - + _addWants (ledger, peerId, entries, cb) { + each(entries, (entry, cb) => { // If we already have the block, serve it this.blockstore.has(entry.key, (err, exists) => { if (err) { - log('failed existence check %s', mh.toB58String(entry.key)) + log.error('failed existence check') } else if (exists) { - log('has want %s', mh.toB58String(entry.key)) - this.peerRequestQueue.push(entry.entry, peerId) - this._outbox() + this._tasks.push({ + entry: entry.entry, + target: peerId + }) } cb() }) - } + }, () => { + this._outbox() + cb() + }) } _processBlocks (blocks, ledger, callback) { - each(blocks.values(), (block, cb) => { + map(blocks.values(), (block, cb) => { block.key((err, key) => { if (err) { return cb(err) } - log('got block %s (%s bytes)', mh.toB58String(key), block.data.length) + log('got block (%s bytes)', block.data.length) ledger.receivedBytes(block.data.length) - this.receivedBlock(key) - cb() + cb(null, key) }) - }, callback) + }, (err, keys) => { + if (err) { + return callback(err) + } + + this.receivedBlocks(keys) + callback() + }) } // Clear up all accounting things after message was sent - messageSent (peerId, msg, callback) { + messageSent (peerId, block, key) { const ledger = this._findOrCreate(peerId) - each(msg.blocks.values(), (block, cb) => { - ledger.sentBytes(block.data.length) - block.key((err, key) => { - if (err) { - return cb(err) - } - - ledger.wantlist.remove(key) - this.peerRequestQueue.remove(key, peerId) - cb() - }) - }, callback) + ledger.sentBytes(block ? block.data.length : 0) + if (key) { + ledger.wantlist.remove(key) + } } numBytesSentTo (peerId) { diff --git a/src/decision/peer-request-queue.js b/src/decision/peer-request-queue.js deleted file mode 100644 index 11daee28..00000000 --- a/src/decision/peer-request-queue.js +++ /dev/null @@ -1,182 +0,0 @@ -'use strict' - -const mh = require('multihashes') -const debug = require('debug') -const assert = require('assert') - -const PriorityQueue = require('./pq') - -const log = debug('bitswap:peer-request-queue') - -class PeerRequestTask { - constructor (entry, target, done) { - this.entry = entry - this.target = target - this.created = (new Date()).getTime() - this.done = done - } - - get key () { - return taskKey(this.target, this.entry.key) - } - - get [Symbol.toStringTag] () { - return `PeerRequestTask ` - } -} - -class ActivePartner { - constructor (id) { - this.id = id - - // The number of blocks this peer is currently being sent. - this.active = 0 - - // The number of blocks this peer is currently requesting - this.requests = 0 - - // Queue of tasks belonging to this peer - this.taskQueue = new PriorityQueue(V1) - - this.activeBlocks = new Map() - } - - startTask (key) { - this.activeBlocks.set(mh.toB58String(key), 1) - this.active ++ - } - - taskDone (key) { - const k = mh.toB58String(key) - assert(this.activeBlocks.has(k), 'finishing non existent task') - - this.activeBlocks.delete() - this.active -- - - assert(this.active >= 0, 'more tasks finished than started') - } -} - -module.exports = class PeerRequestQueue { - constructor () { - this.taskMap = new Map() - this.partners = new Map() - this.pQueue = new PriorityQueue(partnerCompare) - } - - // Add a new entry to the queue - push (entry, to) { - log('push, to: %s', to.toB58String()) - let partner = this.partners.get(to.toB58String()) - - if (!partner) { - partner = new ActivePartner(to) - this.pQueue.push(partner) - this.partners.set(to.toB58String(), partner) - } - - if (partner.activeBlocks.has(entry.key)) { - log('has activeBlocks', entry.key) - return - } - - let task = this.taskMap.get(taskKey(to, entry.key)) - - if (task) { - log('updating task', task.toString()) - task.entry.priority = entry.priority - partner.taskQueue.update(task) - return - } - - task = new PeerRequestTask(entry, to, () => { - partner.taskDone(entry.key) - this.pQueue.update(partner) - }) - - partner.taskQueue.push(task) - log('taskMap.set', task.key, task.toString()) - this.taskMap.set(task.key, task) - partner.requests ++ - this.pQueue.update(partner) - } - - // Get the task with the hightest priority from the queue - pop () { - if (this.pQueue.isEmpty()) { - return - } - - let partner = this.pQueue.pop() - let out - while (!partner.taskQueue.isEmpty()) { - out = partner.taskQueue.pop() - this.taskMap.delete(out.key) - if (out.trash) { - out = null - // discarding tasks that have been removed - continue - } - - partner.startTask(out.entry.key) - partner.requests -- - break - } - this.pQueue.push(partner) - return out - } - - // Remove a task from the queue - remove (key, peerId) { - const t = this.taskMap.get(taskKey(peerId, key)) - if (t) { - // remove the task "lazily" - // simply mark it as trash, so it'll be dropped when popped off the - // queue. - t.trash = true - - // having canceled a block, we now account for that in the given partner - const p = this.partners.get(peerId.toB58String()) - p.requests -- - this.pQueue.update(p) - } - - log('taskMap', Array.from(this.taskMap.values()).map((v) => { - return v.toString() - })) - } -} - -function taskKey (peerId, key) { - return `${peerId.toB58String()}:${mh.toB58String(key)}` -} - -function partnerCompare (a, b) { - // having no blocks in their wantlist means lowest priority - // having both of these checks ensures stability of the sort - if (a.requests === 0) return false - if (b.requests === 0) return true - - if (a.active === b.active) { - // sorting by taskQueue.size() aids in cleaning out trash entries faster - // if we sorted instead by requests, one peer could potentially build up - // a huge number of cancelled entries in the queue resulting in a memory leak - return a.taskQueue.size() > b.taskQueue.size() - } - - return a.active < b.active -} -// A basic task comparator that returns tasks in the order created -function FIFO (a, b) { - return a.created < b.created -} - -// For the same target compare based on the wantlist priorities -// Otherwise fallback to oldest task first -function V1 (a, b) { - if (a.target.toBytes() === b.target.toBytes()) { - return a.entry.priority > b.entry.priority - } - - return FIFO(a, b) -} diff --git a/src/index.js b/src/index.js index eb65147d..ad1e9557 100644 --- a/src/index.js +++ b/src/index.js @@ -1,16 +1,13 @@ 'use strict' const series = require('async/series') -const retry = require('async/retry') const debug = require('debug') const log = debug('bitswap') log.error = debug('bitswap:error') const EventEmitter = require('events').EventEmitter -const mh = require('multihashes') const pull = require('pull-stream') const paramap = require('pull-paramap') const defer = require('pull-defer/source') -const Block = require('ipfs-block') const cs = require('./constants') const WantManager = require('./wantmanager') @@ -44,7 +41,6 @@ module.exports = class Bitwap { // handle messages received through the network _receiveMessage (peerId, incoming, cb) { cb = cb || (() => {}) - log('receiving message from %s', peerId.toB58String()) this.engine.messageReceived(peerId, incoming, (err) => { if (err) { log('failed to receive message', incoming) @@ -87,7 +83,6 @@ module.exports = class Bitwap { return cb() } - log('got block from %s', peerId.toB58String(), block.data.length) cb() }), (cb) => block.key((err, key) => { @@ -128,18 +123,6 @@ module.exports = class Bitwap { }) } - _tryPutBlock (block, times, cb) { - log('trying to put block %s', block.data.toString()) - retry({times, interval: 400}, (done) => { - pull( - pull.values([block]), - pull.asyncMap(blockToStore), - this.blockstore.putStream(), - pull.onEnd(done) - ) - }, cb) - } - // handle errors on the receiving channel _receiveError (err) { log.error('ReceiveError: %s', err.message) @@ -181,41 +164,40 @@ module.exports = class Bitwap { _getStreamSingle (key) { const unwantListeners = {} const blockListeners = {} - const unwantEvent = (key) => `unwant:${key}` - const blockEvent = (key) => `block:${key}` - const d = defer() + const keyS = key.toString() - const cleanupListener = (key) => { - const keyS = mh.toB58String(key) + const unwantEvent = () => `unwant:${keyS}` + const blockEvent = () => `block:${keyS}` + const d = defer() + const cleanupListener = () => { if (unwantListeners[keyS]) { - this.notifications.removeListener(unwantEvent(keyS), unwantListeners[keyS]) + this.notifications.removeListener(unwantEvent(), unwantListeners[keyS]) delete unwantListeners[keyS] } if (blockListeners[keyS]) { - this.notifications.removeListener(blockEvent(keyS), blockListeners[keyS]) + this.notifications.removeListener(blockEvent(), blockListeners[keyS]) delete blockListeners[keyS] } } - const addListener = (key) => { - const keyS = mh.toB58String(key) + const addListener = () => { unwantListeners[keyS] = () => { log(`manual unwant: ${keyS}`) - cleanupListener(key) + cleanupListener() this.wm.cancelWants([key]) d.resolve(pull.empty()) } blockListeners[keyS] = (block) => { this.wm.cancelWants([key]) - cleanupListener(key) + cleanupListener() d.resolve(pull.values([block])) } - this.notifications.once(unwantEvent(keyS), unwantListeners[keyS]) - this.notifications.once(blockEvent(keyS), blockListeners[keyS]) + this.notifications.once(unwantEvent(), unwantListeners[keyS]) + this.notifications.once(blockEvent(), blockListeners[keyS]) } this.blockstore.has(key, (err, exists) => { @@ -223,11 +205,11 @@ module.exports = class Bitwap { return d.resolve(pull.error(err)) } if (exists) { - log('already have block', mh.toB58String(key)) + log('already have block') return d.resolve(this.blockstore.getStream(key)) } - addListener(key) + addListener() this.wm.wantBlocks([key]) }) @@ -242,7 +224,7 @@ module.exports = class Bitwap { this.wm.unwantBlocks(keys) keys.forEach((key) => { - this.notifications.emit(`unwant:${mh.toB58String(key)}`) + this.notifications.emit(`unwant:${key.toString()}`) }) } @@ -261,7 +243,7 @@ module.exports = class Bitwap { if (err) { return cb(err) } - cb(null, [new Block(blockAndKey.data), exists]) + cb(null, [blockAndKey, exists]) }) }), pull.filter((val) => !val[1]), @@ -270,18 +252,11 @@ module.exports = class Bitwap { log('putting block') return pull( pull.values([block]), - pull.asyncMap(blockToStore), this.blockstore.putStream(), - pull.asyncMap((meta, cb) => { - block.key((err, key) => { - if (err) { - return cb(err) - } - log('put block: %s', mh.toB58String(key)) - this.notifications.emit(`block:${mh.toB58String(key)}`, block) - this.engine.receivedBlock(key) - cb(null, meta) - }) + pull.through(() => { + log('put block') + this.notifications.emit(`block:${block.key.toString()}`, block) + this.engine.receivedBlocks([block.key]) }) ) }), @@ -325,13 +300,3 @@ module.exports = class Bitwap { this.engine.stop() } } - -// Helper method, to add a cid to a block before storing it in the ipfs-repo/blockstore -function blockToStore (b, cb) { - b.key((err, key) => { - if (err) { - return cb(err) - } - cb(null, {data: b.data, key: key}) - }) -} diff --git a/src/message/entry.js b/src/message/entry.js index 4617c8de..85e592c0 100644 --- a/src/message/entry.js +++ b/src/message/entry.js @@ -1,7 +1,5 @@ 'use strict' -const mh = require('multihashes') - const WantlistEntry = require('../wantlist').Entry module.exports = class BitswapMessageEntry { @@ -27,7 +25,11 @@ module.exports = class BitswapMessageEntry { } get [Symbol.toStringTag] () { - return `BitswapMessageEntry ${mh.toB58String(this.key)} ` + return `BitswapMessageEntry ${this.toB58String()} ` + } + + toB58String () { + return this.entry.toB58String() } equals (other) { diff --git a/src/message/index.js b/src/message/index.js index be4682e7..8f0e9db1 100644 --- a/src/message/index.js +++ b/src/message/index.js @@ -3,8 +3,6 @@ const protobuf = require('protocol-buffers') const Block = require('ipfs-block') const isEqualWith = require('lodash.isequalwith') -const mh = require('multihashes') -const assert = require('assert') const map = require('async/map') const pbm = protobuf(require('./message.proto')) @@ -22,15 +20,13 @@ class BitswapMessage { } addEntry (key, priority, cancel) { - assert(Buffer.isBuffer(key), 'key must be a buffer') - - const e = this.wantlist.get(mh.toB58String(key)) + const e = this.wantlist.get(key.toString()) if (e) { e.priority = priority e.cancel = Boolean(cancel) } else { - this.wantlist.set(mh.toB58String(key), new Entry(key, priority, cancel)) + this.wantlist.set(key.toString(), new Entry(key, priority, cancel)) } } @@ -39,15 +35,22 @@ class BitswapMessage { if (err) { return cb(err) } - - this.blocks.set(mh.toB58String(key), block) + this.blocks.set(key.toString(), block) cb() }) } + addBlockWithKey (block, key) { + this.blocks.set(key.toString(), block) + } + cancel (key) { - this.wantlist.delete(mh.toB58String(key)) - this.addEntry(key, 0, true) + const keyS = key.toString() + if (this.wantlist.has(keyS)) { + this.wantlist.delete(keyS) + } else { + this.wantlist.set(keyS, new Entry(key, 0, true)) + } } toProto () { diff --git a/src/network/index.js b/src/network/index.js index 2b1adac7..07a2b2b4 100644 --- a/src/network/index.js +++ b/src/network/index.js @@ -22,9 +22,12 @@ module.exports = class Network { // increase event listener max this.libp2p.swarm.setMaxListeners(cs.maxListeners) + + this._running = false } start () { + this._running = true // bind event listeners this._onConnection = this._onConnection.bind(this) this._onPeerMux = this._onPeerMux.bind(this) @@ -43,6 +46,7 @@ module.exports = class Network { } stop () { + this._running = false this.libp2p.unhandle(PROTOCOL_IDENTIFIER) this.libp2p.swarm.removeListener('peer-mux-established', this._onPeerMux) @@ -50,6 +54,9 @@ module.exports = class Network { } _onConnection (protocol, conn) { + if (!this._running) { + return + } log('incomming new bitswap connection: %s', protocol) pull( conn, @@ -60,7 +67,7 @@ module.exports = class Network { if (err) { return cb(err) } - log('data from', peerInfo.id.toB58String()) + // log('data from', peerInfo.id.toB58String()) this.bitswap._receiveMessage(peerInfo.id, msg) cb() }) @@ -75,17 +82,27 @@ module.exports = class Network { } _onPeerMux (peerInfo) { + if (!this._running) { + return + } this.bitswap._onPeerConnected(peerInfo.id) } _onPeerMuxClosed (peerInfo) { + if (!this._running) { + return + } this.bitswap._onPeerDisconnected(peerInfo.id) } // Connect to the given peer connectTo (peerId, cb) { - log('connecting to %s', peerId.toB58String()) const done = (err) => setImmediate(() => cb(err)) + + if (!this._running) { + return done(new Error('No running network')) + } + // NOTE: For now, all this does is ensure that we are // connected. Once we have Peer Routing, we will be able // to find the Peer @@ -98,11 +115,15 @@ module.exports = class Network { // Send the given msg (instance of Message) to the given peer sendMessage (peerId, msg, cb) { + if (!this._running) { + return cb(new Error('No running network')) + } + const stringId = peerId.toB58String() log('sendMessage to %s', stringId) let peerInfo try { - peerInfo = this.peerBook.getByMultihash(peerId.toBytes()) + peerInfo = this.peerBook.getByB58String(stringId) } catch (err) { return cb(err) } @@ -115,7 +136,7 @@ module.exports = class Network { log('dialByPeerInfo') this.libp2p.dialByPeerInfo(peerInfo, PROTOCOL_IDENTIFIER, (err, conn) => { - log('dialed %s', peerInfo.id.toB58String(), err) + // log('dialed %s', peerInfo.id.toB58String(), err) if (err) { return cb(err) } diff --git a/src/wantlist/entry.js b/src/wantlist/entry.js index 29a4fd39..1af066b8 100644 --- a/src/wantlist/entry.js +++ b/src/wantlist/entry.js @@ -10,8 +10,17 @@ module.exports = class WantlistEntry { // Keep track of how many requests we have for this key this._refCounter = 1 - this.key = key + this._key = key this.priority = isUndefined(priority) ? 1 : priority + this._keyB58String = '' + } + + get key () { + return this._key + } + + set key (val) { + throw new Error('immutable key') } inc () { @@ -26,8 +35,16 @@ module.exports = class WantlistEntry { return this._refCounter > 0 } + toB58String () { + if (!this._keyB58String) { + this._keyB58String = mh.toB58String(this.key) + } + + return this._keyB58String + } + get [Symbol.toStringTag] () { - return `WantlistEntry ` + return `WantlistEntry ` } equals (other) { diff --git a/src/wantlist/index.js b/src/wantlist/index.js index 24cc9ec1..a6d4bdc1 100644 --- a/src/wantlist/index.js +++ b/src/wantlist/index.js @@ -1,7 +1,5 @@ 'use strict' -const mh = require('multihashes') - const Entry = require('./entry') class Wantlist { @@ -14,18 +12,18 @@ class Wantlist { } add (key, priority) { - const e = this.set.get(mh.toB58String(key)) + const e = this.set.get(key.toString()) if (e) { e.inc() e.priority = priority } else { - this.set.set(mh.toB58String(key), new Entry(key, priority)) + this.set.set(key.toString(), new Entry(key, priority)) } } remove (key) { - const e = this.set.get(mh.toB58String(key)) + const e = this.set.get(key.toString()) if (!e) return @@ -34,12 +32,12 @@ class Wantlist { // only delete when no refs are held if (e.hasRefs()) return - this.set.delete(mh.toB58String(key)) + this.set.delete(key.toString()) } removeForce (key) { - if (this.set.has(key)) { - this.set.delete(key) + if (this.set.has(key.toString())) { + this.set.delete(key.toString()) } } @@ -52,7 +50,7 @@ class Wantlist { } contains (key) { - return this.set.get(mh.toB58String(key)) + return this.set.get(key.toString()) } } diff --git a/src/wantmanager/index.js b/src/wantmanager/index.js index b747f22c..a7220478 100644 --- a/src/wantmanager/index.js +++ b/src/wantmanager/index.js @@ -1,8 +1,6 @@ 'use strict' const debug = require('debug') -const pull = require('pull-stream') -const mh = require('multihashes') const Message = require('../message') const Wantlist = require('../wantlist') @@ -20,41 +18,29 @@ module.exports = class Wantmanager { this.network = network } - _newMsgQueue (peerId) { - return new MsgQueue(peerId, this.network) - } - _addEntries (keys, cancel, force) { - let i = -1 - pull( - pull.values(keys), - pull.map((key) => { - i++ - return new Message.Entry(key, cs.kMaxPriority - i, cancel) - }), - pull.through((e) => { - // add changes to our wantlist - if (e.cancel) { - if (force) { - this.wl.removeForce(e.key) - } else { - this.wl.remove(e.key) - } + const entries = keys.map((key, i) => { + return new Message.Entry(key, cs.kMaxPriority - i, cancel) + }) + + entries.forEach((e) => { + // add changes to our wantlist + if (e.cancel) { + if (force) { + this.wl.removeForce(e.key) } else { - log('adding to wl', mh.toB58String(e.key), e.priority) - this.wl.add(e.key, e.priority) - } - }), - pull.collect((err, entries) => { - if (err) { - throw err - } - // broadcast changes - for (let p of this.peers.values()) { - p.addEntries(entries, false) + this.wl.remove(e.key) } - }) - ) + } else { + log('adding to wl') + this.wl.add(e.key, e.priority) + } + }) + + // broadcast changes + for (let p of this.peers.values()) { + p.addEntries(entries) + } } _startPeerHandler (peerId) { @@ -65,7 +51,7 @@ module.exports = class Wantmanager { return } - mq = this._newMsgQueue(peerId) + mq = new MsgQueue(peerId, this.network) // new peer, give them the full wantlist const fullwantlist = new Message(true) @@ -76,7 +62,6 @@ module.exports = class Wantmanager { mq.addMessage(fullwantlist) this.peers.set(peerId.toB58String(), mq) - mq.run() return mq } @@ -92,25 +77,23 @@ module.exports = class Wantmanager { return } - mq.stop() this.peers.delete(peerId.toB58String()) } // add all the keys to the wantlist wantBlocks (keys) { - log('want blocks:', keys.map((k) => mh.toB58String(k))) this._addEntries(keys, false) } // remove blocks of all the given keys without respecting refcounts unwantBlocks (keys) { - log('unwant blocks:', keys.map((k) => mh.toB58String(k))) + log('unwant blocks: %s', keys.length) this._addEntries(keys, true, true) } // cancel wanting all of the given keys cancelWants (keys) { - log('cancel wants: ', keys.map((k) => mh.toB58String(k))) + log('cancel wants: %s', keys.length) this._addEntries(keys, true) } @@ -120,12 +103,10 @@ module.exports = class Wantmanager { } connected (peerId) { - log('peer connected: %s', peerId.toB58String()) this._startPeerHandler(peerId) } disconnected (peerId) { - log('peer disconnected: %s', peerId.toB58String()) this._stopPeerHandler(peerId) } @@ -145,7 +126,7 @@ module.exports = class Wantmanager { stop () { for (let mq of this.peers.values()) { - this.disconnected(mq.p) + this.disconnected(mq.id) } clearInterval(this.timer) } diff --git a/src/wantmanager/msg-queue.js b/src/wantmanager/msg-queue.js index 30259425..8a629428 100644 --- a/src/wantmanager/msg-queue.js +++ b/src/wantmanager/msg-queue.js @@ -1,8 +1,7 @@ 'use strict' -const queue = require('async/queue') const debug = require('debug') - +const debounce = require('lodash.debounce') const Message = require('../message') const log = debug('bitswap:wantmanager:queue') @@ -10,62 +9,56 @@ log.error = debug('bitswap:wantmanager:queue:error') module.exports = class MsgQueue { constructor (peerId, network) { - this.p = peerId + this.id = peerId this.network = network this.refcnt = 1 - this.queue = queue(this.doWork.bind(this), 1) - this.queue.pause() + this._entries = [] + this.sendEntries = debounce(this._sendEntries.bind(this), 200) } addMessage (msg) { if (msg.empty) { return } - log('addMessage: %s', this.p.toB58String(), msg) - this.queue.push(msg) + + this.send(msg) } - addEntries (entries, full) { - log('addEntries: %s', entries.length) - const msg = new Message(Boolean(full)) - entries.forEach((entry) => { + addEntries (entries) { + this._entries = this._entries.concat(entries) + this.sendEntries() + } + + _sendEntries () { + if (!this._entries.length) return + + const msg = new Message(false) + this._entries.forEach((entry) => { if (entry.cancel) { msg.cancel(entry.key) } else { msg.addEntry(entry.key, entry.priority) } }) - + this._entries = [] this.addMessage(msg) } - doWork (wlm, cb) { - log('doWork: %s', this.p.toB58String(), wlm) - if (wlm.empty) return cb() - this.network.connectTo(this.p, (err) => { + send (msg) { + this.network.connectTo(this.id, (err) => { if (err) { - log.error('cant connect to peer %s: %s', this.p.toB58String(), err.message) - return cb(err) + log.error('cant connect to peer %s: %s', this.id.toB58String(), err.message) + return } - log('sending message', wlm) - this.network.sendMessage(this.p, wlm, (err) => { + log('sending message') + // console.log('sending msg %s blocks, %s wants', msg.blocks.size, msg.wantlist.size) + this.network.sendMessage(this.id, msg, (err) => { if (err) { log.error('send error: %s', err.message) - return cb(err) + return } - cb() }) }) } - - run () { - log('starting queue') - this.queue.resume() - } - - stop () { - log('killing queue') - this.queue.kill() - } } diff --git a/test/decision/engine-test.js b/test/decision/engine-test.js index 0154e0e1..2d32fc52 100644 --- a/test/decision/engine-test.js +++ b/test/decision/engine-test.js @@ -55,9 +55,9 @@ module.exports = (repo) => { pull( pull.values(_.range(1000)), - paramap((i, cb) => { + pull.map((i) => { const content = `this is message ${i}` - cb(null, new Block(content)) + return new Block(content) }), paramap((block, cb) => { const m = new Message(false) @@ -65,8 +65,13 @@ module.exports = (repo) => { if (err) { return cb(err) } - sender.engine.messageSent(receiver.peer, m) - receiver.engine.messageReceived(sender.peer, m, cb) + block.key((err, key) => { + if (err) { + return cb(err) + } + sender.engine.messageSent(receiver.peer, block, key) + receiver.engine.messageReceived(sender.peer, m, cb) + }) }) }, 100), pull.onEnd((err) => { @@ -113,8 +118,7 @@ module.exports = (repo) => { const seatlle = res[1] const m = new Message(true) - - sanfrancisco.engine.messageSent(seatlle.peer, m) + sanfrancisco.engine.messageSent(seatlle.peer) seatlle.engine.messageReceived(sanfrancisco.peer, m, (err) => { expect(err).to.not.exist @@ -204,9 +208,9 @@ module.exports = (repo) => { return _.flatten(messages.map(messageToString)) } - const network = mockNetwork(keeps.length, (res) => { + const network = mockNetwork(1, (res) => { const msgs = stringifyMessages(res.messages) - expect(msgs).to.be.eql(keeps) + expect(msgs.sort()).to.be.eql(keeps.sort()) innerCb() }) diff --git a/test/decision/peer-request-queue.spec.js b/test/decision/peer-request-queue.spec.js deleted file mode 100644 index ff0e96c0..00000000 --- a/test/decision/peer-request-queue.spec.js +++ /dev/null @@ -1,148 +0,0 @@ -/* eslint max-nested-callbacks: ["error", 8] */ -/* eslint-env mocha */ -'use strict' - -const expect = require('chai').expect -const PeerId = require('peer-id') -const _ = require('lodash') -const Block = require('ipfs-block') -const map = require('async/map') -const each = require('async/each') -const waterfall = require('async/waterfall') -const parallel = require('async/parallel') - -const hash = (data, cb) => { - const block = new Block(data) - block.key((err, key) => { - if (err) { - return cb(err) - } - - cb(null, key) - }) -} - -const WantlistEntry = require('../../src/wantlist').Entry -const PeerRequestQueue = require('../../src/decision/peer-request-queue') - -describe('PeerRequestQueue', () => { - it('push and pop', (done) => { - const prq = new PeerRequestQueue() - PeerId.create({bits: 1024}, (err, partner) => { - if (err) { - return done(err) - } - - const alphabet = 'abcdefghijklmnopqrstuvwxyz'.split('').sort() - const vowels = 'aeiou'.split('').sort() - const vowelsIndex = vowels.map((v) => alphabet.indexOf(v)) - const consonants = alphabet - .filter((a) => !_.includes(vowels, a)) - .sort() - .map((c) => alphabet.indexOf(c)) - - map(alphabet, hash, (err, hashes) => { - if (err) { - return done(err) - } - - alphabet.forEach((a, i) => { - prq.push(new WantlistEntry(hashes[i], Math.pow(2, 32) - 1 - i), partner) - }) - - consonants.forEach((c) => { - prq.remove(hashes[c], partner) - }) - - const out = [] - alphabet.forEach(() => { - const rec = prq.pop() - if (!rec) return - out.push(rec.entry.key) - }) - - expect(out.length).to.be.eql(vowels.length) - - vowelsIndex.forEach((v, i) => { - expect( - out[i].toString('hex') - ).to.be.eql( - hashes[v].toString('hex') - ) - }) - done() - }) - }) - }) - - it('peer repeats', (done) => { - // This test checks that peers wont starve out other peers - const prq = new PeerRequestQueue() - - waterfall([ - (cb) => map(_.range(4), (i, cb) => PeerId.create({bits: 1024}, cb), cb), - (peers, cb) => { - each(_.range(5), (i, cb) => { - hash('hello-' + i, (err, digest) => { - if (err) { - return cb(err) - } - peers.forEach((peer) => { - prq.push(new WantlistEntry(digest), peer) - }) - cb() - }) - }, (err) => { - if (err) { - return cb(err) - } - let targets = [] - const tasks = [] - - _.range(4).forEach((i) => { - const t = prq.pop() - targets.push(t.target.toHexString()) - tasks.push(t) - }) - - const expected = peers.map((p) => p.toHexString()).sort() - targets = targets.sort() - - expect(targets).to.be.eql(expected) - - // Now, if one of the tasks gets finished, the next task off the queue should - // be for the same peer - _.range(3).forEach((blockI) => { - _.range(3).forEach((i) => { - // its okay to mark the same task done multiple times here (JUST FOR TESTING) - tasks[i].done() - const ntask = prq.pop() - expect(ntask.target).to.be.eql(tasks[i].target) - }) - }) - cb() - }) - } - ], done) - }) - - it('push same block multiple times', (done) => { - const prq = new PeerRequestQueue() - parallel([ - (cb) => PeerId.create({bits: 1024}, cb), - (cb) => hash('hello', cb) - ], (err, results) => { - if (err) { - return done(err) - } - const partner = results[0] - const digest = results[1] - prq.push(new WantlistEntry(digest), partner) - prq.push(new WantlistEntry(digest), partner) - - expect(prq.pop()).to.exist - expect(prq.pop()).to.not.exist - done() - }) - }) -}) diff --git a/test/decision/pq.spec.js b/test/decision/pq.spec.js deleted file mode 100644 index f8db3e72..00000000 --- a/test/decision/pq.spec.js +++ /dev/null @@ -1,73 +0,0 @@ -/* eslint-env mocha */ -'use strict' - -const expect = require('chai').expect - -const PriorityQueue = require('../../src/decision/pq') - -describe('PriorityQueue', () => { - it('sorts with a less operator', () => { - const pq = new PriorityQueue((a, b) => a > b) - pq.push(1) - pq.push(5) - pq.push(2) - - expect(pq.pop()).to.be.eql(5) - expect(pq.pop()).to.be.eql(2) - expect(pq.pop()).to.be.eql(1) - }) - - it('updates an element', () => { - const a = {index: 1} - const b = {index: 5} - const c = {index: 2} - const pq = new PriorityQueue((a, b) => a.index > b.index) - - pq.push(a) - pq.push(b) - pq.push(c) - a.index = 10 - pq.update(a) - - expect(pq.pop()).to.be.eql(a) - expect(pq.pop()).to.be.eql(b) - expect(pq.pop()).to.be.eql(c) - }) - - it('isEmpty', () => { - const pq = new PriorityQueue((a, b) => a > b) - - expect(pq.isEmpty()).to.be.eql(true) - - pq.push(1) - - expect(pq.isEmpty()).to.be.eql(false) - - pq.pop() - - expect(pq.isEmpty()).to.be.eql(true) - }) - - it('correct pop', () => { - const pq = new PriorityQueue((a, b) => a.priority < b.priority) - const tasks = [ - {key: 'a', priority: 9}, - {key: 'b', priority: 4}, - {key: 'c', priority: 3}, - {key: 'd', priority: 0}, - {key: 'e', priority: 6} - ] - tasks.forEach((t) => pq.push(t)) - const priorities = [] - - while (!pq.isEmpty()) { - priorities.push(pq.pop().priority) - } - - expect( - priorities - ).to.be.eql([ - 0, 3, 4, 6, 9 - ]) - }) -}) diff --git a/test/index-test.js b/test/index-test.js index d23fd113..86142a06 100644 --- a/test/index-test.js +++ b/test/index-test.js @@ -12,7 +12,6 @@ const _ = require('lodash') const expect = require('chai').expect const PeerId = require('peer-id') const Block = require('ipfs-block') -const mh = require('multihashes') const PeerBook = require('peer-book') const pull = require('pull-stream') @@ -126,8 +125,8 @@ module.exports = (repo) => { const wl = bs.wantlistForPeer(other) - expect(wl.has(mh.toB58String(keys[0]))).to.be.eql(true) - expect(wl.has(mh.toB58String(keys[1]))).to.be.eql(true) + expect(wl.has(keys[0].toString())).to.be.eql(true) + expect(wl.has(keys[1].toString())).to.be.eql(true) done() }) diff --git a/test/message.spec.js b/test/message.spec.js index 4cbb9ac7..1dfa2208 100644 --- a/test/message.spec.js +++ b/test/message.spec.js @@ -100,7 +100,7 @@ describe('BitswapMessage', () => { expect( Array.from(protoMessage.wantlist) ).to.be.eql([ - [mh.toB58String(new Buffer('hello')), new BitswapMessage.Entry(new Buffer('hello'), 0, false)] + [(new Buffer('hello')).toString(), new BitswapMessage.Entry(new Buffer('hello'), 0, false)] ]) const b1 = blocks[1] @@ -111,8 +111,8 @@ describe('BitswapMessage', () => { expect( Array.from(protoMessage.blocks).map((b) => [b[0], b[1].data]) ).to.be.eql([ - [mh.toB58String(k1), b1.data], - [mh.toB58String(k2), b2.data] + [k1.toString(), b1.data], + [k2.toString(), b2.data] ]) done() @@ -214,11 +214,8 @@ describe('BitswapMessage', () => { expect(entry.entry).to.have.property('key') expect(entry.entry).to.have.property('priority', 5) - entry.key = new Buffer('world') entry.priority = 2 - expect(entry.entry).to.have.property('key') - expect(entry.entry.key.equals(new Buffer('world'))) expect(entry.entry).to.have.property('priority', 2) }) }) diff --git a/test/network/gen-bitswap-network.node.js b/test/network/gen-bitswap-network.node.js index ecd13591..b0e02d42 100644 --- a/test/network/gen-bitswap-network.node.js +++ b/test/network/gen-bitswap-network.node.js @@ -11,6 +11,7 @@ const _ = require('lodash') const Block = require('ipfs-block') const Buffer = require('safe-buffer').Buffer const pull = require('pull-stream') +const crypto = require('crypto') const utils = require('../utils') describe('gen Bitswap network', function () { @@ -69,103 +70,99 @@ describe('gen Bitswap network', function () { } ], (err) => { expect(err).to.not.exist - setTimeout(() => { - node.bitswap.stop() - node.libp2p.stop(done) - }) + node.bitswap.stop() + node.libp2p.stop(done) }) }) }) - // const counts = [2, 3, 4, 5, 10] - const counts = [2] - describe('distributed blocks', () => { - counts.forEach((n) => { - it(`with ${n} nodes`, (done) => { - utils.genBitswapNetwork(n, (err, nodeArr) => { - expect(err).to.not.exist - nodeArr.forEach((node) => { - expect(node.bitswap).to.exist - expect(node.libp2p).to.exist - expect(Object.keys(node.libp2p.swarm.conns).length).to.equal(0) - expect(Object.keys(node.libp2p.swarm.muxedConns).length).to.equal(n - 1) - expect(node.repo).to.exist - }) - - // -- actual test - - const round = (j, cb) => { - const blockFactor = 10 - map(_.range(n * blockFactor), (k, cb) => { - const b = Buffer.alloc(1024) - b.fill(k) - cb(null, new Block(b)) - }, (err, blocks) => { - if (err) { - return cb(err) - } + it('with 2 nodes', (done) => { + const n = 2 + utils.genBitswapNetwork(n, (err, nodeArr) => { + expect(err).to.not.exist + nodeArr.forEach((node) => { + expect( + Object.keys(node.libp2p.swarm.conns) + ).to.be.empty - const d = (new Date()).getTime() + expect( + Object.keys(node.libp2p.swarm.muxedConns) + ).to.have.length(n - 1) + }) - parallel(_.map(nodeArr, (node, i) => (callback) => { - node.bitswap.start() - parallel([ - (finish) => pull( - pull.values( - _.range(blockFactor) - ), - pull.map((j) => blocks[i * blockFactor + j]), - pull.asyncMap((b, cb) => { - b.key((err, key) => { - if (err) { - return cb(err) - } - cb(null, {data: b.data, key: key}) - }) - }), - node.bitswap.putStream(), - pull.onEnd(finish) - ), - (finish) => { - map(blocks, (b, cb) => b.key(cb), (err, keys) => { - expect(err).to.not.exist - pull( - node.bitswap.getStream(keys), - pull.collect((err, res) => { - expect(err).to.not.exist - expect(res).to.have.length(blocks.length) - finish() - }) - ) - }) - } - ], callback) - }), (err) => { - expect(err).to.not.exist - console.log(' time -- %s', (new Date()).getTime() - d) - cb() - }) - }) + // -- actual test + round(nodeArr, n, (err) => { + if (err) { + return done(err) } - series( - _.range(2).map((i) => (cb) => round(i, cb)), - (err) => { - // setTimeout is used to avoid closing the TCP socket while spdy is - // still sending a ton of signalling data - setTimeout(() => { - parallel(nodeArr.map((node) => (cb) => { - node.bitswap.stop() - node.libp2p.stop(cb) - }), (err2) => { - done(err || err2) - }) - }, 3000) - } - ) + each(nodeArr, (node, cb) => { + node.bitswap.stop() + node.libp2p.stop(cb) + }, done) }) }) }) }) }) + +function round (nodeArr, n, cb) { + const blockFactor = 10 + const blocks = createBlocks(n, blockFactor) + map(blocks, (b, cb) => b.key(cb), (err, keys) => { + if (err) { + return cb(err) + } + let d + series([ + // put blockFactor amount of blocks per node + (cb) => parallel(_.map(nodeArr, (node, i) => (callback) => { + node.bitswap.start() + + const data = _.map(_.range(blockFactor), (j) => { + const index = i * blockFactor + j + return { + data: blocks[index].data, + key: keys[index] + } + }) + each( + data, + (d, cb) => node.bitswap.put(d, cb), + callback + ) + }), cb), + (cb) => { + d = (new Date()).getTime() + cb() + }, + // fetch all blocks on every node + (cb) => parallel(_.map(nodeArr, (node, i) => (callback) => { + pull( + node.bitswap.getStream(keys), + pull.collect((err, res) => { + if (err) { + return callback(err) + } + + expect(res).to.have.length(blocks.length) + callback() + }) + ) + }), cb) + ], (err) => { + if (err) { + return cb(err) + } + console.log(' time -- %s', (new Date()).getTime() - d) + cb() + }) + }) +} + +function createBlocks (n, blockFactor) { + return _.map(_.range(n * blockFactor), (k) => { + return new Block(crypto.randomBytes(n * blockFactor)) + }) +} diff --git a/test/utils.js b/test/utils.js index f8156962..207a8105 100644 --- a/test/utils.js +++ b/test/utils.js @@ -166,23 +166,17 @@ exports.genBitswapNetwork = (n, callback) => { to.peerInfo.id.toB58String()) { return cbJ() } + from.libp2p.dialByPeerInfo(to.peerInfo, cbJ) - }, (err) => { - if (err) { - throw err - } - cbI() - }) - }, (err) => { - if (err) { - throw err - } - finish() - }) + }, cbI) + }, finish) } // callback with netArray - function finish () { + function finish (err) { + if (err) { + throw err + } callback(null, netArray) } }) diff --git a/test/wantlist.spec.js b/test/wantlist.spec.js index 3bcf8945..473fb7c4 100644 --- a/test/wantlist.spec.js +++ b/test/wantlist.spec.js @@ -3,7 +3,6 @@ const expect = require('chai').expect const Block = require('ipfs-block') -const mh = require('multihashes') const map = require('async/map') const Wantlist = require('../src/wantlist') @@ -101,7 +100,7 @@ describe('Wantlist', () => { expect( Array.from(wm.entries()) ).to.be.eql([ - [mh.toB58String(key), new Wantlist.Entry(key, 2)] + [key.toString(), new Wantlist.Entry(key, 2)] ]) done() }) @@ -119,8 +118,8 @@ describe('Wantlist', () => { expect( Array.from(wm.sortedEntries()) ).to.be.eql([ - [mh.toB58String(keys[0]), new Wantlist.Entry(keys[0], 1)], - [mh.toB58String(keys[1]), new Wantlist.Entry(keys[1], 1)] + [keys[0].toString(), new Wantlist.Entry(keys[0], 1)], + [keys[1].toString(), new Wantlist.Entry(keys[1], 1)] ]) done() }) diff --git a/test/wantmanager/index.spec.js b/test/wantmanager/index.spec.js index 0f5a52bd..57ac69fd 100644 --- a/test/wantmanager/index.spec.js +++ b/test/wantmanager/index.spec.js @@ -58,12 +58,12 @@ describe('Wantmanager', () => { wm.connected(peer2) series([ - (cb) => setTimeout(cb, 100), + (cb) => setTimeout(cb, 200), (cb) => { wm.cancelWants([new Buffer('world')]) cb() }, - (cb) => setTimeout(cb, 100) + (cb) => setTimeout(cb, 200) ], (err) => { expect(err).to.not.exist wm.wantBlocks([new Buffer('foo')]) diff --git a/test/wantmanager/msg-queue.spec.js b/test/wantmanager/msg-queue.spec.js index 7edace3e..a1596139 100644 --- a/test/wantmanager/msg-queue.spec.js +++ b/test/wantmanager/msg-queue.spec.js @@ -23,26 +23,24 @@ describe('MsgQueue', () => { let i = 0 const finish = () => { i++ - if (i === 3) { + if (i === 2) { expect( connects ).to.be.eql([ - id, id, id + id, id ]) const m1 = new Message(false) m1.addEntry(new Buffer('hello'), 1) m1.addEntry(new Buffer('world'), 2) - const m2 = new Message(false) - m2.cancel(new Buffer('foo')) - m2.cancel(new Buffer('bar')) + m1.cancel(new Buffer('foo')) + m1.cancel(new Buffer('bar')) expect( messages ).to.be.eql([ - [id, m1], - [id, m2], - [id, msg] + [id, msg], + [id, m1] ]) done() @@ -74,7 +72,6 @@ describe('MsgQueue', () => { new Message.Entry(new Buffer('bar'), 2, true) ] - mq.run() mq.addEntries(batch1) mq.addEntries(batch2) mq.addMessage(msg)