Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

@web5/agent DWN Subscriptions Support #492

Merged
merged 14 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .changeset/six-pandas-tell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
"@web5/agent": patch
"@web5/identity-agent": patch
"@web5/proxy-agent": patch
"@web5/user-agent": patch
---

- `@web5/agent` DWN Subscriptions Support
- `@web5/agent` supports latest `dwn-sdk-js` with `prune` feature from `RecordsWriteDelete`

5 changes: 5 additions & 0 deletions .changeset/twelve-trainers-wonder.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@web5/api": patch
---

`@web5/api` supports `prune` via `RecordsWriteDelete`
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
"@changesets/cli": "^2.27.1",
"@npmcli/package-json": "5.0.0",
"@typescript-eslint/eslint-plugin": "6.4.0",
"@web5/dwn-server": "0.2.1",
"@web5/dwn-server": "0.2.2",
"eslint-plugin-mocha": "10.1.0",
"npkill": "0.11.3"
},
Expand Down
2 changes: 1 addition & 1 deletion packages/agent/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
"dependencies": {
"@noble/ciphers": "0.4.1",
"@scure/bip39": "1.2.2",
"@tbd54566975/dwn-sdk-js": "0.3.1",
"@tbd54566975/dwn-sdk-js": "0.3.2",
"@web5/common": "1.0.0",
"@web5/crypto": "1.0.0",
"@web5/dids": "1.0.1",
Expand Down
2 changes: 1 addition & 1 deletion packages/agent/src/did-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ export class AgentDidApi<TKeyManager extends AgentKeyManager = AgentKeyManager>
// Resolve the DID document.
const { didDocument, didResolutionMetadata } = await this.resolve(didUri);
if (!didDocument) {
throw new Error(`DID resolution failed for '${didUri}': ${didResolutionMetadata.error}`);
throw new Error(`DID resolution failed for '${didUri}': ${JSON.stringify(didResolutionMetadata)}`);
LiranCohen marked this conversation as resolved.
Show resolved Hide resolved
}

// Retrieve the method-specific verification method to be used for signing operations.
Expand Down
49 changes: 44 additions & 5 deletions packages/agent/src/dwn-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import type { DwnConfig, GenericMessage, UnionMessageReply } from '@tbd54566975/
import { Convert, NodeStream } from '@web5/common';
import { utils as cryptoUtils } from '@web5/crypto';
import { DidDht, DidJwk, DidResolverCacheLevel, UniversalResolver } from '@web5/dids';
import { Cid, DataStoreLevel, Dwn, EventLogLevel, Message, MessageStoreLevel } from '@tbd54566975/dwn-sdk-js';
import { Cid, DataStoreLevel, Dwn, DwnMethodName, EventLogLevel, Message, MessageStoreLevel } from '@tbd54566975/dwn-sdk-js';

import type { Web5PlatformAgent } from './types/agent.js';
import type { DwnMessage, DwnMessageInstance, DwnMessageParams, DwnMessageReply, DwnMessageWithData, DwnResponse, DwnSigner, ProcessDwnRequest, SendDwnRequest } from './types/dwn.js';
import type { DwnMessage, DwnMessageInstance, DwnMessageParams, DwnMessageReply, DwnMessageWithData, DwnResponse, DwnSigner, MessageHandler, ProcessDwnRequest, SendDwnRequest } from './types/dwn.js';

import { DwnInterface, dwnMessageConstructors } from './types/dwn.js';
import { blobToIsomorphicNodeReadable, getDwnServiceEndpointUrls, isRecordsWrite, webReadableToIsomorphicNodeReadable } from './utils.js';
Expand All @@ -32,6 +32,13 @@ export function isDwnRequest<T extends DwnInterface>(
return dwnRequest.messageType === messageType;
}

export function isDwnMessage<T extends DwnInterface>(
messageType: T, message: GenericMessage
): message is DwnMessage[T] {
const incomingMessageInterfaceName = message.descriptor.interface + message.descriptor.method;
return incomingMessageInterfaceName === messageType;
}

export class AgentDwnApi {
/**
* Holds the instance of a `Web5PlatformAgent` that represents the current execution context for
Expand Down Expand Up @@ -114,13 +121,16 @@ export class AgentDwnApi {
// Readable stream.
const { message, dataStream } = await this.constructDwnMessage({ request });

// Extracts the optional subscription handler from the request to pass into `processMessage.
const { subscriptionHandler } = request;

// Conditionally processes the message with the DWN instance:
// - If `store` is not explicitly set to false, it sends the message to the DWN node for
// processing, passing along the target DID, the message, and any associated data stream.
// - If `store` is set to false, it immediately returns a simulated 'accepted' status without
// storing the message/data in the DWN node.
const reply: DwnMessageReply[T] = (request.store !== false)
? await this._dwn.processMessage(request.target, message, { dataStream })
? await this._dwn.processMessage(request.target, message, { dataStream, subscriptionHandler })
: { status: { code: 202, detail: 'Accepted' } };

// Returns an object containing the reply from processing the message, the original message,
Expand All @@ -144,6 +154,7 @@ export class AgentDwnApi {
let messageCid: string | undefined;
let message: DwnMessage[T];
let data: Blob | undefined;
let subscriptionHandler: MessageHandler[T] | undefined;

// If `messageCid` is given, retrieve message and data, if any.
if ('messageCid' in request) {
Expand All @@ -161,14 +172,16 @@ export class AgentDwnApi {
throw new Error('AgentDwnApi: DataStream must be provided as a Blob');
}
data = request.dataStream;
subscriptionHandler = request.subscriptionHandler;
}

// Send the RPC request to the target DID's DWN service endpoint using the Agent's RPC client.
const reply = await this.sendDwnRpcRequest({
targetDid: request.target,
dwnEndpointUrls,
message,
data
data,
subscriptionHandler
});

// If the message CID was not given in the `request`, compute it.
Expand All @@ -180,25 +193,51 @@ export class AgentDwnApi {
}

private async sendDwnRpcRequest<T extends DwnInterface>({
targetDid, dwnEndpointUrls, message, data
targetDid, dwnEndpointUrls, message, data, subscriptionHandler
}: {
targetDid: string;
dwnEndpointUrls: string[];
message: DwnMessage[T];
data?: Blob;
subscriptionHandler?: MessageHandler[T];
}
): Promise<DwnMessageReply[T]> {
const errorMessages: { url: string, message: string }[] = [];

if (message.descriptor.method === DwnMethodName.Subscribe && subscriptionHandler === undefined) {
throw new Error('AgentDwnApi: Subscription handler is required for subscription requests.');
}

// Try sending to author's publicly addressable DWNs until the first request succeeds.
for (let dwnUrl of dwnEndpointUrls) {
try {
if (subscriptionHandler !== undefined) {
// we get the server info to check if the server supports WebSocket for subscription requests
const serverInfo = await this.agent.rpc.getServerInfo(dwnUrl);
if (!serverInfo.webSocketSupport) {
// If the server does not support WebSocket, add an error message and continue to the next URL.
errorMessages.push({
url : dwnUrl,
message : 'WebSocket support is not enabled on the server.'
});
continue;
}

// If the server supports WebSocket, replace the subscription URL with a socket transport.
// For `http` we use the unsecured `ws` protocol, and for `https` we use the secured `wss` protocol.
const parsedUrl = new URL(dwnUrl);
parsedUrl.protocol = parsedUrl.protocol === 'http:' ? 'ws:' : 'wss:';
dwnUrl = parsedUrl.toString();
}

const dwnReply = await this.agent.rpc.sendDwnRequest({
dwnUrl,
targetDid,
message,
data,
subscriptionHandler
});

return dwnReply;
} catch(error: any) {
errorMessages.push({
Expand Down
6 changes: 3 additions & 3 deletions packages/agent/src/prototyping/clients/dwn-rpc-types.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import type { MessageEvent, RecordsReadReply, UnionMessageReply } from '@tbd54566975/dwn-sdk-js';
import type { RecordsReadReply, UnionMessageReply, EventSubscriptionHandler, RecordSubscriptionHandler } from '@tbd54566975/dwn-sdk-js';

export interface SerializableDwnMessage {
toJSON(): string;
}

export type DwnEventSubscriptionHandler = (event: MessageEvent) => void;
export type DwnSubscriptionHandler = EventSubscriptionHandler | RecordSubscriptionHandler;

/**
* Interface for communicating with {@link https://github.com/TBD54566975/dwn-server | DWN Servers}
Expand Down Expand Up @@ -45,7 +45,7 @@ export type DwnRpcRequest = {
targetDid: string;

/** Optional subscription handler for DWN events. */
subscriptionHandler?: DwnEventSubscriptionHandler;
subscriptionHandler?: DwnSubscriptionHandler;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { DwnEventSubscriptionHandler, DwnRpc, DwnRpcRequest, DwnRpcResponse } from './dwn-rpc-types.js';
import type { DwnRpc, DwnRpcRequest, DwnRpcResponse, DwnSubscriptionHandler } from './dwn-rpc-types.js';
import type { GenericMessage, MessageSubscription, UnionMessageReply } from '@tbd54566975/dwn-sdk-js';

import { utils as cryptoUtils } from '@web5/crypto';
Expand Down Expand Up @@ -60,7 +60,7 @@ export class WebSocketDwnRpcClient implements DwnRpc {
return result.reply as DwnRpcResponse;
}

private static async subscriptionRequest(connection: SocketConnection, target:string, message: GenericMessage, messageHandler: DwnEventSubscriptionHandler): Promise<DwnRpcResponse> {
private static async subscriptionRequest(connection: SocketConnection, target:string, message: GenericMessage, messageHandler: DwnSubscriptionHandler): Promise<DwnRpcResponse> {
const requestId = cryptoUtils.randomUuid();
const subscriptionId = cryptoUtils.randomUuid();
const request = createJsonRpcSubscriptionRequest(requestId, 'dwn.processMessage', subscriptionId, { target, message });
Expand Down
6 changes: 3 additions & 3 deletions packages/agent/src/rpc-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ export class Web5RpcClient implements Web5Rpc {
constructor(clients: Web5Rpc[] = []) {
this.transportClients = new Map();

// include http client as default. can be overwritten for 'http:' or 'https:' if instantiator provides
// their own.
clients = [new HttpWeb5RpcClient(), ...clients];
// include http and socket clients as default.
// can be overwritten for 'http:', 'https:', 'ws: or ':wss' if instantiated with other clients.
clients = [new HttpWeb5RpcClient(), new WebSocketWeb5RpcClient(), ...clients];

for (let client of clients) {
for (let transportScheme of client.transportProtocols) {
Expand Down
7 changes: 5 additions & 2 deletions packages/agent/src/test-harness.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type { AbstractLevel } from 'abstract-level';

import { Level } from 'level';
import { LevelStore, MemoryStore } from '@web5/common';
import { DataStoreLevel, Dwn, EventLogLevel, MessageStoreLevel } from '@tbd54566975/dwn-sdk-js';
import { DataStoreLevel, Dwn, EventEmitterStream, EventLogLevel, MessageStoreLevel } from '@tbd54566975/dwn-sdk-js';
import { DidDht, DidJwk, DidResolutionResult, DidResolverCache, DidResolverCacheLevel } from '@web5/dids';

import type { Web5PlatformAgent } from './types/agent.js';
Expand Down Expand Up @@ -180,6 +180,8 @@ export class PlatformAgentTestHarness {
// Note: There is no in-memory store for DWN, so we always use LevelDB-based disk stores.
const dwnDataStore = new DataStoreLevel({ blockstoreLocation: testDataPath('DWN_DATASTORE') });
const dwnEventLog = new EventLogLevel({ location: testDataPath('DWN_EVENTLOG') });
const dwnEventStream = new EventEmitterStream();

const dwnMessageStore = new MessageStoreLevel({
blockstoreLocation : testDataPath('DWN_MESSAGESTORE'),
indexLocation : testDataPath('DWN_MESSAGEINDEX')
Expand All @@ -191,7 +193,8 @@ export class PlatformAgentTestHarness {
dataStore : dwnDataStore,
didResolver : didApi,
eventLog : dwnEventLog,
messageStore : dwnMessageStore
eventStream : dwnEventStream,
messageStore : dwnMessageStore,
});

// Instantiate Agent's DWN API using the custom DWN instance.
Expand Down
Loading
Loading