From 2e7d199a3580a88c1290cc18c69f6620a7d49fe8 Mon Sep 17 00:00:00 2001 From: Brian Botha Date: Mon, 23 Jan 2023 18:48:36 +1100 Subject: [PATCH] wip: updated streams to use `Uint8Array` instead of `Buffer` Mostly a type change, `Buffer` just extended `Uint8Array`. Related #500 [ci skip] --- src/RPC/RPCServer.ts | 2 +- src/RPC/types.ts | 2 +- src/RPC/utils.ts | 15 +++++++++------ tests/RPC/RPC.test.ts | 16 ++++++++-------- tests/RPC/RPCClient.test.ts | 18 ++++++++++++------ tests/RPC/RPCServer.test.ts | 2 +- tests/RPC/utils.ts | 18 +++++++++--------- 7 files changed, 41 insertions(+), 32 deletions(-) diff --git a/src/RPC/RPCServer.ts b/src/RPC/RPCServer.ts index b374de488..23e071a0f 100644 --- a/src/RPC/RPCServer.ts +++ b/src/RPC/RPCServer.ts @@ -133,7 +133,7 @@ class RPCServer { @ready(new rpcErrors.ErrorRpcDestroyed()) public handleStream( - streamPair: ReadableWritablePair, + streamPair: ReadableWritablePair, connectionInfo: ConnectionInfo, ) { // This will take a buffer stream of json messages and set up service diff --git a/src/RPC/types.ts b/src/RPC/types.ts index 884709b68..6f5797a0a 100644 --- a/src/RPC/types.ts +++ b/src/RPC/types.ts @@ -124,7 +124,7 @@ type UnaryHandler = Handler< >; type StreamPairCreateCallback = () => Promise< - ReadableWritablePair + ReadableWritablePair >; export type { diff --git a/src/RPC/utils.ts b/src/RPC/utils.ts index 8e5ef2df1..d3f50ce14 100644 --- a/src/RPC/utils.ts +++ b/src/RPC/utils.ts @@ -23,7 +23,7 @@ import * as errors from '../errors'; const jsonStreamParsers = require('@streamparser/json'); class JsonToJsonMessage - implements Transformer + implements Transformer { protected bytesWritten: number = 0; @@ -45,7 +45,7 @@ class JsonToJsonMessage }; }; - transform: TransformerTransformCallback = async (chunk) => { + transform: TransformerTransformCallback = async (chunk) => { try { this.bytesWritten += chunk.byteLength; this.parser.write(chunk); @@ -60,7 +60,7 @@ class JsonToJsonMessage // TODO: rename to something more descriptive? class JsonToJsonMessageStream extends TransformStream< - Buffer, + Uint8Array, T > { constructor( @@ -71,8 +71,8 @@ class JsonToJsonMessageStream extends TransformStream< } } -class JsonMessageToJson implements Transformer { - transform: TransformerTransformCallback = async ( +class JsonMessageToJson implements Transformer { + transform: TransformerTransformCallback = async ( chunk, controller, ) => { @@ -81,7 +81,10 @@ class JsonMessageToJson implements Transformer { } // TODO: rename to something more descriptive? -class JsonMessageToJsonStream extends TransformStream { +class JsonMessageToJsonStream extends TransformStream< + JsonRpcMessage, + Uint8Array +> { constructor() { super(new JsonMessageToJson()); } diff --git a/tests/RPC/RPC.test.ts b/tests/RPC/RPC.test.ts index 48edbb5e5..1f5220d55 100644 --- a/tests/RPC/RPC.test.ts +++ b/tests/RPC/RPC.test.ts @@ -22,8 +22,8 @@ describe('RPC', () => { [fc.array(rpcTestUtils.safeJsonValueArb, { minLength: 1 })], async (values) => { const { clientPair, serverPair } = rpcTestUtils.createTapPairs< - Buffer, - Buffer + Uint8Array, + Uint8Array >(); const container = {}; @@ -68,8 +68,8 @@ describe('RPC', () => { [fc.integer({ min: 1, max: 100 })], async (value) => { const { clientPair, serverPair } = rpcTestUtils.createTapPairs< - Buffer, - Buffer + Uint8Array, + Uint8Array >(); const container = {}; @@ -110,8 +110,8 @@ describe('RPC', () => { [fc.array(fc.integer(), { minLength: 1 }).noShrink()], async (values) => { const { clientPair, serverPair } = rpcTestUtils.createTapPairs< - Buffer, - Buffer + Uint8Array, + Uint8Array >(); const container = {}; @@ -156,8 +156,8 @@ describe('RPC', () => { [rpcTestUtils.safeJsonValueArb], async (value) => { const { clientPair, serverPair } = rpcTestUtils.createTapPairs< - Buffer, - Buffer + Uint8Array, + Uint8Array >(); const container = {}; diff --git a/tests/RPC/RPCClient.test.ts b/tests/RPC/RPCClient.test.ts index 91afceb97..646972370 100644 --- a/tests/RPC/RPCClient.test.ts +++ b/tests/RPC/RPCClient.test.ts @@ -22,7 +22,8 @@ describe(`${RPCClient.name}`, () => { testProp('generic duplex caller', [specificMessageArb], async (messages) => { const inputStream = rpcTestUtils.jsonRpcStream(messages); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const [outputResult, outputStream] = + rpcTestUtils.streamToArray(); const streamPair: ReadableWritablePair = { readable: inputStream, writable: outputStream, @@ -101,7 +102,8 @@ describe(`${RPCClient.name}`, () => { [rpcTestUtils.jsonRpcResponseResultArb(), fc.array(fc.jsonValue())], async (message, params) => { const inputStream = rpcTestUtils.jsonRpcStream([message]); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const [outputResult, outputStream] = + rpcTestUtils.streamToArray(); const streamPair: ReadableWritablePair = { readable: inputStream, writable: outputStream, @@ -176,7 +178,8 @@ describe(`${RPCClient.name}`, () => { ...messages, errorMessage, ]); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const [outputResult, outputStream] = + rpcTestUtils.streamToArray(); const streamPair: ReadableWritablePair = { readable: inputStream, writable: outputStream, @@ -207,7 +210,8 @@ describe(`${RPCClient.name}`, () => { [fc.array(rpcTestUtils.jsonRpcResponseResultArb(), { minLength: 1 })], async (messages) => { const inputStream = rpcTestUtils.jsonRpcStream(messages); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const [outputResult, outputStream] = + rpcTestUtils.streamToArray(); const streamPair: ReadableWritablePair = { readable: inputStream, writable: outputStream, @@ -242,7 +246,8 @@ describe(`${RPCClient.name}`, () => { ], async (messages, params) => { const inputStream = rpcTestUtils.jsonRpcStream(messages); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const [outputResult, outputStream] = + rpcTestUtils.streamToArray(); const streamPair: ReadableWritablePair = { readable: inputStream, writable: outputStream, @@ -281,7 +286,8 @@ describe(`${RPCClient.name}`, () => { ], async (message, inputMessages) => { const inputStream = rpcTestUtils.jsonRpcStream([message]); - const [outputResult, outputStream] = rpcTestUtils.streamToArray(); + const [outputResult, outputStream] = + rpcTestUtils.streamToArray(); const streamPair: ReadableWritablePair = { readable: inputStream, writable: outputStream, diff --git a/tests/RPC/RPCServer.test.ts b/tests/RPC/RPCServer.test.ts index 4481d5cc6..e1b0685eb 100644 --- a/tests/RPC/RPCServer.test.ts +++ b/tests/RPC/RPCServer.test.ts @@ -231,7 +231,7 @@ describe(`${RPCServer.name}`, () => { let thing; let lastMessage: JsonRpcMessage | undefined; const tapStream: any = {}; - // Const tapStream = new rpcTestUtils.TapStream( + // Const tapStream = new rpcTestUtils.TapStream( // async (_, iteration) => { // if (iteration === 2) { // // @ts-ignore: kidnap private property diff --git a/tests/RPC/utils.ts b/tests/RPC/utils.ts index b11ae020e..e77b4e9a6 100644 --- a/tests/RPC/utils.ts +++ b/tests/RPC/utils.ts @@ -21,7 +21,7 @@ import { fc } from '@fast-check/jest'; import * as utils from '@/utils'; import { fromError } from '@/RPC/utils'; -class BufferStreamToSnipped implements Transformer { +class BufferStreamToSnipped implements Transformer { protected buffer = Buffer.alloc(0); protected iteration = 0; protected snippingPattern: Array; @@ -30,7 +30,7 @@ class BufferStreamToSnipped implements Transformer { this.snippingPattern = snippingPattern; } - transform: TransformerTransformCallback = async ( + transform: TransformerTransformCallback = async ( chunk, controller, ) => { @@ -46,7 +46,7 @@ class BufferStreamToSnipped implements Transformer { } }; - flush: TransformerFlushCallback = (controller) => { + flush: TransformerFlushCallback = (controller) => { controller.enqueue(this.buffer); }; } @@ -62,15 +62,15 @@ class BufferStreamToSnippedStream extends TransformStream { } } -class BufferStreamToNoisy implements Transformer { +class BufferStreamToNoisy implements Transformer { protected iteration = 0; - protected noise: Array; + protected noise: Array; - constructor(noise: Array) { + constructor(noise: Array) { this.noise = noise; } - transform: TransformerTransformCallback = async ( + transform: TransformerTransformCallback = async ( chunk, controller, ) => { @@ -87,13 +87,13 @@ class BufferStreamToNoisy implements Transformer { * splitting up the data. */ class BufferStreamToNoisyStream extends TransformStream { - constructor(noise: Array) { + constructor(noise: Array) { super(new BufferStreamToNoisy(noise)); } } const jsonRpcStream = (messages: Array) => { - return new ReadableStream({ + return new ReadableStream({ async start(controller) { for (const arrayElement of messages) { // Controller.enqueue(arrayElement)