diff --git a/src/RPC/RPCClient.ts b/src/RPC/RPCClient.ts index f0a684a027..91f2c729db 100644 --- a/src/RPC/RPCClient.ts +++ b/src/RPC/RPCClient.ts @@ -3,10 +3,13 @@ import type { JsonRpcRequestMessage, StreamPairCreateCallback, ClientManifest, - MapRawCallers, } from './types'; import type { JSONValue } from 'types'; -import type { ReadableWritablePair, WritableStream } from 'stream/web'; +import type { + ReadableWritablePair, + WritableStream, + ReadableStream, +} from 'stream/web'; import type { JsonRpcRequest, JsonRpcResponse, @@ -78,27 +81,9 @@ class RPCClient { case 'CLIENT': return () => this.clientStreamCaller(method); case 'DUPLEX': - return (f) => this.duplexStreamCaller(method, f); + return () => this.duplexStreamCaller(method); case 'RAW': - default: - return; - } - }, - }, - ); - protected rawMethodsProxy = new Proxy( - {}, - { - get: (_, method) => { - if (typeof method === 'symbol') throw never(); - switch (this.callerTypes[method]) { - case 'DUPLEX': - return () => this.rawDuplexStreamCaller(method); - case 'RAW': - return (params) => this.rawStreamCaller(method, params); - case 'SERVER': - case 'CLIENT': - case 'UNARY': + return (header) => this.rawStreamCaller(method, header); default: return; } @@ -138,17 +123,12 @@ class RPCClient { return this.methodsProxy as MapCallers; } - @ready(new rpcErrors.ErrorRpcDestroyed()) - public get rawMethods(): MapRawCallers { - return this.rawMethodsProxy as MapRawCallers; - } - @ready(new rpcErrors.ErrorRpcDestroyed()) public async unaryCaller( method: string, parameters: I, ): Promise { - const callerInterface = await this.rawDuplexStreamCaller(method); + const callerInterface = await this.duplexStreamCaller(method); const reader = callerInterface.readable.getReader(); const writer = callerInterface.writable.getWriter(); await writer.write(parameters); @@ -165,18 +145,13 @@ class RPCClient { public async serverStreamCaller( method: string, parameters: I, - ): Promise> { - const callerInterface = await this.rawDuplexStreamCaller(method); + ): Promise> { + const callerInterface = await this.duplexStreamCaller(method); const writer = callerInterface.writable.getWriter(); await writer.write(parameters); await writer.close(); - const outputGen = async function* () { - for await (const value of callerInterface.readable) { - yield value; - } - }; - return outputGen(); + return callerInterface.readable; } @ready(new rpcErrors.ErrorRpcDestroyed()) @@ -186,7 +161,7 @@ class RPCClient { output: Promise; writable: WritableStream; }> { - const callerInterface = await this.rawDuplexStreamCaller(method); + const callerInterface = await this.duplexStreamCaller(method); const reader = callerInterface.readable.getReader(); const output = reader.read().then(({ value, done }) => { if (done) { @@ -203,27 +178,6 @@ class RPCClient { @ready(new rpcErrors.ErrorRpcDestroyed()) public async duplexStreamCaller( method: string, - f: (output: AsyncIterable) => AsyncIterable, - ): Promise { - const callerInterface = await this.rawDuplexStreamCaller(method); - const outputGenerator = async function* () { - for await (const value of callerInterface.readable) { - yield value; - } - }; - const writer = callerInterface.writable.getWriter(); - try { - for await (const value of f(outputGenerator())) { - await writer.write(value); - } - } finally { - await writer.close(); - } - } - - @ready(new rpcErrors.ErrorRpcDestroyed()) - public async rawDuplexStreamCaller( - method: string, ): Promise> { const outputMessageTransformStream = clientOutputTransformStream(); const inputMessageTransformStream = clientInputTransformStream(method); @@ -249,14 +203,14 @@ class RPCClient { @ready(new rpcErrors.ErrorRpcDestroyed()) public async rawStreamCaller( method: string, - params: JSONValue, + headerParams: JSONValue, ): Promise> { const streamPair = await this.streamPairCreateCallback(); const tempWriter = streamPair.writable.getWriter(); const header: JsonRpcRequestMessage = { jsonrpc: '2.0', method, - params, + params: headerParams, id: null, }; await tempWriter.write(Buffer.from(JSON.stringify(header))); diff --git a/src/RPC/types.ts b/src/RPC/types.ts index 4e66335278..4d96fcc0c3 100644 --- a/src/RPC/types.ts +++ b/src/RPC/types.ts @@ -156,7 +156,7 @@ type UnaryCallerImplementation< type ServerCallerImplementation< I extends JSONValue = JSONValue, O extends JSONValue = JSONValue, -> = (parameters: I) => Promise>; +> = (parameters: I) => Promise>; type ClientCallerImplementation< I extends JSONValue = JSONValue, @@ -166,17 +166,10 @@ type ClientCallerImplementation< type DuplexCallerImplementation< I extends JSONValue = JSONValue, O extends JSONValue = JSONValue, -> = (f: (output: AsyncIterable) => AsyncIterable) => Promise; - -// Raw callers - -type RawDuplexCallerImplementation< - I extends JSONValue = JSONValue, - O extends JSONValue = JSONValue, > = () => Promise>; type RawCallerImplementation = ( - params: JSONValue, + headerParams: JSONValue, ) => Promise>; type ConvertDuplexCaller = T extends DuplexCaller @@ -203,14 +196,6 @@ type ConvertCaller = T extends DuplexCaller ? ConvertClientCaller : T extends UnaryCaller ? ConvertUnaryCaller - : never; - -type ConvertRawDuplexStreamHandler = T extends DuplexCaller - ? RawDuplexCallerImplementation - : never; - -type ConvertRawCaller = T extends DuplexCaller - ? ConvertRawDuplexStreamHandler : T extends RawCaller ? RawCallerImplementation : never; @@ -224,10 +209,6 @@ type MapCallers = { [K in keyof T]: ConvertCaller; }; -type MapRawCallers = { - [K in keyof T]: ConvertRawCaller; -}; - export type { JsonRpcRequestMessage, JsonRpcRequestNotification, @@ -250,5 +231,4 @@ export type { ClientManifest, HandlerType, MapCallers, - MapRawCallers, }; diff --git a/tests/RPC/RPC.test.ts b/tests/RPC/RPC.test.ts index 28c9704e2e..883b4e4f1f 100644 --- a/tests/RPC/RPC.test.ts +++ b/tests/RPC/RPC.test.ts @@ -63,7 +63,7 @@ describe('RPC', () => { logger, }); - const callerInterface = await rpcClient.rawMethods.testMethod({ + const callerInterface = await rpcClient.methods.testMethod({ hello: 'world', }); const writer = callerInterface.writable.getWriter(); @@ -116,7 +116,7 @@ describe('RPC', () => { logger, }); - const callerInterface = await rpcClient.rawMethods.testMethod(); + const callerInterface = await rpcClient.methods.testMethod(); const writer = callerInterface.writable.getWriter(); const reader = callerInterface.readable.getReader(); for (const value of values) { diff --git a/tests/RPC/RPCClient.test.ts b/tests/RPC/RPCClient.test.ts index 13edb94f29..672f10626d 100644 --- a/tests/RPC/RPCClient.test.ts +++ b/tests/RPC/RPCClient.test.ts @@ -98,12 +98,15 @@ describe(`${RPCClient.name}`, () => { streamPairCreateCallback: async () => streamPair, logger, }); - await rpcClient.duplexStreamCaller( - methodName, - async function* (output) { - yield* output; - }, - ); + const callerInterface = await rpcClient.duplexStreamCaller< + JSONValue, + JSONValue + >(methodName); + const writable = callerInterface.writable.getWriter(); + for await (const value of callerInterface.readable) { + await writable.write(value); + } + await writable.close(); const expectedMessages: Array = messages.map((v) => { const request: JsonRpcRequestMessage = { @@ -252,14 +255,16 @@ describe(`${RPCClient.name}`, () => { streamPairCreateCallback: async () => streamPair, logger, }); - const callProm = rpcClient.duplexStreamCaller( - methodName, - async function* (output) { - for await (const _ of output) { - // No touch, just consume - } - }, - ); + const callerInterface = await rpcClient.duplexStreamCaller< + JSONValue, + JSONValue + >(methodName); + await callerInterface.writable.close(); + const callProm = (async () => { + for await (const _ of callerInterface.readable) { + // Only consume + } + })(); await expect(callProm).rejects.toThrow(rpcErrors.ErrorRpcRemoteError); await outputResult; await rpcClient.destroy(); @@ -287,14 +292,16 @@ describe(`${RPCClient.name}`, () => { streamPairCreateCallback: async () => streamPair, logger, }); - const callProm = rpcClient.duplexStreamCaller( - methodName, - async function* (output) { - for await (const _ of output) { - // No touch, just consume - } - }, - ); + const callerInterface = await rpcClient.duplexStreamCaller< + JSONValue, + JSONValue + >(methodName); + await callerInterface.writable.close(); + const callProm = (async () => { + for await (const _ of callerInterface.readable) { + // Only consume + } + })(); await expect(callProm).rejects.toThrow(rpcErrors.ErrorRpcRemoteError); await outputResult; await rpcClient.destroy(); @@ -325,50 +332,21 @@ describe(`${RPCClient.name}`, () => { streamPairCreateCallback: async () => streamPair, logger, }); - const callProm = rpcClient.duplexStreamCaller( - methodName, - async function* (output) { - for await (const _ of output) { - // No touch, just consume - } - }, - ); + const callerInterface = await rpcClient.duplexStreamCaller< + JSONValue, + JSONValue + >(methodName); + await callerInterface.writable.close(); + const callProm = (async () => { + for await (const _ of callerInterface.readable) { + // Only consume + } + })(); await expect(callProm).rejects.toThrow(rpcErrors.ErrorRpcRemoteError); await outputResult; await rpcClient.destroy(); }, ); - testProp( - 'rawDuplexStreamCaller', - [fc.array(rpcTestUtils.jsonRpcResponseResultArb(), { minLength: 1 })], - async (messages) => { - const inputStream = rpcTestUtils.messagesToReadableStream(messages); - const [outputResult, outputStream] = - rpcTestUtils.streamToArray(); - const streamPair: ReadableWritablePair = { - readable: inputStream, - writable: outputStream, - }; - const rpcClient = await RPCClient.createRPCClient({ - manifest: {}, - streamPairCreateCallback: async () => streamPair, - logger, - }); - let count = 0; - const callerInterface = await rpcClient.rawDuplexStreamCaller(methodName); - const writer = callerInterface.writable.getWriter(); - for await (const val of callerInterface.readable) { - count += 1; - await writer.write(val); - } - await writer.close(); - const result = await outputResult; - // We're just checking that it's consuming the messages as expected - expect(result.length).toEqual(messages.length); - expect(count).toEqual(messages.length); - await rpcClient.destroy(); - }, - ); testProp( 'generic duplex caller with forward Middleware', [specificMessageArb], @@ -399,7 +377,7 @@ describe(`${RPCClient.name}`, () => { logger, }); - const callerInterface = await rpcClient.rawDuplexStreamCaller< + const callerInterface = await rpcClient.duplexStreamCaller< JSONValue, JSONValue >(methodName); @@ -463,7 +441,7 @@ describe(`${RPCClient.name}`, () => { logger, }); - const callerInterface = await rpcClient.rawDuplexStreamCaller< + const callerInterface = await rpcClient.duplexStreamCaller< JSONValue, JSONValue >(methodName); @@ -483,58 +461,6 @@ describe(`${RPCClient.name}`, () => { await rpcClient.destroy(); }, ); - testProp( - 'manifest raw duplex call', - [ - fc.array(rpcTestUtils.jsonRpcResponseResultArb(fc.string()), { - minLength: 5, - }), - ], - async (messages) => { - const inputStream = rpcTestUtils.messagesToReadableStream(messages); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); - const streamPair: ReadableWritablePair = { - readable: inputStream, - writable: outputStream, - }; - const rpcClient = await RPCClient.createRPCClient({ - manifest: { - duplex: new DuplexCaller(), - }, - streamPairCreateCallback: async () => streamPair, - logger, - }); - const callerInterface = await rpcClient.rawMethods.duplex(); - const reader = callerInterface.readable.getReader(); - const writer = callerInterface.writable.getWriter(); - while (true) { - const { value, done } = await reader.read(); - if (done) { - // We have to end the writer otherwise the stream never closes - await writer.close(); - break; - } - await writer.write(value); - } - const expectedMessages: Array = messages.map( - (v) => { - const request: JsonRpcRequestMessage = { - jsonrpc: '2.0', - method: 'duplex', - id: null, - ...(v.result === undefined ? {} : { params: v.result }), - }; - return request; - }, - ); - const outputMessages = (await outputResult).map((v) => - JSON.parse(v.toString()), - ); - expect(outputMessages).toStrictEqual(expectedMessages); - - await rpcClient.destroy(); - }, - ); testProp( 'manifest server call', [specificMessageArb, fc.string()], @@ -643,7 +569,7 @@ describe(`${RPCClient.name}`, () => { }, ); testProp( - 'manifest raw duplex caller', + 'manifest raw caller', [ rpcTestUtils.safeJsonValueArb, rpcTestUtils.rawDataArb, @@ -672,7 +598,7 @@ describe(`${RPCClient.name}`, () => { streamPairCreateCallback: async () => streamPair, logger, }); - const callerInterface = await rpcClient.rawMethods.raw(headerParams); + const callerInterface = await rpcClient.methods.raw(headerParams); await callerInterface.readable.pipeTo(outputWritableStream); const writer = callerInterface.writable.getWriter(); for (const inputDatum of inputData) { @@ -716,12 +642,13 @@ describe(`${RPCClient.name}`, () => { logger, }); let count = 0; - await rpcClient.methods.duplex(async function* (output) { - for await (const value of output) { - count += 1; - yield value; - } - }); + const callerInterface = await rpcClient.methods.duplex(); + const writer = callerInterface.writable.getWriter(); + for await (const value of callerInterface.readable) { + count += 1; + await writer.write(value); + } + await writer.close(); const result = await outputResult; // We're just checking that it's consuming the messages as expected expect(result.length).toEqual(messages.length);