Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

fix: random walk #104

Merged
merged 9 commits into from
Apr 22, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
},
"homepage": "https://github.com/libp2p/js-libp2p-kad-dht",
"dependencies": {
"abort-controller": "^3.0.0",
"async": "^2.6.2",
"base32.js": "~0.1.0",
"chai-checkmark": "^1.0.1",
Expand Down
8 changes: 4 additions & 4 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,10 @@ class KadDHT extends EventEmitter {
*/
stop (callback) {
this._running = false
this.randomWalk.stop(() => { // guarantee that random walk is stopped if it was started
this.providers.stop()
this.network.stop(callback)
})
this.randomWalk.stop()
this.providers.stop()
this._queryManager.stop()
this.network.stop(callback)
}

/**
Expand Down Expand Up @@ -581,6 +580,7 @@ class KadDHT extends EventEmitter {
* @param {PeerId} id
* @param {Object} options - findPeer options
* @param {number} options.timeout - how long the query should maximally run, in milliseconds (default: 60000)
* @param {AbortControllerSignal} options.signal - an `AbortController` signal
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
* @param {function(Error, PeerInfo)} callback
* @returns {void}
*/
Expand Down
59 changes: 34 additions & 25 deletions src/random-walk.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
const times = require('async/times')
const crypto = require('libp2p-crypto')
const waterfall = require('async/waterfall')
const timeout = require('async/timeout')
const multihashing = require('multihashing-async')
const PeerId = require('peer-id')
const assert = require('assert')
const c = require('./constants')
const { logger } = require('./utils')
const AbortController = require('abort-controller')

const errcode = require('err-code')

Expand Down Expand Up @@ -52,23 +52,18 @@ class RandomWalk {
runningHandle._timeoutId = null
dirkmc marked this conversation as resolved.
Show resolved Hide resolved

walk((nextPeriod) => {
// Was walk cancelled while fn was being called?
if (runningHandle._onCancel) {
return runningHandle._onCancel()
}
// Schedule next
runningHandle.runPeriodically(walk, nextPeriod)
})
}, period)
},
cancel: (cb) => {
cancel: () => {
// Not currently running, can callback immediately
if (runningHandle._timeoutId) {
clearTimeout(runningHandle._timeoutId)
return cb()
return
}
// Wait to finish and then call callback
runningHandle._onCancel = cb
this._controller.abort()
jacobheun marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -85,20 +80,20 @@ class RandomWalk {
}

/**
* Stop the random-walk process.
* @param {function(Error)} callback
* Stop the random-walk process. Any active
* queries will be aborted.
*
* @returns {void}
*/
stop (callback) {
stop () {
const runningHandle = this._runningHandle

if (!runningHandle) {
return callback()
return
}

this._runningHandle = null
runningHandle.cancel(callback)
runningHandle.cancel()
}

/**
Expand All @@ -113,39 +108,53 @@ class RandomWalk {
*/
_walk (queries, walkTimeout, callback) {
this.log('start')
this._controller = new AbortController()

times(queries, (i, next) => {
this.log('running query %s', i)

times(queries, (i, cb) => {
// Perform the walk
waterfall([
(cb) => this._randomPeerId(cb),
(id, cb) => timeout((cb) => {
this._query(id, cb)
}, walkTimeout)(cb)
(id, cb) => this._query(id, {
timeout: walkTimeout,
signal: this._controller.signal
}, cb)
], (err) => {
if (err) {
if (err && err.code !== 'ETIMEDOUT') {
this.log.error('query finished with error', err)
return callback(err)
return next(err)
}

this.log('done')
callback(null)
this.log('finished query')
next(null)
})
}, (err) => {
if (err) {
this.log.error(err)
}

this.log('finished queries')
callback(err)
})
}

/**
* The query run during a random walk request.
*
* @param {PeerId} id
* @param {object} options
* @param {number} options.timeout
jacobheun marked this conversation as resolved.
Show resolved Hide resolved
* @param {function(Error)} callback
* @returns {void}
*
* @private
*/
_query (id, callback) {
_query (id, options, callback) {
this.log('query:%s', id.toB58String())

this._kadDHT.findPeer(id, (err, peer) => {
if (err.code === 'ERR_NOT_FOUND') {
this._kadDHT.findPeer(id, options, (err, peer) => {
if (err && err.code === 'ERR_NOT_FOUND') {
// expected case, we asked for random stuff after all
return callback()
}
Expand Down
2 changes: 1 addition & 1 deletion test/kad-dht.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ describe('KadDHT', () => {
})

it('random-walk', function (done) {
this.timeout(10 * 1000)
this.timeout(20 * 1000)

const nDHTs = 20
const tdht = new TestDHT()
Expand Down
215 changes: 215 additions & 0 deletions test/random-walk.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
/* eslint-env mocha */
'use strict'

const chai = require('chai')
chai.use(require('dirty-chai'))
chai.use(require('chai-checkmark'))
const expect = chai.expect
const sinon = require('sinon')
const RandomWalk = require('../src/random-walk')
const { defaultRandomWalk } = require('../src/constants')
const { AssertionError } = require('assert')

describe('Random Walk', () => {
let mockDHT = {
peerInfo: {
id: {
toB58String: () => 'QmRLoXS3E73psYaUsma1VSbboTa2J8Z9kso1tpiGLk9WQ4'
}
},
findPeer: () => {}
}

afterEach(() => {
sinon.restore()
})

describe('configuration', () => {
it('should use require a dht', () => {
expect(() => new RandomWalk()).to.throw(AssertionError)
})

it('should use defaults', () => {
const randomWalk = new RandomWalk(mockDHT)
expect(randomWalk._options).to.eql(defaultRandomWalk)
})

it('should be able to set options', () => {
const options = {
enabled: false,
queriesPerPeriod: 2,
interval: 300e3,
timeout: 30e3,
delay: 1e3
}

const randomWalk = new RandomWalk(mockDHT, options)
expect(randomWalk._options).to.eql(options)
})
})

describe('walk', () => {
let randomWalk
before(() => {
randomWalk = new RandomWalk(mockDHT)
})

it('should be able to specify the number of queries', (done) => {
const queries = 5
sinon.stub(randomWalk, '_query').callsArgWith(2, null)
randomWalk._walk(queries, 1e3, (err) => {
expect(err).to.not.exist()
expect(randomWalk._query.callCount).to.eql(queries)
done()
})
})

it('should stop walking if a query errors', (done) => {
const queries = 5
const error = new Error('ERR_BOOM')
const _queryStub = sinon.stub(randomWalk, '_query')
_queryStub.onCall(2).callsArgWith(2, error)
_queryStub.callsArgWith(2, null)

randomWalk._walk(queries, 1e3, (err) => {
expect(err).to.eql(error)
// 2 successes and error on the 3rd
expect(randomWalk._query.callCount).to.eql(3)
done()
})
})

it('should ignore timeout errors and keep walking', (done) => {
const queries = 5
const _queryStub = sinon.stub(randomWalk, '_query')
_queryStub.onCall(2).callsArgWith(2, {
code: 'ETIMEDOUT'
})
_queryStub.callsArgWith(2, null)

randomWalk._walk(queries, 1e3, (err) => {
expect(err).to.not.exist()
expect(randomWalk._query.callCount).to.eql(queries)
done()
})
})

it('should pass its timeout to the find peer query', (done) => {
sinon.stub(randomWalk._kadDHT, 'findPeer').callsArgWith(2, { code: 'ERR_NOT_FOUND' })

randomWalk._walk(1, 111, (err) => {
const mockCalls = randomWalk._kadDHT.findPeer.getCalls()
expect(err).to.not.exist()
expect(mockCalls).to.have.length(1)
expect(mockCalls[0].args[1]).to.include({
timeout: 111
})
done()
})
})

it('should error if the random id peer is found', (done) => {
const queries = 5
const findPeerStub = sinon.stub(randomWalk._kadDHT, 'findPeer').callsArgWith(2, { code: 'ERR_NOT_FOUND' })
findPeerStub.onCall(2).callsArgWith(2, null, {
id: 'QmB'
})

randomWalk._walk(queries, 1e3, (err) => {
expect(err).to.exist()
expect(findPeerStub.callCount).to.eql(3)
done()
})
})

it('should error if random id generation errors', (done) => {
const error = new Error('ERR_BOOM')
sinon.stub(randomWalk, '_randomPeerId').callsArgWith(0, error)
randomWalk._walk(1, 1e3, (err) => {
expect(err).to.eql(error)
done()
})
})
})

describe('start', () => {
it('should not start if it is running', () => {
const randomWalk = new RandomWalk(mockDHT, {
enabled: true,
delay: 0,
interval: 100
})
randomWalk._running = true
randomWalk.start()
expect(randomWalk._runningHandle).to.not.exist()
})

it('should not start if it is not enabled', () => {
const randomWalk = new RandomWalk(mockDHT, {
enabled: false
})
randomWalk._running = true
randomWalk.start()
expect(randomWalk._runningHandle).to.not.exist()
})

it('should start if not running and enabled', (done) => {
const options = {
enabled: true,
delay: 0,
timeout: 3e3,
queriesPerPeriod: 1
}
const randomWalk = new RandomWalk(mockDHT, options)
sinon.stub(randomWalk, '_walk').callsFake((queries, timeout) => {
expect(queries).to.eql(options.queriesPerPeriod)
expect(timeout).to.eql(options.timeout)
done()
})
randomWalk.start()
})
})

describe('stop', () => {
it('should not throw if already stopped', () => {
const randomWalk = new RandomWalk(mockDHT, {
enabled: true,
delay: 0,
interval: 100
})

expect(() => randomWalk.stop()).to.not.throw()
})

it('should cancel the timer if the walk is not active', () => {
const randomWalk = new RandomWalk(mockDHT, {
enabled: true,
delay: 100e3,
interval: 100e3
})
randomWalk.start()
expect(randomWalk._runningHandle._timeoutId).to.exist()
randomWalk.stop()
expect(randomWalk._runningHandle).to.not.exist()
})

it('should cancel the walk if already running', (done) => {
const randomWalk = new RandomWalk(mockDHT, {
enabled: true,
delay: 0,
timeout: 100e3,
interval: 100e3
})
sinon.stub(randomWalk._kadDHT, 'findPeer').callsFake((id, options) => {
options.signal.addEventListener('abort', () => {
expect(randomWalk._runningHandle).to.not.exist()
options.signal.removeEventListener('abort')
done()
})
randomWalk.stop()
})

randomWalk.start()
})
})
})