Skip to content

Commit

Permalink
support subscriptions in agent
Browse files Browse the repository at this point in the history
  • Loading branch information
LiranCohen committed May 7, 2024
1 parent ca7f53b commit 96e452a
Show file tree
Hide file tree
Showing 6 changed files with 298 additions and 8 deletions.
2 changes: 1 addition & 1 deletion packages/agent/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
"dependencies": {
"@noble/ciphers": "0.4.1",
"@scure/bip39": "1.2.2",
"@tbd54566975/dwn-sdk-js": "0.3.1",
"@tbd54566975/dwn-sdk-js": "0.3.2",
"@web5/common": "1.0.0",
"@web5/crypto": "1.0.0",
"@web5/dids": "1.0.1",
Expand Down
12 changes: 11 additions & 1 deletion packages/agent/src/dwn-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ export function isDwnRequest<T extends DwnInterface>(
return dwnRequest.messageType === messageType;
}

export function isDwnMessage<T extends DwnInterface>(
messageType: T, message: GenericMessage
): message is DwnMessage[T] {
const incomingMessageInterfaceName = message.descriptor.interface + message.descriptor.method;
return incomingMessageInterfaceName === messageType;
}

export class AgentDwnApi {
/**
* Holds the instance of a `Web5PlatformAgent` that represents the current execution context for
Expand Down Expand Up @@ -114,13 +121,16 @@ export class AgentDwnApi {
// Readable stream.
const { message, dataStream } = await this.constructDwnMessage({ request });

// Extracts the optional subscription handler from the request to pass into `processMessage.
const { subscriptionHandler } = request;

// Conditionally processes the message with the DWN instance:
// - If `store` is not explicitly set to false, it sends the message to the DWN node for
// processing, passing along the target DID, the message, and any associated data stream.
// - If `store` is set to false, it immediately returns a simulated 'accepted' status without
// storing the message/data in the DWN node.
const reply: DwnMessageReply[T] = (request.store !== false)
? await this._dwn.processMessage(request.target, message, { dataStream })
? await this._dwn.processMessage(request.target, message, { dataStream, subscriptionHandler })
: { status: { code: 202, detail: 'Accepted' } };

// Returns an object containing the reply from processing the message, the original message,
Expand Down
7 changes: 5 additions & 2 deletions packages/agent/src/test-harness.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type { AbstractLevel } from 'abstract-level';

import { Level } from 'level';
import { LevelStore, MemoryStore } from '@web5/common';
import { DataStoreLevel, Dwn, EventLogLevel, MessageStoreLevel } from '@tbd54566975/dwn-sdk-js';
import { DataStoreLevel, Dwn, EventEmitterStream, EventLogLevel, MessageStoreLevel } from '@tbd54566975/dwn-sdk-js';
import { DidDht, DidJwk, DidResolutionResult, DidResolverCache, DidResolverCacheLevel } from '@web5/dids';

import type { Web5PlatformAgent } from './types/agent.js';
Expand Down Expand Up @@ -180,6 +180,8 @@ export class PlatformAgentTestHarness {
// Note: There is no in-memory store for DWN, so we always use LevelDB-based disk stores.
const dwnDataStore = new DataStoreLevel({ blockstoreLocation: testDataPath('DWN_DATASTORE') });
const dwnEventLog = new EventLogLevel({ location: testDataPath('DWN_EVENTLOG') });
const dwnEventStream = new EventEmitterStream();

const dwnMessageStore = new MessageStoreLevel({
blockstoreLocation : testDataPath('DWN_MESSAGESTORE'),
indexLocation : testDataPath('DWN_MESSAGEINDEX')
Expand All @@ -191,7 +193,8 @@ export class PlatformAgentTestHarness {
dataStore : dwnDataStore,
didResolver : didApi,
eventLog : dwnEventLog,
messageStore : dwnMessageStore
eventStream : dwnEventStream,
messageStore : dwnMessageStore,
});

// Instantiate Agent's DWN API using the custom DWN instance.
Expand Down
41 changes: 41 additions & 0 deletions packages/agent/src/types/dwn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ import type {
ProtocolsQueryOptions,
ProtocolsConfigureMessage,
ProtocolsConfigureOptions,
EventsSubscribeMessage,
RecordsSubscribeMessage,
EventsSubscribeOptions,
RecordsSubscribeOptions,
EventsSubscribeReply,
RecordsSubscribeReply,
RecordSubscriptionHandler,
EventSubscriptionHandler,
} from '@tbd54566975/dwn-sdk-js';

import {
Expand All @@ -40,6 +48,8 @@ import {
DwnInterfaceName,
ProtocolsConfigure,
EventsQuery,
EventsSubscribe,
RecordsSubscribe,
} from '@tbd54566975/dwn-sdk-js';

/**
Expand Down Expand Up @@ -83,63 +93,89 @@ export interface DwnDidService extends DidService {
export enum DwnInterface {
EventsGet = DwnInterfaceName.Events + DwnMethodName.Get,
EventsQuery = DwnInterfaceName.Events + DwnMethodName.Query,
EventsSubscribe = DwnInterfaceName.Events + DwnMethodName.Subscribe,
MessagesGet = DwnInterfaceName.Messages + DwnMethodName.Get,
ProtocolsConfigure = DwnInterfaceName.Protocols + DwnMethodName.Configure,
ProtocolsQuery = DwnInterfaceName.Protocols + DwnMethodName.Query,
RecordsDelete = DwnInterfaceName.Records + DwnMethodName.Delete,
RecordsQuery = DwnInterfaceName.Records + DwnMethodName.Query,
RecordsRead = DwnInterfaceName.Records + DwnMethodName.Read,
RecordsSubscribe = DwnInterfaceName.Records + DwnMethodName.Subscribe,
RecordsWrite = DwnInterfaceName.Records + DwnMethodName.Write
}

export interface DwnMessage {
[DwnInterface.EventsGet] : EventsGetMessage;
[DwnInterface.EventsSubscribe] : EventsSubscribeMessage;
[DwnInterface.EventsQuery] : EventsQueryMessage;
[DwnInterface.MessagesGet] : MessagesGetMessage;
[DwnInterface.ProtocolsConfigure] : ProtocolsConfigureMessage;
[DwnInterface.ProtocolsQuery] : ProtocolsQueryMessage;
[DwnInterface.RecordsDelete] : RecordsDeleteMessage;
[DwnInterface.RecordsQuery] : RecordsQueryMessage;
[DwnInterface.RecordsRead] : RecordsReadMessage;
[DwnInterface.RecordsSubscribe] : RecordsSubscribeMessage;
[DwnInterface.RecordsWrite] : RecordsWriteMessage;
}

export interface DwnMessageDescriptor {
[DwnInterface.EventsGet] : EventsGetMessage['descriptor'];
[DwnInterface.EventsSubscribe] : EventsSubscribeMessage['descriptor'];
[DwnInterface.EventsQuery] : EventsQueryMessage['descriptor'];
[DwnInterface.MessagesGet] : MessagesGetMessage['descriptor'];
[DwnInterface.ProtocolsConfigure] : ProtocolsConfigureMessage['descriptor'];
[DwnInterface.ProtocolsQuery] : ProtocolsQueryMessage['descriptor'];
[DwnInterface.RecordsDelete] : RecordsDeleteMessage['descriptor'];
[DwnInterface.RecordsQuery] : RecordsQueryMessage['descriptor'];
[DwnInterface.RecordsRead] : RecordsReadMessage['descriptor'];
[DwnInterface.RecordsSubscribe] : RecordsSubscribeMessage['descriptor'];
[DwnInterface.RecordsWrite] : RecordsWriteMessage['descriptor'];
}

export interface DwnMessageParams {
[DwnInterface.EventsGet] : Partial<EventsGetOptions>;
[DwnInterface.EventsQuery] : RequireOnly<EventsQueryOptions, 'filters'>;
[DwnInterface.EventsSubscribe] : Partial<EventsSubscribeOptions>;
[DwnInterface.MessagesGet] : RequireOnly<MessagesGetOptions, 'messageCids'>;
[DwnInterface.ProtocolsConfigure] : RequireOnly<ProtocolsConfigureOptions, 'definition'>;
[DwnInterface.ProtocolsQuery] : ProtocolsQueryOptions;
[DwnInterface.RecordsDelete] : RequireOnly<RecordsDeleteOptions, 'recordId'>;
[DwnInterface.RecordsQuery] : RecordsQueryOptions;
[DwnInterface.RecordsRead] : RecordsReadOptions;
[DwnInterface.RecordsSubscribe] : RecordsSubscribeOptions;
[DwnInterface.RecordsWrite] : RecordsWriteOptions;
}

export interface DwnMessageReply {
[DwnInterface.EventsGet] : EventsGetReply;
[DwnInterface.EventsQuery] : EventsQueryReply;
[DwnInterface.EventsSubscribe] : EventsSubscribeReply;
[DwnInterface.MessagesGet] : MessagesGetReply;
[DwnInterface.ProtocolsConfigure] : GenericMessageReply;
[DwnInterface.ProtocolsQuery] : ProtocolsQueryReply;
[DwnInterface.RecordsDelete] : GenericMessageReply;
[DwnInterface.RecordsQuery] : RecordsQueryReply;
[DwnInterface.RecordsRead] : RecordsReadReply;
[DwnInterface.RecordsSubscribe] : RecordsSubscribeReply;
[DwnInterface.RecordsWrite] : GenericMessageReply;
}

export interface MessageHandler {
[DwnInterface.EventsSubscribe] : EventSubscriptionHandler;
[DwnInterface.RecordsSubscribe] : RecordSubscriptionHandler;

// define all of them individually as undefined
[DwnInterface.EventsGet] : undefined;
[DwnInterface.EventsQuery] : undefined;
[DwnInterface.MessagesGet] : undefined;
[DwnInterface.ProtocolsConfigure] : undefined;
[DwnInterface.ProtocolsQuery] : undefined;
[DwnInterface.RecordsDelete] : undefined;
[DwnInterface.RecordsQuery] : undefined;
[DwnInterface.RecordsRead] : undefined;
[DwnInterface.RecordsWrite] : undefined;
}

export type DwnRequest<T extends DwnInterface> = {
author: string;
target: string;
Expand All @@ -166,6 +202,7 @@ export type ProcessDwnRequest<T extends DwnInterface> = DwnRequest<T> & {
messageParams?: DwnMessageParams[T];
store?: boolean;
signAsOwner?: boolean;
subscriptionHandler?: MessageHandler[T];
}

export type SendDwnRequest<T extends DwnInterface> = DwnRequest<T> & (ProcessDwnRequest<T> | { messageCid: string })
Expand All @@ -185,12 +222,14 @@ export interface DwnMessageConstructor<T extends DwnInterface> {
export const dwnMessageConstructors: { [T in DwnInterface]: DwnMessageConstructor<T> } = {
[DwnInterface.EventsGet] : EventsGet as any,
[DwnInterface.EventsQuery] : EventsQuery as any,
[DwnInterface.EventsSubscribe] : EventsSubscribe as any,
[DwnInterface.MessagesGet] : MessagesGet as any,
[DwnInterface.ProtocolsConfigure] : ProtocolsConfigure as any,
[DwnInterface.ProtocolsQuery] : ProtocolsQuery as any,
[DwnInterface.RecordsDelete] : RecordsDelete as any,
[DwnInterface.RecordsQuery] : RecordsQuery as any,
[DwnInterface.RecordsRead] : RecordsRead as any,
[DwnInterface.RecordsSubscribe] : RecordsSubscribe as any,
[DwnInterface.RecordsWrite] : RecordsWrite as any,
} as const;

Expand All @@ -199,12 +238,14 @@ export type DwnMessageConstructors = typeof dwnMessageConstructors;
export interface DwnMessageInstance {
[DwnInterface.EventsGet] : EventsGet;
[DwnInterface.EventsQuery] : EventsQuery;
[DwnInterface.EventsSubscribe] : EventsSubscribe;
[DwnInterface.MessagesGet] : MessagesGet;
[DwnInterface.ProtocolsConfigure] : ProtocolsConfigure;
[DwnInterface.ProtocolsQuery] : ProtocolsQuery;
[DwnInterface.RecordsDelete] : RecordsDelete;
[DwnInterface.RecordsQuery] : RecordsQuery;
[DwnInterface.RecordsRead] : RecordsRead;
[DwnInterface.RecordsSubscribe] : RecordsSubscribe;
[DwnInterface.RecordsWrite] : RecordsWrite;
}

Expand Down
Loading

0 comments on commit 96e452a

Please sign in to comment.