diff --git a/packages/agent/package.json b/packages/agent/package.json index 448265232..a1915d6f9 100644 --- a/packages/agent/package.json +++ b/packages/agent/package.json @@ -71,7 +71,7 @@ "dependencies": { "@noble/ciphers": "0.4.1", "@scure/bip39": "1.2.2", - "@tbd54566975/dwn-sdk-js": "0.3.1", + "@tbd54566975/dwn-sdk-js": "0.3.2", "@web5/common": "1.0.0", "@web5/crypto": "1.0.0", "@web5/dids": "1.0.1", diff --git a/packages/agent/src/dwn-api.ts b/packages/agent/src/dwn-api.ts index d3bb1756d..2aa408b3d 100644 --- a/packages/agent/src/dwn-api.ts +++ b/packages/agent/src/dwn-api.ts @@ -32,6 +32,13 @@ export function isDwnRequest( return dwnRequest.messageType === messageType; } +export function isDwnMessage( + messageType: T, message: GenericMessage +): message is DwnMessage[T] { + const incomingMessageInterfaceName = message.descriptor.interface + message.descriptor.method; + return incomingMessageInterfaceName === messageType; +} + export class AgentDwnApi { /** * Holds the instance of a `Web5PlatformAgent` that represents the current execution context for @@ -114,13 +121,16 @@ export class AgentDwnApi { // Readable stream. const { message, dataStream } = await this.constructDwnMessage({ request }); + // Extracts the optional subscription handler from the request to pass into `processMessage. + const { subscriptionHandler } = request; + // Conditionally processes the message with the DWN instance: // - If `store` is not explicitly set to false, it sends the message to the DWN node for // processing, passing along the target DID, the message, and any associated data stream. // - If `store` is set to false, it immediately returns a simulated 'accepted' status without // storing the message/data in the DWN node. const reply: DwnMessageReply[T] = (request.store !== false) - ? await this._dwn.processMessage(request.target, message, { dataStream }) + ? await this._dwn.processMessage(request.target, message, { dataStream, subscriptionHandler }) : { status: { code: 202, detail: 'Accepted' } }; // Returns an object containing the reply from processing the message, the original message, diff --git a/packages/agent/src/test-harness.ts b/packages/agent/src/test-harness.ts index 7b8fe62af..1676a4b25 100644 --- a/packages/agent/src/test-harness.ts +++ b/packages/agent/src/test-harness.ts @@ -3,7 +3,7 @@ import type { AbstractLevel } from 'abstract-level'; import { Level } from 'level'; import { LevelStore, MemoryStore } from '@web5/common'; -import { DataStoreLevel, Dwn, EventLogLevel, MessageStoreLevel } from '@tbd54566975/dwn-sdk-js'; +import { DataStoreLevel, Dwn, EventEmitterStream, EventLogLevel, MessageStoreLevel } from '@tbd54566975/dwn-sdk-js'; import { DidDht, DidJwk, DidResolutionResult, DidResolverCache, DidResolverCacheLevel } from '@web5/dids'; import type { Web5PlatformAgent } from './types/agent.js'; @@ -180,6 +180,8 @@ export class PlatformAgentTestHarness { // Note: There is no in-memory store for DWN, so we always use LevelDB-based disk stores. const dwnDataStore = new DataStoreLevel({ blockstoreLocation: testDataPath('DWN_DATASTORE') }); const dwnEventLog = new EventLogLevel({ location: testDataPath('DWN_EVENTLOG') }); + const dwnEventStream = new EventEmitterStream(); + const dwnMessageStore = new MessageStoreLevel({ blockstoreLocation : testDataPath('DWN_MESSAGESTORE'), indexLocation : testDataPath('DWN_MESSAGEINDEX') @@ -191,7 +193,8 @@ export class PlatformAgentTestHarness { dataStore : dwnDataStore, didResolver : didApi, eventLog : dwnEventLog, - messageStore : dwnMessageStore + eventStream : dwnEventStream, + messageStore : dwnMessageStore, }); // Instantiate Agent's DWN API using the custom DWN instance. diff --git a/packages/agent/src/types/dwn.ts b/packages/agent/src/types/dwn.ts index bdbdf06f4..68e47a49a 100644 --- a/packages/agent/src/types/dwn.ts +++ b/packages/agent/src/types/dwn.ts @@ -26,6 +26,14 @@ import type { ProtocolsQueryOptions, ProtocolsConfigureMessage, ProtocolsConfigureOptions, + EventsSubscribeMessage, + RecordsSubscribeMessage, + EventsSubscribeOptions, + RecordsSubscribeOptions, + EventsSubscribeReply, + RecordsSubscribeReply, + RecordSubscriptionHandler, + EventSubscriptionHandler, } from '@tbd54566975/dwn-sdk-js'; import { @@ -40,6 +48,8 @@ import { DwnInterfaceName, ProtocolsConfigure, EventsQuery, + EventsSubscribe, + RecordsSubscribe, } from '@tbd54566975/dwn-sdk-js'; /** @@ -83,17 +93,20 @@ export interface DwnDidService extends DidService { export enum DwnInterface { EventsGet = DwnInterfaceName.Events + DwnMethodName.Get, EventsQuery = DwnInterfaceName.Events + DwnMethodName.Query, + EventsSubscribe = DwnInterfaceName.Events + DwnMethodName.Subscribe, MessagesGet = DwnInterfaceName.Messages + DwnMethodName.Get, ProtocolsConfigure = DwnInterfaceName.Protocols + DwnMethodName.Configure, ProtocolsQuery = DwnInterfaceName.Protocols + DwnMethodName.Query, RecordsDelete = DwnInterfaceName.Records + DwnMethodName.Delete, RecordsQuery = DwnInterfaceName.Records + DwnMethodName.Query, RecordsRead = DwnInterfaceName.Records + DwnMethodName.Read, + RecordsSubscribe = DwnInterfaceName.Records + DwnMethodName.Subscribe, RecordsWrite = DwnInterfaceName.Records + DwnMethodName.Write } export interface DwnMessage { [DwnInterface.EventsGet] : EventsGetMessage; + [DwnInterface.EventsSubscribe] : EventsSubscribeMessage; [DwnInterface.EventsQuery] : EventsQueryMessage; [DwnInterface.MessagesGet] : MessagesGetMessage; [DwnInterface.ProtocolsConfigure] : ProtocolsConfigureMessage; @@ -101,11 +114,13 @@ export interface DwnMessage { [DwnInterface.RecordsDelete] : RecordsDeleteMessage; [DwnInterface.RecordsQuery] : RecordsQueryMessage; [DwnInterface.RecordsRead] : RecordsReadMessage; + [DwnInterface.RecordsSubscribe] : RecordsSubscribeMessage; [DwnInterface.RecordsWrite] : RecordsWriteMessage; } export interface DwnMessageDescriptor { [DwnInterface.EventsGet] : EventsGetMessage['descriptor']; + [DwnInterface.EventsSubscribe] : EventsSubscribeMessage['descriptor']; [DwnInterface.EventsQuery] : EventsQueryMessage['descriptor']; [DwnInterface.MessagesGet] : MessagesGetMessage['descriptor']; [DwnInterface.ProtocolsConfigure] : ProtocolsConfigureMessage['descriptor']; @@ -113,33 +128,54 @@ export interface DwnMessageDescriptor { [DwnInterface.RecordsDelete] : RecordsDeleteMessage['descriptor']; [DwnInterface.RecordsQuery] : RecordsQueryMessage['descriptor']; [DwnInterface.RecordsRead] : RecordsReadMessage['descriptor']; + [DwnInterface.RecordsSubscribe] : RecordsSubscribeMessage['descriptor']; [DwnInterface.RecordsWrite] : RecordsWriteMessage['descriptor']; } export interface DwnMessageParams { [DwnInterface.EventsGet] : Partial; [DwnInterface.EventsQuery] : RequireOnly; + [DwnInterface.EventsSubscribe] : Partial; [DwnInterface.MessagesGet] : RequireOnly; [DwnInterface.ProtocolsConfigure] : RequireOnly; [DwnInterface.ProtocolsQuery] : ProtocolsQueryOptions; [DwnInterface.RecordsDelete] : RequireOnly; [DwnInterface.RecordsQuery] : RecordsQueryOptions; [DwnInterface.RecordsRead] : RecordsReadOptions; + [DwnInterface.RecordsSubscribe] : RecordsSubscribeOptions; [DwnInterface.RecordsWrite] : RecordsWriteOptions; } export interface DwnMessageReply { [DwnInterface.EventsGet] : EventsGetReply; [DwnInterface.EventsQuery] : EventsQueryReply; + [DwnInterface.EventsSubscribe] : EventsSubscribeReply; [DwnInterface.MessagesGet] : MessagesGetReply; [DwnInterface.ProtocolsConfigure] : GenericMessageReply; [DwnInterface.ProtocolsQuery] : ProtocolsQueryReply; [DwnInterface.RecordsDelete] : GenericMessageReply; [DwnInterface.RecordsQuery] : RecordsQueryReply; [DwnInterface.RecordsRead] : RecordsReadReply; + [DwnInterface.RecordsSubscribe] : RecordsSubscribeReply; [DwnInterface.RecordsWrite] : GenericMessageReply; } +export interface MessageHandler { + [DwnInterface.EventsSubscribe] : EventSubscriptionHandler; + [DwnInterface.RecordsSubscribe] : RecordSubscriptionHandler; + + // define all of them individually as undefined + [DwnInterface.EventsGet] : undefined; + [DwnInterface.EventsQuery] : undefined; + [DwnInterface.MessagesGet] : undefined; + [DwnInterface.ProtocolsConfigure] : undefined; + [DwnInterface.ProtocolsQuery] : undefined; + [DwnInterface.RecordsDelete] : undefined; + [DwnInterface.RecordsQuery] : undefined; + [DwnInterface.RecordsRead] : undefined; + [DwnInterface.RecordsWrite] : undefined; +} + export type DwnRequest = { author: string; target: string; @@ -166,6 +202,7 @@ export type ProcessDwnRequest = DwnRequest & { messageParams?: DwnMessageParams[T]; store?: boolean; signAsOwner?: boolean; + subscriptionHandler?: MessageHandler[T]; } export type SendDwnRequest = DwnRequest & (ProcessDwnRequest | { messageCid: string }) @@ -185,12 +222,14 @@ export interface DwnMessageConstructor { export const dwnMessageConstructors: { [T in DwnInterface]: DwnMessageConstructor } = { [DwnInterface.EventsGet] : EventsGet as any, [DwnInterface.EventsQuery] : EventsQuery as any, + [DwnInterface.EventsSubscribe] : EventsSubscribe as any, [DwnInterface.MessagesGet] : MessagesGet as any, [DwnInterface.ProtocolsConfigure] : ProtocolsConfigure as any, [DwnInterface.ProtocolsQuery] : ProtocolsQuery as any, [DwnInterface.RecordsDelete] : RecordsDelete as any, [DwnInterface.RecordsQuery] : RecordsQuery as any, [DwnInterface.RecordsRead] : RecordsRead as any, + [DwnInterface.RecordsSubscribe] : RecordsSubscribe as any, [DwnInterface.RecordsWrite] : RecordsWrite as any, } as const; @@ -199,12 +238,14 @@ export type DwnMessageConstructors = typeof dwnMessageConstructors; export interface DwnMessageInstance { [DwnInterface.EventsGet] : EventsGet; [DwnInterface.EventsQuery] : EventsQuery; + [DwnInterface.EventsSubscribe] : EventsSubscribe; [DwnInterface.MessagesGet] : MessagesGet; [DwnInterface.ProtocolsConfigure] : ProtocolsConfigure; [DwnInterface.ProtocolsQuery] : ProtocolsQuery; [DwnInterface.RecordsDelete] : RecordsDelete; [DwnInterface.RecordsQuery] : RecordsQuery; [DwnInterface.RecordsRead] : RecordsRead; + [DwnInterface.RecordsSubscribe] : RecordsSubscribe; [DwnInterface.RecordsWrite] : RecordsWrite; } diff --git a/packages/agent/tests/dwn-api.spec.ts b/packages/agent/tests/dwn-api.spec.ts index 352bb9165..74b71cc62 100644 --- a/packages/agent/tests/dwn-api.spec.ts +++ b/packages/agent/tests/dwn-api.spec.ts @@ -1,4 +1,4 @@ -import type { Dwn } from '@tbd54566975/dwn-sdk-js'; +import { Message, ProtocolDefinition, TestDataGenerator, type Dwn, type MessageEvent, type RecordsWriteMessage } from '@tbd54566975/dwn-sdk-js'; import { expect } from 'chai'; import { DidDht } from '@web5/dids'; @@ -6,7 +6,7 @@ import { Convert } from '@web5/common'; import type { PortableIdentity } from '../src/types/identity.js'; -import { AgentDwnApi } from '../src/dwn-api.js'; +import { AgentDwnApi, isDwnMessage } from '../src/dwn-api.js'; import { TestAgent } from './utils/test-agent.js'; import { testDwnUrl } from './utils/test-config.js'; import { DwnInterface } from '../src/types/dwn.js'; @@ -402,6 +402,87 @@ describe('AgentDwnApi', () => { expect(readReply.record).to.have.property('recordId', writeMessage.recordId); }); + it('handles RecordsSubscribe message', async () => { + const receivedMessages: RecordsWriteMessage[] = []; + const subscriptionHandler = (event: MessageEvent) => { + const { message } = event; + if (!isDwnMessage(DwnInterface.RecordsWrite, message)) { + expect.fail('Received message is not a RecordsWrite message'); + } + receivedMessages.push(message); + }; + + // create a subscription message for schema 'https://schemas.xyz/example' + const { reply: { status: subscribeStatus, subscription } } = await testHarness.agent.dwn.processRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.RecordsSubscribe, + messageParams : { + filter: { + schema: 'https://schemas.xyz/example' + } + }, + subscriptionHandler + }); + + // Verify the response. + expect(subscribeStatus.code).to.equal(200); + expect(subscription).to.exist; + + + // create a test record that matches the subscription filter + const dataBytes = Convert.string('Write 1').toUint8Array(); + let { message, reply: { status: writeStatus } } = await testHarness.agent.dwn.processRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.RecordsWrite, + messageParams : { + dataFormat : 'text/plain', + schema : 'https://schemas.xyz/example' + }, + dataStream: new Blob([dataBytes]) + }); + expect(writeStatus.code).to.equal(202); + const writeMessage1 = message!; + + // create another test record that matches the subscription filter + const dataBytes2 = Convert.string('Write 2').toUint8Array(); + let { message: message2, reply: { status: writeStatus2 } } = await testHarness.agent.dwn.processRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.RecordsWrite, + messageParams : { + dataFormat : 'text/plain', + schema : 'https://schemas.xyz/example' + }, + dataStream: new Blob([dataBytes2]) + }); + expect(writeStatus2.code).to.equal(202); + const writeMessage2 = message2!; + + // create a message that does not match the subscription filter + const dataBytes3 = Convert.string('Write 3').toUint8Array(); + let { reply: { status: writeStatus3 } } = await testHarness.agent.dwn.processRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.RecordsWrite, + messageParams : { + dataFormat : 'text/plain', + schema : 'https://schemas.xyz/other' // different schema + }, + dataStream: new Blob([dataBytes3]) + }); + expect(writeStatus3.code).to.equal(202); + + // close subscription + await subscription!.close(); + + // check that the subscription handler received the expected messages + expect(receivedMessages).to.have.length(2); + expect(receivedMessages[0].recordId).to.equal(writeMessage1.recordId); + expect(receivedMessages[1].recordId).to.equal(writeMessage2.recordId); + }); + it('handles RecordsWrite messages', async () => { // Create test data to write. const dataBytes = Convert.string('Hello, world!').toUint8Array(); @@ -719,6 +800,113 @@ describe('AgentDwnApi', () => { expect(eventsQueryReply.entries).to.have.length(0); }); + it('handles EventsSubscription', async () => { + const receivedMessages: string[] = []; + const subscriptionHandler = async (event: MessageEvent) => { + const { message } = event; + receivedMessages.push(await Message.getCid(message)); + }; + + // create a subscription message for protocol 'https://schemas.xyz/example' + const { reply: { status: subscribeStatus, subscription } } = await testHarness.agent.dwn.processRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.EventsSubscribe, + messageParams : { + filters: [{ + protocol: 'https://protocol.xyz/example' + }] + }, + subscriptionHandler + }); + + // Verify the response. + expect(subscribeStatus.code).to.equal(200); + expect(subscription).to.exist; + + // install the protocol, this will match the subscription filter + const protocolDefinition: ProtocolDefinition = { + published : true, + protocol : 'https://protocol.xyz/example', + types : { + foo: { + schema : 'https://schemas.xyz/foo', + dataFormats : ['text/plain', 'application/json'] + } + }, + structure: { + foo: {} + } + }; + + let {messageCid: protocolMessageCid, reply: { status: protocolStatus } } = await testHarness.agent.dwn.processRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.ProtocolsConfigure, + messageParams : { + definition: protocolDefinition + } + }); + expect(protocolStatus.code).to.equal(202); + + // create a test record that matches the subscription filter + const dataBytes = Convert.string('Write 1').toUint8Array(); + let { messageCid: write1MessageCid, reply: { status: writeStatus } } = await testHarness.agent.dwn.processRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.RecordsWrite, + messageParams : { + protocol : 'https://protocol.xyz/example', + protocolPath : 'foo', + dataFormat : 'text/plain', + schema : 'https://schemas.xyz/foo' + }, + dataStream: new Blob([dataBytes]) + }); + expect(writeStatus.code).to.equal(202); + + // create another test record that matches the subscription filter + const dataBytes2 = Convert.string('Write 2').toUint8Array(); + let { messageCid: write2MessageCid, reply: { status: writeStatus2 } } = await testHarness.agent.dwn.processRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.RecordsWrite, + messageParams : { + protocol : 'https://protocol.xyz/example', + protocolPath : 'foo', + dataFormat : 'text/plain', + schema : 'https://schemas.xyz/foo' + }, + dataStream: new Blob([dataBytes2]) + }); + expect(writeStatus2.code).to.equal(202); + + // create a message that does not match the subscription filter + const dataBytes3 = Convert.string('Write 3').toUint8Array(); + let { reply: { status: writeStatus3 } } = await testHarness.agent.dwn.processRequest({ + author : alice.did.uri, + target : alice.did.uri, + messageType : DwnInterface.RecordsWrite, + messageParams : { + dataFormat : 'text/plain', + schema : 'https://schemas.xyz/foo' // no protocol + }, + dataStream: new Blob([dataBytes3]) + }); + expect(writeStatus3.code).to.equal(202); + + // close subscription + await subscription!.close(); + + // check that the subscription handler received the expected messages + expect(receivedMessages).to.have.length(3); + expect(receivedMessages).to.have.members([ + protocolMessageCid, + write1MessageCid, + write2MessageCid + ]); + }); + it('handles MessagesGet', async () => { // Create test data to write. const dataBytes = Convert.string('Hello, world!').toUint8Array(); @@ -1068,4 +1256,19 @@ describe('AgentDwnApi', () => { } }); }); +}); + +describe('isDwnMessage', () => { + it('asserts the type of DWN message', async () => { + const { message: recordsWriteMessage } = await TestDataGenerator.generateRecordsWrite(); + const { message: recordsQueryMessage } = await TestDataGenerator.generateRecordsQuery(); + + // positive tests + expect(isDwnMessage(DwnInterface.RecordsWrite, recordsWriteMessage)).to.be.true; + expect(isDwnMessage(DwnInterface.RecordsQuery, recordsQueryMessage)).to.be.true; + + // negative tests + expect(isDwnMessage(DwnInterface.RecordsQuery, recordsWriteMessage)).to.be.false; + expect(isDwnMessage(DwnInterface.RecordsWrite, recordsQueryMessage)).to.be.false; + }); }); \ No newline at end of file diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 36db81cb7..e2003804a 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -42,8 +42,8 @@ importers: specifier: 1.2.2 version: 1.2.2 '@tbd54566975/dwn-sdk-js': - specifier: 0.3.1 - version: 0.3.1 + specifier: 0.3.2 + version: 0.3.2 '@web5/common': specifier: 1.0.0 version: link:../common @@ -2800,6 +2800,39 @@ packages: - encoding - supports-color + /@tbd54566975/dwn-sdk-js@0.3.2: + resolution: {integrity: sha512-MMBau0Snkfnw4pCyBAzOneniUPVOBD1+m/Wj5rVgkDJNPIRkbFMcJ7auEmc4Pm0w+7pgm/ToPXDaSix+2Qob1w==} + engines: {node: '>= 18'} + dependencies: + '@ipld/dag-cbor': 9.0.3 + '@js-temporal/polyfill': 0.4.4 + '@noble/ed25519': 2.0.0 + '@noble/secp256k1': 2.0.0 + '@web5/dids': 1.0.1 + abstract-level: 1.0.3 + ajv: 8.12.0 + blockstore-core: 4.2.0 + cross-fetch: 4.0.0 + eciesjs: 0.4.5 + interface-blockstore: 5.2.3 + interface-store: 5.1.2 + ipfs-unixfs-exporter: 13.1.5 + ipfs-unixfs-importer: 15.1.5 + level: 8.0.0 + lodash: 4.17.21 + lru-cache: 9.1.2 + ms: 2.1.3 + multiformats: 11.0.2 + randombytes: 2.1.0 + readable-stream: 4.5.2 + ulidx: 2.1.0 + uuid: 8.3.2 + varint: 6.0.0 + transitivePeerDependencies: + - encoding + - supports-color + dev: false + /@tbd54566975/dwn-sql-store@0.4.1: resolution: {integrity: sha512-ndslsbtNjkIuNu8ytNZnKjH4uWoxWFzt+L/8ok5giVmgrjTh/+XDU23LQYJjRHC/RusjpDNjlt77PhL2qbXxmQ==} engines: {node: '>=18'}