Skip to content

Commit

Permalink
wip: updated streams to use Uint8Array instead of Buffer
Browse files Browse the repository at this point in the history
Mostly a type change, `Buffer` just extended `Uint8Array`.

Related #500

[ci skip]
  • Loading branch information
tegefaulkes committed Jan 23, 2023
1 parent 3821392 commit 2e7d199
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 32 deletions.
2 changes: 1 addition & 1 deletion src/RPC/RPCServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class RPCServer {

@ready(new rpcErrors.ErrorRpcDestroyed())
public handleStream(
streamPair: ReadableWritablePair<Buffer, Buffer>,
streamPair: ReadableWritablePair<Uint8Array, Uint8Array>,
connectionInfo: ConnectionInfo,
) {
// This will take a buffer stream of json messages and set up service
Expand Down
2 changes: 1 addition & 1 deletion src/RPC/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ type UnaryHandler<I extends JSONValue, O extends JSONValue> = Handler<
>;

type StreamPairCreateCallback = () => Promise<
ReadableWritablePair<Buffer, Buffer>
ReadableWritablePair<Uint8Array, Uint8Array>
>;

export type {
Expand Down
15 changes: 9 additions & 6 deletions src/RPC/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import * as errors from '../errors';
const jsonStreamParsers = require('@streamparser/json');

class JsonToJsonMessage<T extends JsonRpcMessage>
implements Transformer<Buffer, T>
implements Transformer<Uint8Array, T>
{
protected bytesWritten: number = 0;

Expand All @@ -45,7 +45,7 @@ class JsonToJsonMessage<T extends JsonRpcMessage>
};
};

transform: TransformerTransformCallback<Buffer, T> = async (chunk) => {
transform: TransformerTransformCallback<Uint8Array, T> = async (chunk) => {
try {
this.bytesWritten += chunk.byteLength;
this.parser.write(chunk);
Expand All @@ -60,7 +60,7 @@ class JsonToJsonMessage<T extends JsonRpcMessage>

// TODO: rename to something more descriptive?
class JsonToJsonMessageStream<T extends JsonRpcMessage> extends TransformStream<
Buffer,
Uint8Array,
T
> {
constructor(
Expand All @@ -71,8 +71,8 @@ class JsonToJsonMessageStream<T extends JsonRpcMessage> extends TransformStream<
}
}

class JsonMessageToJson implements Transformer<JsonRpcMessage, Buffer> {
transform: TransformerTransformCallback<JsonRpcMessage, Buffer> = async (
class JsonMessageToJson implements Transformer<JsonRpcMessage, Uint8Array> {
transform: TransformerTransformCallback<JsonRpcMessage, Uint8Array> = async (
chunk,
controller,
) => {
Expand All @@ -81,7 +81,10 @@ class JsonMessageToJson implements Transformer<JsonRpcMessage, Buffer> {
}

// TODO: rename to something more descriptive?
class JsonMessageToJsonStream extends TransformStream<JsonRpcMessage, Buffer> {
class JsonMessageToJsonStream extends TransformStream<
JsonRpcMessage,
Uint8Array
> {
constructor() {
super(new JsonMessageToJson());
}
Expand Down
16 changes: 8 additions & 8 deletions tests/RPC/RPC.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ describe('RPC', () => {
[fc.array(rpcTestUtils.safeJsonValueArb, { minLength: 1 })],
async (values) => {
const { clientPair, serverPair } = rpcTestUtils.createTapPairs<
Buffer,
Buffer
Uint8Array,
Uint8Array
>();

const container = {};
Expand Down Expand Up @@ -68,8 +68,8 @@ describe('RPC', () => {
[fc.integer({ min: 1, max: 100 })],
async (value) => {
const { clientPair, serverPair } = rpcTestUtils.createTapPairs<
Buffer,
Buffer
Uint8Array,
Uint8Array
>();

const container = {};
Expand Down Expand Up @@ -110,8 +110,8 @@ describe('RPC', () => {
[fc.array(fc.integer(), { minLength: 1 }).noShrink()],
async (values) => {
const { clientPair, serverPair } = rpcTestUtils.createTapPairs<
Buffer,
Buffer
Uint8Array,
Uint8Array
>();

const container = {};
Expand Down Expand Up @@ -156,8 +156,8 @@ describe('RPC', () => {
[rpcTestUtils.safeJsonValueArb],
async (value) => {
const { clientPair, serverPair } = rpcTestUtils.createTapPairs<
Buffer,
Buffer
Uint8Array,
Uint8Array
>();

const container = {};
Expand Down
18 changes: 12 additions & 6 deletions tests/RPC/RPCClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ describe(`${RPCClient.name}`, () => {

testProp('generic duplex caller', [specificMessageArb], async (messages) => {
const inputStream = rpcTestUtils.jsonRpcStream(messages);
const [outputResult, outputStream] = rpcTestUtils.streamToArray<Buffer>();
const [outputResult, outputStream] =
rpcTestUtils.streamToArray<Uint8Array>();
const streamPair: ReadableWritablePair = {
readable: inputStream,
writable: outputStream,
Expand Down Expand Up @@ -101,7 +102,8 @@ describe(`${RPCClient.name}`, () => {
[rpcTestUtils.jsonRpcResponseResultArb(), fc.array(fc.jsonValue())],
async (message, params) => {
const inputStream = rpcTestUtils.jsonRpcStream([message]);
const [outputResult, outputStream] = rpcTestUtils.streamToArray<Buffer>();
const [outputResult, outputStream] =
rpcTestUtils.streamToArray<Uint8Array>();
const streamPair: ReadableWritablePair = {
readable: inputStream,
writable: outputStream,
Expand Down Expand Up @@ -176,7 +178,8 @@ describe(`${RPCClient.name}`, () => {
...messages,
errorMessage,
]);
const [outputResult, outputStream] = rpcTestUtils.streamToArray<Buffer>();
const [outputResult, outputStream] =
rpcTestUtils.streamToArray<Uint8Array>();
const streamPair: ReadableWritablePair = {
readable: inputStream,
writable: outputStream,
Expand Down Expand Up @@ -207,7 +210,8 @@ describe(`${RPCClient.name}`, () => {
[fc.array(rpcTestUtils.jsonRpcResponseResultArb(), { minLength: 1 })],
async (messages) => {
const inputStream = rpcTestUtils.jsonRpcStream(messages);
const [outputResult, outputStream] = rpcTestUtils.streamToArray<Buffer>();
const [outputResult, outputStream] =
rpcTestUtils.streamToArray<Uint8Array>();
const streamPair: ReadableWritablePair = {
readable: inputStream,
writable: outputStream,
Expand Down Expand Up @@ -242,7 +246,8 @@ describe(`${RPCClient.name}`, () => {
],
async (messages, params) => {
const inputStream = rpcTestUtils.jsonRpcStream(messages);
const [outputResult, outputStream] = rpcTestUtils.streamToArray<Buffer>();
const [outputResult, outputStream] =
rpcTestUtils.streamToArray<Uint8Array>();
const streamPair: ReadableWritablePair = {
readable: inputStream,
writable: outputStream,
Expand Down Expand Up @@ -281,7 +286,8 @@ describe(`${RPCClient.name}`, () => {
],
async (message, inputMessages) => {
const inputStream = rpcTestUtils.jsonRpcStream([message]);
const [outputResult, outputStream] = rpcTestUtils.streamToArray<Buffer>();
const [outputResult, outputStream] =
rpcTestUtils.streamToArray<Uint8Array>();
const streamPair: ReadableWritablePair = {
readable: inputStream,
writable: outputStream,
Expand Down
2 changes: 1 addition & 1 deletion tests/RPC/RPCServer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ describe(`${RPCServer.name}`, () => {
let thing;
let lastMessage: JsonRpcMessage | undefined;
const tapStream: any = {};
// Const tapStream = new rpcTestUtils.TapStream<Buffer>(
// Const tapStream = new rpcTestUtils.TapStream<Uint8Array>(
// async (_, iteration) => {
// if (iteration === 2) {
// // @ts-ignore: kidnap private property
Expand Down
18 changes: 9 additions & 9 deletions tests/RPC/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { fc } from '@fast-check/jest';
import * as utils from '@/utils';
import { fromError } from '@/RPC/utils';

class BufferStreamToSnipped implements Transformer<Buffer, Buffer> {
class BufferStreamToSnipped implements Transformer<Uint8Array, Uint8Array> {
protected buffer = Buffer.alloc(0);
protected iteration = 0;
protected snippingPattern: Array<number>;
Expand All @@ -30,7 +30,7 @@ class BufferStreamToSnipped implements Transformer<Buffer, Buffer> {
this.snippingPattern = snippingPattern;
}

transform: TransformerTransformCallback<Buffer, Buffer> = async (
transform: TransformerTransformCallback<Uint8Array, Uint8Array> = async (
chunk,
controller,
) => {
Expand All @@ -46,7 +46,7 @@ class BufferStreamToSnipped implements Transformer<Buffer, Buffer> {
}
};

flush: TransformerFlushCallback<Buffer> = (controller) => {
flush: TransformerFlushCallback<Uint8Array> = (controller) => {
controller.enqueue(this.buffer);
};
}
Expand All @@ -62,15 +62,15 @@ class BufferStreamToSnippedStream extends TransformStream {
}
}

class BufferStreamToNoisy implements Transformer<Buffer, Buffer> {
class BufferStreamToNoisy implements Transformer<Uint8Array, Uint8Array> {
protected iteration = 0;
protected noise: Array<Buffer>;
protected noise: Array<Uint8Array>;

constructor(noise: Array<Buffer>) {
constructor(noise: Array<Uint8Array>) {
this.noise = noise;
}

transform: TransformerTransformCallback<Buffer, Buffer> = async (
transform: TransformerTransformCallback<Uint8Array, Uint8Array> = async (
chunk,
controller,
) => {
Expand All @@ -87,13 +87,13 @@ class BufferStreamToNoisy implements Transformer<Buffer, Buffer> {
* splitting up the data.
*/
class BufferStreamToNoisyStream extends TransformStream {
constructor(noise: Array<Buffer>) {
constructor(noise: Array<Uint8Array>) {
super(new BufferStreamToNoisy(noise));
}
}

const jsonRpcStream = (messages: Array<POJO>) => {
return new ReadableStream<Buffer>({
return new ReadableStream<Uint8Array>({
async start(controller) {
for (const arrayElement of messages) {
// Controller.enqueue(arrayElement)
Expand Down

0 comments on commit 2e7d199

Please sign in to comment.