Skip to content

Commit

Permalink
remote duplicate methods, fix sync for non-RecordsWrite messages, add…
Browse files Browse the repository at this point in the history
… protocol configure to e2e sync test
  • Loading branch information
LiranCohen committed Jul 3, 2024
1 parent 89f239d commit c4a5b4f
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 85 deletions.
37 changes: 2 additions & 35 deletions packages/agent/src/dwn-api.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import type { Readable } from '@web5/common';
import type { DwnConfig, GenericMessage, UnionMessageReply } from '@tbd54566975/dwn-sdk-js';
import type { DwnConfig, GenericMessage } from '@tbd54566975/dwn-sdk-js';

import { NodeStream } from '@web5/common';
import { utils as cryptoUtils } from '@web5/crypto';
import { DidDht, DidJwk, DidResolverCacheLevel, UniversalResolver } from '@web5/dids';
import { Cid, DataStoreLevel, Dwn, DwnMethodName, EventLogLevel, Message, MessageStoreLevel, ResumableTaskStoreLevel } from '@tbd54566975/dwn-sdk-js';

import type { Web5PlatformAgent } from './types/agent.js';
import type { DwnMessage, DwnMessageInstance, DwnMessageParams, DwnMessageReply, DwnMessageWithData, DwnResponse, DwnSigner, MessageHandler, ProcessDwnRequest, SendDwnRequest } from './types/dwn.js';
import type { DwnMessage, DwnMessageReply, DwnMessageWithData, DwnResponse, DwnSigner, MessageHandler, ProcessDwnRequest, SendDwnRequest } from './types/dwn.js';

import { DwnInterface, dwnMessageConstructors } from './types/dwn.js';
import { blobToIsomorphicNodeReadable, getDwnServiceEndpointUrls, isRecordsWrite, webReadableToIsomorphicNodeReadable } from './utils.js';
Expand Down Expand Up @@ -382,37 +382,4 @@ export class AgentDwnApi {

return dwnMessageWithBlob;
}

/**
* TODO: Refactor this to consolidate logic in AgentDwnApi and SyncEngineLevel.
* ADDED TO GET SYNC WORKING
* - createMessage()
* - processMessage()
*/

public async createMessage<T extends DwnInterface>({ author, messageParams, messageType }: {
author: string;
messageType: T;
messageParams?: DwnMessageParams[T];
}): Promise<DwnMessageInstance[T]> {
// Determine the signer for the message.
const signer = await this.getSigner(author);

const dwnMessageConstructor = dwnMessageConstructors[messageType];
const dwnMessage = await dwnMessageConstructor.create({
// TODO: Explore whether 'messageParams' should be required in the ProcessDwnRequest type.
...messageParams!,
signer
});

return dwnMessage;
}

public async processMessage({ dataStream, message, targetDid }: {
targetDid: string;
message: GenericMessage;
dataStream?: Readable;
}): Promise<UnionMessageReply> {
return await this._dwn.processMessage(targetDid, message, { dataStream });
}
}
45 changes: 20 additions & 25 deletions packages/agent/src/sync-engine-level.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,10 @@ export class SyncEngineLevel implements SyncEngine {
continue;
}

