From 54cd6e1bb24d695c1541ff595ec960bfbd9c382b Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Thu, 28 Sep 2023 12:18:29 -0400 Subject: [PATCH] convert RPC Client response dataStream to IsomorphicNodeReadable stream --- packages/agent/src/rpc-client.ts | 3 +- packages/agent/tests/dwn-manager.spec.ts | 46 ++++++++++-------------- 2 files changed, 21 insertions(+), 28 deletions(-) diff --git a/packages/agent/src/rpc-client.ts b/packages/agent/src/rpc-client.ts index 9efc2861b..458f3af4c 100644 --- a/packages/agent/src/rpc-client.ts +++ b/packages/agent/src/rpc-client.ts @@ -4,6 +4,7 @@ import type { DwnRpc, DwnRpcRequest, DwnRpcResponse } from './types/agent.js'; import { randomUuid } from '@web5/crypto/utils'; import { createJsonRpcRequest, parseJson } from './json-rpc.js'; +import { webReadableToIsomorphicNodeReadable } from './utils.js'; /** * Client used to communicate with Dwn Servers @@ -88,7 +89,7 @@ class HttpDwnRpcClient implements DwnRpc { throw new Error(`failed to parse json rpc response. dwn url: ${request.dwnUrl}`); } - dataStream = resp.body; + dataStream = resp.body !== null ? webReadableToIsomorphicNodeReadable(resp.body) : resp.body; dwnRpcResponse = jsonRpcResponse; } else { // TODO: wonder if i need to try/catch this? diff --git a/packages/agent/tests/dwn-manager.spec.ts b/packages/agent/tests/dwn-manager.spec.ts index 90da99d8d..1e460e0bd 100644 --- a/packages/agent/tests/dwn-manager.spec.ts +++ b/packages/agent/tests/dwn-manager.spec.ts @@ -410,7 +410,7 @@ describe('DwnManager', () => { it('handles RecordsWrite and RecordRead with large payload', async () => { // Create test data to write. - const dataBytes = Array(10_000).fill('d').join(''); + const dataBytes = Convert.string(Array(60_000).fill('d').join('')).toUint8Array(); // Attempt to process the RecordsWrite let writeResponse = await testAgent.agent.dwnManager.processRequest({ @@ -439,18 +439,17 @@ describe('DwnManager', () => { const readResponse = await testAgent.agent.dwnManager.processRequest({ - author : identity.did, - target : identity.did, - messageType: 'RecordsRead', - messageOptions: { recordId: writeMessage.recordId } + author : identity.did, + target : identity.did, + messageType : 'RecordsRead', + messageOptions : { recordId: writeMessage.recordId } }); expect(readResponse.reply.status.code).to.equal(200); const reply = readResponse.reply as RecordsReadReply; expect(reply.record).to.not.be.undefined; expect(reply.record!.data).to.not.be.undefined; - const value = await DataStream.toBytes(reply.record!.data); - const data = new TextDecoder().decode(value); - expect(data).to.eq(dataBytes); + const data = await DataStream.toBytes(reply.record!.data); + expect(data).to.eql(dataBytes); }); }); @@ -591,19 +590,15 @@ describe('DwnManager', () => { const readReply = response.reply as RecordsReadReply; expect(readReply.record).to.exist; - const record = readReply.record as unknown as RecordsWriteMessage & { data: ReadableStream }; - expect(record.recordId).to.equal(message.recordId); - - expect(record.data).to.exist; - expect(record.data instanceof ReadableStream).to.be.true; - - const { value } = await record.data.getReader().read(); - expect(dataBytes).to.eql(value); + expect(readReply.record!.recordId).to.equal(message.recordId); + expect(readReply.record!.data).to.exist; + const data = await DataStream.toBytes(readReply.record!.data); + expect(data).to.eql(dataBytes); }); it('handles RecordsWrite and RecordRead with large payload', async () => { // Create test data to write. - const dataBytes = Array(10_000).fill('d').join(''); + const largeDataBytes = Convert.string(Array(60_000).fill('d').join('')).toUint8Array(); // Attempt to process the RecordsWrite let writeResponse = await testAgent.agent.dwnManager.sendRequest({ @@ -613,7 +608,7 @@ describe('DwnManager', () => { messageOptions : { dataFormat: 'text/plain' }, - dataStream: new Blob([dataBytes]) + dataStream: new Blob([ largeDataBytes ]) }); // Verify the response. @@ -632,20 +627,17 @@ describe('DwnManager', () => { const readResponse = await testAgent.agent.dwnManager.sendRequest({ - author : identity.did, - target : identity.did, - messageType: 'RecordsRead', - messageOptions: { recordId: writeMessage.recordId } + author : identity.did, + target : identity.did, + messageType : 'RecordsRead', + messageOptions : { recordId: writeMessage.recordId } }); expect(readResponse.reply.status.code).to.equal(200); const reply = readResponse.reply as RecordsReadReply; expect(reply.record).to.not.be.undefined; expect(reply.record!.data).to.not.be.undefined; - const record = reply.record as unknown as RecordsWriteMessage & { data: ReadableStream }; - const { value } = await record.data.getReader().read(); - const data = new TextDecoder().decode(value); - expect(data).to.eq(dataBytes); - + const data = await DataStream.toBytes(reply.record!.data); + expect(data).to.eql(largeDataBytes); }); it('throws an error when DwnRequest fails validation', async () => {