diff --git a/.gitignore b/.gitignore index 8c2731545..d76d6929c 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ node_modules coverage .tmp tmp +proto/build diff --git a/lib/core-manager/index.js b/lib/core-manager/index.js index b9bbfb416..a18392548 100644 --- a/lib/core-manager/index.js +++ b/lib/core-manager/index.js @@ -14,8 +14,6 @@ const NAMESPACES = /** @type {const} */ (['auth', 'data', 'blobIndex', 'blob']) /** @typedef {(typeof NAMESPACES)[number]} Namespace */ /** @typedef {import('streamx').Duplex} DuplexStream */ /** @typedef {{ rsm: ReplicationStateMachine, stream: DuplexStream, cores: Set }} ReplicationRecord */ -/** @typedef {DuplexStream & { noiseStream: DuplexStream }} NoiseStream */ -/** @typedef {DuplexStream & { noiseStream: DuplexStream & { userData: any }}} ProtocolStream */ /** * @typedef {Object} Events * @property {import('./core-index.js').CoreIndexEvents['add-core']} add-core diff --git a/lib/rpc/index.js b/lib/rpc/index.js new file mode 100644 index 000000000..828260967 --- /dev/null +++ b/lib/rpc/index.js @@ -0,0 +1,339 @@ +// @ts-check +import { TypedEmitter } from 'tiny-typed-emitter' +import Protomux from 'protomux' +import { openedNoiseSecretStream, keyToId } from '../utils.js' +import cenc from 'compact-encoding' +import { Invite, InviteResponse, InviteResponse_Decision } from './messages.js' + +const PROTOCOL_NAME = 'mapeo/rpc' + +// Protomux message types depend on the order that messages are added to a +// channel (this needs to remain consistent). To avoid breaking changes, the +// types here should not change. +// +// TODO: Add @satisfies to check this matches the imports from './messages.js' +// when we switch to Typescript v5 +const MESSAGE_TYPES = /** @type {const} */ ({ + Invite: 0, + InviteResponse: 1 +}) +const MESSAGES_MAX_ID = Math.max.apply(null, [...Object.values(MESSAGE_TYPES)]) + +/** @typedef {Peer['info']} PeerInfoInternal */ +/** @typedef {Omit & { status: Exclude }} PeerInfo */ +/** @typedef {'connecting' | 'connected' | 'disconnected'} PeerState */ + +/** + * @template ValueType + * @typedef {object} DeferredPromise + * @property {(value?: ValueType | PromiseLike) => void} resolve + * @property {(reason?: unknown) => void} reject + */ + +class Peer { + /** @type {PeerState} */ + #state = 'connecting' + #publicKey + #channel + /** @type {Map>>} */ + pendingInvites = new Map() + + /** + * @param {object} options + * @param {Buffer} options.publicKey + * @param {ReturnType} options.channel + */ + constructor ({ publicKey, channel }) { + this.#publicKey = publicKey + this.#channel = channel + } + get info () { + return { + status: this.#state, + id: keyToId(this.#publicKey) + } + } + /** + * Poor-man's finite state machine. Rather than a `setState` method, only + * allows specific transitions between states. + * + * @param {'connect' | 'disconnect'} type + */ + action (type) { + switch (type) { + case 'connect': + /* c8 ignore next 3 */ + if (this.#state !== 'connecting') { + return // TODO: report error - this should not happen + } + this.#state = 'connected' + break + case 'disconnect': + /* c8 ignore next */ + if (this.#state === 'disconnected') return + this.#state = 'disconnected' + for (const pending of this.pendingInvites.values()) { + for (const { reject } of pending) { + reject(new PeerDisconnectedError()) + } + } + this.pendingInvites.clear() + break + } + } + /** @param {Invite} invite */ + sendInvite (invite) { + this.#assertConnected() + const buf = Buffer.from(Invite.encode(invite).finish()) + const messageType = MESSAGE_TYPES.Invite + this.#channel.messages[messageType].send(buf) + } + /** @param {InviteResponse} response */ + sendInviteResponse (response) { + this.#assertConnected() + const buf = Buffer.from(InviteResponse.encode(response).finish()) + const messageType = MESSAGE_TYPES.InviteResponse + this.#channel.messages[messageType].send(buf) + } + #assertConnected () { + if (this.#state === 'connected' && !this.#channel.closed) return + /* c8 ignore next */ + throw new PeerDisconnectedError() // TODO: report error - this should not happen + } +} + +/** + * @typedef {object} MapeoRPCEvents + * @property {(peers: PeerInfo[]) => void} peers Emitted whenever the connection status of peers changes. An array of peerInfo objects with a peer id and the peer connection status + * @property {(peerId: string, invite: Invite) => void} invite Emitted when an invite is received + */ + +/** @extends {TypedEmitter} */ +export class MapeoRPC extends TypedEmitter { + /** @type {Map} + */ + async invite (peerId, { timeout, ...invite }) { + const peer = this.#peers.get(peerId) + if (!peer) throw new UnknownPeerError('Unknown peer ' + peerId) + /** @type {Promise} */ + return new Promise((origResolve, origReject) => { + const projectId = keyToId(invite.projectKey) + + const pending = peer.pendingInvites.get(projectId) || [] + peer.pendingInvites.set(projectId, pending) + + const deferred = { resolve, reject } + pending.push(deferred) + + const timeoutId = + timeout && + setTimeout(() => { + const index = pending.indexOf(deferred) + if (index > -1) { + pending.splice(index, 1) + } + origReject(new TimeoutError(`No response after ${timeout}ms`)) + }, timeout) + + peer.sendInvite(invite) + + /** @type {typeof origResolve} */ + function resolve (value) { + clearTimeout(timeoutId) + origResolve(value) + } + /** @type {typeof origReject} */ + function reject (reason) { + clearTimeout(timeoutId) + origReject(reason) + } + }) + } + + /** + * Respond to an invite from a peer + * + * @param {string} peerId id of the peer you want to respond to (publicKey of peer as hex string) + * @param {object} options + * @param {InviteResponse['projectKey']} options.projectKey project key of the invite you are responding to + * @param {InviteResponse['decision']} options.decision response to invite, one of "ACCEPT", "REJECT", or "ALREADY" (already on project) + */ + inviteResponse (peerId, options) { + const peer = this.#peers.get(peerId) + if (!peer) throw new UnknownPeerError('Unknown peer ' + peerId) + peer.sendInviteResponse(options) + } + + /** + * Connect to a peer over an existing NoiseSecretStream + * + * @param {NoiseStream | ProtocolStream} stream a NoiseSecretStream from @hyperswarm/secret-stream + */ + connect (stream) { + if (!stream.noiseStream) throw new Error('Invalid stream') + const protomux = + stream.userData && Protomux.isProtomux(stream.userData) + ? stream.userData + : Protomux.from(stream) + + // noiseSecretStream.remotePublicKey can be null before the stream has + // opened, so this helped awaits the open + openedNoiseSecretStream(stream).then(stream => { + if (stream.destroyed) return + const { remotePublicKey } = stream + + // This is written like this because the protomux uses the index within + // the messages array to define the message id over the wire, so this must + // stay consistent to avoid breaking protocol changes. + /** @type {Parameters[0]['messages']} */ + const messages = new Array(MESSAGES_MAX_ID).fill(undefined) + for (const [type, id] of Object.entries(MESSAGE_TYPES)) { + messages[id] = { + encoding: cenc.raw, + onmessage: this.#handleMessage.bind(this, remotePublicKey, type) + } + } + + const channel = protomux.createChannel({ + userData: null, + protocol: PROTOCOL_NAME, + messages, + onopen: this.#openPeer.bind(this, remotePublicKey), + onclose: this.#closePeer.bind(this, remotePublicKey) + }) + channel.open() + + const peerId = keyToId(remotePublicKey) + const existingPeer = this.#peers.get(peerId) + /* c8 ignore next 3 */ + if (existingPeer && existingPeer.info.status !== 'disconnected') { + existingPeer.action('disconnect') // Should not happen, but in case + } + const peer = new Peer({ publicKey: remotePublicKey, channel }) + this.#peers.set(peerId, peer) + // Do not emit peers now - will emit when connected + }) + + return stream + } + + /** @param {Buffer} publicKey */ + #openPeer (publicKey) { + const peerId = keyToId(publicKey) + const peer = this.#peers.get(peerId) + /* c8 ignore next */ + if (!peer) return // TODO: report error - this should not happen + // No-op if no change in state + /* c8 ignore next */ + if (peer.info.status === 'connected') return // TODO: report error - this should not happen + peer.action('connect') + this.#emitPeers() + } + + /** @param {Buffer} publicKey */ + #closePeer (publicKey) { + const peerId = publicKey.toString('hex') + const peer = this.#peers.get(peerId) + /* c8 ignore next */ + if (!peer) return // TODO: report error - this should not happen + // No-op if no change in state + /* c8 ignore next */ + if (peer.info.status === 'disconnected') return + // TODO: Track reasons for closing + peer.action('disconnect') + this.#emitPeers() + } + + get peers () { + return /** @type {PeerInfo[]} */ ( + [...this.#peers.values()] + .map(peer => peer.info) + // A peer is only 'connecting' for a single tick, so to avoid complex + // async code around sending messages we don't expose 'connecting' peers + .filter(peerInfo => peerInfo.status !== 'connecting') + ) + } + + #emitPeers () { + this.emit('peers', this.peers) + } + + /** + * + * @param {Buffer} peerPublicKey + * @param {keyof typeof MESSAGE_TYPES} type + * @param {Buffer} value + */ + #handleMessage (peerPublicKey, type, value) { + const peerId = keyToId(peerPublicKey) + const peer = this.#peers.get(peerId) + /* c8 ignore next */ + if (!peer) return // TODO: report error - this should not happen + switch (type) { + case 'Invite': { + const invite = Invite.decode(value) + this.emit('invite', peerId, invite) + break + } + case 'InviteResponse': { + const response = InviteResponse.decode(value) + const projectId = keyToId(response.projectKey) + const pending = peer.pendingInvites.get(projectId) + /* c8 ignore next 3 */ + if (!pending) { + return // TODO: report error - this should not happen + } + for (const deferredPromise of pending) { + deferredPromise.resolve(response.decision) + } + peer.pendingInvites.set(projectId, []) + break + } + /* c8 ignore next 2 */ + default: + // TODO: report unhandled message error + } + } +} + +export class TimeoutError extends Error { + /** @param {string} [message] */ + constructor (message) { + super(message) + this.name = 'TimeoutError' + } +} + +export class UnknownPeerError extends Error { + /** @param {string} [message] */ + constructor (message) { + super(message) + this.name = 'UnknownPeerError' + } +} + +export class PeerDisconnectedError extends Error { + /** @param {string} [message] */ + constructor (message) { + super(message) + this.name = 'PeerDisconnectedError' + } +} diff --git a/lib/rpc/messages.d.ts b/lib/rpc/messages.d.ts new file mode 100644 index 000000000..3690a48f1 --- /dev/null +++ b/lib/rpc/messages.d.ts @@ -0,0 +1,26 @@ +/// +import _m0 from "protobufjs/minimal.js"; +export interface Invite { + projectKey: Buffer; + encryptionKey?: Buffer | undefined; +} +export interface InviteResponse { + projectKey: Buffer; + decision: InviteResponse_Decision; +} +export declare enum InviteResponse_Decision { + REJECT = "REJECT", + ACCEPT = "ACCEPT", + ALREADY = "ALREADY", + UNRECOGNIZED = "UNRECOGNIZED" +} +export declare function inviteResponse_DecisionFromJSON(object: any): InviteResponse_Decision; +export declare function inviteResponse_DecisionToNumber(object: InviteResponse_Decision): number; +export declare const Invite: { + encode(message: Invite, writer?: _m0.Writer): _m0.Writer; + decode(input: _m0.Reader | Uint8Array, length?: number): Invite; +}; +export declare const InviteResponse: { + encode(message: InviteResponse, writer?: _m0.Writer): _m0.Writer; + decode(input: _m0.Reader | Uint8Array, length?: number): InviteResponse; +}; diff --git a/lib/rpc/messages.js b/lib/rpc/messages.js new file mode 100644 index 000000000..20aabc4f4 --- /dev/null +++ b/lib/rpc/messages.js @@ -0,0 +1,109 @@ +/* eslint-disable */ +import _m0 from "protobufjs/minimal.js"; +export var InviteResponse_Decision; +(function (InviteResponse_Decision) { + InviteResponse_Decision["REJECT"] = "REJECT"; + InviteResponse_Decision["ACCEPT"] = "ACCEPT"; + InviteResponse_Decision["ALREADY"] = "ALREADY"; + InviteResponse_Decision["UNRECOGNIZED"] = "UNRECOGNIZED"; +})(InviteResponse_Decision || (InviteResponse_Decision = {})); +export function inviteResponse_DecisionFromJSON(object) { + switch (object) { + case 0: + case "REJECT": + return InviteResponse_Decision.REJECT; + case 1: + case "ACCEPT": + return InviteResponse_Decision.ACCEPT; + case 2: + case "ALREADY": + return InviteResponse_Decision.ALREADY; + case -1: + case "UNRECOGNIZED": + default: + return InviteResponse_Decision.UNRECOGNIZED; + } +} +export function inviteResponse_DecisionToNumber(object) { + switch (object) { + case InviteResponse_Decision.REJECT: + return 0; + case InviteResponse_Decision.ACCEPT: + return 1; + case InviteResponse_Decision.ALREADY: + return 2; + case InviteResponse_Decision.UNRECOGNIZED: + default: + return -1; + } +} +function createBaseInvite() { + return { projectKey: Buffer.alloc(0) }; +} +export var Invite = { + encode: function (message, writer) { + if (writer === void 0) { writer = _m0.Writer.create(); } + if (message.projectKey.length !== 0) { + writer.uint32(10).bytes(message.projectKey); + } + if (message.encryptionKey !== undefined) { + writer.uint32(18).bytes(message.encryptionKey); + } + return writer; + }, + decode: function (input, length) { + var reader = input instanceof _m0.Reader ? input : new _m0.Reader(input); + var end = length === undefined ? reader.len : reader.pos + length; + var message = createBaseInvite(); + while (reader.pos < end) { + var tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + message.projectKey = reader.bytes(); + break; + case 2: + message.encryptionKey = reader.bytes(); + break; + default: + reader.skipType(tag & 7); + break; + } + } + return message; + } +}; +function createBaseInviteResponse() { + return { projectKey: Buffer.alloc(0), decision: InviteResponse_Decision.REJECT }; +} +export var InviteResponse = { + encode: function (message, writer) { + if (writer === void 0) { writer = _m0.Writer.create(); } + if (message.projectKey.length !== 0) { + writer.uint32(10).bytes(message.projectKey); + } + if (message.decision !== InviteResponse_Decision.REJECT) { + writer.uint32(16).int32(inviteResponse_DecisionToNumber(message.decision)); + } + return writer; + }, + decode: function (input, length) { + var reader = input instanceof _m0.Reader ? input : new _m0.Reader(input); + var end = length === undefined ? reader.len : reader.pos + length; + var message = createBaseInviteResponse(); + while (reader.pos < end) { + var tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + message.projectKey = reader.bytes(); + break; + case 2: + message.decision = inviteResponse_DecisionFromJSON(reader.int32()); + break; + default: + reader.skipType(tag & 7); + break; + } + } + return message; + } +}; diff --git a/lib/types.js b/lib/types.js index 32b70e073..3b9ca9f8b 100644 --- a/lib/types.js +++ b/lib/types.js @@ -148,3 +148,7 @@ * @property {Buffer} key * @property {Block} block */ + +/** @typedef {import('streamx').Duplex} DuplexStream */ +/** @typedef {import('@hyperswarm/secret-stream')} NoiseStream */ +/** @typedef {NoiseStream & { userData: import('protomux') }} ProtocolStream */ diff --git a/lib/utils.js b/lib/utils.js index 1c9e1a09d..3c0d2a756 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -44,3 +44,20 @@ export function parseVersion(version) { blockIndex: Number(blockIndex), } } + +/** @typedef {import('@hyperswarm/secret-stream')} NoiseStream */ +/** @typedef {NoiseStream & { destroyed: true }} DestroyedNoiseStream */ +/** @typedef {NoiseStream & { publicKey: Buffer, remotePublicKey: Buffer, handshake: Buffer }} OpenedNoiseStream */ + +/** + * Utility to await a NoiseSecretStream to open, that returns a stream with the + * correct types for publicKey and remotePublicKey (which can be null before + * stream is opened) + * + * @param {NoiseStream} stream + * @returns {Promise} + */ +export async function openedNoiseSecretStream(stream) { + await stream.opened + return /** @type {OpenedNoiseStream | DestroyedNoiseStream} */ (stream) +} diff --git a/package-lock.json b/package-lock.json index a875c189f..7cdd70ea0 100644 --- a/package-lock.json +++ b/package-lock.json @@ -22,14 +22,18 @@ "mapeo-schema": "github:digidem/mapeo-schema#protobufsTypescript", "multi-core-indexer": "github:digidem/multi-core-indexer#no-prepare", "multicast-service-discovery": "^4.0.4", + "p-defer": "^4.0.0", "protobufjs": "^7.1.2", + "protomux": "^3.4.1", "sodium-universal": "^3.1.0", "tiny-typed-emitter": "^2.1.0" }, "devDependencies": { "@hyperswarm/testnet": "^3.1.0", + "@sinonjs/fake-timers": "^10.0.2", "@types/json-schema": "^7.0.11", "@types/node": "^18.11.17", + "@types/sinonjs__fake-timers": "^8.1.2", "@types/streamx": "^2.9.1", "brittle": "^3.1.1", "depcheck": "^1.4.3", @@ -608,6 +612,24 @@ "version": "0.25.13", "license": "MIT" }, + "node_modules/@sinonjs/commons": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/@sinonjs/commons/-/commons-2.0.0.tgz", + "integrity": "sha512-uLa0j859mMrg2slwQYdO/AkrOfmH+X6LTVmNTS9CqexuE2IvVORIkSpJLqePAbEnKJ77aMmCwr1NUZ57120Xcg==", + "dev": true, + "dependencies": { + "type-detect": "4.0.8" + } + }, + "node_modules/@sinonjs/fake-timers": { + "version": "10.0.2", + "resolved": "https://registry.npmjs.org/@sinonjs/fake-timers/-/fake-timers-10.0.2.tgz", + "integrity": "sha512-SwUDyjWnah1AaNl7kxsa7cfLhlTYoiyhDAIgyh+El30YvXs/o7OLXpYH88Zdhyx9JExKrmHDJ+10bwIcY80Jmw==", + "dev": true, + "dependencies": { + "@sinonjs/commons": "^2.0.0" + } + }, "node_modules/@types/better-sqlite3": { "version": "7.6.3", "license": "MIT", @@ -647,6 +669,12 @@ "dev": true, "license": "MIT" }, + "node_modules/@types/sinonjs__fake-timers": { + "version": "8.1.2", + "resolved": "https://registry.npmjs.org/@types/sinonjs__fake-timers/-/sinonjs__fake-timers-8.1.2.tgz", + "integrity": "sha512-9GcLXF0/v3t80caGs5p2rRfkB+a8VBGLJZVih6CNFkx8IZ994wiKKLSRs9nuFwk1HevWs/1mnUmkApGrSGsShA==", + "dev": true + }, "node_modules/@types/streamx": { "version": "2.9.1", "dev": true, @@ -3136,6 +3164,17 @@ "node": ">= 0.8.0" } }, + "node_modules/p-defer": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/p-defer/-/p-defer-4.0.0.tgz", + "integrity": "sha512-Vb3QRvQ0Y5XnF40ZUWW7JfLogicVh/EnA5gBIvKDJoYpeI82+1E3AlB9yOcKFS0AhHrWVnAQO39fbR0G99IVEQ==", + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/p-limit": { "version": "3.1.0", "dev": true, @@ -4202,6 +4241,15 @@ "node": ">= 0.8.0" } }, + "node_modules/type-detect": { + "version": "4.0.8", + "resolved": "https://registry.npmjs.org/type-detect/-/type-detect-4.0.8.tgz", + "integrity": "sha512-0fr/mIH1dlO+x7TlcMy+bIDqKPsw/70tVyeHW787goQjhmqaZe10uwLujubK9q9Lg6Fiho1KUKDYz0Z7k7g5/g==", + "dev": true, + "engines": { + "node": ">=4" + } + }, "node_modules/type-fest": { "version": "0.20.2", "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-0.20.2.tgz", diff --git a/package.json b/package.json index 142cd4482..5dd8a1c9f 100644 --- a/package.json +++ b/package.json @@ -41,8 +41,10 @@ "homepage": "https://github.com/digidem/mapeo-core#readme", "devDependencies": { "@hyperswarm/testnet": "^3.1.0", + "@sinonjs/fake-timers": "^10.0.2", "@types/json-schema": "^7.0.11", "@types/node": "^18.11.17", + "@types/sinonjs__fake-timers": "^8.1.2", "@types/streamx": "^2.9.1", "brittle": "^3.1.1", "depcheck": "^1.4.3", @@ -70,7 +72,9 @@ "mapeo-schema": "github:digidem/mapeo-schema#protobufsTypescript", "multi-core-indexer": "github:digidem/multi-core-indexer#no-prepare", "multicast-service-discovery": "^4.0.4", + "p-defer": "^4.0.0", "protobufjs": "^7.1.2", + "protomux": "^3.4.1", "sodium-universal": "^3.1.0", "tiny-typed-emitter": "^2.1.0" } diff --git a/proto/buf.gen.yaml b/proto/buf.gen.yaml index ddfedab61..0a789a719 100644 --- a/proto/buf.gen.yaml +++ b/proto/buf.gen.yaml @@ -1,7 +1,7 @@ version: v1 plugins: - name: ts - out: ../lib/core-manager + out: ./build strategy: all path: ../node_modules/ts-proto/protoc-gen-ts_proto opt: @@ -14,3 +14,4 @@ plugins: - outputJsonMethods=false - useOptionals=none - outputPartialMethods=false + - stringEnums=true diff --git a/proto/messages.proto b/proto/extensions.proto similarity index 83% rename from proto/messages.proto rename to proto/extensions.proto index e5ff2956e..57eee814a 100644 --- a/proto/messages.proto +++ b/proto/extensions.proto @@ -1,3 +1,5 @@ +syntax = "proto3"; + message ProjectExtension { repeated bytes authCoreKeys = 1; repeated bytes wantCoreKeys = 2; diff --git a/proto/rpc.proto b/proto/rpc.proto new file mode 100644 index 000000000..c4bca6be7 --- /dev/null +++ b/proto/rpc.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +message Invite { + bytes projectKey = 1; + optional bytes encryptionKey = 2; +} +message InviteResponse { + enum Decision { + REJECT = 0; + ACCEPT = 1; + ALREADY = 2; + } + bytes projectKey = 1; + Decision decision = 2; +} diff --git a/scripts/build-messages.js b/scripts/build-messages.js index 3824f72ce..eedec6366 100755 --- a/scripts/build-messages.js +++ b/scripts/build-messages.js @@ -1,17 +1,41 @@ #!/usr/bin/env node -import { execSync } from "child_process"; +import { execSync } from 'child_process' +import fs from 'fs' import rimraf from 'rimraf' +import path from 'path' const protoURL = new URL('../proto', import.meta.url) const projectRootURL = new URL('..', import.meta.url) -const messagesTSPath = new URL('../lib/core-manager/messages.ts', import.meta.url).pathname +const buildPath = path.join(protoURL.pathname, './build') + +rimraf.sync(buildPath) + +const destinations = { + extensions: path.join(projectRootURL.pathname, './lib/core-manager'), + rpc: path.join(projectRootURL.pathname, './lib/rpc') +} const command1 = 'buf generate .' console.log(command1) execSync(command1, { cwd: protoURL, stdio: 'inherit' }) -const command2 = 'tsc --module es2020 --declaration --allowSyntheticDefaultImports --moduleResolution node ' + messagesTSPath +const command2 = `tsc --module es2020 --declaration --allowSyntheticDefaultImports --moduleResolution node ${buildPath}/*` console.log(command2) execSync(command2, { cwd: projectRootURL, stdio: 'inherit' }) -console.log('rimraf ' + messagesTSPath) -rimraf.sync(messagesTSPath) + +for (const [source, dest] of Object.entries(destinations)) { + const sourcePath = path.join(buildPath, source) + const destPath = path.join(dest, 'messages') + console.log( + `copy ./${path.relative( + projectRootURL.pathname, + sourcePath + )}{.js,.d.ts} to ./${path.relative( + projectRootURL.pathname, + destPath + )}{.js,.d.ts}` + ) + for (const ext of ['.js', '.d.ts']) { + fs.copyFileSync(sourcePath + ext, destPath + ext) + } +} diff --git a/tests/rpc.js b/tests/rpc.js new file mode 100644 index 000000000..2f34ab115 --- /dev/null +++ b/tests/rpc.js @@ -0,0 +1,356 @@ +// @ts-check +import test from 'brittle' +import NoiseSecretStream from '@hyperswarm/secret-stream' +import { + MapeoRPC, + PeerDisconnectedError, + TimeoutError, + UnknownPeerError +} from '../lib/rpc/index.js' +import FakeTimers from '@sinonjs/fake-timers' +import { once } from 'events' +import { Duplex } from 'streamx' + +test('Send invite and accept', async t => { + t.plan(3) + const r1 = new MapeoRPC() + const r2 = new MapeoRPC() + + const projectKey = Buffer.allocUnsafe(32).fill(0) + + r1.on('peers', async peers => { + t.is(peers.length, 1) + const response = await r1.invite(peers[0].id, { projectKey }) + t.is(response, MapeoRPC.InviteResponse.ACCEPT) + }) + + r2.on('invite', (peerId, invite) => { + t.ok(invite.projectKey.equals(projectKey), 'invite project key correct') + r2.inviteResponse(peerId, { + projectKey: invite.projectKey, + decision: MapeoRPC.InviteResponse.ACCEPT + }) + }) + + replicate(r1, r2) +}) + +test('Send invite and reject', async t => { + t.plan(3) + const r1 = new MapeoRPC() + const r2 = new MapeoRPC() + + const projectKey = Buffer.allocUnsafe(32).fill(0) + + r1.on('peers', async peers => { + t.is(peers.length, 1) + const response = await r1.invite(peers[0].id, { projectKey }) + t.is(response, MapeoRPC.InviteResponse.REJECT) + }) + + r2.on('invite', (peerId, invite) => { + t.ok(invite.projectKey.equals(projectKey), 'invite project key correct') + r2.inviteResponse(peerId, { + projectKey: invite.projectKey, + decision: MapeoRPC.InviteResponse.REJECT + }) + }) + + replicate(r1, r2) +}) + +test('Invite to unknown peer', async t => { + const r1 = new MapeoRPC() + const r2 = new MapeoRPC() + + const projectKey = Buffer.allocUnsafe(32).fill(0) + const unknownPeerId = Buffer.allocUnsafe(32).fill(1).toString('hex') + replicate(r1, r2) + + await once(r1, 'peers') + await t.exception(r1.invite(unknownPeerId, { projectKey }), UnknownPeerError) + await t.exception( + () => r2.inviteResponse(unknownPeerId, { + projectKey, + decision: MapeoRPC.InviteResponse.ACCEPT + }), UnknownPeerError + ) +}) + +test('Send invite and already on project', async t => { + t.plan(3) + const r1 = new MapeoRPC() + const r2 = new MapeoRPC() + + const projectKey = Buffer.allocUnsafe(32).fill(0) + + r1.on('peers', async peers => { + t.is(peers.length, 1) + const response = await r1.invite(peers[0].id, { projectKey }) + t.is(response, MapeoRPC.InviteResponse.ALREADY) + }) + + r2.on('invite', (peerId, invite) => { + t.ok(invite.projectKey.equals(projectKey), 'invite project key correct') + r2.inviteResponse(peerId, { + projectKey: invite.projectKey, + decision: MapeoRPC.InviteResponse.ALREADY + }) + }) + + replicate(r1, r2) +}) + +test('Send invite with encryption key', async t => { + t.plan(4) + const r1 = new MapeoRPC() + const r2 = new MapeoRPC() + + const projectKey = Buffer.allocUnsafe(32).fill(0) + const encryptionKey = Buffer.allocUnsafe(32).fill(1) + + r1.on('peers', async peers => { + t.is(peers.length, 1) + const response = await r1.invite(peers[0].id, { projectKey, encryptionKey }) + t.is(response, MapeoRPC.InviteResponse.ACCEPT) + }) + + r2.on('invite', (peerId, invite) => { + t.ok(invite.projectKey.equals(projectKey), 'invite project key correct') + t.ok( + invite.encryptionKey?.equals(encryptionKey), + 'invite encryption key correct' + ) + r2.inviteResponse(peerId, { + projectKey: invite.projectKey, + decision: MapeoRPC.InviteResponse.ACCEPT + }) + }) + + replicate(r1, r2) +}) + +test('Disconnected peer shows in state', async t => { + t.plan(6) + const r1 = new MapeoRPC() + const r2 = new MapeoRPC() + let peerStateUpdates = 0 + + r1.on('peers', async peers => { + t.is(peers.length, 1, 'one peer in state') + if (peers[0].status === 'connected') { + t.pass('peer appeared as connected') + t.is(++peerStateUpdates, 1) + destroy() + } else { + t.pass('peer appeared as disconnected') + t.is(++peerStateUpdates, 2) + } + }) + + const destroy = replicate(r1, r2) +}) + +test('Disconnect results in rejected invite', async t => { + t.plan(2) + const r1 = new MapeoRPC() + const r2 = new MapeoRPC() + + const projectKey = Buffer.allocUnsafe(32).fill(0) + + r1.on('peers', async peers => { + if (peers[0].status === 'connected') { + const invite = r1.invite(peers[0].id, { projectKey }) + await t.exception( + invite, + PeerDisconnectedError, + 'invite rejected with PeerDisconnectedError' + ) + } else { + t.pass('Peer disconnected') + } + }) + + r2.on('invite', () => { + destroy() + }) + + const destroy = replicate(r1, r2) +}) + +test('Invite to multiple peers', async t => { + // This is catches not tracking invites by peer + t.plan(2) + const r1 = new MapeoRPC() + const r2 = new MapeoRPC() + const r3 = new MapeoRPC() + + const projectKey = Buffer.allocUnsafe(32).fill(0) + + r1.on('peers', async peers => { + if (peers.length < 2) return + t.pass('connected to two peers') + const responses = await Promise.all( + peers.map(peer => r1.invite(peer.id, { projectKey })) + ) + t.alike( + responses.sort(), + [MapeoRPC.InviteResponse.ACCEPT, MapeoRPC.InviteResponse.REJECT], + 'One peer accepted, one rejected' + ) + }) + + r2.on('invite', (peerId, invite) => { + r2.inviteResponse(peerId, { + projectKey: invite.projectKey, + decision: MapeoRPC.InviteResponse.ACCEPT + }) + }) + + r3.on('invite', (peerId, invite) => { + r3.inviteResponse(peerId, { + projectKey: invite.projectKey, + decision: MapeoRPC.InviteResponse.REJECT + }) + }) + + replicate(r1, r2) + replicate(r2, r3) + replicate(r3, r1) +}) + +test('Multiple invites to a peer, only one response', async t => { + t.plan(2) + let count = 0 + const r1 = new MapeoRPC() + const r2 = new MapeoRPC() + + const projectKey = Buffer.allocUnsafe(32).fill(0) + + r1.on('peers', async peers => { + const responses = await Promise.all([ + r1.invite(peers[0].id, { projectKey }), + r1.invite(peers[0].id, { projectKey }), + r1.invite(peers[0].id, { projectKey }) + ]) + const expected = Array(3).fill(MapeoRPC.InviteResponse.ACCEPT) + t.alike(responses, expected) + }) + + r2.on('invite', (peerId, invite) => { + if (++count < 3) return + // Only respond to third invite + t.is(count, 3) + r2.inviteResponse(peerId, { + projectKey: invite.projectKey, + decision: MapeoRPC.InviteResponse.ACCEPT + }) + }) + + replicate(r1, r2) +}) + +test('Default: invites do not timeout', async t => { + const clock = FakeTimers.install({ shouldAdvanceTime: true }) + t.teardown(() => clock.uninstall()) + t.plan(1) + + const r1 = new MapeoRPC() + const r2 = new MapeoRPC() + + const projectKey = Buffer.allocUnsafe(32).fill(0) + + r1.once('peers', async peers => { + r1.invite(peers[0].id, { projectKey }).then( + () => t.fail('invite promise should not resolve'), + () => t.fail('invite promise should not reject') + ) + await clock.tickAsync('01:00') // Advance 1 hour + t.pass('Waited 1 hour without invite timing out') + }) + + replicate(r1, r2) +}) + +test('Invite timeout', async t => { + const clock = FakeTimers.install({ shouldAdvanceTime: true }) + t.teardown(() => clock.uninstall()) + t.plan(1) + + const r1 = new MapeoRPC() + const r2 = new MapeoRPC() + + const projectKey = Buffer.allocUnsafe(32).fill(0) + + r1.once('peers', async peers => { + t.exception( + r1.invite(peers[0].id, { projectKey, timeout: 5000 }), + TimeoutError + ) + clock.tick(5001) + }) + + replicate(r1, r2) +}) + +test('Reconnect peer and send invite', async t => { + const r1 = new MapeoRPC() + const r2 = new MapeoRPC() + + const projectKey = Buffer.allocUnsafe(32).fill(0) + + const destroy = replicate(r1, r2) + await once(r1, 'peers') + await destroy() + + t.is(r1.peers.length, 1) + t.is(r1.peers[0].status, 'disconnected') + + r2.on('invite', (peerId, invite) => { + t.ok(invite.projectKey.equals(projectKey), 'invite project key correct') + r2.inviteResponse(peerId, { + projectKey: invite.projectKey, + decision: MapeoRPC.InviteResponse.ACCEPT + }) + }) + + replicate(r1, r2) + const [peers] = await once(r1, 'peers') + t.is(r1.peers.length, 1) + t.is(peers[0].status, 'connected') + const response = await r1.invite(peers[0].id, { projectKey }) + t.is(response, MapeoRPC.InviteResponse.ACCEPT) +}) + +test('invalid stream', t => { + const r1 = new MapeoRPC() + const regularStream = new Duplex() + t.exception(() => r1.connect(regularStream), 'Invalid stream') +}) + +function replicate (rpc1, rpc2) { + const n1 = new NoiseSecretStream(true, undefined, { + // Keep keypairs deterministic for tests, since we use peer.publicKey as an identifier. + keyPair: NoiseSecretStream.keyPair(Buffer.allocUnsafe(32).fill(0)) + }) + const n2 = new NoiseSecretStream(false, undefined, { + keyPair: NoiseSecretStream.keyPair(Buffer.allocUnsafe(32).fill(1)) + }) + n1.rawStream.pipe(n2.rawStream).pipe(n1.rawStream) + + rpc1.connect(n1) + rpc2.connect(n2) + + return async function destroy () { + return Promise.all([ + new Promise(res => { + n1.on('close', res) + n1.destroy() + }), + new Promise(res => { + n2.on('close', res) + n2.destroy() + }) + ]) + } +} diff --git a/tsconfig.json b/tsconfig.json index fbc93331e..f44cf8b2b 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -4,6 +4,7 @@ "lib": ["es2020"], "noImplicitAny": true, "strictNullChecks": true, + "strictBindCallApply": true, "allowSyntheticDefaultImports": true, "resolveJsonModule": true, "module": "ES2020", diff --git a/types/compact-encoding.d.ts b/types/compact-encoding.d.ts new file mode 100644 index 000000000..5d9a8553f --- /dev/null +++ b/types/compact-encoding.d.ts @@ -0,0 +1 @@ +declare module 'compact-encoding' diff --git a/types/hyperswarm-secret-stream.d.ts b/types/hyperswarm-secret-stream.d.ts new file mode 100644 index 000000000..192964e51 --- /dev/null +++ b/types/hyperswarm-secret-stream.d.ts @@ -0,0 +1,55 @@ +declare module '@hyperswarm/secret-stream' { + import { Duplex as NodeDuplex } from 'stream' + import { Duplex, DuplexEvents } from 'streamx' + + interface Opts { + autostart?: boolean + // TODO: Use https://github.com/chm-diederichs/noise-handshake/blob/main/noise.js for specific patterns + pattern?: string + remotePublicKey?: Buffer + keyPair?: KeyPair + handshake?: { + tx: Buffer + rx: Buffer + hash: Buffer + publicKey: Buffer + remotePublicKey: Buffer + } + } + + type NoiseStreamEvents = { + connect: () => void + handshake: () => void + } + + class NoiseSecretStream< + RawStream extends NodeDuplex | Duplex = Duplex + > extends Duplex< + any, + any, + any, + any, + true, + true, + DuplexEvents & NoiseStreamEvents + > { + readonly isInitiator: boolean + readonly noiseStream: this + publicKey: Buffer | null + remotePublicKey: Buffer | null + handshakeHash: Buffer | null + rawStream: RawStream + opened: Promise + userData: any + + constructor(isInitiator: boolean, rawStream?: RawStream, opts?: Opts) + + static keyPair(seed?: Buffer): KeyPair + + start(rawStream?: NodeDuplex, opts?: Opts): void + setTimeout(ms?: number): void + setKeepAlive(ms?: number): void + } + + export = NoiseSecretStream +} diff --git a/types/modules.d.ts b/types/modules.d.ts index e0c3ff731..4f3e34433 100644 --- a/types/modules.d.ts +++ b/types/modules.d.ts @@ -763,60 +763,6 @@ declare module '@hyperswarm/testnet' { export = createTestnet } -declare module '@hyperswarm/secret-stream' { - import { Duplex as NodeDuplex } from 'stream' - import { Duplex, DuplexEvents } from 'streamx' - - interface Opts { - autostart?: boolean - // TODO: Use https://github.com/chm-diederichs/noise-handshake/blob/main/noise.js for specific patterns - pattern?: string - remotePublicKey?: Buffer - keyPair?: KeyPair - handshake?: { - tx: Buffer - rx: Buffer - hash: Buffer - publicKey: Buffer - remotePublicKey: Buffer - } - } - - type NoiseStreamEvents = { - connect: () => void - } - - class NoiseSecretStream< - RawStream extends NodeDuplex | Duplex = Duplex - > extends Duplex< - any, - any, - any, - any, - true, - true, - DuplexEvents & NoiseStreamEvents - > { - readonly isInitiator: boolean - readonly noiseStream: this - publicKey: Buffer - remotePublicKey: Buffer - handshakeHash: Buffer - rawStream: RawStream - opened: Promise - userData: any - - constructor(isInitiator: boolean, rawStream?: RawStream, opts?: Opts) - - static keyPair(seed?: Buffer): KeyPair - - start(rawStream?: NodeDuplex, opts?: Opts): void - setTimeout(ms?: number): void - setKeepAlive(ms?: number): void - } - - export = NoiseSecretStream -} declare module 'random-access-storage' { import { TypedEmitter } from 'tiny-typed-emitter' diff --git a/types/protomux.d.ts b/types/protomux.d.ts new file mode 100644 index 000000000..2e68c0646 --- /dev/null +++ b/types/protomux.d.ts @@ -0,0 +1,76 @@ +declare module 'protomux' { + import { Duplex } from 'streamx' + + interface PreEncodingState { + buffer: null + start: number + end: number + } + + interface EncodingState { + buffer: null | Buffer + start: number + end: number + } + + interface Encoding { + preencode(state: PreEncodingState, value: any): void + encode(state: EncodingState, value: any): void + decode(state: EncodingState): any + } + + interface Message { + type: number + send(msg: any): void + onmessage: (message: any) => void + encoding: Encoding + } + + type MessageOptions = Partial> + + interface Channel { + open(handshake?: any): void + userData: any + protocol: string + id: Buffer + messages: Message[] + opened: boolean + closed: boolean + destroyed: boolean + cork(): void + uncork(): void + close(): void + addMessage(opts?: MessageOptions): Message + } + + class Protomux { + constructor(stream: Duplex) + isProtomux: true + stream: Duplex + static from(stream: Duplex): Protomux + static isProtomux(mux: unknown): mux is Protomux + cork(): void + uncork(): void + pair( + opts: { protocol: string; id?: Buffer }, + notify: (id: Buffer) => Promise + ): void + unpair(opts: { protocol: string; id?: Buffer }): void + opened(opts: { protocol: string; id?: Buffer }): boolean + createChannel(opts: { + userData?: any + protocol: string + aliases?: string[] + id?: Buffer + unique?: boolean + handshake?: Encoding + messages: MessageOptions[] + onopen?(handshake?: any): Promise | void + onclose?(): Promise | void + ondestroy?(): Promise | void + }): Channel + destroy(err: Error): void + } + + export = Protomux +}