This repository has been archived by the owner on Jul 21, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 17
feat: compatibility with go-libp2p-mdns #80
Merged
Merged
Changes from all commits
Commits
Show all changes
17 commits
Select commit
Hold shift + click to select a range
8280e8d
feat: compatibility with go-libp2p-mdns
0320312
chore: appease linter
62562b8
fix: move async to dependencies
2483aba
fix: typo in comment
vasco-santos 655b4d7
refactor: pr feedback
alanshaw 7f6ac76
fix: respond directly to querier
alanshaw 324bbf7
fix: reemit the peer event from GoMulticastDNS
alanshaw 05ec9f0
refactor: add interval between queries
alanshaw 31369eb
chore: appease linter
alanshaw 3ff7250
fix: existing tests
0b39814
fix: option name
alanshaw 2b01368
fix: use async/nextTick
alanshaw 3aca133
test: add tests for querier and main compat class
15416c3
chore: remove .only
37fbfdd
test: add responder tests
a3683e7
fix: increase timeout for query on interval test
f8f9830
docs: document new compat option
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,7 @@ docs | |
**/*.log | ||
test/repo-tests* | ||
**/bundle.js | ||
.nyc_output | ||
|
||
# Logs | ||
logs | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
'use strict' | ||
|
||
exports.SERVICE_TAG = '_ipfs-discovery._udp' | ||
exports.SERVICE_TAG_LOCAL = `${exports.SERVICE_TAG}.local` | ||
exports.MULTICAST_IP = '224.0.0.251' | ||
exports.MULTICAST_PORT = 5353 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
'use strict' | ||
|
||
// Compatibility with Go libp2p MDNS | ||
|
||
const EE = require('events') | ||
const parallel = require('async/parallel') | ||
const Responder = require('./responder') | ||
const Querier = require('./querier') | ||
|
||
class GoMulticastDNS extends EE { | ||
constructor (peerInfo) { | ||
super() | ||
this._started = false | ||
this._peerInfo = peerInfo | ||
this._onPeer = this._onPeer.bind(this) | ||
} | ||
|
||
start (callback) { | ||
if (this._started) { | ||
return callback(new Error('MulticastDNS service is already started')) | ||
} | ||
|
||
this._started = true | ||
this._responder = new Responder(this._peerInfo) | ||
this._querier = new Querier(this._peerInfo.id) | ||
|
||
this._querier.on('peer', this._onPeer) | ||
|
||
parallel([ | ||
cb => this._responder.start(cb), | ||
cb => this._querier.start(cb) | ||
], callback) | ||
} | ||
|
||
_onPeer (peerInfo) { | ||
this.emit('peer', peerInfo) | ||
} | ||
|
||
stop (callback) { | ||
if (!this._started) { | ||
return callback(new Error('MulticastDNS service is not started')) | ||
} | ||
|
||
const responder = this._responder | ||
const querier = this._querier | ||
|
||
this._started = false | ||
this._responder = null | ||
this._querier = null | ||
|
||
querier.removeListener('peer', this._onPeer) | ||
|
||
parallel([ | ||
cb => responder.stop(cb), | ||
cb => querier.stop(cb) | ||
], callback) | ||
} | ||
} | ||
|
||
module.exports = GoMulticastDNS |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,176 @@ | ||
'use strict' | ||
|
||
const assert = require('assert') | ||
const EE = require('events') | ||
const MDNS = require('multicast-dns') | ||
const Multiaddr = require('multiaddr') | ||
const PeerInfo = require('peer-info') | ||
const PeerId = require('peer-id') | ||
const nextTick = require('async/nextTick') | ||
const log = require('debug')('libp2p:mdns:compat:querier') | ||
const { SERVICE_TAG_LOCAL, MULTICAST_IP, MULTICAST_PORT } = require('./constants') | ||
|
||
class Querier extends EE { | ||
constructor (peerId, options) { | ||
super() | ||
assert(peerId, 'missing peerId parameter') | ||
options = options || {} | ||
this._peerIdStr = peerId.toB58String() | ||
// Re-query every 60s, in leu of network change detection | ||
options.queryInterval = options.queryInterval || 60000 | ||
// Time for which the MDNS server will stay alive waiting for responses | ||
// Must be less than options.queryInterval! | ||
options.queryPeriod = Math.min( | ||
options.queryInterval, | ||
options.queryPeriod == null ? 5000 : options.queryPeriod | ||
) | ||
this._options = options | ||
this._onResponse = this._onResponse.bind(this) | ||
} | ||
|
||
start (callback) { | ||
this._handle = periodically(() => { | ||
// Create a querier that queries multicast but gets responses unicast | ||
const mdns = MDNS({ multicast: false, interface: '0.0.0.0', port: 0 }) | ||
|
||
mdns.on('response', this._onResponse) | ||
|
||
mdns.query({ | ||
id: nextId(), // id > 0 for unicast response | ||
questions: [{ name: SERVICE_TAG_LOCAL, type: 'PTR', class: 'IN' }] | ||
}, null, { | ||
address: MULTICAST_IP, | ||
port: MULTICAST_PORT | ||
}) | ||
|
||
return { | ||
stop: callback => { | ||
mdns.removeListener('response', this._onResponse) | ||
mdns.destroy(callback) | ||
} | ||
} | ||
}, { | ||
period: this._options.queryPeriod, | ||
interval: this._options.queryInterval | ||
}) | ||
|
||
nextTick(() => callback()) | ||
} | ||
|
||
_onResponse (event, info) { | ||
const answers = event.answers || [] | ||
const ptrRecord = answers.find(a => a.type === 'PTR' && a.name === SERVICE_TAG_LOCAL) | ||
|
||
// Only deal with responses for our service tag | ||
if (!ptrRecord) return | ||
|
||
log('got response', event, info) | ||
|
||
const txtRecord = answers.find(a => a.type === 'TXT') | ||
if (!txtRecord) return log('missing TXT record in response') | ||
|
||
let peerIdStr | ||
try { | ||
peerIdStr = txtRecord.data[0].toString() | ||
} catch (err) { | ||
return log('failed to extract peer ID from TXT record data', txtRecord, err) | ||
} | ||
|
||
if (this._peerIdStr === peerIdStr) { | ||
return log('ignoring reply to myself') | ||
} | ||
|
||
let peerId | ||
try { | ||
peerId = PeerId.createFromB58String(peerIdStr) | ||
} catch (err) { | ||
return log('failed to create peer ID from TXT record data', peerIdStr, err) | ||
} | ||
|
||
PeerInfo.create(peerId, (err, info) => { | ||
if (err) return log('failed to create peer info from peer ID', peerId, err) | ||
|
||
const srvRecord = answers.find(a => a.type === 'SRV') | ||
if (!srvRecord) return log('missing SRV record in response') | ||
|
||
log('peer found', peerIdStr) | ||
|
||
const { port } = srvRecord.data || {} | ||
const protos = { A: 'ip4', AAAA: 'ip6' } | ||
|
||
const multiaddrs = answers | ||
.filter(a => ['A', 'AAAA'].includes(a.type)) | ||
.reduce((addrs, a) => { | ||
const maStr = `/${protos[a.type]}/${a.data}/tcp/${port}` | ||
try { | ||
addrs.push(new Multiaddr(maStr)) | ||
log(maStr) | ||
} catch (err) { | ||
log(`failed to create multiaddr from ${a.type} record data`, maStr, port, err) | ||
} | ||
return addrs | ||
}, []) | ||
|
||
multiaddrs.forEach(addr => info.multiaddrs.add(addr)) | ||
this.emit('peer', info) | ||
}) | ||
} | ||
|
||
stop (callback) { | ||
this._handle.stop(callback) | ||
} | ||
} | ||
|
||
module.exports = Querier | ||
|
||
/** | ||
* Run `fn` for a certain period of time, and then wait for an interval before | ||
* running it again. `fn` must return an object with a stop function, which is | ||
* called when the period expires. | ||
* | ||
* @param {Function} fn function to run | ||
* @param {Object} [options] | ||
* @param {Object} [options.period] Period in ms to run the function for | ||
* @param {Object} [options.interval] Interval in ms between runs | ||
* @returns {Object} handle that can be used to stop execution | ||
*/ | ||
function periodically (fn, options) { | ||
let handle, timeoutId | ||
let stopped = false | ||
|
||
const reRun = () => { | ||
handle = fn() | ||
timeoutId = setTimeout(() => { | ||
handle.stop(err => { | ||
if (err) log(err) | ||
if (!stopped) { | ||
timeoutId = setTimeout(reRun, options.interval) | ||
} | ||
}) | ||
handle = null | ||
}, options.period) | ||
} | ||
|
||
reRun() | ||
|
||
return { | ||
stop (callback) { | ||
stopped = true | ||
clearTimeout(timeoutId) | ||
if (handle) { | ||
handle.stop(callback) | ||
} else { | ||
callback() | ||
} | ||
} | ||
} | ||
} | ||
|
||
const nextId = (() => { | ||
let id = 0 | ||
return () => { | ||
id++ | ||
if (id === Number.MAX_SAFE_INTEGER) id = 1 | ||
return id | ||
} | ||
})() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
'use strict' | ||
|
||
const OS = require('os') | ||
const assert = require('assert') | ||
const MDNS = require('multicast-dns') | ||
const log = require('debug')('libp2p:mdns:compat:responder') | ||
const TCP = require('libp2p-tcp') | ||
const nextTick = require('async/nextTick') | ||
const { SERVICE_TAG_LOCAL } = require('./constants') | ||
|
||
const tcp = new TCP() | ||
|
||
class Responder { | ||
constructor (peerInfo) { | ||
assert(peerInfo, 'missing peerInfo parameter') | ||
this._peerInfo = peerInfo | ||
this._peerIdStr = peerInfo.id.toB58String() | ||
this._onQuery = this._onQuery.bind(this) | ||
} | ||
|
||
start (callback) { | ||
this._mdns = MDNS() | ||
this._mdns.on('query', this._onQuery) | ||
nextTick(() => callback()) | ||
} | ||
|
||
_onQuery (event, info) { | ||
const multiaddrs = tcp.filter(this._peerInfo.multiaddrs.toArray()) | ||
// Only announce TCP for now | ||
if (!multiaddrs.length) return | ||
|
||
const questions = event.questions || [] | ||
|
||
// Only respond to queries for our service tag | ||
if (!questions.some(q => q.name === SERVICE_TAG_LOCAL)) return | ||
|
||
log('got query', event, info) | ||
|
||
const answers = [] | ||
const peerServiceTagLocal = `${this._peerIdStr}.${SERVICE_TAG_LOCAL}` | ||
|
||
answers.push({ | ||
name: SERVICE_TAG_LOCAL, | ||
type: 'PTR', | ||
class: 'IN', | ||
ttl: 120, | ||
data: peerServiceTagLocal | ||
}) | ||
|
||
// Only announce TCP multiaddrs for now | ||
const port = multiaddrs[0].toString().split('/')[4] | ||
|
||
answers.push({ | ||
name: peerServiceTagLocal, | ||
type: 'SRV', | ||
class: 'IN', | ||
ttl: 120, | ||
data: { | ||
priority: 10, | ||
weight: 1, | ||
port, | ||
target: OS.hostname() | ||
} | ||
}) | ||
|
||
answers.push({ | ||
name: peerServiceTagLocal, | ||
type: 'TXT', | ||
class: 'IN', | ||
ttl: 120, | ||
data: [Buffer.from(this._peerIdStr)] | ||
}) | ||
|
||
multiaddrs.forEach((ma) => { | ||
const proto = ma.protoNames()[0] | ||
if (proto === 'ip4' || proto === 'ip6') { | ||
answers.push({ | ||
name: OS.hostname(), | ||
type: proto === 'ip4' ? 'A' : 'AAAA', | ||
class: 'IN', | ||
ttl: 120, | ||
data: ma.toString().split('/')[2] | ||
}) | ||
} | ||
}) | ||
|
||
log('responding to query', answers) | ||
this._mdns.respond(answers, info) | ||
} | ||
|
||
stop (callback) { | ||
this._mdns.removeListener('query', this._onQuery) | ||
this._mdns.destroy(callback) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this._mdns.removeListener('query', this._onQuery) |
||
} | ||
} | ||
|
||
module.exports = Responder |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you have as an action item of merging this PR to create an issue for tracking the announce of other transports?