Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
refactor: use iterables instead of Promises where possible
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Mar 7, 2019
1 parent 15e939e commit a119f32
Show file tree
Hide file tree
Showing 22 changed files with 987 additions and 687 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
},
"homepage": "https://github.com/libp2p/js-libp2p-kad-dht",
"dependencies": {
"async": "^2.6.2",
"base32.js": "~0.1.0",
"chai-checkmark": "^1.0.1",
"cids": "~0.5.7",
Expand All @@ -58,6 +57,7 @@
"protons": "^1.0.1",
"pull-length-prefixed": "^1.3.1",
"pull-stream": "^3.6.9",
"streaming-iterables": "^3.5.0",
"varint": "^5.0.0",
"xor-distance": "^1.0.0"
},
Expand Down
3 changes: 3 additions & 0 deletions src/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ exports.PROVIDERS_CLEANUP_INTERVAL = hour

exports.READ_MESSAGE_TIMEOUT = minute

// The number of records that will be retrieved on a call to getMany()
exports.GET_MANY_RECORD_COUNT = 16

// K is the maximum number of requests to perform before returning failue
exports.K = 20

Expand Down
111 changes: 61 additions & 50 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const MemoryStore = require('interface-datastore').MemoryDatastore
const PeerId = require('peer-id')
const PeerInfo = require('peer-info')
const crypto = require('libp2p-crypto')
const { collect } = require('streaming-iterables')

const errcode = require('err-code')

