From e6ee17aa0ca6cd3b9c209c75746e7fec1b881c3c Mon Sep 17 00:00:00 2001 From: Andor Kesselman Date: Thu, 28 Sep 2023 15:33:41 +0530 Subject: [PATCH] added subscription api --- packages/api/src/dwn-api.ts | 182 ++++++++++++++++++++++-------- packages/api/src/subscriptions.ts | 40 +++++++ 2 files changed, 175 insertions(+), 47 deletions(-) create mode 100644 packages/api/src/subscriptions.ts diff --git a/packages/api/src/dwn-api.ts b/packages/api/src/dwn-api.ts index dc70c5884..85be84c88 100644 --- a/packages/api/src/dwn-api.ts +++ b/packages/api/src/dwn-api.ts @@ -1,24 +1,26 @@ +import { DwnInterfaceName, DwnMethodName, EventStreamI } from '@tbd54566975/dwn-sdk-js'; import type { DwnResponse, Web5Agent } from '@web5/agent'; import type { - UnionMessageReply, - RecordsReadOptions, + EventMessage, + ProtocolsConfigureDescriptor, + ProtocolsConfigureMessage, + ProtocolsConfigureOptions, + ProtocolsQueryOptions, + RecordsDeleteOptions, RecordsQueryOptions, + RecordsQueryReplyEntry, + RecordsReadOptions, RecordsWriteMessage, RecordsWriteOptions, - RecordsDeleteOptions, - ProtocolsQueryOptions, - RecordsQueryReplyEntry, - ProtocolsConfigureMessage, - ProtocolsConfigureOptions, - ProtocolsConfigureDescriptor, + SubscriptionRequestOptions, + SubscriptionRequestReply, + UnionMessageReply, } from '@tbd54566975/dwn-sdk-js'; -import { isEmptyObject } from '@web5/common'; -import { DwnInterfaceName, DwnMethodName } from '@tbd54566975/dwn-sdk-js'; - -import { Record } from './record.js'; import { Protocol } from './protocol.js'; +import { Record } from './record.js'; import { dataToBlob } from './utils.js'; +import { isEmptyObject } from '@web5/common'; export type ProtocolsConfigureRequest = { message: Omit; @@ -96,6 +98,20 @@ export type RecordsWriteResponse = { record?: Record }; +export type SubscriptionRequestMessage = { + message: Omit; +} + +export type SubscriptionRequestResponse = { + status: UnionMessageReply['status']; + protocol?: Record; +} + +export type Subscription = { + stream: EventStreamI; + socket?: WebSocket; +} + /** * TODO: Document class. */ @@ -108,6 +124,78 @@ export class DwnApi { this.connectedDid = options.connectedDid; } + get subscription() { + return { + /** + * Creates a subscription. Note: the appropriate Permissions over SubscriptionRequestPermission + * MUST be set beforehand for authorization to work. + * @param {string} target - The target for the subscription. + * @param {SubscriptionRequestMessage} request - The subscription request message. + * @param {(e: EventMessage) => Promise} callback - The callback function to handle events. + * @returns {Promise} A promise containing the subscription request reply. + * + * Example: + * { + * "target": "did:example:12345", + * "request": { + * "filter": { + * "type": "record", + * "recordFilters": { + * "protocolPath": "/my/protocol/path" + * } + * } + * } + * } + * Callback will run over the returned event type. + * Alternatively, you may request the actual pipe + */ + create: async (target, request, callback) : Promise => { + if (this.connectedDid === target) { + // Form a request object + const agentResponse = await this.agent.processDwnRequest({ + target: this.connectedDid, + author: this.connectedDid, + messageOptions: request.message, + messageType: DwnInterfaceName.Subscriptions + DwnMethodName.Request + }); + + const { message, messageCid, reply: { status } } = agentResponse; + const response = { status }; + + if (status.code < 300) { + const metadata = { author: this.connectedDid, messageCid }; + // response.subscription = new Subscription(this.agent, message as SubscriptionRequestMessage, metadata); + } + response.subscription?.emitter.on((event) => { + callback(event); + }); + return response; + } else { + // Step 1: Get address via DID document (To be fixed: resolve DID document) + const addr = "127.0.0.1:9002"; + + // Step 2: Create WebSocket + const socket = new WebSocket(addr); + + // Setup socket + socket.onmessage = (data) => { + // Parse message + const event = JSON.parse(data) as EventMessage; + // Run callback + callback(event); + }; + + socket.onopen = () => { + // Step 3: Send RPC request to endpoint + const request = JSON.stringify(dwnRequest) + socket.send(request); + }; + + } + } + }; + } + /** * TODO: Document namespace. */ @@ -118,13 +206,13 @@ export class DwnApi { */ configure: async (request: ProtocolsConfigureRequest): Promise => { const agentResponse = await this.agent.processDwnRequest({ - target : this.connectedDid, - author : this.connectedDid, - messageOptions : request.message, - messageType : DwnInterfaceName.Protocols + DwnMethodName.Configure + target: this.connectedDid, + author: this.connectedDid, + messageOptions: request.message, + messageType: DwnInterfaceName.Protocols + DwnMethodName.Configure }); - const { message, messageCid, reply: { status }} = agentResponse; + const { message, messageCid, reply: { status } } = agentResponse; const response: ProtocolsConfigureResponse = { status }; if (status.code < 300) { @@ -140,10 +228,10 @@ export class DwnApi { */ query: async (request: ProtocolsQueryRequest): Promise => { const agentRequest = { - author : this.connectedDid, - messageOptions : request.message, - messageType : DwnInterfaceName.Protocols + DwnMethodName.Query, - target : request.from || this.connectedDid + author: this.connectedDid, + messageOptions: request.message, + messageType: DwnInterfaceName.Protocols + DwnMethodName.Query, + target: request.from || this.connectedDid }; let agentResponse: DwnResponse; @@ -209,8 +297,8 @@ export class DwnApi { } return this.records.write({ - data : request.data, - message : { + data: request.data, + message: { ...inheritedProperties, ...request.message, }, @@ -222,10 +310,10 @@ export class DwnApi { */ delete: async (request: RecordsDeleteRequest): Promise => { const agentRequest = { - author : this.connectedDid, - messageOptions : request.message, - messageType : DwnInterfaceName.Records + DwnMethodName.Delete, - target : request.from || this.connectedDid + author: this.connectedDid, + messageOptions: request.message, + messageType: DwnInterfaceName.Records + DwnMethodName.Delete, + target: request.from || this.connectedDid }; let agentResponse; @@ -254,10 +342,10 @@ export class DwnApi { */ query: async (request: RecordsQueryRequest): Promise => { const agentRequest = { - author : this.connectedDid, - messageOptions : request.message, - messageType : DwnInterfaceName.Records + DwnMethodName.Query, - target : request.from || this.connectedDid + author: this.connectedDid, + messageOptions: request.message, + messageType: DwnInterfaceName.Records + DwnMethodName.Query, + target: request.from || this.connectedDid }; let agentResponse; @@ -272,8 +360,8 @@ export class DwnApi { const records = entries.map((entry: RecordsQueryReplyEntry) => { const recordOptions = { - author : this.connectedDid, - target : this.connectedDid, + author: this.connectedDid, + target: this.connectedDid, ...entry as RecordsWriteMessage }; const record = new Record(this.agent, recordOptions); @@ -288,10 +376,10 @@ export class DwnApi { */ read: async (request: RecordsReadRequest): Promise => { const agentRequest = { - author : this.connectedDid, - messageOptions : request.message, - messageType : DwnInterfaceName.Records + DwnMethodName.Read, - target : request.from || this.connectedDid + author: this.connectedDid, + messageOptions: request.message, + messageType: DwnInterfaceName.Records + DwnMethodName.Read, + target: request.from || this.connectedDid }; let agentResponse; @@ -316,8 +404,8 @@ export class DwnApi { let record: Record; if (200 <= status.code && status.code <= 299) { const recordOptions = { - author : this.connectedDid, - target : this.connectedDid, + author: this.connectedDid, + target: this.connectedDid, ...responseRecord, }; @@ -347,12 +435,12 @@ export class DwnApi { messageOptions.dataFormat = dataFormat; const agentResponse = await this.agent.processDwnRequest({ - author : this.connectedDid, - dataStream : dataBlob, + author: this.connectedDid, + dataStream: dataBlob, messageOptions, - messageType : DwnInterfaceName.Records + DwnMethodName.Write, - store : request.store, - target : this.connectedDid + messageType: DwnInterfaceName.Records + DwnMethodName.Write, + store: request.store, + target: this.connectedDid }); const { message, reply: { status } } = agentResponse; @@ -361,9 +449,9 @@ export class DwnApi { let record: Record; if (200 <= status.code && status.code <= 299) { const recordOptions = { - author : this.connectedDid, - encodedData : dataBlob, - target : this.connectedDid, + author: this.connectedDid, + encodedData: dataBlob, + target: this.connectedDid, ...responseMessage, }; diff --git a/packages/api/src/subscriptions.ts b/packages/api/src/subscriptions.ts new file mode 100644 index 000000000..d27089d2c --- /dev/null +++ b/packages/api/src/subscriptions.ts @@ -0,0 +1,40 @@ +import type { ProtocolsConfigure, SubscriptionRequest } from '@tbd54566975/dwn-sdk-js'; + +import type { Web5Agent } from '@web5/agent'; + +export type SubscriptionRequestMessage = SubscriptionRequest['message']; +type SubscriptionMetadata = { + author: string; + messageCid?: string; +}; + +export class Subscription { + private _agent: Web5Agent; + private _metadata: SubscriptionMetadata; + private _subscriptionRequestMessage: SubscriptionRequestMessage; + + get definition() { + return this._subscriptionRequestMessage.descriptor.definition; + } + + constructor(agent: Web5Agent, subscriptionRequestMessage: SubscriptionRequestMessage, metadata: SubscriptionMetadata) { + this._agent = agent; + this._metadata = metadata; + this._subscriptionRequestMessage = subscriptionRequestMessage; + } + + toJSON() { + return this._subscriptionRequestMessage; + } + + async send(target: string) { + const { reply } = await this._agent.sendDwnRequest({ + author : this._metadata.author, + messageCid : this._metadata.messageCid, + messageType : 'subscriptionRequest', + target : target, + }); + + return { status: reply.status }; + } +} \ No newline at end of file