Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Duplicate Methods used in sync #738

Merged
merged 2 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/tidy-wasps-smell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@web5/agent": patch
"@web5/identity-agent": patch
"@web5/proxy-agent": patch
"@web5/user-agent": patch
---

Remove Duplicate Methods used in sync & Fix sync bug where only RecordsWrite were being pulled from the remote
37 changes: 2 additions & 35 deletions packages/agent/src/dwn-api.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import type { Readable } from '@web5/common';
import type { DwnConfig, GenericMessage, UnionMessageReply } from '@tbd54566975/dwn-sdk-js';
import type { DwnConfig, GenericMessage } from '@tbd54566975/dwn-sdk-js';

import { NodeStream } from '@web5/common';
import { utils as cryptoUtils } from '@web5/crypto';
import { DidDht, DidJwk, DidResolverCacheLevel, UniversalResolver } from '@web5/dids';
import { Cid, DataStoreLevel, Dwn, DwnMethodName, EventLogLevel, Message, MessageStoreLevel, ResumableTaskStoreLevel } from '@tbd54566975/dwn-sdk-js';

import type { Web5PlatformAgent } from './types/agent.js';
import type { DwnMessage, DwnMessageInstance, DwnMessageParams, DwnMessageReply, DwnMessageWithData, DwnResponse, DwnSigner, MessageHandler, ProcessDwnRequest, SendDwnRequest } from './types/dwn.js';
import type { DwnMessage, DwnMessageReply, DwnMessageWithData, DwnResponse, DwnSigner, MessageHandler, ProcessDwnRequest, SendDwnRequest } from './types/dwn.js';

import { DwnInterface, dwnMessageConstructors } from './types/dwn.js';
import { blobToIsomorphicNodeReadable, getDwnServiceEndpointUrls, isRecordsWrite, webReadableToIsomorphicNodeReadable } from './utils.js';
Expand Down Expand Up @@ -382,37 +382,4 @@ export class AgentDwnApi {

return dwnMessageWithBlob;
}

/**
* TODO: Refactor this to consolidate logic in AgentDwnApi and SyncEngineLevel.
* ADDED TO GET SYNC WORKING
* - createMessage()
* - processMessage()
*/

public async createMessage<T extends DwnInterface>({ author, messageParams, messageType }: {
author: string;
messageType: T;
messageParams?: DwnMessageParams[T];
}): Promise<DwnMessageInstance[T]> {
// Determine the signer for the message.
const signer = await this.getSigner(author);

const dwnMessageConstructor = dwnMessageConstructors[messageType];
const dwnMessage = await dwnMessageConstructor.create({
// TODO: Explore whether 'messageParams' should be required in the ProcessDwnRequest type.
...messageParams!,
signer
});

return dwnMessage;
}

public async processMessage({ dataStream, message, targetDid }: {
targetDid: string;
message: GenericMessage;
dataStream?: Readable;
}): Promise<UnionMessageReply> {
return await this._dwn.processMessage(targetDid, message, { dataStream });
}
}
45 changes: 20 additions & 25 deletions packages/agent/src/sync-engine-level.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,10 @@ export class SyncEngineLevel implements SyncEngine {
continue;
}

