From ca8fed48bf171a61081d7502c42a9097497e80c5 Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Wed, 3 Jul 2024 15:37:51 -0400 Subject: [PATCH 1/2] remove duplicate methods, fix sync for non-RecordsWrite messages, add protocol configure to e2e sync test --- packages/agent/src/dwn-api.ts | 37 +---- packages/agent/src/sync-engine-level.ts | 45 +++--- packages/agent/tests/dwn-api.spec.ts | 3 +- .../agent/tests/sync-engine-level.spec.ts | 145 +++++++++++++++--- 4 files changed, 145 insertions(+), 85 deletions(-) diff --git a/packages/agent/src/dwn-api.ts b/packages/agent/src/dwn-api.ts index a209bb068..fb201ebe3 100644 --- a/packages/agent/src/dwn-api.ts +++ b/packages/agent/src/dwn-api.ts @@ -1,5 +1,5 @@ import type { Readable } from '@web5/common'; -import type { DwnConfig, GenericMessage, UnionMessageReply } from '@tbd54566975/dwn-sdk-js'; +import type { DwnConfig, GenericMessage } from '@tbd54566975/dwn-sdk-js'; import { NodeStream } from '@web5/common'; import { utils as cryptoUtils } from '@web5/crypto'; @@ -7,7 +7,7 @@ import { DidDht, DidJwk, DidResolverCacheLevel, UniversalResolver } from '@web5/ import { Cid, DataStoreLevel, Dwn, DwnMethodName, EventLogLevel, Message, MessageStoreLevel, ResumableTaskStoreLevel } from '@tbd54566975/dwn-sdk-js'; import type { Web5PlatformAgent } from './types/agent.js'; -import type { DwnMessage, DwnMessageInstance, DwnMessageParams, DwnMessageReply, DwnMessageWithData, DwnResponse, DwnSigner, MessageHandler, ProcessDwnRequest, SendDwnRequest } from './types/dwn.js'; +import type { DwnMessage, DwnMessageReply, DwnMessageWithData, DwnResponse, DwnSigner, MessageHandler, ProcessDwnRequest, SendDwnRequest } from './types/dwn.js'; import { DwnInterface, dwnMessageConstructors } from './types/dwn.js'; import { blobToIsomorphicNodeReadable, getDwnServiceEndpointUrls, isRecordsWrite, webReadableToIsomorphicNodeReadable } from './utils.js'; @@ -382,37 +382,4 @@ export class AgentDwnApi { return dwnMessageWithBlob; } - - /** - * TODO: Refactor this to consolidate logic in AgentDwnApi and SyncEngineLevel. - * ADDED TO GET SYNC WORKING - * - createMessage() - * - processMessage() - */ - - public async createMessage({ author, messageParams, messageType }: { - author: string; - messageType: T; - messageParams?: DwnMessageParams[T]; - }): Promise { - // Determine the signer for the message. - const signer = await this.getSigner(author); - - const dwnMessageConstructor = dwnMessageConstructors[messageType]; - const dwnMessage = await dwnMessageConstructor.create({ - // TODO: Explore whether 'messageParams' should be required in the ProcessDwnRequest type. - ...messageParams!, - signer - }); - - return dwnMessage; - } - - public async processMessage({ dataStream, message, targetDid }: { - targetDid: string; - message: GenericMessage; - dataStream?: Readable; - }): Promise { - return await this._dwn.processMessage(targetDid, message, { dataStream }); - } } \ No newline at end of file diff --git a/packages/agent/src/sync-engine-level.ts b/packages/agent/src/sync-engine-level.ts index 4f6c5eff3..9c1fb0d87 100644 --- a/packages/agent/src/sync-engine-level.ts +++ b/packages/agent/src/sync-engine-level.ts @@ -104,8 +104,10 @@ export class SyncEngineLevel implements SyncEngine { continue; } - const messagesRead = await this.agent.dwn.createMessage({ + const messagesRead = await this.agent.processDwnRequest({ + store : false, author : did, + target : did, messageType : DwnInterface.MessagesRead, messageParams : { messageCid: messageCid @@ -118,7 +120,7 @@ export class SyncEngineLevel implements SyncEngine { reply = await this.agent.rpc.sendDwnRequest({ dwnUrl, targetDid : did, - message : messagesRead, + message : messagesRead.message, }) as MessagesReadReply; } catch(e) { errored.add(dwnUrl); @@ -132,27 +134,18 @@ export class SyncEngineLevel implements SyncEngine { } const replyEntry = reply.entry; - - if (isRecordsWrite(replyEntry)) { - const message = replyEntry.message; - - // if the message includes data we convert it to a Node readable stream - // otherwise we set it as undefined, as the message does not include data - // this occurs when the message is a RecordsWrite message that has been updated - const dataStream = replyEntry.data ? - NodeStream.fromWebReadable({ readableStream: replyEntry.data as unknown as ReadableStream }) - : undefined; - - const pullReply = await this.agent.dwn.processMessage({ - targetDid: did, - message, - dataStream, - }); - - if (pullReply.status.code === 202 || pullReply.status.code === 409) { - await this.addMessage(did, messageCid); - deleteOperations.push({ type: 'del', key: key }); - } + const message = replyEntry.message; + // if the message includes data we convert it to a Node readable stream + // otherwise we set it as undefined, as the message does not include data + // this occurs when the message is a RecordsWrite message that has been updated + const dataStream = isRecordsWrite(replyEntry) && replyEntry.data ? + NodeStream.fromWebReadable({ readableStream: replyEntry.data as unknown as ReadableStream }) + : undefined; + + const pullReply = await this.agent.dwn.node.processMessage(did, message, { dataStream }); + if (pullReply.status.code === 202 || pullReply.status.code === 409) { + await this.addMessage(did, messageCid); + deleteOperations.push({ type: 'del', key: key }); } } @@ -308,7 +301,9 @@ export class SyncEngineLevel implements SyncEngine { if (syncDirection === 'pull') { // When sync is a pull, get the event log from the remote DWN. - const messagesReadMessage = await this.agent.dwn.createMessage({ + const messagesReadMessage = await this.agent.dwn.processRequest({ + store : false, + target : did, author : did, messageType : DwnInterface.MessagesQuery, messageParams : { filters: [], cursor } @@ -318,7 +313,7 @@ export class SyncEngineLevel implements SyncEngine { messagesReply = await this.agent.rpc.sendDwnRequest({ dwnUrl : dwnUrl, targetDid : did, - message : messagesReadMessage + message : messagesReadMessage.message }) as MessagesQueryReply; } catch { // If a particular DWN service endpoint is unreachable, silently ignore. diff --git a/packages/agent/tests/dwn-api.spec.ts b/packages/agent/tests/dwn-api.spec.ts index a7db50877..e6fb277b4 100644 --- a/packages/agent/tests/dwn-api.spec.ts +++ b/packages/agent/tests/dwn-api.spec.ts @@ -604,11 +604,12 @@ describe('AgentDwnApi', () => { it('returns a 202 Accepted status when the request is not stored', async () => { // spy on dwn.processMessage - const processMessageSpy = sinon.spy(testHarness.agent.dwn, 'processMessage'); + const processMessageSpy = sinon.spy(testHarness.agent.dwn.node, 'processMessage'); // Attempt to process the RecordsWrite const dataBytes = Convert.string('Hello, world!').toUint8Array(); let writeResponse = await testHarness.agent.dwn.processRequest({ + store : false, author : alice.did.uri, target : alice.did.uri, messageType : DwnInterface.RecordsWrite, diff --git a/packages/agent/tests/sync-engine-level.spec.ts b/packages/agent/tests/sync-engine-level.spec.ts index edcc1ad71..57a46c021 100644 --- a/packages/agent/tests/sync-engine-level.spec.ts +++ b/packages/agent/tests/sync-engine-level.spec.ts @@ -1,7 +1,7 @@ import sinon from 'sinon'; import { expect } from 'chai'; import { utils as cryptoUtils } from '@web5/crypto'; -import { DwnConstant } from '@tbd54566975/dwn-sdk-js'; +import { DwnConstant, ProtocolDefinition } from '@tbd54566975/dwn-sdk-js'; import type { BearerIdentity } from '../src/bearer-identity.js'; @@ -82,7 +82,56 @@ describe('SyncEngineLevel', () => { await testHarness.closeStorage(); }); - it('syncs multiple records in both directions', async () => { + it('syncs multiple messages in both directions', async () => { + // create 1 local protocol configure + const protocolDefinition1: ProtocolDefinition = { + published : true, + protocol : 'https://protocol.xyz/example/1', + types : { + foo: { + schema : 'https://schemas.xyz/foo', + dataFormats : ['text/plain', 'application/json'] + } + }, + structure: { + foo: {} + } + }; + + const protocolsConfigure1 = await testHarness.agent.processDwnRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.ProtocolsConfigure, + messageParams : { + definition: protocolDefinition1 + } + }); + + // create 1 remote protocol configure + const protocolDefinition2: ProtocolDefinition = { + published : true, + protocol : 'https://protocol.xyz/example/2', + types : { + bar: { + schema : 'https://schemas.xyz/bar', + dataFormats : ['text/plain', 'application/json'] + } + }, + structure: { + bar: {} + } + }; + + const protocolsConfigure2 = await testHarness.agent.sendDwnRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.ProtocolsConfigure, + messageParams : { + definition: protocolDefinition2 + } + }); + + // create 3 local records. const localRecords: string[] = []; for (let i = 0; i < 3; i++) { @@ -152,8 +201,20 @@ describe('SyncEngineLevel', () => { remoteRecords.push((writeResponse.message!).recordId); } + // check that protocol1 exists locally + let localProtocolsQueryResponse = await testHarness.agent.dwn.processRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.ProtocolsQuery, + messageParams : {} + }); + let localProtocolsQueryReply = localProtocolsQueryResponse.reply; + expect(localProtocolsQueryReply.status.code).to.equal(200); + expect(localProtocolsQueryReply.entries?.length).to.equal(1); + expect(localProtocolsQueryReply.entries).to.have.deep.equal([ protocolsConfigure1.message ]); + // query local and check for only local records - let localQueryResponse = await testHarness.agent.dwn.processRequest({ + let localRecordsQueryResponse = await testHarness.agent.dwn.processRequest({ author : alice.did.uri, target : alice.did.uri, messageType : DwnInterface.RecordsQuery, @@ -164,14 +225,26 @@ describe('SyncEngineLevel', () => { } } }); - let localDwnQueryReply = localQueryResponse.reply; - expect(localDwnQueryReply.status.code).to.equal(200); - expect(localDwnQueryReply.entries).to.have.length(3); - let localRecordsFromQuery = localDwnQueryReply.entries?.map(entry => entry.recordId); + let localRecordsQueryReply = localRecordsQueryResponse.reply; + expect(localRecordsQueryReply.status.code).to.equal(200); + expect(localRecordsQueryReply.entries).to.have.length(3); + let localRecordsFromQuery = localRecordsQueryReply.entries?.map(entry => entry.recordId); expect(localRecordsFromQuery).to.have.members(localRecords); + // check that protocol2 exists remotely + let remoteProtocolsQueryResponse = await testHarness.agent.dwn.sendRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.ProtocolsQuery, + messageParams : {} + }); + let remoteProtocolsQueryReply = remoteProtocolsQueryResponse.reply; + expect(remoteProtocolsQueryReply.status.code).to.equal(200); + expect(remoteProtocolsQueryReply.entries?.length).to.equal(1); + expect(remoteProtocolsQueryReply.entries).to.have.deep.equal([ protocolsConfigure2.message ]); + // query remote and check for only remote records - let remoteQueryResponse = await testHarness.agent.dwn.sendRequest({ + let remoteRecordsQueryResponse = await testHarness.agent.dwn.sendRequest({ author : alice.did.uri, target : alice.did.uri, messageType : DwnInterface.RecordsQuery, @@ -182,10 +255,10 @@ describe('SyncEngineLevel', () => { } } }); - let remoteDwnQueryReply = remoteQueryResponse.reply; - expect(remoteDwnQueryReply.status.code).to.equal(200); - expect(remoteDwnQueryReply.entries).to.have.length(3); - let remoteRecordsFromQuery = remoteDwnQueryReply.entries?.map(entry => entry.recordId); + let remoteRecordsQueryReply = remoteRecordsQueryResponse.reply; + expect(remoteRecordsQueryReply.status.code).to.equal(200); + expect(remoteRecordsQueryReply.entries).to.have.length(3); + let remoteRecordsFromQuery = remoteRecordsQueryReply.entries?.map(entry => entry.recordId); expect(remoteRecordsFromQuery).to.have.members(remoteRecords); // Register Alice's DID to be synchronized. @@ -197,8 +270,20 @@ describe('SyncEngineLevel', () => { await syncEngine.push(); await syncEngine.pull(); + // query local to see all protocols + localProtocolsQueryResponse = await testHarness.agent.dwn.processRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.ProtocolsQuery, + messageParams : {} + }); + localProtocolsQueryReply = localProtocolsQueryResponse.reply; + expect(localProtocolsQueryReply.status.code).to.equal(200); + expect(localProtocolsQueryReply.entries?.length).to.equal(2); + expect(localProtocolsQueryReply.entries).to.have.deep.equal([ protocolsConfigure1.message, protocolsConfigure2.message ]); + // query local node to see all records - localQueryResponse = await testHarness.agent.dwn.processRequest({ + localRecordsQueryResponse = await testHarness.agent.dwn.processRequest({ author : alice.did.uri, target : alice.did.uri, messageType : DwnInterface.RecordsQuery, @@ -209,14 +294,26 @@ describe('SyncEngineLevel', () => { } } }); - localDwnQueryReply = localQueryResponse.reply; - expect(localDwnQueryReply.status.code).to.equal(200); - expect(localDwnQueryReply.entries).to.have.length(6, 'local'); - localRecordsFromQuery = localDwnQueryReply.entries?.map(entry => entry.recordId); + localRecordsQueryReply = localRecordsQueryResponse.reply; + expect(localRecordsQueryReply.status.code).to.equal(200); + expect(localRecordsQueryReply.entries).to.have.length(6, 'local'); + localRecordsFromQuery = localRecordsQueryReply.entries?.map(entry => entry.recordId); expect(localRecordsFromQuery).to.have.members([...localRecords, ...remoteRecords]); - // query remote node to see all results - remoteQueryResponse = await testHarness.agent.dwn.sendRequest({ + // query remote node to see all protocols + remoteProtocolsQueryResponse = await testHarness.agent.dwn.sendRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.ProtocolsQuery, + messageParams : {} + }); + remoteProtocolsQueryReply = remoteProtocolsQueryResponse.reply; + expect(remoteProtocolsQueryReply.status.code).to.equal(200); + expect(remoteProtocolsQueryReply.entries?.length).to.equal(2); + expect(remoteProtocolsQueryReply.entries).to.have.deep.equal([ protocolsConfigure1.message, protocolsConfigure2.message ]); + + // query remote node to see all records + remoteRecordsQueryResponse = await testHarness.agent.dwn.sendRequest({ author : alice.did.uri, target : alice.did.uri, messageType : DwnInterface.RecordsQuery, @@ -227,10 +324,10 @@ describe('SyncEngineLevel', () => { } } }); - remoteDwnQueryReply = remoteQueryResponse.reply; - expect(remoteDwnQueryReply.status.code).to.equal(200); - expect(remoteDwnQueryReply.entries).to.have.length(6, 'remote'); - remoteRecordsFromQuery = remoteDwnQueryReply.entries?.map(entry => entry.recordId); + remoteRecordsQueryReply = remoteRecordsQueryResponse.reply; + expect(remoteRecordsQueryReply.status.code).to.equal(200); + expect(remoteRecordsQueryReply.entries).to.have.length(6, 'remote'); + remoteRecordsFromQuery = remoteRecordsQueryReply.entries?.map(entry => entry.recordId); expect(remoteRecordsFromQuery).to.have.members([...localRecords, ...remoteRecords]); }).slow(1000); // Yellow at 500ms, Red at 1000ms. @@ -412,7 +509,7 @@ describe('SyncEngineLevel', () => { // spy on sendDwnRequest to the remote DWN const sendDwnRequestSpy = sinon.spy(testHarness.agent.rpc, 'sendDwnRequest'); - const processMessageSpy = sinon.spy(testHarness.agent.dwn, 'processMessage'); + const processMessageSpy = sinon.spy(testHarness.agent.dwn.node, 'processMessage'); // Execute Sync to push records to Alice's remote node await syncEngine.pull(); From 95b65f7a1b8dcb5da8fe82680722ee4eb432357c Mon Sep 17 00:00:00 2001 From: Liran Cohen Date: Wed, 3 Jul 2024 15:44:08 -0400 Subject: [PATCH 2/2] add changeset --- .changeset/tidy-wasps-smell.md | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 .changeset/tidy-wasps-smell.md diff --git a/.changeset/tidy-wasps-smell.md b/.changeset/tidy-wasps-smell.md new file mode 100644 index 000000000..d605bbd74 --- /dev/null +++ b/.changeset/tidy-wasps-smell.md @@ -0,0 +1,8 @@ +--- +"@web5/agent": patch +"@web5/identity-agent": patch +"@web5/proxy-agent": patch +"@web5/user-agent": patch +--- + +Remove Duplicate Methods used in sync & Fix sync bug where only RecordsWrite were being pulled from the remote