Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Async Refactor: random-walk #134

Merged
merged 8 commits into from
Jun 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
"multihashes": "~0.4.14",
"multihashing-async": "~0.5.2",
kumavis marked this conversation as resolved.
Show resolved Hide resolved
"p-queue": "^5.0.0",
"p-times": "^2.1.0",
"peer-id": "~0.12.2",
"peer-info": "~0.15.1",
"priorityqueue": "~0.2.1",
Expand Down
124 changes: 60 additions & 64 deletions src/random-walk.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
'use strict'

const times = require('async/times')
const promisify = require('promisify-es6')
const crypto = require('libp2p-crypto')
const waterfall = require('async/waterfall')
const multihashing = require('multihashing-async')
const multihashing = promisify(require('multihashing-async'))
kumavis marked this conversation as resolved.
Show resolved Hide resolved
const PeerId = require('peer-id')
const assert = require('assert')
const c = require('./constants')
const { logger } = require('./utils')
const AbortController = require('abort-controller')

const errcode = require('err-code')
const times = require('p-times')
const c = require('./constants')
const { logger } = require('./utils')

class RandomWalk {
/**
Expand All @@ -29,6 +28,7 @@ class RandomWalk {
this._options = { ...c.defaultRandomWalk, ...options }
this._kadDHT = dht
this.log = logger(dht.peerInfo.id, 'random-walk')
this._timeoutId = undefined
}

/**
Expand All @@ -45,10 +45,7 @@ class RandomWalk {
// Start doing random walks after `this._options.delay`
this._timeoutId = setTimeout(() => {
// Start runner immediately
this._runPeriodically((done) => {
// Each subsequent walk should run on a `this._options.interval` interval
this._walk(this._options.queriesPerPeriod, this._options.timeout, () => done(this._options.interval))
}, 0)
this._runPeriodically()
}, this._options.delay)
}

Expand All @@ -59,70 +56,72 @@ class RandomWalk {
* @returns {void}
*/
stop () {
clearTimeout(this._timeoutId)
this._timeoutId = null
if (this._timeoutId) {
clearTimeout(this._timeoutId)
this._timeoutId = undefined
}
this._controller && this._controller.abort()
}

/**
* Run function `walk` on every `interval` ms
* @param {function(callback)} walk The function to execute on `interval`
* @param {number} interval The interval to run on in ms
* Run function `randomWalk._walk` on every `options.interval` ms
*
* @private
*/
_runPeriodically (walk, interval) {
this._timeoutId = setTimeout(() => {
walk((nextInterval) => {
// Schedule next
this._runPeriodically(walk, nextInterval)
async _runPeriodically () {
// run until the walk has been stopped
while (this._timeoutId) {
try {
await this._walk(this._options.queriesPerPeriod, this._options.timeout)
} catch (err) {
this._kadDHT._log.error('random-walk:error', err)
}
// Each subsequent walk should run on a `this._options.interval` interval
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be checking this._timeoutId has not been cleared after the walk has completed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point! We can check and not do the timeout if needed.

await new Promise(resolve => {
this._timeoutId = setTimeout(resolve, this._options.interval)
})
}, interval)
}
}

/**
* Do the random walk work.
*
* @param {number} queries
* @param {number} walkTimeout
* @param {function(Error)} callback
* @returns {void}
* @returns {Promise}
*
* @private
*/
_walk (queries, walkTimeout, callback) {
async _walk (queries, walkTimeout) {
this.log('start')
this._controller = new AbortController()

times(queries, (i, next) => {
this.log('running query %d', i)
try {
await times(queries, async (index) => {
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
this.log('running query %d', index)
try {
const id = await this._randomPeerId()

// Perform the walk
waterfall([
(cb) => this._randomPeerId(cb),
(id, cb) => {
// Check if we've happened to already abort
if (!this._controller) return cb()
if (!this._controller) return

this._query(id, {
await this._query(id, {
timeout: walkTimeout,
signal: this._controller.signal
}, cb)
}
], (err) => {
if (err && err.code !== 'ETIMEDOUT') {
this.log.error('query %d finished with error', i, err)
return next(err)
})
} catch (err) {
if (err && err.code !== 'ETIMEDOUT') {
this.log.error('query %d finished with error', index, err)
throw err
}
}

this.log('finished query %d', i)
next(null)
this.log('finished query %d', index)
})
}, (err) => {
} finally {
this._controller = null
this.log('finished queries')
callback(err)
})
}
}

/**
Expand All @@ -139,44 +138,41 @@ class RandomWalk {
* @param {object} options
* @param {number} options.timeout
* @param {AbortControllerSignal} options.signal
* @param {function(Error)} callback
* @returns {void}
* @returns {Promise}
*
* @private
*/
_query (id, options, callback) {
async _query (id, options) {
this.log('query:%s', id.toB58String())

this._kadDHT.findPeer(id, options, (err, peer) => {
let peer
try {
peer = await promisify(cb => this._kadDHT.findPeer(id, options, cb))()
} catch (err) {
if (err && err.code === 'ERR_NOT_FOUND') {
// expected case, we asked for random stuff after all
return callback()
return
}
if (err) {
return callback(err)
}
this.log('query:found', peer)

// wait what, there was something found? Lucky day!
callback(errcode(new Error(`random-walk: ACTUALLY FOUND PEER: ${peer}, ${id.toB58String()}`), 'ERR_FOUND_RANDOM_PEER'))
})
throw err
}

this.log('query:found', peer)

// wait what, there was something found? Lucky day!
throw errcode(`random-walk: ACTUALLY FOUND PEER: ${peer}, ${id.toB58String()}`, 'ERR_FOUND_RANDOM_PEER')
}

/**
* Generate a random peer id for random-walk purposes.
*
* @param {function(Error, PeerId)} callback
* @returns {void}
* @returns {Promise<PeerId>}
*
* @private
*/
_randomPeerId (callback) {
multihashing(crypto.randomBytes(16), 'sha2-256', (err, digest) => {
if (err) {
return callback(err)
}
callback(null, new PeerId(digest))
})
async _randomPeerId () {
const digest = await multihashing(crypto.randomBytes(16), 'sha2-256')
return new PeerId(digest)
}
}

Expand Down
102 changes: 48 additions & 54 deletions test/random-walk.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,81 +54,75 @@ describe('Random Walk', () => {
randomWalk = new RandomWalk(mockDHT)
})

it('should be able to specify the number of queries', (done) => {
it('should be able to specify the number of queries', async () => {
const queries = 5
sinon.stub(randomWalk, '_query').callsArgWith(2, null)
randomWalk._walk(queries, 1e3, (err) => {
expect(err).to.not.exist()
expect(randomWalk._query.callCount).to.eql(queries)
done()
})
sinon.stub(randomWalk, '_query').resolves(null)
await randomWalk._walk(queries, 1e3)
expect(randomWalk._query.callCount).to.eql(queries)
})

it('should stop walking if a query errors', (done) => {
it('should NOT stop walking if a query errors', async () => {
const queries = 5
const error = new Error('ERR_BOOM')
const findPeerStub = sinon.stub(randomWalk._kadDHT, 'findPeer')
findPeerStub.onCall(2).callsArgWith(2, error)
findPeerStub.callsArgWith(2, { code: 'ERR_NOT_FOUND' })

randomWalk._walk(queries, 1e3, (err) => {
expect(err).to.eql(error)
// 2 successes and error on the 3rd
expect(findPeerStub.callCount).to.eql(3)
done()
})
let err
try {
await randomWalk._walk(queries, 1e3)
} catch (_err) {
err = _err
}
expect(err.message).to.include('ERR_BOOM')
expect(findPeerStub.callCount).to.eql(5)
})

it('should ignore timeout errors and keep walking', (done) => {
it('should ignore timeout errors and keep walking', async () => {
const queries = 5
const _queryStub = sinon.stub(randomWalk, '_query')
_queryStub.onCall(2).callsArgWith(2, {
code: 'ETIMEDOUT'
})
_queryStub.callsArgWith(2, null)
_queryStub.onCall(2).rejects({ code: 'ETIMEDOUT' })
_queryStub.resolves(null)

randomWalk._walk(queries, 1e3, (err) => {
expect(err).to.not.exist()
expect(randomWalk._query.callCount).to.eql(queries)
done()
})
await randomWalk._walk(queries, 1e3)
expect(randomWalk._query.callCount).to.eql(queries)
})

it('should pass its timeout to the find peer query', (done) => {
it('should pass its timeout to the find peer query', async () => {
sinon.stub(randomWalk._kadDHT, 'findPeer').callsArgWith(2, { code: 'ERR_NOT_FOUND' })

randomWalk._walk(1, 111, (err) => {
const mockCalls = randomWalk._kadDHT.findPeer.getCalls()
expect(err).to.not.exist()
expect(mockCalls).to.have.length(1)
expect(mockCalls[0].args[1]).to.include({
timeout: 111
})
done()
})
await randomWalk._walk(1, 111)
const mockCalls = randomWalk._kadDHT.findPeer.getCalls()
expect(mockCalls).to.have.length(1)
expect(mockCalls[0].args[1]).to.include({ timeout: 111 })
})

it('should error if the random id peer is found', (done) => {
it('should error if the random id peer is found', async () => {
const queries = 5
const findPeerStub = sinon.stub(randomWalk._kadDHT, 'findPeer').callsArgWith(2, { code: 'ERR_NOT_FOUND' })
findPeerStub.onCall(2).callsArgWith(2, null, {
id: 'QmB'
})
findPeerStub.onCall(2).callsArgWith(2, null, { id: 'QmB' })

randomWalk._walk(queries, 1e3, (err) => {
expect(err).to.exist()
expect(findPeerStub.callCount).to.eql(3)
done()
})
let err
try {
await randomWalk._walk(queries, 1e3)
} catch (_err) {
err = _err
}

expect(err).to.exist()
expect(findPeerStub.callCount).to.eql(5)
})

it('should error if random id generation errors', (done) => {
it('should error if random id generation errors', async () => {
const error = new Error('ERR_BOOM')
sinon.stub(randomWalk, '_randomPeerId').callsArgWith(0, error)
randomWalk._walk(1, 1e3, (err) => {
expect(err).to.eql(error)
done()
})
sinon.stub(randomWalk, '_randomPeerId').rejects(error)
let err
try {
await randomWalk._walk(1, 1e3)
} catch (_err) {
err = _err
}
expect(err).to.eql(error)
})
})

Expand All @@ -141,7 +135,7 @@ describe('Random Walk', () => {
})
sinon.spy(randomWalk, '_runPeriodically')

sinon.stub(randomWalk, '_walk').callsFake(() => {
sinon.stub(randomWalk, '_walk').callsFake(async () => {
// Try to start again
randomWalk.start()

Expand Down Expand Up @@ -180,7 +174,7 @@ describe('Random Walk', () => {
queriesPerPeriod: 1
}
const randomWalk = new RandomWalk(mockDHT, options)
sinon.stub(randomWalk, '_walk').callsFake((queries, timeout) => {
sinon.stub(randomWalk, '_walk').callsFake(async (queries, timeout) => {
expect(queries).to.eql(options.queriesPerPeriod)
expect(timeout).to.eql(options.timeout)
done()
Expand All @@ -200,7 +194,7 @@ describe('Random Walk', () => {
const randomWalk = new RandomWalk(mockDHT, options)
sinon.stub(randomWalk._kadDHT, 'findPeer').callsFake((_, opts, callback) => {
expect(opts.timeout).to.eql(options.timeout).mark()
callback(error)
setTimeout(() => callback(error), 100)
})

expect(3).checks(() => {
Expand All @@ -224,7 +218,7 @@ describe('Random Walk', () => {
expect(() => randomWalk.stop()).to.not.throw()
})

it('should cancel the timer if the walk is not active', () => {
it('should not be running if the walk is not active', () => {
const randomWalk = new RandomWalk(mockDHT, {
enabled: true,
delay: 100e3,
Expand All @@ -233,7 +227,7 @@ describe('Random Walk', () => {
randomWalk.start()
expect(randomWalk._timeoutId).to.exist()
randomWalk.stop()
expect(randomWalk._timeoutId).to.not.exist()
expect(randomWalk._timeoutId).to.eql(undefined)
})

it('should cancel the walk if already running', (done) => {
Expand Down