const messagesRead = await this.agent.dwn.createMessage({
const messagesRead = await this.agent.processDwnRequest({
store : false,
author : did,
target : did,
messageType : DwnInterface.MessagesRead,
messageParams : {
messageCid: messageCid
Expand All @@ -118,7 +120,7 @@ export class SyncEngineLevel implements SyncEngine {
reply = await this.agent.rpc.sendDwnRequest({
dwnUrl,
targetDid : did,
message : messagesRead,
message : messagesRead.message,
}) as MessagesReadReply;
} catch(e) {
errored.add(dwnUrl);
Expand All @@ -132,27 +134,18 @@ export class SyncEngineLevel implements SyncEngine {
}

const replyEntry = reply.entry;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was only syncing RecordsWrite messages, added test path for ProtocolsConfigure message and simplified the logic here.

if (isRecordsWrite(replyEntry)) {
const message = replyEntry.message;

// if the message includes data we convert it to a Node readable stream
// otherwise we set it as undefined, as the message does not include data
// this occurs when the message is a RecordsWrite message that has been updated
const dataStream = replyEntry.data ?
NodeStream.fromWebReadable({ readableStream: replyEntry.data as unknown as ReadableStream })
: undefined;

const pullReply = await this.agent.dwn.processMessage({
targetDid: did,
message,
dataStream,
});

if (pullReply.status.code === 202 || pullReply.status.code === 409) {
await this.addMessage(did, messageCid);
deleteOperations.push({ type: 'del', key: key });
}
const message = replyEntry.message;
// if the message includes data we convert it to a Node readable stream
// otherwise we set it as undefined, as the message does not include data
// this occurs when the message is a RecordsWrite message that has been updated
const dataStream = isRecordsWrite(replyEntry) && replyEntry.data ?
NodeStream.fromWebReadable({ readableStream: replyEntry.data as unknown as ReadableStream })
: undefined;

const pullReply = await this.agent.dwn.node.processMessage(did, message, { dataStream });
if (pullReply.status.code === 202 || pullReply.status.code === 409) {
await this.addMessage(did, messageCid);
deleteOperations.push({ type: 'del', key: key });
}
}

Expand Down Expand Up @@ -308,7 +301,9 @@ export class SyncEngineLevel implements SyncEngine {

if (syncDirection === 'pull') {
// When sync is a pull, get the event log from the remote DWN.
const messagesReadMessage = await this.agent.dwn.createMessage({
const messagesReadMessage = await this.agent.dwn.processRequest({
store : false,
target : did,
author : did,
messageType : DwnInterface.MessagesQuery,
messageParams : { filters: [], cursor }
Expand All @@ -318,7 +313,7 @@ export class SyncEngineLevel implements SyncEngine {
messagesReply = await this.agent.rpc.sendDwnRequest({
dwnUrl : dwnUrl,
targetDid : did,
message : messagesReadMessage
message : messagesReadMessage.message
}) as MessagesQueryReply;
} catch {
// If a particular DWN service endpoint is unreachable, silently ignore.
Expand Down
3 changes: 2 additions & 1 deletion packages/agent/tests/dwn-api.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -604,11 +604,12 @@ describe('AgentDwnApi', () => {

it('returns a 202 Accepted status when the request is not stored', async () => {
// spy on dwn.processMessage
const processMessageSpy = sinon.spy(testHarness.agent.dwn, 'processMessage');
const processMessageSpy = sinon.spy(testHarness.agent.dwn.node, 'processMessage');

// Attempt to process the RecordsWrite
const dataBytes = Convert.string('Hello, world!').toUint8Array();
let writeResponse = await testHarness.agent.dwn.processRequest({
store : false,
author : alice.did.uri,
target : alice.did.uri,
messageType : DwnInterface.RecordsWrite,
Expand Down
145 changes: 121 additions & 24 deletions packages/agent/tests/sync-engine-level.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import sinon from 'sinon';
import { expect } from 'chai';
import { utils as cryptoUtils } from '@web5/crypto';
import { DwnConstant } from '@tbd54566975/dwn-sdk-js';
import { DwnConstant, ProtocolDefinition } from '@tbd54566975/dwn-sdk-js';

import type { BearerIdentity } from '../src/bearer-identity.js';

Expand Down Expand Up @@ -82,7 +82,56 @@ describe('SyncEngineLevel', () => {
await testHarness.closeStorage();
});

it('syncs multiple records in both directions', async () => {
it('syncs multiple messages in both directions', async () => {
// create 1 local protocol configure
const protocolDefinition1: ProtocolDefinition = {
published : true,
protocol : 'https://protocol.xyz/example/1',
types : {
foo: {
schema : 'https://schemas.xyz/foo',
dataFormats : ['text/plain', 'application/json']
}
},
structure: {
foo: {}
}
};

const protocolsConfigure1 = await testHarness.agent.processDwnRequest({
author : alice.did.uri,
target : alice.did.uri,
messageType : DwnInterface.ProtocolsConfigure,
messageParams : {
definition: protocolDefinition1
}
});

// create 1 remote protocol configure
const protocolDefinition2: ProtocolDefinition = {
published : true,
protocol : 'https://protocol.xyz/example/2',
types : {
bar: {
schema : 'https://schemas.xyz/bar',
dataFormats : ['text/plain', 'application/json']
}
},
structure: {
bar: {}
}
};

const protocolsConfigure2 = await testHarness.agent.sendDwnRequest({
author : alice.did.uri,
target : alice.did.uri,
messageType : DwnInterface.ProtocolsConfigure,
messageParams : {
definition: protocolDefinition2
}
});


// create 3 local records.
const localRecords: string[] = [];
for (let i = 0; i < 3; i++) {
Expand Down Expand Up @@ -152,8 +201,20 @@ describe('SyncEngineLevel', () => {
remoteRecords.push((writeResponse.message!).recordId);
}

// check that protocol1 exists locally
let localProtocolsQueryResponse = await testHarness.agent.dwn.processRequest({
author : alice.did.uri,
target : alice.did.uri,
messageType : DwnInterface.ProtocolsQuery,
messageParams : {}
});
let localProtocolsQueryReply = localProtocolsQueryResponse.reply;
expect(localProtocolsQueryReply.status.code).to.equal(200);
expect(localProtocolsQueryReply.entries?.length).to.equal(1);
expect(localProtocolsQueryReply.entries).to.have.deep.equal([ protocolsConfigure1.message ]);

// query local and check for only local records
let localQueryResponse = await testHarness.agent.dwn.processRequest({
let localRecordsQueryResponse = await testHarness.agent.dwn.processRequest({
author : alice.did.uri,
target : alice.did.uri,
messageType : DwnInterface.RecordsQuery,
Expand All @@ -164,14 +225,26 @@ describe('SyncEngineLevel', () => {
}
}
});
let localDwnQueryReply = localQueryResponse.reply;
expect(localDwnQueryReply.status.code).to.equal(200);
expect(localDwnQueryReply.entries).to.have.length(3);
let localRecordsFromQuery = localDwnQueryReply.entries?.map(entry => entry.recordId);
let localRecordsQueryReply = localRecordsQueryResponse.reply;
expect(localRecordsQueryReply.status.code).to.equal(200);
expect(localRecordsQueryReply.entries).to.have.length(3);
let localRecordsFromQuery = localRecordsQueryReply.entries?.map(entry => entry.recordId);
expect(localRecordsFromQuery).to.have.members(localRecords);

// check that protocol2 exists remotely
let remoteProtocolsQueryResponse = await testHarness.agent.dwn.sendRequest({
author : alice.did.uri,
target : alice.did.uri,
messageType : DwnInterface.ProtocolsQuery,
messageParams : {}
});
let remoteProtocolsQueryReply = remoteProtocolsQueryResponse.reply;
expect(remoteProtocolsQueryReply.status.code).to.equal(200);
expect(remoteProtocolsQueryReply.entries?.length).to.equal(1);
expect(remoteProtocolsQueryReply.entries).to.have.deep.equal([ protocolsConfigure2.message ]);

// query remote and check for only remote records
let remoteQueryResponse = await testHarness.agent.dwn.sendRequest({
let remoteRecordsQueryResponse = await testHarness.agent.dwn.sendRequest({
author : alice.did.uri,
target : alice.did.uri,
messageType : DwnInterface.RecordsQuery,
Expand All @@ -182,10 +255,10 @@ describe('SyncEngineLevel', () => {
}
}
});
let remoteDwnQueryReply = remoteQueryResponse.reply;
expect(remoteDwnQueryReply.status.code).to.equal(200);
expect(remoteDwnQueryReply.entries).to.have.length(3);
let remoteRecordsFromQuery = remoteDwnQueryReply.entries?.map(entry => entry.recordId);
let remoteRecordsQueryReply = remoteRecordsQueryResponse.reply;
expect(remoteRecordsQueryReply.status.code).to.equal(200);
expect(remoteRecordsQueryReply.entries).to.have.length(3);
let remoteRecordsFromQuery = remoteRecordsQueryReply.entries?.map(entry => entry.recordId);
expect(remoteRecordsFromQuery).to.have.members(remoteRecords);

// Register Alice's DID to be synchronized.
Expand All @@ -197,8 +270,20 @@ describe('SyncEngineLevel', () => {
await syncEngine.push();
await syncEngine.pull();

// query local to see all protocols
localProtocolsQueryResponse = await testHarness.agent.dwn.processRequest({
author : alice.did.uri,
target : alice.did.uri,
messageType : DwnInterface.ProtocolsQuery,
messageParams : {}
});
localProtocolsQueryReply = localProtocolsQueryResponse.reply;
expect(localProtocolsQueryReply.status.code).to.equal(200);
expect(localProtocolsQueryReply.entries?.length).to.equal(2);
expect(localProtocolsQueryReply.entries).to.have.deep.equal([ protocolsConfigure1.message, protocolsConfigure2.message ]);

// query local node to see all records
localQueryResponse = await testHarness.agent.dwn.processRequest({
localRecordsQueryResponse = await testHarness.agent.dwn.processRequest({
author : alice.did.uri,
target : alice.did.uri,
messageType : DwnInterface.RecordsQuery,
Expand All @@ -209,14 +294,26 @@ describe('SyncEngineLevel', () => {
}
}
});
localDwnQueryReply = localQueryResponse.reply;
expect(localDwnQueryReply.status.code).to.equal(200);
expect(localDwnQueryReply.entries).to.have.length(6, 'local');
localRecordsFromQuery = localDwnQueryReply.entries?.map(entry => entry.recordId);
localRecordsQueryReply = localRecordsQueryResponse.reply;
expect(localRecordsQueryReply.status.code).to.equal(200);
expect(localRecordsQueryReply.entries).to.have.length(6, 'local');
localRecordsFromQuery = localRecordsQueryReply.entries?.map(entry => entry.recordId);
expect(localRecordsFromQuery).to.have.members([...localRecords, ...remoteRecords]);

// query remote node to see all results
remoteQueryResponse = await testHarness.agent.dwn.sendRequest({
// query remote node to see all protocols
remoteProtocolsQueryResponse = await testHarness.agent.dwn.sendRequest({
author : alice.did.uri,
target : alice.did.uri,
messageType : DwnInterface.ProtocolsQuery,
messageParams : {}
});
remoteProtocolsQueryReply = remoteProtocolsQueryResponse.reply;
expect(remoteProtocolsQueryReply.status.code).to.equal(200);
expect(remoteProtocolsQueryReply.entries?.length).to.equal(2);
expect(remoteProtocolsQueryReply.entries).to.have.deep.equal([ protocolsConfigure1.message, protocolsConfigure2.message ]);

// query remote node to see all records
remoteRecordsQueryResponse = await testHarness.agent.dwn.sendRequest({
author : alice.did.uri,
target : alice.did.uri,
messageType : DwnInterface.RecordsQuery,
Expand All @@ -227,10 +324,10 @@ describe('SyncEngineLevel', () => {
}
}
});
remoteDwnQueryReply = remoteQueryResponse.reply;
expect(remoteDwnQueryReply.status.code).to.equal(200);
expect(remoteDwnQueryReply.entries).to.have.length(6, 'remote');
remoteRecordsFromQuery = remoteDwnQueryReply.entries?.map(entry => entry.recordId);
remoteRecordsQueryReply = remoteRecordsQueryResponse.reply;
expect(remoteRecordsQueryReply.status.code).to.equal(200);
expect(remoteRecordsQueryReply.entries).to.have.length(6, 'remote');
remoteRecordsFromQuery = remoteRecordsQueryReply.entries?.map(entry => entry.recordId);
expect(remoteRecordsFromQuery).to.have.members([...localRecords, ...remoteRecords]);
}).slow(1000); // Yellow at 500ms, Red at 1000ms.

Expand Down Expand Up @@ -412,7 +509,7 @@ describe('SyncEngineLevel', () => {

// spy on sendDwnRequest to the remote DWN
const sendDwnRequestSpy = sinon.spy(testHarness.agent.rpc, 'sendDwnRequest');
const processMessageSpy = sinon.spy(testHarness.agent.dwn, 'processMessage');
const processMessageSpy = sinon.spy(testHarness.agent.dwn.node, 'processMessage');

// Execute Sync to push records to Alice's remote node
await syncEngine.pull();
Expand Down
Loading