Skip to content

Commit

Permalink
wip: fixing up vaults agent handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
tegefaulkes committed Aug 15, 2023
1 parent ea2928a commit 9c26b16
Show file tree
Hide file tree
Showing 11 changed files with 207 additions and 180 deletions.
38 changes: 19 additions & 19 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
"@matrixai/resources": "^1.1.5",
"@matrixai/timer": "^1.1.1",
"@matrixai/workers": "^1.3.7",
"@matrixai/quic": "^0.0.13",
"@matrixai/quic": "^0.0.14",
"@peculiar/asn1-pkcs8": "^2.3.0",
"@peculiar/asn1-schema": "^2.3.0",
"@peculiar/asn1-x509": "^2.3.0",
Expand Down
12 changes: 7 additions & 5 deletions src/agent/handlers/clientManifest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@ import type {
VaultsGitPackGetMessage,
VaultsScanMessage,
} from './types';
import { DuplexCaller, ServerCaller, UnaryCaller } from '../../rpc/callers';
import {
DuplexCaller,
RawCaller,
ServerCaller,
UnaryCaller,
} from '../../rpc/callers';

const nodesClaimsGet = new ServerCaller<
AgentRPCRequestParams<ClaimIdMessage>,
Expand All @@ -39,10 +44,7 @@ const notificationsSend = new UnaryCaller<
AgentRPCResponseResult
>();

const vaultsGitInfoGet = new ServerCaller<
AgentRPCRequestParams<VaultsGitInfoGetMessage>,
AgentRPCResponseResult<VaultInfo | GitPackMessage>
>();
const vaultsGitInfoGet = new RawCaller();

const vaultsGitPackGet = new ServerCaller<
AgentRPCRequestParams<VaultsGitPackGetMessage>,
Expand Down
4 changes: 2 additions & 2 deletions src/agent/handlers/serverManifest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ const serverManifest = (container: {
nodesCrossSignClaim: new NodesCrossSignClaimHandler(container),
nodesHolePunchMessageSend: new NodesHolePunchMessageSendHandler(container),
notificationsSend: new NotificationsSendHandler(container),
VaultsGitInfoGet: new VaultsGitInfoGetHandler(container),
VaultsGitPackGet: new VaultsGitPackGetHandler(container),
vaultsGitInfoGet: new VaultsGitInfoGetHandler(container),
vaultsGitPackGet: new VaultsGitPackGetHandler(container),
vaultsScan: new VaultsScanHandler(container),
};
};
Expand Down
108 changes: 60 additions & 48 deletions src/agent/handlers/vaultsGitInfoGet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,59 +6,56 @@ import type { ACL } from '../../acl';
import type Logger from '@matrixai/logger';
import type { VaultsGitInfoGetMessage } from './types';
import type { VaultAction } from '../../vaults/types';
import type { JSONRPCRequest } from '@/rpc/types';
import type { ContextTimed } from '@matrixai/contexts';
import type { JSONValue } from '@/types';
import { ReadableStream } from 'stream/web';
import * as agentErrors from '../errors';
import * as vaultsUtils from '../../vaults/utils';
import * as vaultsErrors from '../../vaults/errors';
import { ServerHandler } from '../../rpc/handlers';
import { RawHandler } from '../../rpc/handlers';
import { validateSync } from '../../validation';
import { matchSync } from '../../utils';
import { matchSync, never } from '../../utils';
import * as validationUtils from '../../validation/utils';
import * as nodesUtils from '../../nodes/utils';
import * as agentUtils from '../utils';
import * as utils from '../../utils';