const messagesRead = await this.agent.dwn.createMessage({
const messagesRead = await this.agent.processDwnRequest({
store : false,
author : did,
target : did,
messageType : DwnInterface.MessagesRead,
messageParams : {
messageCid: messageCid
Expand All @@ -118,7 +120,7 @@ export class SyncEngineLevel implements SyncEngine {
reply = await this.agent.rpc.sendDwnRequest({
dwnUrl,
targetDid : did,
message : messagesRead,
message : messagesRead.message,
}) as MessagesReadReply;
} catch(e) {
errored.add(dwnUrl);
Expand All @@ -132,27 +134,18 @@ export class SyncEngineLevel implements SyncEngine {
}

const replyEntry = reply.entry;

if (isRecordsWrite(replyEntry)) {
const message = replyEntry.message;

// if the message includes data we convert it to a Node readable stream
// otherwise we set it as undefined, as the message does not include data
// this occurs when the message is a RecordsWrite message that has been updated
const dataStream = replyEntry.data ?
NodeStream.fromWebReadable({ readableStream: replyEntry.data as unknown as ReadableStream })
: undefined;

const pullReply = await this.agent.dwn.processMessage({
targetDid: did,
message,
dataStream,
});

if (pullReply.status.code === 202 || pullReply.status.code === 409) {
await this.addMessage(did, messageCid);
deleteOperations.push({ type: 'del', key: key });
}
const message = replyEntry.message;
// if the message includes data we convert it to a Node readable stream
// otherwise we set it as undefined, as the message does not include data
// this occurs when the message is a RecordsWrite message that has been updated
const dataStream = isRecordsWrite(replyEntry) && replyEntry.data ?
NodeStream.fromWebReadable({ readableStream: replyEntry.data as unknown as ReadableStream })
: undefined;

const pullReply = await this.agent.dwn.node.processMessage(did, message, { dataStream });
if (pullReply.status.code === 202 || pullReply.status.code === 409) {
await this.addMessage(did, messageCid);
deleteOperations.push({ type: 'del', key: key });
}
}

Expand Down Expand Up @@ -308,7 +301,9 @@ export class SyncEngineLevel implements SyncEngine {

if (syncDirection === 'pull') {
// When sync is a pull, get the event log from the remote DWN.
const messagesReadMessage = await this.agent.dwn.createMessage({
const messagesReadMessage = await this.agent.dwn.processRequest({
store : false,
target : did,
author : did,
messageType : DwnInterface.MessagesQuery,
messageParams : { filters: [], cursor }
Expand All @@ -318,7 +313,7 @@ export class SyncEngineLevel implements SyncEngine {
messagesReply = await this.agent.rpc.sendDwnRequest({
dwnUrl : dwnUrl,
targetDid : did,
message : messagesReadMessage
message : messagesReadMessage.message
}) as MessagesQueryReply;
} catch {
// If a particular DWN service endpoint is unreachable, silently ignore.
Expand Down
3 changes: 2 additions & 1 deletion packages/agent/tests/dwn-api.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -604,11 +604,12 @@ describe('AgentDwnApi', () => {

it('returns a 202 Accepted status when the request is not stored', async () => {
// spy on dwn.processMessage
const processMessageSpy = sinon.spy(testHarness.agent.dwn, 'processMessage');
const processMessageSpy = sinon.spy(testHarness.agent.dwn.node, 'processMessage');

// Attempt to process the RecordsWrite
const dataBytes = Convert.string('Hello, world!').toUint8Array();
let writeResponse = await testHarness.agent.dwn.processRequest({
store : false,
author : alice.did.uri,
target : alice.did.uri,
messageType : DwnInterface.RecordsWrite,
Expand Down
145 changes: 121 additions & 24 deletions packages/agent/tests/sync-engine-level.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import sinon from 'sinon';
import { expect } from 'chai';
import { utils as cryptoUtils } from '@web5/crypto';
import { DwnConstant } from '@tbd54566975/dwn-sdk-js';
import { DwnConstant, ProtocolDefinition } from '@tbd54566975/dwn-sdk-js';

import type { BearerIdentity } from '../src/bearer-identity.js';

Expand Down Expand Up @@ -82,7 +82,56 @@ describe('SyncEngineLevel', () => {
await testHarness.closeStorage();
});

it('syncs multiple records in both directions', async () => {
it('syncs multiple messages in both directions', async () => {
// create 1 local protocol configure
const protocolDefinition1: ProtocolDefinition = {
published : true,
protocol : 'https://protocol.xyz/example/1',
types : {
foo: {
schema : 'https://schemas.xyz/foo',
dataFormats : ['text/plain', 'application/json']
}
},
structure: {
foo: {}
}
};

const protocolsConfigure1 = await testHarness.agent.processDwnRequest({
author : alice.did.uri,
target : alice.did.uri,
messageType : DwnInterface.ProtocolsConfigure,
messageParams : {
definition: protocolDefinition1
}
});

// create 1 remote protocol configure
const protocolDefinition2: ProtocolDefinition = {
published : true,
protocol : 'https://protocol.xyz/example/2',
types : {
bar: {
schema : 'https://schemas.xyz/bar',
dataFormats : ['text/plain', 'application/json']
}
},
structure: {
bar: {}
}
};

const protocolsConfigure2 = await testHarness.agent.sendDwnRequest({
author : alice.did.uri,
target : alice.did.uri,
messageType : DwnInterface.ProtocolsConfigure,
messageParams : {
definition: protocolDefinition2
}
});


// create 3 local records.
const localRecords: string[] = [];
for (let i = 0; i < 3; i++) {
Expand Down Expand Up @@ -152,8 +201,20 @@ describe('SyncEngineLevel', () => {
remoteRecords.push((writeResponse.message!).recordId);
}

// check that protocol1 exists locally
let localProtocolsQueryResponse = await testHarness.agent.dwn.processRequest({
author : alice.did.uri,
target : alice.did.uri,
messageType : DwnInterface.ProtocolsQuery,
messageParams : {}
});
let localProtocolsQueryReply = localProtocolsQueryResponse.reply;
expect(localProtocolsQueryReply.status.code).to.equal(200);
expect(localProtocolsQueryReply.entries?.length).to.equal(1);
expect(localProtocolsQueryReply.entries).to.have.deep.equal([ protocolsConfigure1.message ]);

// query local and check for only local records
let localQueryResponse = await testHarness.agent.dwn.processRequest({
let localRecordsQueryResponse = await testHarness.agent.dwn.processRequest({
author : alice.did.uri,
target : alice.did.uri,
messageType : DwnInterface.RecordsQuery,
Expand All @@ -164,14 +225,26 @@ describe('SyncEngineLevel', () => {
}
}
});
let localDwnQueryReply = localQueryResponse.reply;
expect(localDwnQueryReply.status.code).to.equal(200);
expect(localDwnQueryReply.entries).to.have.length(3);
let localRecordsFromQuery = localDwnQueryReply.entries?.map(entry => entry.recordId);
let localRecordsQueryReply = localRecordsQueryResponse.reply;
expect(localRecordsQueryReply.status.code).to.equal(200);
expect(localRecordsQueryReply.entries).to.have.length(3);
let localRecordsFromQuery = localRecordsQueryReply.entries?.map(entry => entry.recordId);
expect(localRecordsFromQuery).to.have.members(localRecords);

// check that protocol2 exists remotely
let remoteProtocolsQueryResponse = await testHarness.agent.dwn.sendRequest({
author : alice.did.uri,
target : alice.did.uri,
messageType : DwnInterface.ProtocolsQuery,
messageParams : {}
});
let remoteProtocolsQueryReply = remoteProtocolsQueryResponse.reply;
expect(remoteProtocolsQueryReply.status.code).to.equal(200);
expect(remoteProtocolsQueryReply.entries?.length).to.equal(1);
expect(remoteProtocolsQueryReply.entries).to.have.deep.equal([ protocolsConfigure2.message ]);

// query remote and check for only remote records
let remoteQueryResponse = await testHarness.agent.dwn.sendRequest({
let remoteRecordsQueryResponse = await testHarness.agent.dwn.sendRequest({
author : alice.did.uri,
target : alice.did.uri,
messageType : DwnInterface.RecordsQuery,
Expand All @@ -182,10 +255,10 @@ describe('SyncEngineLevel', () => {
}
}
});
let remoteDwnQueryReply = remoteQueryResponse.reply;
expect(remoteDwnQueryReply.status.code).to.equal(200);
expect(remoteDwnQueryReply.entries).to.have.length(3);
let remoteRecordsFromQuery = remoteDwnQueryReply.entries?.map(entry => entry.recordId);
let remoteRecordsQueryReply = remoteRecordsQueryResponse.reply;
expect(remoteRecordsQueryReply.status.code).to.equal(200);
expect(remoteRecordsQueryReply.entries).to.have.length(3);
let remoteRecordsFromQuery = remoteRecordsQueryReply.entries?.map(entry => entry.recordId);
expect(remoteRecordsFromQuery).to.have.members(remoteRecords);

// Register Alice's DID to be synchronized.
Expand All @@ -197,8 +270,20 @@ describe('SyncEngineLevel', () => {
await syncEngine.push();
await syncEngine.pull();

// query local to see all protocols
localProtocolsQueryResponse = await testHarness.agent.dwn.processRequest({
author : alice.did.uri,
target : alice.did.uri,
messageType : DwnInterface.ProtocolsQuery,
messageParams : {}
});
localProtocolsQueryReply = localProtocolsQueryResponse.reply;
expect(localProtocolsQueryReply.status.code).to.equal(200);
expect(localProtocolsQueryReply.entries?.length).to.equal(2);
expect(localProtocolsQueryReply.entries).to.have.deep.equal([ protocolsConfigure1.message, protocolsConfigure2.message ]);

// query local node to see all records
localQueryResponse = await testHarness.agent.dwn.processRequest({
localRecordsQueryResponse = await testHarness.agent.dwn.processRequest({
author : alice.did.uri,
target : alice.did.uri,
messageType : DwnInterface.RecordsQuery,
Expand All @@ -209,14 +294,26 @@ describe('SyncEngineLevel', () => {
}
}
});
localDwnQueryReply = localQueryResponse.reply;
expect(localDwnQueryReply.status.code).to.equal(200);
expect(localDwnQueryReply.entries).to.have.length(6, 'local');
localRecordsFromQuery = localDwnQueryReply.entries?.map(entry => entry.recordId);
localRecordsQueryReply = localRecordsQueryResponse.reply;
expect(localRecordsQueryReply.status.code).to.equal(200);
expect(localRecordsQueryReply.entries).to.have.length(6, 'local');
localRecordsFromQuery = localRecordsQueryReply.entries?.map(entry => entry.recordId);
expect(localRecordsFromQuery).to.have.members([...localRecords, ...remoteRecords]);

// query remote node to see all results
remoteQueryResponse = await testHarness.agent.dwn.sendRequest({
// query remote node to see all protocols
remoteProtocolsQueryResponse = await testHarness.agent.dwn.sendRequest({
author : alice.did.uri,
target : alice.did.uri,
messageType : DwnInterface.ProtocolsQuery,
messageParams : {}
});
remoteProtocolsQueryReply = remoteProtocolsQueryResponse.reply;
expect(remoteProtocolsQueryReply.status.code).to.equal(200);
expect(remoteProtocolsQueryReply.entries?.length).to.equal(2);
expect(remoteProtocolsQueryReply.entries).to.have.deep.equal([ protocolsConfigure1.message, protocolsConfigure2.message ]);

// query remote node to see all records
remoteRecordsQueryResponse = await testHarness.agent.dwn.sendRequest({
author : alice.did.uri,
target : alice.did.uri,
messageType : DwnInterface.RecordsQuery,
Expand All @@ -227,10 +324,10 @@ describe('SyncEngineLevel', () => {
}
}
});
remoteDwnQueryReply = remoteQueryResponse.reply;
expect(remoteDwnQueryReply.status.code).to.equal(200);
expect(remoteDwnQueryReply.entries).to.have.length(6, 'remote');
remoteRecordsFromQuery = remoteDwnQueryReply.entries?.map(entry => entry.recordId);
remoteRecordsQueryReply = remoteRecordsQueryResponse.reply;
expect(remoteRecordsQueryReply.status.code).to.equal(200);
expect(remoteRecordsQueryReply.entries).to.have.length(6, 'remote');
remoteRecordsFromQuery = remoteRecordsQueryReply.entries?.map(entry => entry.recordId);
expect(remoteRecordsFromQuery).to.have.members([...localRecords, ...remoteRecords]);
}).slow(1000); // Yellow at 500ms, Red at 1000ms.

Expand Down Expand Up @@ -412,7 +509,7 @@ describe('SyncEngineLevel', () => {

// spy on sendDwnRequest to the remote DWN
const sendDwnRequestSpy = sinon.spy(testHarness.agent.rpc, 'sendDwnRequest');
const processMessageSpy = sinon.spy(testHarness.agent.dwn, 'processMessage');
const processMessageSpy = sinon.spy(testHarness.agent.dwn.node, 'processMessage');

// Execute Sync to push records to Alice's remote node
await syncEngine.pull();
Expand Down

0 comments on commit c4a5b4f

Please sign in to comment.