From 356748a8f9711734f0fc81d39962ab1add25406e Mon Sep 17 00:00:00 2001 From: Robert Cronin Date: Thu, 22 Oct 2020 17:23:36 +0800 Subject: [PATCH] wip --- proto/Peer.proto | 9 +- proto/js/Peer.d.ts | 116 ++---- proto/js/Peer.js | 210 +++------- src/peers/PeerManager.ts | 4 +- src/peers/nat-traversal/NatTraversal.ts | 375 +++++++++++------- .../micro-transport-protocol/MTPConnection.ts | 31 +- .../micro-transport-protocol/MTPServer.ts | 9 +- src/peers/peer-connection/PeerConnection.ts | 86 ++-- src/peers/peer-connection/PeerServer.ts | 2 - src/peers/peer-dht/PeerDHT.ts | 24 -- 10 files changed, 395 insertions(+), 471 deletions(-) diff --git a/proto/Peer.proto b/proto/Peer.proto index b2c49f793a..ef506ac347 100644 --- a/proto/Peer.proto +++ b/proto/Peer.proto @@ -88,11 +88,10 @@ message HolePunchConnectionMessage { ////////////////////// // Relay Connection // ////////////////////// -message RelayConnectionRequest { - string peer_id = 1; -} -message RelayConnectionResponse { - string relay_address = 1; +message RelayConnectionMessage { + string target_peer_id = 1; + string origin_peer_id = 2; + string relay_address = 3; } ////////////////////// diff --git a/proto/js/Peer.d.ts b/proto/js/Peer.d.ts index 5de21f5ea0..cd309956fd 100644 --- a/proto/js/Peer.d.ts +++ b/proto/js/Peer.d.ts @@ -699,128 +699,78 @@ declare namespace Peer { public static decodeDelimited(reader: ($protobuf.Reader|Uint8Array)): peerInterface.HolePunchConnectionMessage; } - /** Properties of a RelayConnectionRequest. */ - interface IRelayConnectionRequest { + /** Properties of a RelayConnectionMessage. */ + interface IRelayConnectionMessage { - /** RelayConnectionRequest peerId */ - peerId?: (string|null); - } - - /** Represents a RelayConnectionRequest. */ - class RelayConnectionRequest implements IRelayConnectionRequest { - - /** - * Constructs a new RelayConnectionRequest. - * @param [p] Properties to set - */ - constructor(p?: peerInterface.IRelayConnectionRequest); - - /** RelayConnectionRequest peerId. */ - public peerId: string; - - /** - * Creates a new RelayConnectionRequest instance using the specified properties. - * @param [properties] Properties to set - * @returns RelayConnectionRequest instance - */ - public static create(properties?: peerInterface.IRelayConnectionRequest): peerInterface.RelayConnectionRequest; - - /** - * Encodes the specified RelayConnectionRequest message. Does not implicitly {@link peerInterface.RelayConnectionRequest.verify|verify} messages. - * @param m RelayConnectionRequest message or plain object to encode - * @param [w] Writer to encode to - * @returns Writer - */ - public static encode(m: peerInterface.IRelayConnectionRequest, w?: $protobuf.Writer): $protobuf.Writer; - - /** - * Encodes the specified RelayConnectionRequest message, length delimited. Does not implicitly {@link peerInterface.RelayConnectionRequest.verify|verify} messages. - * @param message RelayConnectionRequest message or plain object to encode - * @param [writer] Writer to encode to - * @returns Writer - */ - public static encodeDelimited(message: peerInterface.IRelayConnectionRequest, writer?: $protobuf.Writer): $protobuf.Writer; - - /** - * Decodes a RelayConnectionRequest message from the specified reader or buffer. - * @param r Reader or buffer to decode from - * @param [l] Message length if known beforehand - * @returns RelayConnectionRequest - * @throws {Error} If the payload is not a reader or valid buffer - * @throws {$protobuf.util.ProtocolError} If required fields are missing - */ - public static decode(r: ($protobuf.Reader|Uint8Array), l?: number): peerInterface.RelayConnectionRequest; - - /** - * Decodes a RelayConnectionRequest message from the specified reader or buffer, length delimited. - * @param reader Reader or buffer to decode from - * @returns RelayConnectionRequest - * @throws {Error} If the payload is not a reader or valid buffer - * @throws {$protobuf.util.ProtocolError} If required fields are missing - */ - public static decodeDelimited(reader: ($protobuf.Reader|Uint8Array)): peerInterface.RelayConnectionRequest; - } + /** RelayConnectionMessage targetPeerId */ + targetPeerId?: (string|null); - /** Properties of a RelayConnectionResponse. */ - interface IRelayConnectionResponse { + /** RelayConnectionMessage originPeerId */ + originPeerId?: (string|null); - /** RelayConnectionResponse relayAddress */ + /** RelayConnectionMessage relayAddress */ relayAddress?: (string|null); } - /** Represents a RelayConnectionResponse. */ - class RelayConnectionResponse implements IRelayConnectionResponse { + /** Represents a RelayConnectionMessage. */ + class RelayConnectionMessage implements IRelayConnectionMessage { /** - * Constructs a new RelayConnectionResponse. + * Constructs a new RelayConnectionMessage. * @param [p] Properties to set */ - constructor(p?: peerInterface.IRelayConnectionResponse); + constructor(p?: peerInterface.IRelayConnectionMessage); + + /** RelayConnectionMessage targetPeerId. */ + public targetPeerId: string; + + /** RelayConnectionMessage originPeerId. */ + public originPeerId: string; - /** RelayConnectionResponse relayAddress. */ + /** RelayConnectionMessage relayAddress. */ public relayAddress: string; /** - * Creates a new RelayConnectionResponse instance using the specified properties. + * Creates a new RelayConnectionMessage instance using the specified properties. * @param [properties] Properties to set - * @returns RelayConnectionResponse instance + * @returns RelayConnectionMessage instance */ - public static create(properties?: peerInterface.IRelayConnectionResponse): peerInterface.RelayConnectionResponse; + public static create(properties?: peerInterface.IRelayConnectionMessage): peerInterface.RelayConnectionMessage; /** - * Encodes the specified RelayConnectionResponse message. Does not implicitly {@link peerInterface.RelayConnectionResponse.verify|verify} messages. - * @param m RelayConnectionResponse message or plain object to encode + * Encodes the specified RelayConnectionMessage message. Does not implicitly {@link peerInterface.RelayConnectionMessage.verify|verify} messages. + * @param m RelayConnectionMessage message or plain object to encode * @param [w] Writer to encode to * @returns Writer */ - public static encode(m: peerInterface.IRelayConnectionResponse, w?: $protobuf.Writer): $protobuf.Writer; + public static encode(m: peerInterface.IRelayConnectionMessage, w?: $protobuf.Writer): $protobuf.Writer; /** - * Encodes the specified RelayConnectionResponse message, length delimited. Does not implicitly {@link peerInterface.RelayConnectionResponse.verify|verify} messages. - * @param message RelayConnectionResponse message or plain object to encode + * Encodes the specified RelayConnectionMessage message, length delimited. Does not implicitly {@link peerInterface.RelayConnectionMessage.verify|verify} messages. + * @param message RelayConnectionMessage message or plain object to encode * @param [writer] Writer to encode to * @returns Writer */ - public static encodeDelimited(message: peerInterface.IRelayConnectionResponse, writer?: $protobuf.Writer): $protobuf.Writer; + public static encodeDelimited(message: peerInterface.IRelayConnectionMessage, writer?: $protobuf.Writer): $protobuf.Writer; /** - * Decodes a RelayConnectionResponse message from the specified reader or buffer. + * Decodes a RelayConnectionMessage message from the specified reader or buffer. * @param r Reader or buffer to decode from * @param [l] Message length if known beforehand - * @returns RelayConnectionResponse + * @returns RelayConnectionMessage * @throws {Error} If the payload is not a reader or valid buffer * @throws {$protobuf.util.ProtocolError} If required fields are missing */ - public static decode(r: ($protobuf.Reader|Uint8Array), l?: number): peerInterface.RelayConnectionResponse; + public static decode(r: ($protobuf.Reader|Uint8Array), l?: number): peerInterface.RelayConnectionMessage; /** - * Decodes a RelayConnectionResponse message from the specified reader or buffer, length delimited. + * Decodes a RelayConnectionMessage message from the specified reader or buffer, length delimited. * @param reader Reader or buffer to decode from - * @returns RelayConnectionResponse + * @returns RelayConnectionMessage * @throws {Error} If the payload is not a reader or valid buffer * @throws {$protobuf.util.ProtocolError} If required fields are missing */ - public static decodeDelimited(reader: ($protobuf.Reader|Uint8Array)): peerInterface.RelayConnectionResponse; + public static decodeDelimited(reader: ($protobuf.Reader|Uint8Array)): peerInterface.RelayConnectionMessage; } /** CAMessageType enum. */ diff --git a/proto/js/Peer.js b/proto/js/Peer.js index 6c9f24189b..b2b39b55ce 100644 --- a/proto/js/Peer.js +++ b/proto/js/Peer.js @@ -1395,24 +1395,26 @@ $root.peerInterface = (function() { return HolePunchConnectionMessage; })(); - peerInterface.RelayConnectionRequest = (function() { + peerInterface.RelayConnectionMessage = (function() { /** - * Properties of a RelayConnectionRequest. + * Properties of a RelayConnectionMessage. * @memberof peerInterface - * @interface IRelayConnectionRequest - * @property {string|null} [peerId] RelayConnectionRequest peerId + * @interface IRelayConnectionMessage + * @property {string|null} [targetPeerId] RelayConnectionMessage targetPeerId + * @property {string|null} [originPeerId] RelayConnectionMessage originPeerId + * @property {string|null} [relayAddress] RelayConnectionMessage relayAddress */ /** - * Constructs a new RelayConnectionRequest. + * Constructs a new RelayConnectionMessage. * @memberof peerInterface - * @classdesc Represents a RelayConnectionRequest. - * @implements IRelayConnectionRequest + * @classdesc Represents a RelayConnectionMessage. + * @implements IRelayConnectionMessage * @constructor - * @param {peerInterface.IRelayConnectionRequest=} [p] Properties to set + * @param {peerInterface.IRelayConnectionMessage=} [p] Properties to set */ - function RelayConnectionRequest(p) { + function RelayConnectionMessage(p) { if (p) for (var ks = Object.keys(p), i = 0; i < ks.length; ++i) if (p[ks[i]] != null) @@ -1420,196 +1422,100 @@ $root.peerInterface = (function() { } /** - * RelayConnectionRequest peerId. - * @member {string} peerId - * @memberof peerInterface.RelayConnectionRequest + * RelayConnectionMessage targetPeerId. + * @member {string} targetPeerId + * @memberof peerInterface.RelayConnectionMessage * @instance */ - RelayConnectionRequest.prototype.peerId = ""; - - /** - * Creates a new RelayConnectionRequest instance using the specified properties. - * @function create - * @memberof peerInterface.RelayConnectionRequest - * @static - * @param {peerInterface.IRelayConnectionRequest=} [properties] Properties to set - * @returns {peerInterface.RelayConnectionRequest} RelayConnectionRequest instance - */ - RelayConnectionRequest.create = function create(properties) { - return new RelayConnectionRequest(properties); - }; - - /** - * Encodes the specified RelayConnectionRequest message. Does not implicitly {@link peerInterface.RelayConnectionRequest.verify|verify} messages. - * @function encode - * @memberof peerInterface.RelayConnectionRequest - * @static - * @param {peerInterface.IRelayConnectionRequest} m RelayConnectionRequest message or plain object to encode - * @param {$protobuf.Writer} [w] Writer to encode to - * @returns {$protobuf.Writer} Writer - */ - RelayConnectionRequest.encode = function encode(m, w) { - if (!w) - w = $Writer.create(); - if (m.peerId != null && Object.hasOwnProperty.call(m, "peerId")) - w.uint32(10).string(m.peerId); - return w; - }; - - /** - * Encodes the specified RelayConnectionRequest message, length delimited. Does not implicitly {@link peerInterface.RelayConnectionRequest.verify|verify} messages. - * @function encodeDelimited - * @memberof peerInterface.RelayConnectionRequest - * @static - * @param {peerInterface.IRelayConnectionRequest} message RelayConnectionRequest message or plain object to encode - * @param {$protobuf.Writer} [writer] Writer to encode to - * @returns {$protobuf.Writer} Writer - */ - RelayConnectionRequest.encodeDelimited = function encodeDelimited(message, writer) { - return this.encode(message, writer).ldelim(); - }; - - /** - * Decodes a RelayConnectionRequest message from the specified reader or buffer. - * @function decode - * @memberof peerInterface.RelayConnectionRequest - * @static - * @param {$protobuf.Reader|Uint8Array} r Reader or buffer to decode from - * @param {number} [l] Message length if known beforehand - * @returns {peerInterface.RelayConnectionRequest} RelayConnectionRequest - * @throws {Error} If the payload is not a reader or valid buffer - * @throws {$protobuf.util.ProtocolError} If required fields are missing - */ - RelayConnectionRequest.decode = function decode(r, l) { - if (!(r instanceof $Reader)) - r = $Reader.create(r); - var c = l === undefined ? r.len : r.pos + l, m = new $root.peerInterface.RelayConnectionRequest(); - while (r.pos < c) { - var t = r.uint32(); - switch (t >>> 3) { - case 1: - m.peerId = r.string(); - break; - default: - r.skipType(t & 7); - break; - } - } - return m; - }; - - /** - * Decodes a RelayConnectionRequest message from the specified reader or buffer, length delimited. - * @function decodeDelimited - * @memberof peerInterface.RelayConnectionRequest - * @static - * @param {$protobuf.Reader|Uint8Array} reader Reader or buffer to decode from - * @returns {peerInterface.RelayConnectionRequest} RelayConnectionRequest - * @throws {Error} If the payload is not a reader or valid buffer - * @throws {$protobuf.util.ProtocolError} If required fields are missing - */ - RelayConnectionRequest.decodeDelimited = function decodeDelimited(reader) { - if (!(reader instanceof $Reader)) - reader = new $Reader(reader); - return this.decode(reader, reader.uint32()); - }; - - return RelayConnectionRequest; - })(); - - peerInterface.RelayConnectionResponse = (function() { - - /** - * Properties of a RelayConnectionResponse. - * @memberof peerInterface - * @interface IRelayConnectionResponse - * @property {string|null} [relayAddress] RelayConnectionResponse relayAddress - */ + RelayConnectionMessage.prototype.targetPeerId = ""; /** - * Constructs a new RelayConnectionResponse. - * @memberof peerInterface - * @classdesc Represents a RelayConnectionResponse. - * @implements IRelayConnectionResponse - * @constructor - * @param {peerInterface.IRelayConnectionResponse=} [p] Properties to set + * RelayConnectionMessage originPeerId. + * @member {string} originPeerId + * @memberof peerInterface.RelayConnectionMessage + * @instance */ - function RelayConnectionResponse(p) { - if (p) - for (var ks = Object.keys(p), i = 0; i < ks.length; ++i) - if (p[ks[i]] != null) - this[ks[i]] = p[ks[i]]; - } + RelayConnectionMessage.prototype.originPeerId = ""; /** - * RelayConnectionResponse relayAddress. + * RelayConnectionMessage relayAddress. * @member {string} relayAddress - * @memberof peerInterface.RelayConnectionResponse + * @memberof peerInterface.RelayConnectionMessage * @instance */ - RelayConnectionResponse.prototype.relayAddress = ""; + RelayConnectionMessage.prototype.relayAddress = ""; /** - * Creates a new RelayConnectionResponse instance using the specified properties. + * Creates a new RelayConnectionMessage instance using the specified properties. * @function create - * @memberof peerInterface.RelayConnectionResponse + * @memberof peerInterface.RelayConnectionMessage * @static - * @param {peerInterface.IRelayConnectionResponse=} [properties] Properties to set - * @returns {peerInterface.RelayConnectionResponse} RelayConnectionResponse instance + * @param {peerInterface.IRelayConnectionMessage=} [properties] Properties to set + * @returns {peerInterface.RelayConnectionMessage} RelayConnectionMessage instance */ - RelayConnectionResponse.create = function create(properties) { - return new RelayConnectionResponse(properties); + RelayConnectionMessage.create = function create(properties) { + return new RelayConnectionMessage(properties); }; /** - * Encodes the specified RelayConnectionResponse message. Does not implicitly {@link peerInterface.RelayConnectionResponse.verify|verify} messages. + * Encodes the specified RelayConnectionMessage message. Does not implicitly {@link peerInterface.RelayConnectionMessage.verify|verify} messages. * @function encode - * @memberof peerInterface.RelayConnectionResponse + * @memberof peerInterface.RelayConnectionMessage * @static - * @param {peerInterface.IRelayConnectionResponse} m RelayConnectionResponse message or plain object to encode + * @param {peerInterface.IRelayConnectionMessage} m RelayConnectionMessage message or plain object to encode * @param {$protobuf.Writer} [w] Writer to encode to * @returns {$protobuf.Writer} Writer */ - RelayConnectionResponse.encode = function encode(m, w) { + RelayConnectionMessage.encode = function encode(m, w) { if (!w) w = $Writer.create(); + if (m.targetPeerId != null && Object.hasOwnProperty.call(m, "targetPeerId")) + w.uint32(10).string(m.targetPeerId); + if (m.originPeerId != null && Object.hasOwnProperty.call(m, "originPeerId")) + w.uint32(18).string(m.originPeerId); if (m.relayAddress != null && Object.hasOwnProperty.call(m, "relayAddress")) - w.uint32(10).string(m.relayAddress); + w.uint32(26).string(m.relayAddress); return w; }; /** - * Encodes the specified RelayConnectionResponse message, length delimited. Does not implicitly {@link peerInterface.RelayConnectionResponse.verify|verify} messages. + * Encodes the specified RelayConnectionMessage message, length delimited. Does not implicitly {@link peerInterface.RelayConnectionMessage.verify|verify} messages. * @function encodeDelimited - * @memberof peerInterface.RelayConnectionResponse + * @memberof peerInterface.RelayConnectionMessage * @static - * @param {peerInterface.IRelayConnectionResponse} message RelayConnectionResponse message or plain object to encode + * @param {peerInterface.IRelayConnectionMessage} message RelayConnectionMessage message or plain object to encode * @param {$protobuf.Writer} [writer] Writer to encode to * @returns {$protobuf.Writer} Writer */ - RelayConnectionResponse.encodeDelimited = function encodeDelimited(message, writer) { + RelayConnectionMessage.encodeDelimited = function encodeDelimited(message, writer) { return this.encode(message, writer).ldelim(); }; /** - * Decodes a RelayConnectionResponse message from the specified reader or buffer. + * Decodes a RelayConnectionMessage message from the specified reader or buffer. * @function decode - * @memberof peerInterface.RelayConnectionResponse + * @memberof peerInterface.RelayConnectionMessage * @static * @param {$protobuf.Reader|Uint8Array} r Reader or buffer to decode from * @param {number} [l] Message length if known beforehand - * @returns {peerInterface.RelayConnectionResponse} RelayConnectionResponse + * @returns {peerInterface.RelayConnectionMessage} RelayConnectionMessage * @throws {Error} If the payload is not a reader or valid buffer * @throws {$protobuf.util.ProtocolError} If required fields are missing */ - RelayConnectionResponse.decode = function decode(r, l) { + RelayConnectionMessage.decode = function decode(r, l) { if (!(r instanceof $Reader)) r = $Reader.create(r); - var c = l === undefined ? r.len : r.pos + l, m = new $root.peerInterface.RelayConnectionResponse(); + var c = l === undefined ? r.len : r.pos + l, m = new $root.peerInterface.RelayConnectionMessage(); while (r.pos < c) { var t = r.uint32(); switch (t >>> 3) { case 1: + m.targetPeerId = r.string(); + break; + case 2: + m.originPeerId = r.string(); + break; + case 3: m.relayAddress = r.string(); break; default: @@ -1621,22 +1527,22 @@ $root.peerInterface = (function() { }; /** - * Decodes a RelayConnectionResponse message from the specified reader or buffer, length delimited. + * Decodes a RelayConnectionMessage message from the specified reader or buffer, length delimited. * @function decodeDelimited - * @memberof peerInterface.RelayConnectionResponse + * @memberof peerInterface.RelayConnectionMessage * @static * @param {$protobuf.Reader|Uint8Array} reader Reader or buffer to decode from - * @returns {peerInterface.RelayConnectionResponse} RelayConnectionResponse + * @returns {peerInterface.RelayConnectionMessage} RelayConnectionMessage * @throws {Error} If the payload is not a reader or valid buffer * @throws {$protobuf.util.ProtocolError} If required fields are missing */ - RelayConnectionResponse.decodeDelimited = function decodeDelimited(reader) { + RelayConnectionMessage.decodeDelimited = function decodeDelimited(reader) { if (!(reader instanceof $Reader)) reader = new $Reader(reader); return this.decode(reader, reader.uint32()); }; - return RelayConnectionResponse; + return RelayConnectionMessage; })(); /** diff --git a/src/peers/PeerManager.ts b/src/peers/PeerManager.ts index 85dc916228..424888e01b 100644 --- a/src/peers/PeerManager.ts +++ b/src/peers/PeerManager.ts @@ -113,11 +113,9 @@ class PeerManager { } }).bind(this) ) - console.log(this.listPeers()); - this.peerDHT.addPeers(this.listPeers()) this.natTraversal = new NatTraversal(this) - this.setNatHandler(this.natTraversal.handleNatRequest.bind(this.natTraversal)) + this.setNatHandler(this.natTraversal.handleNatMessageGRPC.bind(this.natTraversal)) } toggleStealthMode(active: boolean) { diff --git a/src/peers/nat-traversal/NatTraversal.ts b/src/peers/nat-traversal/NatTraversal.ts index f23829a9e9..d270ce3557 100644 --- a/src/peers/nat-traversal/NatTraversal.ts +++ b/src/peers/nat-traversal/NatTraversal.ts @@ -1,6 +1,6 @@ import net from 'net' import dgram from 'dgram' -import { Address } from "../PeerInfo"; +import PeerInfo, { Address } from "../PeerInfo"; import { EventEmitter } from 'events'; import { promiseAll } from "../../utils"; import PeerManager from "../PeerManager"; @@ -16,7 +16,9 @@ class NatTraversal extends EventEmitter { // peerId -> dgram.Socket private pendingHolePunchedSockets: Map = new Map // peerId -> tcp relay server - private tcpHolePunchedConnections: Map = new Map + private outgoingTCPHolePunchedRelayServers: Map = new Map + // peerId -> peer relay servers + private peerTCPHolePunchedRelayServers: Map = new Map // interval with which the server requests new direct hole punched connections // from adjacent peers that are in the store but have not yet been requested yet private intermittentConnectionInterval: NodeJS.Timeout @@ -29,14 +31,13 @@ class NatTraversal extends EventEmitter { this.peerManager = peerManager this.server = new MTPServer( this.connectionHandler.bind(this), - this.handleNATMessage.bind(this) + this.handleNATMessageUDP.bind(this) ) this.server.listenPort(0, () => { const address = this.server.address() console.log(`main MTP server is now listening on address: '${address.toString()}'`); }) - // this is just to make sure every other peer has a back connection to this node // the idea behind this is that if we are a node that is behind a NAT, then if // another node wants to connect via an adjacent node were already connected to, @@ -46,12 +47,7 @@ class NatTraversal extends EventEmitter { this.intermittentConnectionInterval = setInterval(async () => { const promiseList: Promise[] = [] for (const peerId of this.peerManager.listPeers()) { - console.log(`checking for peerId: ${peerId}`); - console.log(!this.holePunchedConnections.has(peerId)); - console.log(Array.from(this.holePunchedConnections.keys())); - - if (!this.holePunchedConnections.has(peerId)) { - console.log('time for connection interval!'); + if (!this.server.incomingConnections.has(peerId)) { const peerInfo = this.peerManager.getPeer(peerId)! const udpAddress = await this.requestUDPAddress(peerInfo.id) promiseList.push(this.sendDirectHolePunchConnectionRequest(udpAddress)) @@ -80,6 +76,70 @@ class NatTraversal extends EventEmitter { return Address.parse(address) } + // This request will timeout after 'timeout' milliseconds (defaults to 10 seconds) + async requestUDPHolePunch(targetPeerId: string, adjacentPeerId: string, timeout: number = 10000): Promise
{ + return new Promise(async (resolve, reject) => { + setTimeout(() => reject(Error('hole punch connection request timed out')), timeout) + try { + if (!this.peerManager.hasPeer(targetPeerId)) { + throw Error(`target peer id does not exist in store: ${targetPeerId}`) + } else if (!this.peerManager.hasPeer(adjacentPeerId)) { + throw Error(`adjacent peer id does not exist in store: ${adjacentPeerId}`) + } + const udpAddress = await this.requestUDPAddress(adjacentPeerId) + await this.sendHolePunchRequest(udpAddress, targetPeerId) + + this.on('hole-punch-connection', (peerId: string, conn: MTPConnection) => { + // need to set up a local relay server between the new connection and the gRPC server! + // this will include 2 socket pipes: + // 1. one from the grpc connection to the local relay server (tcp packets) + // 2. another one from the local relay server to the hole punched server address (udp/mtp packets) + const newServer = net.createServer((tcpConn) => { + tcpConn.on('data', (data) => conn.write(data)) + conn.on('data', (data) => tcpConn.write(data)) + }).listen(0, '127.0.01', () => { + this.outgoingTCPHolePunchedRelayServers.set(peerId, newServer) + resolve(Address.fromAddressInfo(newServer.address())) + }) + }) + } catch (error) { + reject(error) + } + }) + } + + // This request will timeout after 'timeout' milliseconds (defaults to 10 seconds) + async requestUDPRelay(targetPeerId: string, adjacentPeerId: string, timeout: number = 10000): Promise
{ + return new Promise(async (resolve, reject) => { + setTimeout(() => reject(Error('relay connection request timed out')), timeout) + try { + if (!this.peerManager.hasPeer(targetPeerId)) { + throw Error(`target peer id does not exist in store: ${targetPeerId}`) + } else if (!this.peerManager.hasPeer(adjacentPeerId)) { + throw Error(`adjacent peer id does not exist in store: ${adjacentPeerId}`) + } + const udpAddress = await this.requestUDPAddress(adjacentPeerId) + await this.sendHolePunchRequest(udpAddress, targetPeerId) + + this.on('hole-punch-connection', (peerId: string, conn: MTPConnection) => { + // need to set up a local relay server between the new connection and the gRPC server! + // this will include 2 socket pipes: + // 1. one from the grpc connection to the local relay server (tcp packets) + // 2. another one from the local relay server to the hole punched server address (udp/mtp packets) + const newServer = net.createServer((tcpConn) => { + tcpConn.on('data', (data) => conn.write(data)) + conn.on('data', (data) => tcpConn.write(data)) + }).listen(0, '127.0.01', () => { + this.outgoingTCPHolePunchedRelayServers.set(peerId, newServer) + resolve(Address.fromAddressInfo(newServer.address())) + }) + }) + } catch (error) { + reject(error) + } + }) + } + // ===================================================== // // ================ initiation messages ================ // // ===================================================== // @@ -90,12 +150,12 @@ class NatTraversal extends EventEmitter { // the resulting connection will be used in coordinating NAT traversal // requests from other peers via the peer adjacent to this one async sendDirectHolePunchConnectionRequest(udpAddress: Address) { - console.log('sendDirectHolePunchConnectionRequest'); - console.log(udpAddress); - - // get their udp address - console.log('sending back connection request'); - const message = peerInterface.DirectConnectionMessage.encodeDelimited({ peerId: this.peerManager.peerInfo.id }).finish() + const message = peerInterface.PeerInfoMessage.encodeDelimited({ + publicKey: this.peerManager.peerInfo.publicKey, + rootCertificate: this.peerManager.peerInfo.rootCertificate, + peerAddress: this.peerManager.peerInfo.peerAddress?.toString(), + apiAddress: this.peerManager.peerInfo.apiAddress?.toString(), + }).finish() this.sendNATMessage(udpAddress, peerInterface.NatMessageType.DIRECT_CONNECTION, message) } @@ -104,16 +164,31 @@ class NatTraversal extends EventEmitter { // node must also be known before requesting and that is what the udpAddress // parameter is for async sendHolePunchRequest(udpAddress: Address, targetPeerId: string) { - // create socket - const socket = dgram.createSocket('udp4') - socket.bind(0) - // create request - const request = peerInterface.HolePunchConnectionMessage.encodeDelimited({ - originPeerId: this.peerManager.peerInfo.id, - targetPeerId - }).finish() + return new Promise((resolve, reject) => { + try { + // create socket + const socket = dgram.createSocket('udp4') + socket.bind() + socket.on('listening', () => { + // create request + const request = peerInterface.HolePunchConnectionMessage.encodeDelimited({ + originPeerId: this.peerManager.peerInfo.id, + targetPeerId: targetPeerId, + udpAddress: Address.fromAddressInfo(socket.address()).toString() + }).finish() + + this.sendNATMessage(udpAddress, peerInterface.NatMessageType.HOLE_PUNCH_CONNECTION, request, socket) + resolve() + }) - this.sendNATMessage(udpAddress, peerInterface.NatMessageType.HOLE_PUNCH_CONNECTION, request, socket) + socket.on('message', (message: Buffer, rinfo: dgram.RemoteInfo) => { + const address = new Address(rinfo.address, rinfo.port) + this.handleNATMessageUDP(message, address) + }) + } catch (error) { + reject(error) + } + }) } // this is just a convenience function to wrap a message in a NATMessage @@ -130,41 +205,87 @@ class NatTraversal extends EventEmitter { } // ==== Handler Methods ==== // - private async handleNATMessage(message: Buffer, rinfo: dgram.RemoteInfo) { - const address = new Address(rinfo.address, rinfo.port) + connectionHandler(conn: MTPConnection) { + // first check if the connection is for nat message handler + // if it isn't try to send it to the gRPC service relayed via an internal MTP connection + const grpcAddress = this.peerManager.peerInfo.peerAddress! + const grpcConn = net.createConnection({ port: grpcAddress?.port, host: grpcAddress?.host }) + grpcConn.on('data', (data) => conn.write(data)) + conn.on('data', async (data) => { + // first try the nat message handler, might be a hole punch or relay request + try { + return await this.handleNATMessageUDP(data, conn.address()) + } catch (error) { + // don't want to throw so just log + } + // this is now assumed to be a message for grpc so need to pipe it over + grpcConn.write(data) + }) + } + + async handleNatMessageGRPC(request: Uint8Array): Promise { + const { type, subMessage } = peerInterface.NatMessage.decodeDelimited(request); + let response: Uint8Array; + switch (type) { + case peerInterface.NatMessageType.UDP_ADDRESS: + { + response = peerInterface.UDPAddressMessage.encodeDelimited({ + address: this.server.address().toString(), + }).finish(); + } + break; + case peerInterface.NatMessageType.DIRECT_CONNECTION: + throw Error('not implemented') + break; + case peerInterface.NatMessageType.HOLE_PUNCH_CONNECTION: + throw Error('not implemented') + break; + case peerInterface.NatMessageType.RELAY_CONNECTION: + // the relay connection request will come through the grpc channel + response = await this.handleRelayRequest(subMessage) + break; + default: { + throw Error('git message type not supported'); + } + } + // encode a git response + return peerInterface.NatMessage.encodeDelimited({ type, subMessage: response }).finish(); + } + + private async handleNATMessageUDP(message: Buffer, address: Address) { const { type, isResponse, subMessage } = peerInterface.NatMessage.decodeDelimited(message) switch (type) { case peerInterface.NatMessageType.UDP_ADDRESS: throw Error('message type not supported via udp, try grpc connection') case peerInterface.NatMessageType.DIRECT_CONNECTION: - console.log(address); - console.log(rinfo); - - console.log('direct connection isResponse: ', isResponse); await this.handleDirectConnectionRequest(address, isResponse, subMessage) break; case peerInterface.NatMessageType.HOLE_PUNCH_CONNECTION: await this.handleHolePunchRequest(address, isResponse, subMessage) break; case peerInterface.NatMessageType.RELAY_CONNECTION: - + throw Error('message type not supported via udp, try grpc connection') break; - default: break; } } private async handleDirectConnectionRequest(address: Address, isResponse: boolean, request: Uint8Array) { - const { peerId } = peerInterface.DirectConnectionMessage.decodeDelimited(request) - if (isResponse) { - console.log(`direct hole punch request was successful from peer id: ${peerId}`); - } else { - console.log('got back connection request!!'); - // create a punched connection - const conn = MTPConnection.connect(address.port, address.host) - console.log('hole punch successful'); + const { + publicKey, + rootCertificate, + peerAddress, + apiAddress + } = peerInterface.PeerInfoMessage.decodeDelimited(request) + const peerInfo = new PeerInfo(publicKey, rootCertificate, peerAddress, apiAddress) + if (this.peerManager.hasPeer(peerInfo.id)) { + this.peerManager.updatePeer(peerInfo) + } + if (!isResponse) { + // create a punched connection + const conn = MTPConnection.connect(this.peerManager.peerInfo.id, address.port, address.host) // write back response const subMessage = peerInterface.DirectConnectionMessage.encodeDelimited({ peerId: this.peerManager.peerInfo.id }).finish() const response = peerInterface.NatMessage.encodeDelimited({ @@ -173,18 +294,17 @@ class NatTraversal extends EventEmitter { subMessage }).finish() conn.write(response) - this.holePunchedConnections.set(peerId, conn) + this.holePunchedConnections.set(peerInfo.id, conn) } } private async handleHolePunchRequest(address: Address, isResponse: boolean, request: Uint8Array) { - return await new Promise(async (resolve, reject) => { + return await new Promise(async (resolve, reject) => { const { originPeerId, targetPeerId, udpAddress } = peerInterface.HolePunchConnectionMessage.decodeDelimited(request) // TODO: make sure origin peer id is known const parsedAddress = Address.parse(udpAddress) if (isResponse) { // case: hole punch has already been requested and adjacent peer has returned a message - console.log(`I have a udpAddress for the target peer`); if (this.pendingHolePunchedSockets.has(targetPeerId)) { throw Error(`there are no pending hole punching requests for peerId: ${targetPeerId}`) } @@ -195,28 +315,15 @@ class NatTraversal extends EventEmitter { const socket = this.pendingHolePunchedSockets.get(targetPeerId)! // send a message at interval for creating the entry in the translation table // TODO: not sure if its completely necessary to do this multiple times - const sendPacketInterval = setInterval(() => { - // okay to just send peerId of current node - socket.send(this.peerManager.peerInfo.id, parsedAddress.port, parsedAddress.host) - }, 1000) - // listen for response messages from the target peer - socket.on('message', (message, rinfo) => { - // confirm rinfo is the same as udpAddress - if (parsedAddress.host != rinfo.address || parsedAddress.port != rinfo.port) { - throw Error('udp addresses did not match') - } else if (message.toString() != targetPeerId) { - throw Error('received peer id did not match the target peer id') - } else { - clearInterval(sendPacketInterval) - // if our code has reached here, our hole punch has been successful! - // now we can add the connection to hole punched connections and emit an event - const conn = MTPConnection.connect(parsedAddress.port, parsedAddress.host, socket) - this.holePunchedConnections.set(targetPeerId, conn) - this.pendingHolePunchedSockets.delete(targetPeerId) - this.emit('hole-punch-connection', targetPeerId, conn) - resolve() - } - }) + const conn = MTPConnection.connect(this.peerManager.peerInfo.id, parsedAddress.port, parsedAddress.host, socket) + + while (conn.connecting) { + await new Promise((r, _) => setTimeout(() => r(), 1000)) + } + + this.emit('hole-punch-connection', targetPeerId, conn) + + resolve() } else { if (targetPeerId == this.peerManager.peerInfo.id) { // case: some other node is trying to connect to this node via an adjacent node @@ -240,13 +347,37 @@ class NatTraversal extends EventEmitter { // case: this node is the adjacent node and target peer is assumed to be connected to this node // first check if adjacent peer has a hole punched connection for coordination, if not then throw if (this.holePunchedConnections.has(targetPeerId)) { - const conn = this.holePunchedConnections.get(targetPeerId)! - const request = peerInterface.HolePunchConnectionMessage.encodeDelimited({ - originPeerId, - targetPeerId, + // if this node has a connection to target peer, then tell the target peer to initiate a connection with the origin peer! + const targetConn = this.holePunchedConnections.get(targetPeerId)! + const targetSubMessage = peerInterface.HolePunchConnectionMessage.encodeDelimited({ + originPeerId: originPeerId, + targetPeerId: targetPeerId, udpAddress: address?.toString() + }).finish() + const targetRequest = peerInterface.NatMessage.encodeDelimited({ + type: peerInterface.NatMessageType.HOLE_PUNCH_CONNECTION, + subMessage: targetSubMessage + }).finish() + targetConn.write(targetRequest) + + // finally tell the origin peer the target peers udp address + const originSubMessage = peerInterface.HolePunchConnectionMessage.encodeDelimited({ + originPeerId: originPeerId, + targetPeerId: targetPeerId, + udpAddress: targetConn.address().toString() + }).finish() + const originRequest = peerInterface.NatMessage.encodeDelimited({ + type: peerInterface.NatMessageType.HOLE_PUNCH_CONNECTION, + isResponse: true, + subMessage: originSubMessage + }).finish() + this.server.socket.send(originRequest, address.port, address.host, (err) => { + if (err) { + reject(err) + } else { + resolve() + } }) - } else { throw Error('no connection exists to target peer so cannot coordinate hole punching') } @@ -255,79 +386,49 @@ class NatTraversal extends EventEmitter { }) } - // This request will timeout after 'timeout' milliseconds (defaults to 10 seconds) - async requestUDPHolePunch(targetPeerId: string, adjacentPeerId: string, timeout: number = 10000): Promise
{ - return new Promise(async (resolve, reject) => { - setTimeout(() => reject(Error('hole punch connection request timed out')), timeout) + private async handleRelayRequest(request: Uint8Array): Promise { + return await new Promise(async (resolve, reject) => { try { - if (!this.peerManager.hasPeer(targetPeerId)) { - throw Error(`target peer id does not exist in store: ${targetPeerId}`) - } else if (!this.peerManager.hasPeer(adjacentPeerId)) { - throw Error(`adjacent peer id does not exist in store: ${adjacentPeerId}`) - } - const udpAddress = await this.requestUDPAddress(adjacentPeerId) - await this.sendHolePunchRequest(udpAddress, targetPeerId) - - this.server.on('hole-punch-connection', (peerId: string, conn: MTPConnection) => { - // need to set up a local relay server between the new connection and the gRPC server! - // this will include 2 socket pipes: - // 1. one from the grpc connection to the local relay server (tcp packets) - // 2. another one from the local relay server to the hole punched server address (udp/mtp packets) - const newServer = net.createServer((tcpConn) => { - tcpConn.on('data', (data) => conn.write(data)) - conn.on('data', (data) => tcpConn.write(data)) - }).listen(0, '127.0.01', () => { - this.tcpHolePunchedConnections.set(peerId, newServer) - resolve(Address.fromAddressInfo(newServer.address())) + const { originPeerId, targetPeerId } = peerInterface.RelayConnectionMessage.decodeDelimited(request) + // first check if there is already a relay set up for the peer + if (this.peerTCPHolePunchedRelayServers.has(targetPeerId)) { + const addressInfo = this.peerTCPHolePunchedRelayServers.get(targetPeerId)!.address() + const relayAddress = Address.fromAddressInfo(addressInfo).toString() + const response = peerInterface.RelayConnectionMessage.encodeDelimited({ + originPeerId: originPeerId, + targetPeerId: targetPeerId, + relayAddress: relayAddress + }).finish() + resolve(response) + } else { + // otherwise we need to make sure tell target peer to setup a relay + if (!this.holePunchedConnections.has(targetPeerId)) { + throw Error(`no hole punched connection exists to target peer id: ${targetPeerId}`) + } + const udpConnection = this.holePunchedConnections.get(targetPeerId)! + + const newRelayServer = net.createServer((newConn) => { + udpConnection.on('data', (data) => newConn.write(data)) + newConn.on('data', (data) => udpConnection.write(data)) + }).listen(0, '0.0.0.0', () => { + // set the server + this.peerTCPHolePunchedRelayServers.set(targetPeerId, newRelayServer) + // send the address back to the origin peer + const addressInfo = newRelayServer.address() + const relayAddress = Address.fromAddressInfo(addressInfo).toString() + const response = peerInterface.RelayConnectionMessage.encodeDelimited({ + originPeerId: originPeerId, + targetPeerId: targetPeerId, + relayAddress: relayAddress + }).finish() + resolve(response) }) - }) + } } catch (error) { reject(error) } }) } - - connectionHandler(conn: MTPConnection) { - // first check if the connection is for - // if it isn't try to send it to the gRPC service relayed via an internal MTP connection - conn.on('data', (data) => { - // console.log('got some data'); - - // console.log(data.toString()); - - }) - - - } - - async handleNatRequest(request: Uint8Array): Promise { - const { type, subMessage } = peerInterface.NatMessage.decodeDelimited(request); - let response: Uint8Array; - switch (type) { - case peerInterface.NatMessageType.UDP_ADDRESS: - { - response = peerInterface.UDPAddressMessage.encodeDelimited({ - address: this.server.address().toString(), - }).finish(); - } - break; - case peerInterface.NatMessageType.DIRECT_CONNECTION: - throw Error('not yet implemented') - break; - case peerInterface.NatMessageType.HOLE_PUNCH_CONNECTION: - throw Error('not yet implemented') - break; - case peerInterface.NatMessageType.RELAY_CONNECTION: - throw Error('not yet implemented') - break; - default: { - throw Error('git message type not supported'); - } - } - // encode a git response - return peerInterface.NatMessage.encodeDelimited({ type, subMessage: response }).finish(); - } - } export default NatTraversal diff --git a/src/peers/nat-traversal/micro-transport-protocol/MTPConnection.ts b/src/peers/nat-traversal/micro-transport-protocol/MTPConnection.ts index 7f0bb491ee..7398a88334 100644 --- a/src/peers/nat-traversal/micro-transport-protocol/MTPConnection.ts +++ b/src/peers/nat-traversal/micro-transport-protocol/MTPConnection.ts @@ -25,6 +25,8 @@ import { import { peerInterface } from '../../../../proto/js/Peer'; class MTPConnection extends Duplex { + private peerId: string; + private port: number; private host: string; socket: dgram.Socket; @@ -34,7 +36,7 @@ class MTPConnection extends Duplex { closed: boolean; private inflightPackets: number; private alive: boolean; - private connecting: boolean; + connecting: boolean; private recvId: number; public get RecvID(): number { return this.recvId; @@ -44,9 +46,14 @@ class MTPConnection extends Duplex { private seq: number; private ack: number; private synack?: peerInterface.MTPPacket; - constructor(port: number, host: string, socket: dgram.Socket, syn?: peerInterface.MTPPacket) { + + + + constructor(peerId: string, port: number, host: string, socket: dgram.Socket, syn?: peerInterface.MTPPacket) { super(); + this.peerId = peerId + this.remoteAddress = new Address(host, port); if (isNaN(port)) { @@ -71,6 +78,7 @@ class MTPConnection extends Duplex { this.seq = (Math.random() * UINT16) | 0; this.ack = syn.seq; this.synack = MTPConnection.createPacket( + this.peerId, this.recvId, this.sendId, this.seq, @@ -91,15 +99,19 @@ class MTPConnection extends Duplex { socket.on('listening', () => { this.recvId = socket.address().port; // using the port gives us system wide clash protection this.sendId = uint16(this.recvId + 1); - this.sendOutgoing(MTPConnection.createPacket( + + const initialPacket = MTPConnection.createPacket( + this.peerId, this.recvId, this.sendId, this.seq, this.ack, PACKET_SYN, null - )); - }); + ); + + this.sendOutgoing(initialPacket); + }) socket.on('error', (err) => { this.emit('error', err); @@ -123,6 +135,7 @@ class MTPConnection extends Duplex { return this.once('connect', sendFin); } this.sendOutgoing(MTPConnection.createPacket( + this.peerId, this.recvId, this.sendId, this.seq, @@ -167,6 +180,7 @@ class MTPConnection extends Duplex { while (this._writable) { const payload = this.payload(data); this.sendOutgoing(MTPConnection.createPacket( + this.peerId, this.recvId, this.sendId, this.seq, @@ -318,6 +332,7 @@ class MTPConnection extends Duplex { private sendAck() { const packet = MTPConnection.createPacket( + this.peerId, this.recvId, this.sendId, this.seq, @@ -346,6 +361,7 @@ class MTPConnection extends Duplex { // ==== Helper methods ==== // public static createPacket( + peerId: string, recvId: number, sendId: number, seq: number, @@ -355,6 +371,7 @@ class MTPConnection extends Duplex { ): peerInterface.MTPPacket { return new peerInterface.MTPPacket({ id: id, + peerId: peerId, connection: id === PACKET_SYN ? recvId : sendId, seq: seq, ack: ack, @@ -375,8 +392,8 @@ class MTPConnection extends Duplex { } })() - public static connect(port: number, host?: string, socket: dgram.Socket = dgram.createSocket('udp4')) { - const connection = new MTPConnection(port, host || '127.0.0.1', socket); + public static connect(localPeerId: string, port: number, host?: string, socket: dgram.Socket = dgram.createSocket('udp4')) { + const connection = new MTPConnection(localPeerId, port, host || '127.0.0.1', socket); socket.on('message', (message) => { diff --git a/src/peers/nat-traversal/micro-transport-protocol/MTPServer.ts b/src/peers/nat-traversal/micro-transport-protocol/MTPServer.ts index 2e990097ce..5be8a7fde3 100644 --- a/src/peers/nat-traversal/micro-transport-protocol/MTPServer.ts +++ b/src/peers/nat-traversal/micro-transport-protocol/MTPServer.ts @@ -16,11 +16,11 @@ class MTPServer extends EventEmitter { // peerId -> connection incomingConnections: Map; - tertiaryMessageHandler?: (message: Uint8Array, rinfo: dgram.RemoteInfo) => Promise + tertiaryMessageHandler?: (message: Uint8Array, address: Address) => Promise constructor( handleIncomingConnection: (conn: MTPConnection) => void, - tertiaryMessageHandler?: (message: Uint8Array, rinfo: dgram.RemoteInfo) => Promise, + tertiaryMessageHandler?: (message: Uint8Array, address: Address) => Promise, ) { super(); this.tertiaryMessageHandler = tertiaryMessageHandler @@ -96,7 +96,8 @@ class MTPServer extends EventEmitter { // ================================// if (this.tertiaryMessageHandler) { try { - return await this.tertiaryMessageHandler(message, rinfo) + const address = new Address(rinfo.address, rinfo.port) + return await this.tertiaryMessageHandler(message, address) } catch (error) { // if anything went wrong, assume it is a direct connection request and move on } @@ -121,7 +122,7 @@ class MTPServer extends EventEmitter { return; } - const newConnection = new MTPConnection(rinfo.port, rinfo.address, this.socket, packet) + const newConnection = new MTPConnection(peerId, rinfo.port, rinfo.address, this.socket, packet) this.incomingConnections.set(peerId, newConnection) newConnection.on('close', () => { this.incomingConnections.delete(peerId); diff --git a/src/peers/peer-connection/PeerConnection.ts b/src/peers/peer-connection/PeerConnection.ts index e82fc62570..386db13486 100644 --- a/src/peers/peer-connection/PeerConnection.ts +++ b/src/peers/peer-connection/PeerConnection.ts @@ -34,17 +34,10 @@ class PeerConnection { // 1st connection option: peerInfo already in peerStore and peerAddress is connected private async connectDirectly(): Promise { // try to create a direct connection - console.log('this.getPeerInfo()'); - console.log(this.getPeerInfo()); - if (this.getPeerInfo().peerAddress) { - console.log(`connecting directly to: ${this.peerId}`); // direct connection attempt const address = this.getPeerInfo().peerAddress!; - console.log(address); - const peerClient = new PeerClient(address.toString(), this.credentials); - console.log('connected directly!!!!!!!!!!!!!!!!!111'); this.connected = true; return peerClient; } else { @@ -56,36 +49,27 @@ class PeerConnection { private async connectDHT(): Promise { // try to find peer directly from intermediary peers const { targetPeerInfo, adjacentPeerInfo } = await this.peerManager.peerDHT.findPeer(this.getPeerInfo().id) - console.log('connectDHT'); - console.log(targetPeerInfo); - console.log(adjacentPeerInfo); - - // if (targetPeerInfo?.peerAddress) { - // try { - // // case 1: target peer has been found and has a peerAddress - // const address = targetPeerInfo.peerAddress; - // const peerClient = new PeerClient(address.toString(), this.credentials); - // this.connected = true; - // return peerClient; - // } catch (error) { - // // don't want to throw, just try next method - // } - // } - - if (adjacentPeerInfo?.peerAddress) { - // case 2: target peer has an adjacent peer that can be contacted for nat traversal + if (targetPeerInfo?.peerAddress) { try { - const promiseList = [ - this.connectHolePunch(adjacentPeerInfo), - // this.connectRelay(adjacentPeerInfo) - ]; - const client = await promiseAny(promiseList) - console.log('this one finished early!'); - return client + // case 1: target peer has been found and has a peerAddress + const address = targetPeerInfo.peerAddress; + const peerClient = new PeerClient(address.toString(), this.credentials); + this.connected = true; + return peerClient; } catch (error) { - // don't want to throw + // don't want to throw, just try next method } } + + if (adjacentPeerInfo?.peerAddress) { + // case 2: target peer has an adjacent peer that can be contacted for nat traversal + const promiseList = [ + this.connectHolePunch(adjacentPeerInfo), + this.connectRelay(adjacentPeerInfo) + ]; + const client = await promiseAny(promiseList) + return client + } throw Error('could not find peer via dht') } @@ -95,7 +79,6 @@ class PeerConnection { // try to hole punch to peer via relay peer if (adjacentPeerInfo.peerAddress) { // connect to relay and ask it to create a relay - console.log('requesting udp hole punch connection'); const connectedAddress = await this.peerManager.natTraversal.requestUDPHolePunch(this.getPeerInfo().id, adjacentPeerInfo.id, 10000) const peerClient = new PeerClient(connectedAddress.toString(), this.credentials); @@ -106,26 +89,21 @@ class PeerConnection { } } - // // 4th connection option: relay connection facilitated by a peer adjacent (i.e. connected) to the target peer - // // triggered by 2nd option - // private async connectRelay(adjacentPeerInfo: PeerInfo): Promise { - // // try to relay to peer via relay peer - // if (this.getPeerInfo().relayPublicKey) { - // // turn relay - // // connect to relay and ask it to create a relay - // const connectedAddress = await this.peerManager.turnClient.requestPeerConnection( - // this.getPeerInfo().publicKey, - // this.getPeerInfo().relayPublicKey!, - // ); - // const peerClient = new PeerClient(connectedAddress.toString(), this.credentials); - // this.connected = true; - // return peerClient; - // } else if (!this.getPeerInfo().relayPublicKey) { - // throw Error('peer does not have relay public key specified'); - // } else { - // throw Error('peer is already connected'); - // } - // } + // 4th connection option: relay connection facilitated by a peer adjacent (i.e. connected) to the target peer + // triggered by 2nd option + private async connectRelay(adjacentPeerInfo: PeerInfo): Promise { + // try to hole punch to peer via relay peer + if (adjacentPeerInfo.peerAddress) { + // connect to relay and ask it to create a relay + const connectedAddress = await this.peerManager.natTraversal.requestUDPHolePunch(this.getPeerInfo().id, adjacentPeerInfo.id, 10000) + const peerClient = new PeerClient(connectedAddress.toString(), this.credentials); + + this.connected = true; + return peerClient; + } else { + throw Error('peer is already connected'); + } + } async connectFirstChannel() { if (!this.connected) { diff --git a/src/peers/peer-connection/PeerServer.ts b/src/peers/peer-connection/PeerServer.ts index 98add110f5..332934cc26 100644 --- a/src/peers/peer-connection/PeerServer.ts +++ b/src/peers/peer-connection/PeerServer.ts @@ -60,8 +60,6 @@ class PeerServer implements IPeerServer { } async messagePeer(call: grpc.ServerUnaryCall, callback: grpc.sendUnaryData) { - console.log('got a peer message!'); - const peerRequest: PeerMessage = call.request!; const { publicKey, type, subMessage: requestMessage } = peerRequest.toObject(); diff --git a/src/peers/peer-dht/PeerDHT.ts b/src/peers/peer-dht/PeerDHT.ts index 0e998f4f52..71f69f7ea5 100644 --- a/src/peers/peer-dht/PeerDHT.ts +++ b/src/peers/peer-dht/PeerDHT.ts @@ -165,22 +165,11 @@ class PeerDHT { // } // } - console.log('querying the network!'); - console.log(closestPeerIds); - - // Query the network until the peer public key is found for (const closePeerId of closestPeerIds) { - console.log('checking if should continue'); - if (closePeerId == this.getPeerId()) { - console.log('continuing'); - continue } - console.log('after checking if continue'); - - console.log(`querying peerId: ${closePeerId}`); try { const pc = this.connectToPeer(closePeerId) @@ -199,7 +188,6 @@ class PeerDHT { }).finish() // send request const response = await pc.sendPeerRequest(SubServiceType.PEER_DHT, request) - console.log('got a response from peer about dht!'); // decode response const { subMessage: responseSubMessage } = peerInterface.PeerDHTMessage.decodeDelimited(response) @@ -211,8 +199,6 @@ class PeerDHT { } const closestFoundPeerInfoMessageList = closestPeers.map(p => new PeerInfo(p.publicKey!, p.rootCertificate!, p.peerAddress ?? undefined, p.apiAddress ?? undefined)) - console.log('closestPeers'); - console.log(closestPeers.map(p => PeerInfo.publicKeyToId(p.publicKey!))); // Add peers to routing table this.addPeers(closestPeers.map(p => PeerInfo.publicKeyToId(p.publicKey!))) @@ -223,16 +209,12 @@ class PeerDHT { if (this.getPeerId() != peerInfo.id) { this.updatePeerStore(peerInfo) } - console.log(peerInfo.id); - console.log(peerId); - if (peerInfo.id == peerId) { foundPeerInfo = peerInfo } } if (foundPeerInfo) { - console.log('Peer was found!!'); return { adjacentPeerInfo: this.getPeerInfo(closePeerId)!, targetPeerInfo: foundPeerInfo @@ -242,8 +224,6 @@ class PeerDHT { } } catch (error) { // don't want to throw if peer contact failed so just log it - console.log('got an error while querying network!'); - console.log(error) continue } } @@ -280,10 +260,6 @@ class PeerDHT { .map(p => new PeerInfo(p.publicKey!, p.rootCertificate!, p.peerAddress ?? undefined, p.apiAddress ?? undefined)) .filter(p => p.id != this.getPeerId()) - console.log('responding to handleFindNodeMessage'); - console.log(this.kBucket.toArray()); - console.log(this.closestPeers(peerId)); - const response = peerInterface.PeerDHTFindNodeMessage.encodeDelimited({ peerId: peerId, closestPeers: this.toPeerInfoMessageList(this.closestPeers(peerId))