class VaultsGitInfoGetHandler extends ServerHandler<
{
db: DB;
vaultManager: VaultManager;
acl: ACL;
logger: Logger;
},
AgentRPCRequestParams<VaultsGitInfoGetMessage>,
AgentRPCResponseResult<VaultInfo | GitPackMessage>
> {
public async *handle(
input: AgentRPCRequestParams<VaultsGitInfoGetMessage>,
_cancel,
meta,
): AsyncGenerator<VaultInfo | GitPackMessage> {
class VaultsGitInfoGetHandler extends RawHandler<{
db: DB;
vaultManager: VaultManager;
acl: ACL;
logger: Logger;
}> {
public async handle(
input: [JSONRPCRequest, ReadableStream<Uint8Array>],
cancel: (reason?: any) => void,
meta: Record<string, JSONValue> | undefined,
ctx: ContextTimed,
): Promise<[JSONValue, ReadableStream<Uint8Array>]> {
const { db, vaultManager, acl } = this.container;
yield* db.withTransactionG(async function* (
tran,
): AsyncGenerator<VaultInfo | GitPackMessage> {
const [headerMessage, inputStream] = input;
const params = headerMessage.params;
if (params == null || !utils.isObject(params)) never();
if (
!('vaultNameOrId' in params) ||
typeof params.vaultNameOrId != 'string'
) {
never();
}
if (!('action' in params) || typeof params.action != 'string') never();
const vaultNameOrId = params.vaultNameOrId;
const actionType = validationUtils.parseVaultAction(params.action);
const data = await db.withTransactionF(async (tran) => {
const vaultIdFromName = await vaultManager.getVaultId(
input.vaultNameOrId,
vaultNameOrId,
tran,
);
const vaultId =
vaultIdFromName ?? vaultsUtils.decodeVaultId(input.vaultNameOrId);
vaultIdFromName ?? vaultsUtils.decodeVaultId(vaultNameOrId);
if (vaultId == null) {
throw new vaultsErrors.ErrorVaultsVaultUndefined();
}
const {
actionType,
}: {
actionType: VaultAction;
} = validateSync(
(keyPath, value) => {
return matchSync(keyPath)(
[['actionType'], () => validationUtils.parseVaultAction(value)],
() => value,
);
},
{
actionType: input.action,
},
);
const vaultName = (await vaultManager.getVaultMeta(vaultId, tran))
?.vaultName;
if (vaultName == null) {
Expand All @@ -85,20 +82,35 @@ class VaultsGitInfoGetHandler extends ServerHandler<
);
}

yield {
vaultName: vaultName,
vaultIdEncoded: vaultsUtils.encodeVaultId(vaultId),
return {
vaultId,
vaultName,
};
for await (const byte of vaultManager.handleInfoRequest(vaultId, tran)) {
if (byte !== null) {
yield {
chunk: byte.toString('binary'),
};
} else {
return;
});

// TODO: Needs to handle cancellation
const stream = new ReadableStream({
start: async (controller) => {
for await (const buffer of vaultManager.handleInfoRequest(
data.vaultId,
)) {
if (buffer != null) {
controller.enqueue(buffer);
} else {
break;
}
}
}
controller.close();
},
});

return [
{
vaultName: data.vaultName,
vaultIdEncoded: vaultsUtils.encodeVaultId(data.vaultId),
},
stream,
];
}
}

Expand Down
7 changes: 5 additions & 2 deletions src/nodes/NodeConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import * as rpcUtils from '../rpc/utils';
import * as keysUtils from '../keys/utils';
import * as nodesUtils from '../nodes/utils';
import { never } from '../utils';
import * as utils from '../utils';

/**
* Encapsulates the unidirectional client-side connection of one node to another.
Expand Down Expand Up @@ -93,9 +94,9 @@ class NodeConnection<M extends ClientManifest> extends EventTarget {
targetHostname,
crypto,
tlsConfig,
manifest,
quicConfig = {},
quicSocket,
manifest,
logger = new Logger(this.name),
}: {
handleStream: (stream: RPCStream<Uint8Array, Uint8Array>) => void;
Expand All @@ -105,9 +106,9 @@ class NodeConnection<M extends ClientManifest> extends EventTarget {
targetHostname?: Hostname;
crypto: ClientCrypto;
tlsConfig: TLSConfig;
manifest: M;
quicConfig?: QuicConfig;
quicSocket?: QUICSocket;
manifest: M;
logger?: Logger;
},
@context ctx: ContextTimed,
Expand Down Expand Up @@ -140,6 +141,8 @@ class NodeConnection<M extends ClientManifest> extends EventTarget {
crypto: {
ops: crypto,
},
reasonToCode: utils.reasonToCode,
codeToReason: utils.codeToReason,
logger: logger.getChild(QUICClient.name),
},
ctx,
Expand Down
2 changes: 1 addition & 1 deletion src/rpc/RPCServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ class RPCServer extends EventTarget {
let handlerResult: [JSONValue | undefined, ReadableStream<Uint8Array>];
const headerWriter = rpcStream.writable.getWriter();
try {
handlerResult = handler(
handlerResult = await handler(
[headerMessage.value, inputStream],
rpcStream.cancel,
rpcStream.meta,
Expand Down
2 changes: 1 addition & 1 deletion src/rpc/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ abstract class RawHandler<
cancel: (reason?: any) => void,
meta: Record<string, JSONValue> | undefined,
ctx: ContextTimed,
): [JSONValue, ReadableStream<Uint8Array>];
): Promise<[JSONValue, ReadableStream<Uint8Array>]>;
}

abstract class DuplexHandler<
Expand Down
12 changes: 10 additions & 2 deletions src/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -429,12 +429,20 @@ function lexiUnpackBuffer(b: Buffer): number {
return lexi.unpack([...b]);
}

// TODO: remove this, quick hack to allow errors to jump the network
const codeMap = new Map<number, any>();
let code = 1;

const reasonToCode = (_type: 'recv' | 'send', _reason?: any): number => {
return 0;
codeMap.set(code, _reason);
const returnCode = code;
code++;
return returnCode;
};

const codeToReason = (type: 'recv' | 'send', code: number): any => {
return Error(`${type} ${code}`);
const asd = codeMap.get(code);
return asd;
};

export {
Expand Down
Loading

0 comments on commit 9c26b16

Please sign in to comment.