diff --git a/package-lock.json b/package-lock.json index 43525f8638..b140214e44 100644 --- a/package-lock.json +++ b/package-lock.json @@ -17,7 +17,7 @@ "@matrixai/errors": "^1.1.7", "@matrixai/id": "^3.3.6", "@matrixai/logger": "^3.1.0", - "@matrixai/quic": "^0.0.13", + "@matrixai/quic": "^0.0.14", "@matrixai/resources": "^1.1.5", "@matrixai/timer": "^1.1.1", "@matrixai/workers": "^1.3.7", @@ -1784,14 +1784,14 @@ "integrity": "sha512-C4JWpgbNik3V99bfGfDell5cH3JULD67eEq9CeXl4rYgsvanF8hhuY84ZYvndPhimt9qjA9/Z8uExKGoiv1zVw==" }, "node_modules/@matrixai/quic": { - "version": "0.0.13", - "resolved": "https://registry.npmjs.org/@matrixai/quic/-/quic-0.0.13.tgz", - "integrity": "sha512-tvlA0m2fUIchyEZxzkBbvYNXYf21u0gR4Lv2BaYZYmGa1Fr2VH07MCZu9Ka8DpAEOXKEU94yqhNSEKCCJ83LJA==", + "version": "0.0.14", + "resolved": "https://registry.npmjs.org/@matrixai/quic/-/quic-0.0.14.tgz", + "integrity": "sha512-FINNs92u0qKXHRmBbUePODEx67IAzVr0XgmMARLGcyMRC3rAuQN0L0fsXJTxokcUqO1aaIYurGhzY4kVwtHW8Q==", "dependencies": { - "@matrixai/async-cancellable": "^1.1.0", + "@matrixai/async-cancellable": "^1.1.1", "@matrixai/async-init": "^1.8.4", "@matrixai/async-locks": "^4.0.0", - "@matrixai/contexts": "^1.0.0", + "@matrixai/contexts": "^1.1.0", "@matrixai/errors": "^1.1.7", "@matrixai/logger": "^3.1.0", "@matrixai/resources": "^1.1.5", @@ -1799,16 +1799,16 @@ "ip-num": "^1.5.0" }, "optionalDependencies": { - "@matrixai/quic-darwin-arm64": "0.0.13", - "@matrixai/quic-darwin-x64": "0.0.13", - "@matrixai/quic-linux-x64": "0.0.13", - "@matrixai/quic-win32-x64": "0.0.13" + "@matrixai/quic-darwin-arm64": "0.0.14", + "@matrixai/quic-darwin-x64": "0.0.14", + "@matrixai/quic-linux-x64": "0.0.14", + "@matrixai/quic-win32-x64": "0.0.14" } }, "node_modules/@matrixai/quic-darwin-arm64": { - "version": "0.0.13", - "resolved": "https://registry.npmjs.org/@matrixai/quic-darwin-arm64/-/quic-darwin-arm64-0.0.13.tgz", - "integrity": "sha512-EKBfqYr6mMj0k9cE97KiommyFb7eD3u4OWloMFySERcBzg+9HWwonDX5/kyChllxEDorPXneW/CfF8gtZTQ1ug==", + "version": "0.0.14", + "resolved": "https://registry.npmjs.org/@matrixai/quic-darwin-arm64/-/quic-darwin-arm64-0.0.14.tgz", + "integrity": "sha512-cdC6m02aaqKZi0dOulLhTFYNUALqnptsHzgFCwOU2uwiEbBghIkC/gjrPMpj4KmReb55ZZ5z2cap8KrQCcD0Cw==", "cpu": [ "arm64" ], @@ -1818,9 +1818,9 @@ ] }, "node_modules/@matrixai/quic-darwin-x64": { - "version": "0.0.13", - "resolved": "https://registry.npmjs.org/@matrixai/quic-darwin-x64/-/quic-darwin-x64-0.0.13.tgz", - "integrity": "sha512-WTf9gKdAqHkWVk48eWZ4JofctjZBrvUxEfg8HcBDzye1kz1O+0IyJlF4web3ZFYu/lvoGQn6DTaJvozdQS5hTw==", + "version": "0.0.14", + "resolved": "https://registry.npmjs.org/@matrixai/quic-darwin-x64/-/quic-darwin-x64-0.0.14.tgz", + "integrity": "sha512-zvkGeBGFgOKSXnHFcqZyEqmam6ZhvjKXi5oTp5nzYqGw2lf6KorruEmdQoi414fOZwB+uEvTZuH15rCapvZO2w==", "cpu": [ "x64" ], @@ -1830,9 +1830,9 @@ ] }, "node_modules/@matrixai/quic-linux-x64": { - "version": "0.0.13", - "resolved": "https://registry.npmjs.org/@matrixai/quic-linux-x64/-/quic-linux-x64-0.0.13.tgz", - "integrity": "sha512-ExOhO9YjiCNV6OrRMF2+CVQdPANa2zSqlMzCUaLC5whAsll50M08LpoV4J/HnmpTWPcfohr+G28bFWVsnb8/wA==", + "version": "0.0.14", + "resolved": "https://registry.npmjs.org/@matrixai/quic-linux-x64/-/quic-linux-x64-0.0.14.tgz", + "integrity": "sha512-t6kCv7Cg9ef6rVMWyH4/mkcRTsxgusXSO+ivlysCOiTK6R+LasTLelzjnWvUvJXfCgY9cJHn7JsZuQsdf/SB0g==", "cpu": [ "x64" ], diff --git a/package.json b/package.json index 8a043f7ee1..913308e1a7 100644 --- a/package.json +++ b/package.json @@ -70,7 +70,7 @@ "@matrixai/resources": "^1.1.5", "@matrixai/timer": "^1.1.1", "@matrixai/workers": "^1.3.7", - "@matrixai/quic": "^0.0.13", + "@matrixai/quic": "^0.0.14", "@peculiar/asn1-pkcs8": "^2.3.0", "@peculiar/asn1-schema": "^2.3.0", "@peculiar/asn1-x509": "^2.3.0", diff --git a/src/agent/handlers/clientManifest.ts b/src/agent/handlers/clientManifest.ts index 35b58edb24..b3e4874886 100644 --- a/src/agent/handlers/clientManifest.ts +++ b/src/agent/handlers/clientManifest.ts @@ -12,7 +12,12 @@ import type { VaultsGitPackGetMessage, VaultsScanMessage, } from './types'; -import { DuplexCaller, ServerCaller, UnaryCaller } from '../../rpc/callers'; +import { + DuplexCaller, + RawCaller, + ServerCaller, + UnaryCaller, +} from '../../rpc/callers'; const nodesClaimsGet = new ServerCaller< AgentRPCRequestParams, @@ -39,10 +44,7 @@ const notificationsSend = new UnaryCaller< AgentRPCResponseResult >(); -const vaultsGitInfoGet = new ServerCaller< - AgentRPCRequestParams, - AgentRPCResponseResult ->(); +const vaultsGitInfoGet = new RawCaller(); const vaultsGitPackGet = new ServerCaller< AgentRPCRequestParams, diff --git a/src/agent/handlers/serverManifest.ts b/src/agent/handlers/serverManifest.ts index 8f0f5d76c0..952f66e247 100644 --- a/src/agent/handlers/serverManifest.ts +++ b/src/agent/handlers/serverManifest.ts @@ -41,8 +41,8 @@ const serverManifest = (container: { nodesCrossSignClaim: new NodesCrossSignClaimHandler(container), nodesHolePunchMessageSend: new NodesHolePunchMessageSendHandler(container), notificationsSend: new NotificationsSendHandler(container), - VaultsGitInfoGet: new VaultsGitInfoGetHandler(container), - VaultsGitPackGet: new VaultsGitPackGetHandler(container), + vaultsGitInfoGet: new VaultsGitInfoGetHandler(container), + vaultsGitPackGet: new VaultsGitPackGetHandler(container), vaultsScan: new VaultsScanHandler(container), }; }; diff --git a/src/agent/handlers/vaultsGitInfoGet.ts b/src/agent/handlers/vaultsGitInfoGet.ts index 969bbb8d8f..7b16f5d1d5 100644 --- a/src/agent/handlers/vaultsGitInfoGet.ts +++ b/src/agent/handlers/vaultsGitInfoGet.ts @@ -6,59 +6,56 @@ import type { ACL } from '../../acl'; import type Logger from '@matrixai/logger'; import type { VaultsGitInfoGetMessage } from './types'; import type { VaultAction } from '../../vaults/types'; +import type { JSONRPCRequest } from '@/rpc/types'; +import type { ContextTimed } from '@matrixai/contexts'; +import type { JSONValue } from '@/types'; +import { ReadableStream } from 'stream/web'; import * as agentErrors from '../errors'; import * as vaultsUtils from '../../vaults/utils'; import * as vaultsErrors from '../../vaults/errors'; -import { ServerHandler } from '../../rpc/handlers'; +import { RawHandler } from '../../rpc/handlers'; import { validateSync } from '../../validation'; -import { matchSync } from '../../utils'; +import { matchSync, never } from '../../utils'; import * as validationUtils from '../../validation/utils'; import * as nodesUtils from '../../nodes/utils'; import * as agentUtils from '../utils'; +import * as utils from '../../utils'; -class VaultsGitInfoGetHandler extends ServerHandler< - { - db: DB; - vaultManager: VaultManager; - acl: ACL; - logger: Logger; - }, - AgentRPCRequestParams, - AgentRPCResponseResult -> { - public async *handle( - input: AgentRPCRequestParams, - _cancel, - meta, - ): AsyncGenerator { +class VaultsGitInfoGetHandler extends RawHandler<{ + db: DB; + vaultManager: VaultManager; + acl: ACL; + logger: Logger; +}> { + public async handle( + input: [JSONRPCRequest, ReadableStream], + cancel: (reason?: any) => void, + meta: Record | undefined, + ctx: ContextTimed, + ): Promise<[JSONValue, ReadableStream]> { const { db, vaultManager, acl } = this.container; - yield* db.withTransactionG(async function* ( - tran, - ): AsyncGenerator { + const [headerMessage, inputStream] = input; + const params = headerMessage.params; + if (params == null || !utils.isObject(params)) never(); + if ( + !('vaultNameOrId' in params) || + typeof params.vaultNameOrId != 'string' + ) { + never(); + } + if (!('action' in params) || typeof params.action != 'string') never(); + const vaultNameOrId = params.vaultNameOrId; + const actionType = validationUtils.parseVaultAction(params.action); + const data = await db.withTransactionF(async (tran) => { const vaultIdFromName = await vaultManager.getVaultId( - input.vaultNameOrId, + vaultNameOrId, tran, ); const vaultId = - vaultIdFromName ?? vaultsUtils.decodeVaultId(input.vaultNameOrId); + vaultIdFromName ?? vaultsUtils.decodeVaultId(vaultNameOrId); if (vaultId == null) { throw new vaultsErrors.ErrorVaultsVaultUndefined(); } - const { - actionType, - }: { - actionType: VaultAction; - } = validateSync( - (keyPath, value) => { - return matchSync(keyPath)( - [['actionType'], () => validationUtils.parseVaultAction(value)], - () => value, - ); - }, - { - actionType: input.action, - }, - ); const vaultName = (await vaultManager.getVaultMeta(vaultId, tran)) ?.vaultName; if (vaultName == null) { @@ -85,20 +82,35 @@ class VaultsGitInfoGetHandler extends ServerHandler< ); } - yield { - vaultName: vaultName, - vaultIdEncoded: vaultsUtils.encodeVaultId(vaultId), + return { + vaultId, + vaultName, }; - for await (const byte of vaultManager.handleInfoRequest(vaultId, tran)) { - if (byte !== null) { - yield { - chunk: byte.toString('binary'), - }; - } else { - return; + }); + + // TODO: Needs to handle cancellation + const stream = new ReadableStream({ + start: async (controller) => { + for await (const buffer of vaultManager.handleInfoRequest( + data.vaultId, + )) { + if (buffer != null) { + controller.enqueue(buffer); + } else { + break; + } } - } + controller.close(); + }, }); + + return [ + { + vaultName: data.vaultName, + vaultIdEncoded: vaultsUtils.encodeVaultId(data.vaultId), + }, + stream, + ]; } } diff --git a/src/nodes/NodeConnection.ts b/src/nodes/NodeConnection.ts index 779b3805a6..620d82f5d5 100644 --- a/src/nodes/NodeConnection.ts +++ b/src/nodes/NodeConnection.ts @@ -24,6 +24,7 @@ import * as rpcUtils from '../rpc/utils'; import * as keysUtils from '../keys/utils'; import * as nodesUtils from '../nodes/utils'; import { never } from '../utils'; +import * as utils from '../utils'; /** * Encapsulates the unidirectional client-side connection of one node to another. @@ -93,9 +94,9 @@ class NodeConnection extends EventTarget { targetHostname, crypto, tlsConfig, + manifest, quicConfig = {}, quicSocket, - manifest, logger = new Logger(this.name), }: { handleStream: (stream: RPCStream) => void; @@ -105,9 +106,9 @@ class NodeConnection extends EventTarget { targetHostname?: Hostname; crypto: ClientCrypto; tlsConfig: TLSConfig; + manifest: M; quicConfig?: QuicConfig; quicSocket?: QUICSocket; - manifest: M; logger?: Logger; }, @context ctx: ContextTimed, @@ -140,6 +141,8 @@ class NodeConnection extends EventTarget { crypto: { ops: crypto, }, + reasonToCode: utils.reasonToCode, + codeToReason: utils.codeToReason, logger: logger.getChild(QUICClient.name), }, ctx, diff --git a/src/rpc/RPCServer.ts b/src/rpc/RPCServer.ts index 52aa3ffb75..3de812a0e8 100644 --- a/src/rpc/RPCServer.ts +++ b/src/rpc/RPCServer.ts @@ -568,7 +568,7 @@ class RPCServer extends EventTarget { let handlerResult: [JSONValue | undefined, ReadableStream]; const headerWriter = rpcStream.writable.getWriter(); try { - handlerResult = handler( + handlerResult = await handler( [headerMessage.value, inputStream], rpcStream.cancel, rpcStream.meta, diff --git a/src/rpc/handlers.ts b/src/rpc/handlers.ts index f2b280c86e..a388c75a10 100644 --- a/src/rpc/handlers.ts +++ b/src/rpc/handlers.ts @@ -29,7 +29,7 @@ abstract class RawHandler< cancel: (reason?: any) => void, meta: Record | undefined, ctx: ContextTimed, - ): [JSONValue, ReadableStream]; + ): Promise<[JSONValue, ReadableStream]>; } abstract class DuplexHandler< diff --git a/src/utils/utils.ts b/src/utils/utils.ts index 53b293b5c0..eda95679ae 100644 --- a/src/utils/utils.ts +++ b/src/utils/utils.ts @@ -429,12 +429,20 @@ function lexiUnpackBuffer(b: Buffer): number { return lexi.unpack([...b]); } +// TODO: remove this, quick hack to allow errors to jump the network +const codeMap = new Map(); +let code = 1; + const reasonToCode = (_type: 'recv' | 'send', _reason?: any): number => { - return 0; + codeMap.set(code, _reason); + const returnCode = code; + code++; + return returnCode; }; const codeToReason = (type: 'recv' | 'send', code: number): any => { - return Error(`${type} ${code}`); + const asd = codeMap.get(code); + return asd; }; export { diff --git a/src/vaults/VaultInternal.ts b/src/vaults/VaultInternal.ts index 5e8cbb933b..cb44ebdac3 100644 --- a/src/vaults/VaultInternal.ts +++ b/src/vaults/VaultInternal.ts @@ -26,6 +26,8 @@ import { } from '@matrixai/async-init/dist/CreateDestroyStartStop'; import { withF, withG } from '@matrixai/resources'; import { RWLockWriter } from '@matrixai/async-locks'; +import * as utils from '@/utils'; +import * as validationUtils from '@/validation/utils'; import * as vaultsErrors from './errors'; import * as vaultsUtils from './utils'; import { tagLast } from './types'; @@ -756,91 +758,93 @@ class VaultInternal { } protected async request( - _client: RPCClient, - _vaultNameOrId: VaultId | VaultName, - _vaultAction: VaultAction, + client: RPCClient, + vaultNameOrId: VaultId | VaultName, + vaultAction: VaultAction, ): Promise { - throw Error('TMP IMP'); - // Const vaultNameOrId_ = typeof vaultNameOrId === 'string' ? - // vaultNameOrId : - // vaultsUtils.encodeVaultId(vaultNameOrId); - // const response = client.methods.vaultsGitInfoGet({ - // vaultNameOrId: vaultNameOrId_, - // action: vaultAction, - // }); - // let vaultName, remoteVaultId; - // response.stream.on('metadata', async (meta) => { - // // Receive the Id of the remote vault - // vaultName = meta.get('vaultName').pop(); - // if (vaultName) vaultName = vaultName.toString(); - // const vId = meta.get('vaultId').pop(); - // if (vId) remoteVaultId = validationUtils.parseVaultId(vId.toString()); - // }); - // // Collect the response buffers from the GET request - // const infoResponse: Uint8Array[] = []; - // for await (const resp of response) { - // infoResponse.push(resp.getChunk_asU8()); - // } - // const metadata = new grpc.Metadata(); - // metadata.set('vaultAction', vaultAction); - // if (typeof vaultNameOrId === 'string') { - // metadata.set('vaultNameOrId', vaultNameOrId); - // } else { - // // Metadata only accepts the user readable form of the vault Id - // // as the string form has illegal characters - // metadata.set('vaultNameOrId', vaultsUtils.encodeVaultId(vaultNameOrId)); - // } - // return [ - // async function ({ - // url, - // method = 'GET', - // headers = {}, - // body = [Buffer.from('')], - // }: { - // url: string; - // method: string; - // headers: POJO; - // body: Buffer[]; - // }) { - // if (method === 'GET') { - // // Send back the GET request info response - // return { - // url: url, - // method: method, - // body: infoResponse, - // headers: headers, - // statusCode: 200, - // statusMessage: 'OK', - // }; - // } else if (method === 'POST') { - // const responseBuffers: Array = []; - // const stream = client.vaultsGitPackGet(metadata); - // const chunk = new vaultsPB.PackChunk(); - // // Body is usually an async generator but in the cases we are using, - // // only the first value is used - // chunk.setChunk(body[0]); - // // Tell the server what commit we need - // await stream.write(chunk); - // let packResponse = (await stream.read()).value; - // while (packResponse != null) { - // responseBuffers.push(packResponse.getChunk_asU8()); - // packResponse = (await stream.read()).value; - // } - // return { - // url: url, - // method: method, - // body: responseBuffers, - // headers: headers, - // statusCode: 200, - // statusMessage: 'OK', - // }; - // } else { - // never(); - // } - // }, - // vaultName, - // remoteVaultId, - // ]; + const vaultNameOrId_ = + typeof vaultNameOrId === 'string' + ? vaultNameOrId + : vaultsUtils.encodeVaultId(vaultNameOrId); + console.log('a'); + const response = await client.methods.vaultsGitInfoGet({ + vaultNameOrId: vaultNameOrId_, + action: vaultAction, + }); + console.log('a'); + console.log(response.meta); + + const result = response.meta?.result; + if (result == null || !utils.isObject(result)) never(); + if (!('vaultName' in result) || typeof result.vaultName != 'string') { + never(); + } + if ( + !('vaultIdEncoded' in result) || + typeof result.vaultIdEncoded != 'string' + ) { + never(); + } + const vaultName = result.vaultName; + const remoteVaultId = validationUtils.parseVaultId(result.vaultIdEncoded); + + // Collect the response buffers from the GET request + const infoResponse: Uint8Array[] = []; + for await (const chunk of response.readable) { + infoResponse.push(chunk); + } + // TODO: complete + return [ + async function ({ + url, + method = 'GET', + headers = {}, + body = [Buffer.from('')], + }: { + url: string; + method: string; + headers: POJO; + body: Buffer[]; + }) { + if (method === 'GET') { + // Send back the GET request info response + return { + url: url, + method: method, + body: infoResponse, + headers: headers, + statusCode: 200, + statusMessage: 'OK', + }; + } else if (method === 'POST') { + const responseBuffers: Array = []; + const stream = client.methods.vaultsGitPackGet(metadata); + const chunk = new vaultsPB.PackChunk(); + // Body is usually an async generator but in the cases we are using, + // only the first value is used + chunk.setChunk(body[0]); + // Tell the server what commit we need + await stream.write(chunk); + let packResponse = (await stream.read()).value; + while (packResponse != null) { + responseBuffers.push(packResponse.getChunk_asU8()); + packResponse = (await stream.read()).value; + } + return { + url: url, + method: method, + body: responseBuffers, + headers: headers, + statusCode: 200, + statusMessage: 'OK', + }; + } else { + never(); + } + }, + vaultName, + remoteVaultId, + ]; } /** diff --git a/tests/vaults/VaultManager.test.ts b/tests/vaults/VaultManager.test.ts index 7d1fab63be..4ca37466ae 100644 --- a/tests/vaults/VaultManager.test.ts +++ b/tests/vaults/VaultManager.test.ts @@ -470,7 +470,7 @@ describe('VaultManager', () => { } }); // TODO: disabled until feature is addressed in agent migration stage 2 - describe.skip('with remote agents', () => { + describe('with remote agents', () => { let allDataDir: string; let keyRing: KeyRing; let nodeGraph: NodeGraph; @@ -517,12 +517,12 @@ describe('VaultManager', () => { // Adding details to each agent await remoteKeynode1.nodeGraph.setNode(remoteKeynode2Id, { - host: remoteKeynode2.quicServerAgent.host as Host, - port: remoteKeynode2.quicServerAgent.port as Port, + host: remoteKeynode2.quicSocket.host as Host, + port: remoteKeynode2.quicSocket.port as Port, }); await remoteKeynode2.nodeGraph.setNode(remoteKeynode1Id, { - host: remoteKeynode1.quicServerAgent.host as Host, - port: remoteKeynode1.quicServerAgent.port as Port, + host: remoteKeynode1.quicSocket.host as Host, + port: remoteKeynode1.quicSocket.port as Port, }); await remoteKeynode1.gestaltGraph.setNode({ @@ -580,25 +580,23 @@ describe('VaultManager', () => { nodeConnectionManager = new NodeConnectionManager({ keyRing, nodeGraph, - quicClientConfig: { - key: tlsConfig.keyPrivatePem, - cert: tlsConfig.certChainPem, - }, + tlsConfig, crypto, quicSocket, logger, }); await nodeConnectionManager.start({ nodeManager: { setNode: jest.fn() } as unknown as NodeManager, + handleStream: () => {}, }); await taskManager.startProcessing(); await nodeGraph.setNode(remoteKeynode1Id, { - host: remoteKeynode1.quicServerAgent.host as Host, - port: remoteKeynode1.quicServerAgent.port as Port, + host: remoteKeynode1.quicSocket.host as Host, + port: remoteKeynode1.quicSocket.port as Port, }); await nodeGraph.setNode(remoteKeynode2Id, { - host: remoteKeynode2.quicServerAgent.host as Host, - port: remoteKeynode2.quicServerAgent.port as Port, + host: remoteKeynode2.quicSocket.host as Host, + port: remoteKeynode2.quicSocket.port as Port, }); }); afterEach(async () => { @@ -1400,8 +1398,8 @@ describe('VaultManager', () => { // Letting nodeGraph know where the remote agent is await nodeGraph.setNode(targetNodeId, { - host: remoteKeynode1.quicServerAgent.host as Host, - port: remoteKeynode1.quicServerAgent.port as Port, + host: remoteKeynode1.quicSocket.host as Host, + port: remoteKeynode1.quicSocket.port as Port, }); await remoteKeynode1.gestaltGraph.setNode({