Skip to content

Commit

Permalink
wip: implementing withXCaller CO style methods
Browse files Browse the repository at this point in the history
Related #501

[ci skip]
  • Loading branch information
tegefaulkes committed Jan 23, 2023
1 parent 98677ec commit 3197764
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 0 deletions.
58 changes: 58 additions & 0 deletions src/RPC/RPCClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,64 @@ class RPCClient {
await writer.close();
return output.value;
}

@ready(new rpcErrors.ErrorRpcDestroyed())
public async withDuplexCaller<I extends JSONValue, O extends JSONValue>(
method: string,
f: (output: AsyncGenerator<O>) => AsyncGenerator<I>,
metadata: POJO,
): Promise<void> {
const callerInterface = await this.duplexStreamCaller<I, O>(
method,
metadata,
);
const outputGenerator = async function* () {
for await (const value of callerInterface.readable) {
yield value;
}
};
const writer = callerInterface.writable.getWriter();
for await (const value of f(outputGenerator())) {
await writer.write(value);
}
await writer.close();
}

@ready(new rpcErrors.ErrorRpcDestroyed())
public async withServerCaller<I extends JSONValue, O extends JSONValue>(
method: string,
parameters: I,
f: (output: AsyncGenerator<O>) => Promise<void>,
metadata: POJO,
) {
const callerInterface = await this.serverStreamCaller<I, O>(
method,
parameters,
metadata,
);
const outputGenerator = async function* () {
yield* callerInterface;
};
await f(outputGenerator());
}

@ready(new rpcErrors.ErrorRpcDestroyed())
public async withClientCaller<I extends JSONValue, O extends JSONValue>(
method: string,
f: () => AsyncGenerator<I>,
metadata: POJO,
): Promise<O> {
const callerInterface = await this.clientStreamCaller<I, O>(
method,
metadata,
);
const writer = callerInterface.writable.getWriter();
for await (const value of f()) {
await writer.write(value);
}
await writer.close();
return callerInterface.output;
}
}

export default RPCClient;
112 changes: 112 additions & 0 deletions tests/RPC/RPCClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,4 +202,116 @@ describe(`${RPCClient.name}`, () => {
await rpcClient.destroy();
},
);
testProp(
'withDuplexCaller',
[fc.array(rpcTestUtils.jsonRpcResponseResultArb(), { minLength: 1 })],
async (messages) => {
const inputStream = rpcTestUtils.jsonRpcStream(messages);
const [outputResult, outputStream] = rpcTestUtils.streamToArray<Buffer>();
const streamPair: ReadableWritablePair = {
readable: inputStream,
writable: outputStream,
};
const rpcClient = await RPCClient.createRPCClient({
streamPairCreateCallback: async () => streamPair,
logger,
});
let count = 0;
await rpcClient.withDuplexCaller(
methodName,
async function* (output) {
for await (const value of output) {
count += 1;
yield value;
}
},
{},
);
const result = await outputResult;
// We're just checking that it consuming the messages as expected
expect(result.length).toEqual(messages.length);
expect(count).toEqual(messages.length);
await rpcClient.destroy();
},
);
testProp(
'withServerCaller',
[
fc.array(rpcTestUtils.jsonRpcResponseResultArb(), { minLength: 1 }),
rpcTestUtils.safeJsonValueArb,
],
async (messages, params) => {
const inputStream = rpcTestUtils.jsonRpcStream(messages);
const [outputResult, outputStream] = rpcTestUtils.streamToArray<Buffer>();
const streamPair: ReadableWritablePair = {
readable: inputStream,
writable: outputStream,
};
const rpcClient = await RPCClient.createRPCClient({
streamPairCreateCallback: async () => streamPair,
logger,
});
let count = 0;
await rpcClient.withServerCaller(
methodName,
params,
async (output) => {
for await (const _ of output) count += 1;
},
{},
);
const result = await outputResult;
expect(count).toEqual(messages.length);
expect(result.toString()).toStrictEqual(
JSON.stringify({
method: methodName,
jsonrpc: '2.0',
id: null,
params: params,
}),
);
await rpcClient.destroy();
},
);
testProp(
'withClientCaller',
[
rpcTestUtils.jsonRpcResponseResultArb(),
fc.array(rpcTestUtils.safeJsonValueArb, { minLength: 2 }).noShrink(),
],
async (message, inputMessages) => {
const inputStream = rpcTestUtils.jsonRpcStream([message]);
const [outputResult, outputStream] = rpcTestUtils.streamToArray<Buffer>();
const streamPair: ReadableWritablePair = {
readable: inputStream,
writable: outputStream,
};
const rpcClient = await RPCClient.createRPCClient({
streamPairCreateCallback: async () => streamPair,
logger,
});
const result = await rpcClient.withClientCaller(
methodName,
async function* () {
for (const inputMessage of inputMessages) {
yield inputMessage;
}
},
{},
);
const expectedResult = inputMessages.map((v) => {
return JSON.stringify({
method: methodName,
jsonrpc: '2.0',
id: null,
params: v,
});
});
expect((await outputResult).map((v) => v.toString())).toStrictEqual(
expectedResult,
);
expect(result).toStrictEqual(message.result);
await rpcClient.destroy();
},
);
});

0 comments on commit 3197764

Please sign in to comment.