Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
refactor(async): Query (#120)
Browse files Browse the repository at this point in the history
  • Loading branch information
kumavis authored and vasco-santos committed May 23, 2019
1 parent e493987 commit ce15ec3
Show file tree
Hide file tree
Showing 10 changed files with 272 additions and 295 deletions.
7 changes: 4 additions & 3 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const timeout = require('async/timeout')
const PeerId = require('peer-id')
const PeerInfo = require('peer-info')
const crypto = require('libp2p-crypto')
const promiseToCallback = require('promise-to-callback')

const errcode = require('err-code')

Expand Down Expand Up @@ -376,7 +377,7 @@ class KadDHT extends EventEmitter {

// run our query
timeout((_cb) => {
query.run(rtp, _cb)
promiseToCallback(query.run(rtp))(_cb)
}, options.timeout)((err, res) => {
query.stop()
cb(err, res)
Expand Down Expand Up @@ -438,7 +439,7 @@ class KadDHT extends EventEmitter {
}
})

q.run(tablePeers, (err, res) => {
promiseToCallback(q.run(tablePeers))((err, res) => {
if (err) {
return callback(err)
}
Expand Down Expand Up @@ -673,7 +674,7 @@ class KadDHT extends EventEmitter {
})

timeout((_cb) => {
query.run(peers, _cb)
promiseToCallback(query.run(peers))(_cb)
}, options.timeout)((err, res) => {
query.stop()
cb(err, res)
Expand Down
53 changes: 18 additions & 35 deletions src/peer-queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const Heap = require('heap')
const distance = require('xor-distance')
const debug = require('debug')
const promisify = require('promisify-es6')

const utils = require('./utils')

Expand All @@ -17,34 +18,22 @@ class PeerQueue {
* Create from a given peer id.
*
* @param {PeerId} id
* @param {function(Error, PeerQueue)} callback
* @returns {void}
* @returns {Promise<PeerQueue>}
*/
static fromPeerId (id, callback) {
utils.convertPeerId(id, (err, key) => {
if (err) {
return callback(err)
}

callback(null, new PeerQueue(key))
})
static async fromPeerId (id) {
const key = await promisify(cb => utils.convertPeerId(id, cb))()
return new PeerQueue(key)
}

/**
* Create from a given buffer.
*
* @param {Buffer} key
* @param {function(Error, PeerQueue)} callback
* @returns {void}
* @param {Buffer} keyBuffer
* @returns {Promise<PeerQueue>}
*/
static fromKey (key, callback) {
utils.convertBuffer(key, (err, key) => {
if (err) {
return callback(err)
}

callback(null, new PeerQueue(key))
})
static async fromKey (keyBuffer) {
const key = await promisify(cb => utils.convertBuffer(keyBuffer, cb))()
return new PeerQueue(key)
}

/**
Expand All @@ -62,24 +51,18 @@ class PeerQueue {
* Add a new PeerId to the queue.
*
* @param {PeerId} id
* @param {function(Error)} callback
* @returns {void}
* @returns {Promise}
*/
enqueue (id, callback) {
async enqueue (id) {
log('enqueue %s', id.toB58String())
utils.convertPeerId(id, (err, key) => {
if (err) {
return callback(err)
}
const key = await promisify(cb => utils.convertPeerId(id, cb))()

const el = {
id: id,
distance: distance(this.from, key)
}
const el = {
id: id,
distance: distance(this.from, key)
}

this.heap.push(el)
callback()
})
this.heap.push(el)
}

/**
Expand Down
4 changes: 3 additions & 1 deletion src/private.js
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,9 @@ module.exports = (dht) => ({
const peers = dht.routingTable.closestPeers(key.buffer, dht.kBucketSize)

try {
await promisify(callback => timeout((cb) => query.run(peers, cb), providerTimeout)(callback))()
await promisify(callback => timeout((cb) => {
promiseToCallback(query.run(peers))(cb)
}, providerTimeout)(callback))()
} catch (err) {
if (err.code !== 'ETIMEDOUT' || out.length === 0) {
throw err
Expand Down
11 changes: 5 additions & 6 deletions src/query/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,26 +50,25 @@ class Query {
* Run this query, start with the given list of peers first.
*
* @param {Array<PeerId>} peers
* @param {function(Error, Object)} callback
* @returns {void}
* @returns {Promise}
*/
run (peers, callback) {
async run (peers) {
if (!this.dht._queryManager.running) {
this._log.error('Attempt to run query after shutdown')
return callback(null, { finalSet: new Set(), paths: [] })
return { finalSet: new Set(), paths: [] }
}

if (peers.length === 0) {
this._log.error('Running query with no peers')
return callback(null, { finalSet: new Set(), paths: [] })
return { finalSet: new Set(), paths: [] }
}

this._run = new Run(this)

this._log(`query running with K=${this.dht.kBucketSize}, A=${this.dht.concurrency}, D=${Math.min(this.dht.disjointPaths, peers.length)}`)
this._run.once('start', this._onStart)
this._run.once('complete', this._onComplete)
this._run.execute(peers, callback)
return this._run.execute(peers)
}

/**
Expand Down
40 changes: 16 additions & 24 deletions src/query/path.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
'use strict'

const each = require('async/each')
const timeout = require('async/timeout')
const waterfall = require('async/waterfall')
const promisify = require('promisify-es6')
const PeerQueue = require('../peer-queue')

// TODO: Temporary until parallel dial in Switch have a proper
Expand All @@ -24,6 +23,7 @@ class Path {
constructor (run, queryFunc) {
this.run = run
this.queryFunc = timeout(queryFunc, QUERY_FUNC_TIMEOUT)
this.queryFuncAsync = promisify(this.queryFunc)

/**
* @type {Array<PeerId>}
Expand All @@ -48,45 +48,37 @@ class Path {
/**
* Execute the path.
*
* @param {function(Error)} callback
* @returns {Promise}
*
*/
execute (callback) {
waterfall([
// Create a queue of peers ordered by distance from the key
(cb) => PeerQueue.fromKey(this.run.query.key, cb),
// Add initial peers to the queue
(q, cb) => {
this.peersToQuery = q
each(this.initialPeers, this.addPeerToQuery.bind(this), cb)
},
// Start processing the queue
(cb) => {
this.run.workerQueue(this, cb)
}
], callback)
async execute () {
// Create a queue of peers ordered by distance from the key
const queue = await PeerQueue.fromKey(this.run.query.key)
// Add initial peers to the queue
this.peersToQuery = queue
await Promise.all(this.initialPeers.map(peer => this.addPeerToQuery(peer)))
await this.run.workerQueue(this)
}

/**
* Add a peer to the peers to be queried.
*
* @param {PeerId} peer
* @param {function(Error)} callback
* @returns {void}
* @private
* @returns {Promise<void>}
*/
addPeerToQuery (peer, callback) {
async addPeerToQuery (peer) {
// Don't add self
if (this.run.query.dht._isSelf(peer)) {
return callback()
return
}

// The paths must be disjoint, meaning that no two paths in the Query may
// traverse the same peer
if (this.run.peersSeen.has(peer)) {
return callback()
return
}

this.peersToQuery.enqueue(peer, callback)
await this.peersToQuery.enqueue(peer)
}
}

Expand Down
Loading

0 comments on commit ce15ec3

Please sign in to comment.