Expand Down Expand Up @@ -213,18 +214,19 @@ class KadDHT extends EventEmitter {
* @param {number} nvals
* @param {Object} options - get options
* @param {number} options.timeout - optional timeout (default: 60000)
* @returns {Promise<Array<{from: PeerId, val: Buffer}>>}
* @returns {AsyncIterator<{from: PeerId, val: Buffer}>}
*/
async getMany (key, nvals, options = {}) {
async * getMany (key, nvals, options = {}) {
if (!options.maxTimeout && !options.timeout) {
options.timeout = c.minute // default
} else if (options.maxTimeout && !options.timeout) { // TODO this will be deprecated in a next release
options.timeout = options.maxTimeout
}

this._log('getMany %b (%s)', key, nvals)
let vals = []
let valCount = 0

// First check the local store
let localRec, err
try {
localRec = await this._getLocal(key)
Expand All @@ -237,19 +239,23 @@ class KadDHT extends EventEmitter {
}

if (!err) {
vals.push({
valCount++
yield {
val: localRec.value,
from: this.peerInfo.id
})
}
}

if (nvals <= 1) {
return vals
// Already have enough values
if (valCount >= nvals) {
return
}

const paths = []
// Not enough values yet, let's go out to the swarm
const id = await utils.convertBuffer(key)

// As a starting list of peers, get the closest ALPHA peers to the key that
// we know about from the routing table
const rtp = this.routingTable.closestPeers(id, c.ALPHA)

this._log('peers in rt: %d', rtp.length)
Expand All @@ -260,19 +266,23 @@ class KadDHT extends EventEmitter {
throw errcode(errMsg, 'ERR_NO_PEERS_IN_ROUTING_TABLE')
}

// we have peers, lets do the actual query to them
// We have peers, lets do the actual query to them
const startingValCount = valCount
const query = new Query(this, key, (pathIndex, numPaths) => {
// This function body runs once per disjoint path
const pathSize = utils.pathSize(nvals - vals.length, numPaths)
const pathVals = []
paths.push(pathVals)
// This function body runs once per disjoint path.
// Note: For S/Kademlia, We need to get peers from nvals disjoint paths
// (not just nvals different peers)
// eg 20 values from 8 paths = Math.ceiling(20 / 8) = 3 peers per path
const pathSize = utils.pathSize(nvals - startingValCount, numPaths)
let pathVals = 0

// Here we return the query function to use on this particular disjoint path
return async (peer) => {
let valueOrPeers
let valueOrPeers, err
try {
valueOrPeers = await this._getValueOrPeers(peer, key)
} catch (err) {
} catch (e) {
err = e
// If we have an invalid record we just want to continue and fetch a new one.
if (err.code !== 'ERR_INVALID_RECORD') {
throw err
Expand All @@ -282,33 +292,35 @@ class KadDHT extends EventEmitter {

const res = { closerPeers: peers }

// Note: An invalid record still counts as a retrieved record
if ((record && record.value) || (err && err.code === 'ERR_INVALID_RECORD')) {
pathVals.push({
pathVals++
res.value = {
val: record && record.value,
from: peer
})
}
}

// enough is enough
if (pathVals.length >= pathSize) {
// We have enough values for this path so we're done
if (pathVals >= pathSize) {
res.success = true
}

return res
}
})

// run our query
await query.run(rtp, options.timeout)

// combine vals from each path
vals = [].concat.apply(vals, paths).slice(0, nvals)

if (err && vals.length === 0) {
throw err
for await (const res of query.run(rtp, options.timeout)) {
if ((res || {}).value) {
valCount++
yield res.value
}
if (valCount >= nvals) {
query.stop()
return
}
}

return vals
query.stop()
}

/**
Expand All @@ -335,13 +347,14 @@ class KadDHT extends EventEmitter {
}
})

const res = await q.run(tablePeers)
const results = await collect(q.run(tablePeers))
const peers = (results[0] || {}).peersSeen

if (!res || !res.finalSet) {
if (!(peers || {}).length) {
return []
}

const sorted = await utils.sortClosestPeers(Array.from(res.finalSet), id)
const sorted = await utils.sortClosestPeers(peers, id)
return sorted.slice(0, c.K)
}

Expand Down Expand Up @@ -417,9 +430,10 @@ class KadDHT extends EventEmitter {
async provide (key) {
this._log('provide: %s', key.toBaseEncodedString())

await this.providers.addProvider(key, this.peerInfo.id)

const peers = await this.getClosestPeers(key.buffer)
const [, peers] = await Promise.all([
this.providers.addProvider(key, this.peerInfo.id),
this.getClosestPeers(key.buffer)
])

const msg = new Message(Message.TYPES.ADD_PROVIDER, key.buffer, 0)
msg.providerPeers = peers.map((p) => new PeerInfo(p))
Expand All @@ -437,9 +451,9 @@ class KadDHT extends EventEmitter {
* @param {Object} options - findProviders options
* @param {number} options.timeout - how long the query should maximally run, in milliseconds (default: 60000)
* @param {number} options.maxNumProviders - maximum number of providers to find
* @returns {Promise<Array<PeerInfo>>}
* @returns {AsyncIterator<PeerInfo>}
*/
findProviders (key, options = {}) {
async * findProviders (key, options = {}) {
if (!options.maxTimeout && !options.timeout) {
options.timeout = c.minute // default
} else if (options.maxTimeout && !options.timeout) { // TODO this will be deprecated in a next release
Expand All @@ -449,7 +463,7 @@ class KadDHT extends EventEmitter {
options.maxNumProviders = options.maxNumProviders || c.K

this._log('findProviders %s', key.toBaseEncodedString())
return this._findNProviders(key, options.timeout, options.maxNumProviders)
return yield * this._findNProviders(key, options.timeout, options.maxNumProviders)
}

// ----------- Peer Routing
Expand All @@ -471,12 +485,12 @@ class KadDHT extends EventEmitter {

this._log('findPeer %s', id.toB58String())

const pi = await this.findPeerLocal(id)
const peerId = await this.findPeerLocal(id)

// already got it
if (pi != null) {
if (peerId != null) {
this._log('found local')
return pi
return peerId
}

const key = await utils.convertPeerId(id)
Expand Down Expand Up @@ -507,7 +521,7 @@ class KadDHT extends EventEmitter {
// found it
if (match) {
return {
peer: match,
value: match,
success: true
}
}
Expand All @@ -518,15 +532,12 @@ class KadDHT extends EventEmitter {
}
})

const result = await query.run(peers, options.timeout)
const results = await collect(query.run(peers, options.timeout))

let success = false
result.paths.forEach((res) => {
if (res.success) {
success = true
this.peerBook.put(res.peer)
}
})
const success = Boolean(results.length && results[0].value)
if (success) {
this.peerBook.put(results[0].value)
}

this._log('findPeer %s: %s', id.toB58String(), success)
if (!success) {
Expand Down
33 changes: 0 additions & 33 deletions src/limited-peer-list.js

This file was deleted.

33 changes: 33 additions & 0 deletions src/limited-peer-set.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
'use strict'

const PeerSet = require('./peer-set')

/**
* Like PeerSet but with a size restriction.
*/
class LimitedPeerSet extends PeerSet {
/**
* Create a new limited peer set.
*
* @param {number} limit
*/
constructor (limit) {
super()
this.limit = limit
}

/**
* Add a PeerInfo if it fits in the set
*
* @param {PeerInfo} info
* @returns {bool}
*/
add (info) {
if (this.size < this.limit) {
return super.add(info)
}
return false
}
}

module.exports = LimitedPeerSet
57 changes: 57 additions & 0 deletions src/peer-id-set.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
'use strict'

/**
* A set of unique peer ids.
*/
class PeerIdSet {
constructor () {
this.peers = new Map()
}

/**
* Add a new id. Returns `true` if it was a new one
*
* @param {PeerId} id
* @returns {bool}
*/
add (id) {
if (!id) {
return false
}
if (!this.has(id)) {
this.peers.set(id.toB58String(), id)
return true
}
return false
}

/**
* Check if this PeerId is already in here.
*
* @param {PeerId} id
* @returns {bool}
*/
has (id) {
return this.peers.has(id && id.toB58String())
}

/**
* Get the set as an array.
*
* @returns {Array<PeerId>}
*/
toArray () {
return [...this.peers.values()]
}

/**
* The size of the set
*
* @type {number}
*/
get size () {
return this.peers.size
}
}

module.exports = PeerIdSet
Loading

0 comments on commit a119f32

Please sign in to comment.