Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Live Updates #888

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/agent/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export * from './local-key-manager.js';
export * from './permissions-api.js';
export * from './rpc-client.js';
export * from './store-data.js';
export * from './store-data-protocols.js';
export * from './store-did.js';
export * from './store-identity.js';
export * from './store-key.js';
Expand Down
225 changes: 222 additions & 3 deletions packages/agent/src/sync-engine-level.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type {
GenericMessage,
MessagesQueryReply,
MessagesReadReply,
MessageSubscriptionHandler,
PaginationCursor,
UnionMessageReply,
} from '@tbd54566975/dwn-sdk-js';
Expand All @@ -15,12 +16,13 @@ import { NodeStream } from '@web5/common';
import {
DwnInterfaceName,
DwnMethodName,
Message,
} from '@tbd54566975/dwn-sdk-js';

import type { SyncEngine, SyncIdentityOptions } from './types/sync.js';
import type { Web5Agent, Web5PlatformAgent } from './types/agent.js';

import { DwnInterface } from './types/dwn.js';
import { DwnInterface, DwnMessageSubscription } from './types/dwn.js';
import { getDwnServiceEndpointUrls, isRecordsWrite } from './utils.js';
import { PermissionsApi } from './types/permissions.js';
import { AgentPermissionsApi } from './permissions-api.js';
Expand Down Expand Up @@ -71,6 +73,7 @@ export class SyncEngineLevel implements SyncEngine {
private _syncIntervalId?: ReturnType<typeof setInterval>;
private _syncLock = false;
private _ulidFactory: ULIDFactory;
private _subscriptions: Map<string, DwnMessageSubscription> = new Map();

constructor({ agent, dataPath, db }: SyncEngineLevelParams) {
this._agent = agent;
Expand Down Expand Up @@ -164,10 +167,10 @@ export class SyncEngineLevel implements SyncEngine {
});

let reply: MessagesReadReply;

try {
reply = await this.agent.rpc.sendDwnRequest({
dwnUrl, targetDid : did,
dwnUrl,
targetDid : did,
message : messagesRead.message,
}) as MessagesReadReply;
} catch(e) {
Expand Down Expand Up @@ -198,6 +201,7 @@ export class SyncEngineLevel implements SyncEngine {
}

await pullQueue.batch(deleteOperations as any);
await this.syncInstant({ syncDirection: 'pull' });
}

private async push(): Promise<void> {
Expand Down Expand Up @@ -251,6 +255,7 @@ export class SyncEngineLevel implements SyncEngine {
}

await pushQueue.batch(deleteOperations as any);
await this.syncInstant({ syncDirection: 'push' });
}

public async registerIdentity({ did, options }: { did: string; options?: SyncIdentityOptions }): Promise<void> {
Expand Down Expand Up @@ -282,6 +287,183 @@ export class SyncEngineLevel implements SyncEngine {
}
}

private async syncInstant({ syncDirection }: {
syncDirection: SyncDirection;
}): Promise<void> {
const syncPeerState = await this.getSyncPeerInfo({ syncDirection });
for (const peerState of syncPeerState) {
const { did, delegateDid, dwnUrl, protocol } = peerState;
const key = `${did}~${delegateDid}~${dwnUrl}~${protocol}~${syncDirection}`;
if (syncDirection === 'push') {
let permissionGrantId: string | undefined;
let granteeDid: string | undefined;
if (delegateDid) {
try {
const messagesReadGrant = await this._permissionsApi.getPermissionForRequest({
connectedDid : did,
messageType : DwnInterface.MessagesSubscribe,
delegateDid,
protocol,
cached : true
});

permissionGrantId = messagesReadGrant.grant.id;
granteeDid = delegateDid;
} catch(error:any) {
console.error('SyncEngineLevel: pull - Error fetching MessagesRead permission grant for delegate DID', error);
continue;
}
}

const { reply: { status, subscription } } = await this.agent.processDwnRequest({
author : did,
target : did,
messageType : DwnInterface.MessagesSubscribe,
granteeDid,
messageParams : {
permissionGrantId
},
subscriptionHandler: async (event) => {
const { message } = event;
const messageCid = await Message.getCid(message);

// send the message to the remote DWN
const { status } = await this.agent.rpc.sendDwnRequest({
targetDid: did,
dwnUrl,
message,
});

if (status.code === 202 || status.code === 204 || status.code === 409) {
await this.addMessage(did, messageCid);
}
}
});

if (status.code === 200 && subscription) {
const existingSubscription = this._subscriptions.get(key);
if (existingSubscription) {
await existingSubscription.close();
}

this._subscriptions.set(key, subscription);
}
}

if (syncDirection === 'pull') {

const subscriptionHandler: MessageSubscriptionHandler = async (event) => {
const { message } = event;
const messageCid = await Message.getCid(message);

const messageExists = await this.messageExists(did, messageCid);
if (messageExists) {
return;
}
let permissionGrantId: string | undefined;
let granteeDid: string | undefined;
if (delegateDid) {
try {
const messagesReadGrant = await this._permissionsApi.getPermissionForRequest({
connectedDid : did,
messageType : DwnInterface.MessagesRead,
delegateDid,
protocol,
cached : true
});

permissionGrantId = messagesReadGrant.grant.id;
granteeDid = delegateDid;
} catch(error:any) {
console.error('SyncEngineLevel: pull - Error fetching MessagesRead permission grant for delegate DID', error);
return;
}
}

const messagesRead = await this.agent.processDwnRequest({
store : false,
author : did,
target : did,
granteeDid,
messageType : DwnInterface.MessagesRead,
messageParams : {
messageCid,
permissionGrantId
}
});

let reply: MessagesReadReply;

try {
reply = await this.agent.rpc.sendDwnRequest({
dwnUrl,
targetDid : did,
message : messagesRead.message,
}) as MessagesReadReply;
} catch(e) {
return;
}

if (reply.status.code !== 200 || !reply.entry?.message) {
return;
}

const replyEntry = reply.entry;
const { status: processStatus } = await this.agent.dwn.node.processMessage(did, replyEntry.message, { dataStream: replyEntry.data });
if (processStatus.code === 202 || processStatus.code === 204 || processStatus.code === 409) {
await this.addMessage(did, messageCid);
}
};
let permissionGrantId: string | undefined;
let granteeDid: string | undefined;
if (delegateDid) {
try {
const messagesReadGrant = await this._permissionsApi.getPermissionForRequest({
connectedDid : did,
messageType : DwnInterface.MessagesSubscribe,
delegateDid,
protocol,
cached : true
});

permissionGrantId = messagesReadGrant.grant.id;
granteeDid = delegateDid;
} catch(error:any) {
console.error('SyncEngineLevel: pull - Error fetching MessagesRead permission grant for delegate DID', error);
continue;
}
}
const messagesSubscribeMessage = await this.agent.sendDwnRequest({
store : false,
author : did,
target : did,
messageType : DwnInterface.MessagesSubscribe,
granteeDid,
messageParams : {
permissionGrantId,
},
subscriptionHandler
});

const { status: sendStatus, subscription } = await this.agent.rpc.sendDwnRequest({
dwnUrl,
targetDid : did,
message : messagesSubscribeMessage.message,
subscriptionHandler
});

if (sendStatus.code === 200 && subscription) {
const existingSubscription = this._subscriptions.get(key);
if (existingSubscription) {
await existingSubscription.close();
}

this._subscriptions.set(key, subscription);
}
}
}
}

public async startSync({ interval }: {
interval: string
}): Promise<void> {
Expand Down Expand Up @@ -542,6 +724,43 @@ export class SyncEngineLevel implements SyncEngine {
return dwnMessageWithBlob;
}

private async getSyncPeerInfo({ syncDirection }: {
syncDirection: SyncDirection;
}): Promise<SyncState[]> {

// Array to accumulate the list of sync peers for each DID.
const syncPeerState: SyncState[] = [];

// iterate over all registered identities
for await (const [ did, options ] of this._db.sublevel('registeredIdentities').iterator()) {
const { protocols, delegateDid } = JSON.parse(options) as SyncIdentityOptions;
// First, confirm the DID can be resolved and extract the DWN service endpoint URLs.
const dwnEndpointUrls = await getDwnServiceEndpointUrls(did, this.agent.did);
if (dwnEndpointUrls.length === 0) {
// Silently ignore and do not try to perform Sync for any DID that does not have a DWN
// service endpoint published in its DID document.
continue;
}

// Get the cursor (or undefined) for each (DID, DWN service endpoint, sync direction)
// combination and add it to the sync peer state array.
for (let dwnUrl of dwnEndpointUrls) {
const info = await this.agent.rpc.getServerInfo(dwnUrl);
if (info.webSocketSupport) {
if (protocols.length === 0) {
syncPeerState.push({ did, dwnUrl, delegateDid });
} else {
for (const protocol of protocols) {
syncPeerState.push({ did, dwnUrl, delegateDid, protocol });
}
}
}
}
}

return syncPeerState;
}

private async getSyncPeerState({ syncDirection }: {
syncDirection: SyncDirection;
}): Promise<SyncState[]> {
Expand Down
22 changes: 20 additions & 2 deletions packages/api/src/web5.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import type {
} from '@web5/agent';

import { Web5UserAgent } from '@web5/user-agent';
import { DwnRegistrar, WalletConnect } from '@web5/agent';
import { DwnRegistrar, IdentityProtocolDefinition, JwkProtocolDefinition, WalletConnect } from '@web5/agent';

import { DidApi } from './did-api.js';
import { DwnApi } from './dwn-api.js';
Expand Down Expand Up @@ -280,10 +280,28 @@ export class Web5 {
const serviceEndpointNodes = techPreview?.dwnEndpoints ?? didCreateOptions?.dwnEndpoints ?? ['https://dwn.tbddev.org/beta'];

// Initialize, if necessary, and start the agent.
if (await userAgent.firstLaunch()) {
const firstLaunch = await userAgent.firstLaunch();
if (firstLaunch) {
recoveryPhrase = await userAgent.initialize({ password, recoveryPhrase, dwnEndpoints: serviceEndpointNodes });
}
await userAgent.start({ password });
if (firstLaunch && sync !== 'off') {

// register only the identity-specific protocols for agent sync
await userAgent.sync.registerIdentity({
did : userAgent.agentDid.uri,
options : {
protocols: [
IdentityProtocolDefinition.protocol,
JwkProtocolDefinition.protocol
]
}
});

// attempt to pull the latest messages from the DWN so that the identities can be in sync
await userAgent.sync.sync('pull');
}

// Attempt to retrieve the connected Identity if it exists.
const connectedIdentity: BearerIdentity = await userAgent.identity.connectedIdentity();
let identity: BearerIdentity;
Expand Down
2 changes: 1 addition & 1 deletion packages/user-agent/src/user-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
} from '@web5/agent';

import { LevelStore } from '@web5/common';
import { BearerDid, DidDht, DidJwk, DidResolverCacheLevel } from '@web5/dids';
import { BearerDid, DidDht, DidJwk } from '@web5/dids';
import {
AgentDidApi,
AgentDwnApi,
Expand Down
Loading