Skip to content

Commit

Permalink
Deterministically de-dupe mdns connections (#112)
Browse files Browse the repository at this point in the history
* feat: handle duplicate connections by sorting {remote,local}.publicKeys

We added a check to always close the same connection from both sides,
using `b4a.compare(publicKey, remotePublicKey)`
additionally we:
* destroy the socket instead of closing to match hypercore
implementation, and take care of handling posible error events
* close mdns connection if its not from a localAddress
* add the remote address and port to PeerInfo (instead of local address
  and port)

* feat: correctly handling checking of privateIp

Basically, only allow local connections on mdns, using `private-ip`

* chore: added package-lock.json

---------

Co-authored-by: Tomás Ciccola <[email protected]>
  • Loading branch information
tomasciccola and Tomás Ciccola authored Jun 6, 2023
1 parent 954e1b1 commit b918ce1
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 39 deletions.
121 changes: 82 additions & 39 deletions lib/discovery/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import NoiseSecretStream from '@hyperswarm/secret-stream'
import Hyperswarm from 'hyperswarm'
import { MdnsDiscovery } from 'multicast-service-discovery'
import base32 from 'base32.js'
import b4a from 'b4a'
import isIpPrivate from 'private-ip'

/**
* @typedef {Object} DiscoveryEvents
Expand Down Expand Up @@ -233,64 +235,84 @@ export class Discovery extends TypedEmitter {
* @param {import('net').Socket} socket
* @returns {Promise<void>}
*/
// Don't think this needs to be async?
async _onMdnsConnection(socket) {
if (socket.destroyed) {
throw new Error('Socket destroyed')
console.error('Socket destroyed')
return
}

const socketAddress = /** @type {import('net').AddressInfo} */ (
socket.address()
)
// const socketAddress = /** @type {import('net').AddressInfo} */ (
// socket.address()
// )

if (!socketAddress.address) {
throw new Error('Socket not connected')
if (!socket.remoteAddress) {
console.error('Socket not connected')
return
}
if (!isIpPrivate(socket.remoteAddress)) socket.destroy()

const connection = new NoiseSecretStream(false, socket, {
const isInitiator = false
const connection = new NoiseSecretStream(isInitiator, socket, {
keyPair: this.#identityKeyPair,
})

connection.on('error', (error) => {
console.error('Error on mdns client connection', error)
})

connection.on('connect', () => {
connection.on('connect', async () => {
if(!connection.remotePublicKey || !connection.publicKey) return
const remotePublicKey = encodeHex(connection.remotePublicKey)

if (remotePublicKey === this.identityPublicKey) {
destroyConnection(connection).catch(noop)
return
}

let peer = this.#peers.get(remotePublicKey)

if (peer) {
connection.end()
} else {
peer = new PeerInfo({
topics: [],
identityPublicKey: remotePublicKey,
discoveryType: 'mdns',
host: socketAddress.address,
port: socketAddress.port,
connection,
status: 'connected',
})
const keepNew = !connection.isInitiator || b4a.compare(connection.publicKey, connection.remotePublicKey) > 0
let existing = this.#peers.get(remotePublicKey)

if (existing && keepNew) {
// to handle close event while still negotiating connection
let closed = false
let onClose = () => { closed = true }
connection.on('close', onClose)
connection.on('error', noop)
try{
await destroyConnection(existing.connection)
}catch(e){
console.error('error destroying connection', e)
}
if (closed) return
connection.removeListener('close', onClose)
connection.removeListener('error', noop)
}

this.#peers.set(remotePublicKey, peer)
this.emit('connection', connection, peer)
existing = new PeerInfo({
topics: [],
identityPublicKey: remotePublicKey,
discoveryType: 'mdns',
host: socket.remoteAddress,
port: socket.remotePort,
connection,
status: 'connected',
})

connection.on('close', async () => {
const remotePublicKey = encodeHex(connection.remotePublicKey)
const peer = this.#peers.get(remotePublicKey)
this.#peers.set(remotePublicKey, existing)
this.emit('connection', connection, existing)

if (peer) {
this._updatePeerStatus(peer, 'disconnecting')
peer.destroy()
this._updatePeerStatus(peer, 'disconnected')
this.#peers.delete(remotePublicKey)
}
})
}
connection.on('close', async () => {
const remotePublicKey = encodeHex(connection.remotePublicKey)
const peer = this.#peers.get(remotePublicKey)

if (peer) {
this._updatePeerStatus(peer, 'disconnecting')
peer.destroy()
this._updatePeerStatus(peer, 'disconnected')
this.#peers.delete(remotePublicKey)
}
})
})
}

Expand Down Expand Up @@ -1175,10 +1197,18 @@ export class PeerInfo extends TypedEmitter {
* peer.destroy()
* ```
*/
destroy() {
if (this.connection && !this.connection.destroying) {
this.connection.end()
}
async destroy(e) {
return new Promise((resolve) => {
this.connection.on('close', () => resolve())
this.connection.on('error', noop)
this.connection.destroy(e)
})
/* Is this check necessary? this.connection.destroying
* doesn't seem to exist...
*/
// if (this.connection && !this.connection.destroying) {
// this.connection.end()
// }
}
}

Expand Down Expand Up @@ -1453,3 +1483,16 @@ export function encodeHex(buffer) {
export function decodeHex(str) {
return Buffer.from(str, 'hex')
}

function noop(){}

/**
* @param {NoiseSecretStream<RawConnectionStream>} socket
* @returns {Promise<void>}
*/
async function destroyConnection (socket) {
socket.on('error', noop)
return new Promise(res => {
socket.on('close', res)
})
}
47 changes: 47 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
"multicast-service-discovery": "^4.0.4",
"p-defer": "^4.0.0",
"patch-package": "^7.0.0",
"private-ip": "^3.0.0",
"protobufjs": "^7.2.3",
"protomux": "^3.4.1",
"sodium-universal": "^4.0.0",
Expand Down

0 comments on commit b918ce1

Please sign in to comment.