Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

feat: compatibility with go-libp2p-mdns #80

Merged
merged 17 commits into from
May 9, 2019
Merged
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ docs
**/*.log
test/repo-tests*
**/bundle.js
.nyc_output

# Logs
logs
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
],
"scripts": {
"lint": "aegir lint",
"coverage": "aegir coverage",
"coverage": "nyc --reporter=lcov --reporter=text npm run test:node",
"test": "aegir test -t node",
"test:node": "aegir test -t node",
"release": "aegir release -t node --no-build",
Expand All @@ -35,11 +35,11 @@
"homepage": "https://github.com/libp2p/js-libp2p-mdns",
"devDependencies": {
"aegir": "^18.2.2",
"async": "^2.6.2",
"chai": "^4.2.0",
"dirty-chai": "^2.0.1"
},
"dependencies": {
"async": "^2.6.2",
"debug": "^4.1.1",
"libp2p-tcp": "~0.13.0",
"multiaddr": "^6.0.6",
Expand Down
6 changes: 6 additions & 0 deletions src/compat/constants.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
'use strict'

exports.SERVICE_TAG = '_ipfs-discovery._udp'
exports.SERVICE_TAG_LOCAL = `${exports.SERVICE_TAG}.local`
exports.MULTICAST_IP = '224.0.0.251'
exports.MULTICAST_PORT = 5353
60 changes: 60 additions & 0 deletions src/compat/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
'use strict'

// Compatibility with Go libp2p MDNS

const EE = require('events')
const parallel = require('async/parallel')
const Responder = require('./responder')
const Querier = require('./querier')

class GoMulticastDNS extends EE {
constructor (peerInfo) {
super()
this._started = false
this._peerInfo = peerInfo
this._onPeer = this._onPeer.bind(this)
}

start (callback) {
if (this._started) {
return callback(new Error('MulticastDNS service is already started'))
}

this._started = true
this._responder = new Responder(this._peerInfo)
this._querier = new Querier(this._peerInfo.id)

this._querier.on('peer', this._onPeer)

parallel([
cb => this._responder.start(cb),
cb => this._querier.start(cb)
], callback)
}

_onPeer (peerInfo) {
this.emit('peer', peerInfo)
}

stop (callback) {
if (!this._started) {
return callback(new Error('MulticastDNS service is not started'))
}

const responder = this._responder
const querier = this._querier

this._started = false
this._responder = null
this._querier = null

querier.removeListener('peer', this._onPeer)

parallel([
cb => responder.stop(cb),
cb => querier.stop(cb)
], callback)
}
}

module.exports = GoMulticastDNS
176 changes: 176 additions & 0 deletions src/compat/querier.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
'use strict'

const assert = require('assert')
const EE = require('events')
const MDNS = require('multicast-dns')
const Multiaddr = require('multiaddr')
const PeerInfo = require('peer-info')
const PeerId = require('peer-id')
const nextTick = require('async/nextTick')
const log = require('debug')('libp2p:mdns:compat:querier')
const { SERVICE_TAG_LOCAL, MULTICAST_IP, MULTICAST_PORT } = require('./constants')

class Querier extends EE {
constructor (peerId, options) {
super()
assert(peerId, 'missing peerId parameter')
options = options || {}
this._peerIdStr = peerId.toB58String()
// Re-query every 60s, in leu of network change detection
options.queryInterval = options.queryInterval || 60000
// Time for which the MDNS server will stay alive waiting for responses
// Must be less than options.queryInterval!
options.queryPeriod = Math.min(
options.queryInterval,
options.queryPeriod == null ? 5000 : options.queryPeriod
)
this._options = options
this._onResponse = this._onResponse.bind(this)
}

start (callback) {
this._handle = periodically(() => {
// Create a querier that queries multicast but gets responses unicast
const mdns = MDNS({ multicast: false, interface: '0.0.0.0', port: 0 })

mdns.on('response', this._onResponse)

mdns.query({
id: nextId(), // id > 0 for unicast response
questions: [{ name: SERVICE_TAG_LOCAL, type: 'PTR', class: 'IN' }]
}, null, {
address: MULTICAST_IP,
port: MULTICAST_PORT
})

return {
stop: callback => {
mdns.removeListener('response', this._onResponse)
mdns.destroy(callback)
}
}
}, {
period: this._options.queryPeriod,
interval: this._options.queryInterval
})

nextTick(() => callback())
}

_onResponse (event, info) {
const answers = event.answers || []
const ptrRecord = answers.find(a => a.type === 'PTR' && a.name === SERVICE_TAG_LOCAL)

// Only deal with responses for our service tag
if (!ptrRecord) return

log('got response', event, info)

const txtRecord = answers.find(a => a.type === 'TXT')
if (!txtRecord) return log('missing TXT record in response')

let peerIdStr
try {
peerIdStr = txtRecord.data[0].toString()
} catch (err) {
return log('failed to extract peer ID from TXT record data', txtRecord, err)
}

if (this._peerIdStr === peerIdStr) {
return log('ignoring reply to myself')
}

let peerId
try {
peerId = PeerId.createFromB58String(peerIdStr)
} catch (err) {
return log('failed to create peer ID from TXT record data', peerIdStr, err)
}

PeerInfo.create(peerId, (err, info) => {
if (err) return log('failed to create peer info from peer ID', peerId, err)

const srvRecord = answers.find(a => a.type === 'SRV')
if (!srvRecord) return log('missing SRV record in response')

log('peer found', peerIdStr)

const { port } = srvRecord.data || {}
const protos = { A: 'ip4', AAAA: 'ip6' }

const multiaddrs = answers
.filter(a => ['A', 'AAAA'].includes(a.type))
.reduce((addrs, a) => {
const maStr = `/${protos[a.type]}/${a.data}/tcp/${port}`
try {
addrs.push(new Multiaddr(maStr))
log(maStr)
} catch (err) {
log(`failed to create multiaddr from ${a.type} record data`, maStr, port, err)
}
return addrs
}, [])

multiaddrs.forEach(addr => info.multiaddrs.add(addr))
this.emit('peer', info)
})
}

stop (callback) {
this._handle.stop(callback)
}
}

module.exports = Querier

/**
* Run `fn` for a certain period of time, and then wait for an interval before
* running it again. `fn` must return an object with a stop function, which is
* called when the period expires.
*
* @param {Function} fn function to run
* @param {Object} [options]
* @param {Object} [options.period] Period in ms to run the function for
* @param {Object} [options.interval] Interval in ms between runs
* @returns {Object} handle that can be used to stop execution
*/
function periodically (fn, options) {
let handle, timeoutId
let stopped = false

const reRun = () => {
handle = fn()
timeoutId = setTimeout(() => {
handle.stop(err => {
if (err) log(err)
if (!stopped) {
timeoutId = setTimeout(reRun, options.interval)
}
})
handle = null
}, options.period)
}

reRun()

return {
stop (callback) {
stopped = true
clearTimeout(timeoutId)
if (handle) {
handle.stop(callback)
} else {
callback()
}
}
}
}

const nextId = (() => {
let id = 0
return () => {
id++
if (id === Number.MAX_SAFE_INTEGER) id = 1
return id
}
})()
97 changes: 97 additions & 0 deletions src/compat/responder.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
'use strict'

const OS = require('os')
const assert = require('assert')
const MDNS = require('multicast-dns')
const log = require('debug')('libp2p:mdns:compat:responder')
const TCP = require('libp2p-tcp')
const nextTick = require('async/nextTick')
const { SERVICE_TAG_LOCAL } = require('./constants')

const tcp = new TCP()

class Responder {
constructor (peerInfo) {
assert(peerInfo, 'missing peerInfo parameter')
this._peerInfo = peerInfo
this._peerIdStr = peerInfo.id.toB58String()
this._onQuery = this._onQuery.bind(this)
}

start (callback) {
this._mdns = MDNS()
this._mdns.on('query', this._onQuery)
nextTick(() => callback())
}

_onQuery (event, info) {
const multiaddrs = tcp.filter(this._peerInfo.multiaddrs.toArray())
// Only announce TCP for now
if (!multiaddrs.length) return

const questions = event.questions || []

// Only respond to queries for our service tag
if (!questions.some(q => q.name === SERVICE_TAG_LOCAL)) return

log('got query', event, info)

const answers = []
const peerServiceTagLocal = `${this._peerIdStr}.${SERVICE_TAG_LOCAL}`

answers.push({
name: SERVICE_TAG_LOCAL,
type: 'PTR',
class: 'IN',
ttl: 120,
data: peerServiceTagLocal
})

// Only announce TCP multiaddrs for now
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you have as an action item of merging this PR to create an issue for tracking the announce of other transports?

const port = multiaddrs[0].toString().split('/')[4]

answers.push({
name: peerServiceTagLocal,
type: 'SRV',
class: 'IN',
ttl: 120,
data: {
priority: 10,
weight: 1,
port,
target: OS.hostname()
}
})

answers.push({
name: peerServiceTagLocal,
type: 'TXT',
class: 'IN',
ttl: 120,
data: [Buffer.from(this._peerIdStr)]
})

multiaddrs.forEach((ma) => {
const proto = ma.protoNames()[0]
if (proto === 'ip4' || proto === 'ip6') {
answers.push({
name: OS.hostname(),
type: proto === 'ip4' ? 'A' : 'AAAA',
class: 'IN',
ttl: 120,
data: ma.toString().split('/')[2]
})
}
})

log('responding to query', answers)
this._mdns.respond(answers, info)
}

stop (callback) {
this._mdns.removeListener('query', this._onQuery)
this._mdns.destroy(callback)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this._mdns.removeListener('query', this._onQuery)

}
}

module.exports = Responder
Loading