Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Records Subscriptions in @web5/api #522

Merged
merged 6 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/lovely-rules-fold.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@web5/api": patch
---

Add `records.subscribe()` functionality to the DwnApi
60 changes: 58 additions & 2 deletions packages/api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The SDK is currently still under active development, but having entered the Tech
- [API Documentation](#api-documentation)
- [Web5.connect](#web5connectoptions)
- [web5.dwn.records.query](#web5dwnrecordsqueryrequest)
- [web5.dwn.records.subscribe](#web5dwnrecordssubscriberequest)
- [web5.dwn.records.create](#web5dwnrecordscreaterequest)
- [web5.dwn.records.write](#web5dwnrecordswriterequest)
- [web5.dwn.records.read](#web5dwnrecordsreadrequest)
Expand Down Expand Up @@ -233,6 +234,59 @@ The query `response` contains the following properties:
- **`records`** - _`Records array`_ (_optional_): the array of `Records` returned if the request was successful.
- **`cursor`** - _`messageCid string`_ (_optional_): the `messageCid` of the last message returned in the results if there are exist additional records beyond the specified `limit` in the `query`.

### **`web5.dwn.records.subscribe(request)`**

Method for subscribing to either the locally connected DWeb Node or any remote DWeb Node specified in the `from` property.

```javascript
// This invocation will subscribe the user's own DWeb Nodes

const subscriptionHandler = (record) => {
console.log("received", record);
};

const { status } = await web5.dwn.records.subscribe({
message: {
filter: {
protocol: "https://schema.org/protocols/social",
},
},
subscriptionHandler,
});

console.log(status.code === 200); // successful subscription

// This invocation will query Bob's DWeb Nodes
const { status } = await web5.dwn.records.query({
from: "did:example:bob",
message: {
filter: {
protocol: "https://schema.org/protocols/social",
},
},
subscriptionHandler,
});

console.log(status.code === 200); // successful subscription
```

#### **Request**

The query `request` contains the following properties:

- **`from`** - _`DID string`_ (_optional_): the decentralized identifier of the DWeb Node the subscribe will receive results from.
- **`message`** - _`object`_: the properties of the DWeb Node Message Descriptor that will be used to construct a valid record subscription:
- **`filter`** - _`object`_: properties against which results of the subscription will be filtered:
- **`recordId`** - _`string`_ (_optional_): the record ID string that identifies the record data you are fetching.
- **`protocol`** - _`URI string`_ (_optional_): the URI of the protocol bucket in which to subscribe to.
- **`protocolPath`** - _`string`_ (_optional_): the path to the record in the protocol configuration.
- **`contextId`** _`string`_ (_optional_): the `recordId` of a root record of a protocol.
- **`parentId`** _`string`_ (_optional_): the `recordId` of a the parent of a protocol record.
- **`recipient`** - _`string`_ (_optional_): the DID in the `recipient` field of the record.
- **`schema`** - _`URI string`_ (_optional_): the URI of the schema bucket in which to subscribe to.
- **`dataFormat`** - _`Media Type string`_ (_optional_): the IANA string corresponding with the format of the data to filter for. See IANA's Media Type list here: https://www.iana.org/assignments/media-types/media-types.xhtml
- **`subscriptionHandler`** - _`function`_: The handler function which emits a `Record` object when any matching records arrive.

### **`web5.dwn.records.create(request)`**

Method for creating a new record and storing it in the user's local DWeb Node, remote DWeb Nodes, or another party's DWeb Nodes (if permitted).
Expand Down Expand Up @@ -497,7 +551,9 @@ metadata associated with a DID.
#### **Usage**

```javascript
const { didDocument } = await web5.did.resolve('did:dht:qftx7z968xcpfy1a1diu75pg5meap3gdtg6ezagaw849wdh6oubo');
const { didDocument } = await web5.did.resolve(
"did:dht:qftx7z968xcpfy1a1diu75pg5meap3gdtg6ezagaw849wdh6oubo"
);
```

#### **Parameters**
Expand All @@ -514,7 +570,7 @@ The method returns a DID resolution result as a JavaScript object. The structure

#### **Notes**

- The resolution process for some DID methods like DID DHT involve network requests to the relevant DID verifiable
- The resolution process for some DID methods like DID DHT involve network requests to the relevant DID verifiable
data registry or a resolver endpoint, which may introduce latency based on the network conditions and the specific DID
method utilized.

Expand Down
89 changes: 81 additions & 8 deletions packages/api/src/dwn-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ import {
DwnMessage,
DwnResponse,
DwnMessageParams,
DwnMessageSubscription,
DwnResponseStatus,
CachedPermissions,
ProcessDwnRequest,
DwnPaginationCursor,
DwnDataEncodedRecordsWriteMessage,
AgentPermissionsApi
AgentPermissionsApi,
} from '@web5/agent';

import { isEmptyObject } from '@web5/common';
Expand All @@ -32,8 +32,7 @@ import { dataToBlob } from './utils.js';
import { Protocol } from './protocol.js';
import { PermissionGrant } from './permission-grant.js';
import { PermissionRequest } from './permission-request.js';
import { DwnMessagesPermissionScope } from '@web5/agent';
import { DwnRecordsPermissionScope } from '@web5/agent';
import { SubscriptionUtil } from './subscription-util.js';

/**
* Represents the request payload for fetching permission requests from a Decentralized Web Node (DWN).
Expand Down Expand Up @@ -179,7 +178,7 @@ export type RecordsQueryResponse = DwnResponseStatus & {

/** If there are additional results, the messageCid of the last record will be returned as a pagination cursor. */
cursor?: DwnPaginationCursor;
};
}

/**
* Represents a request to read a specific record from a Decentralized Web Node (DWN).
Expand All @@ -206,7 +205,36 @@ export type RecordsReadRequest = {
export type RecordsReadResponse = DwnResponseStatus & {
/** The record retrieved by the read operation. */
record: Record;
};
}

/** Subscription handler for Records */
export type RecordsSubscriptionHandler = (record: Record) => void;

/**
* Represents a request to subscribe to records from a Decentralized Web Node (DWN).
*
* This request type is used to specify the target DWN from which records matching the subscription
* criteria should be emitted. It's useful for being notified in real time when records are written, deleted or modified.
*/
export type RecordsSubscribeRequest = {
/** Optional DID specifying the remote target DWN tenant to subscribe from. */
from?: string;

/** The parameters for the subscription operation, detailing the criteria for the subscription filter */
message: Omit<DwnMessageParams[DwnInterface.RecordsSubscribe], 'signer'>;

/** The handler to process the subscription events */
subscriptionHandler: RecordsSubscriptionHandler;
}

/** Encapsulates the response from a DWN RecordsSubscriptionRequest */
export type RecordsSubscribeResponse = DwnResponseStatus & {
/**
* Represents the subscription that was created. Includes an ID and the close method to stop the subscription.
*
* */
subscription?: DwnMessageSubscription;
}

/**
* Defines a request to write (create) a record to a Decentralized Web Node (DWN).
Expand Down Expand Up @@ -252,7 +280,7 @@ export type RecordsWriteResponse = DwnResponseStatus & {
* DWN as a result of the write operation.
*/
record?: Record
};
}

/**
* Interface to interact with DWN Records and Protocols
Expand Down Expand Up @@ -582,7 +610,6 @@ export class DwnApi {

return { status };
},

/**
* Query a single or multiple records based on the given filter
*/
Expand Down Expand Up @@ -733,6 +760,52 @@ export class DwnApi {
return { record, status };
},

/**
* Subscribes to records based on the given filter and emits events to the `subscriptionHandler`.
*
* @param request must include the `message` with the subscription filter and the `subscriptionHandler` to process the events.
* @returns the subscription status and the subscription object used to close the subscription.
*/
subscribe: async (request: RecordsSubscribeRequest): Promise<RecordsSubscribeResponse> => {
const agentRequest: ProcessDwnRequest<DwnInterface.RecordsSubscribe> = {
/**
* The `author` is the DID that will sign the message and must be the DID the Web5 app is
* connected with and is authorized to access the signing private key of.
*/
author : this.connectedDid,
messageParams : request.message,
messageType : DwnInterface.RecordsSubscribe,
/**
* The `target` is the DID of the DWN tenant under which the subscribe operation will be executed.
* If `from` is provided, the subscribe operation will be executed on a remote DWN.
* Otherwise, the local DWN will execute the subscribe operation.
*/
target : request.from || this.connectedDid,

/**
* The handler to process the subscription events.
*/
subscriptionHandler: SubscriptionUtil.recordSubscriptionHandler({
agent : this.agent,
connectedDid : this.connectedDid,
request
})
};

let agentResponse: DwnResponse<DwnInterface.RecordsSubscribe>;

if (request.from) {
agentResponse = await this.agent.sendDwnRequest(agentRequest);
} else {
agentResponse = await this.agent.processDwnRequest(agentRequest);
}

const reply = agentResponse.reply;
const { status, subscription } = reply;

return { status, subscription };
},

/**
* Writes a record to the DWN
*
Expand Down
29 changes: 26 additions & 3 deletions packages/api/src/record.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import {
DwnDateSort,
DwnPaginationCursor,
isDwnMessage,
SendDwnRequest
SendDwnRequest,
isRecordsWrite
} from '@web5/agent';

import { Convert, isEmptyObject, NodeStream, removeUndefinedProperties, Stream } from '@web5/common';
Expand Down Expand Up @@ -72,10 +73,22 @@ export type RecordModel = ImmutableRecordProperties & OptionalRecordProperties &
*
* @beta
*/
export type RecordOptions = DwnMessage[DwnInterface.RecordsWrite] & {
export type RecordOptions = DwnMessage[DwnInterface.RecordsWrite | DwnInterface.RecordsDelete] & {
/** The DID that signed the record. */
author: string;

/** The attestation signature(s) for the record. */
attestation?: DwnMessage[DwnInterface.RecordsWrite]['attestation'];

/** The encryption information for the record. */
encryption?: DwnMessage[DwnInterface.RecordsWrite]['encryption'];

/** The contextId associated with the record. */
contextId?: string;

/** The unique identifier of the record */
recordId?: string;

/** The DID of the DWN tenant under which record operations are being performed. */
connectedDid: string;

Expand Down Expand Up @@ -360,7 +373,7 @@ export class Record implements RecordModel {
this._descriptor = options.descriptor;
this._encryption = options.encryption;
this._initialWrite = options.initialWrite;
this._recordId = options.recordId;
this._recordId = this.isRecordsDeleteDescriptor(options.descriptor) ? options.descriptor.recordId : options.recordId;
this._protocolRole = options.protocolRole;

if (options.encodedData) {
Expand Down Expand Up @@ -539,6 +552,7 @@ export class Record implements RecordModel {
/**
* Send the current record to a remote DWN by specifying their DID
* If no DID is specified, the target is assumed to be the owner (connectedDID).
*
* If an initial write is present and the Record class send cache has no awareness of it, the initial write is sent first
* (vs waiting for the regular DWN sync)
*
Expand Down Expand Up @@ -949,4 +963,13 @@ export class Record implements RecordModel {
}
}
}

/**
* Checks if the descriptor is a RecordsDelete descriptor.
*
* @param descriptor a RecordsWrite or RecordsDelete descriptor
*/
private isRecordsDeleteDescriptor(descriptor: DwnMessageDescriptor[DwnInterface.RecordsWrite | DwnInterface.RecordsDelete]): descriptor is DwnMessageDescriptor[DwnInterface.RecordsDelete] {
return descriptor.interface + descriptor.method === DwnInterface.RecordsDelete;
}
}
33 changes: 33 additions & 0 deletions packages/api/src/subscription-util.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { DwnRecordSubscriptionHandler, getRecordAuthor, Web5Agent } from '@web5/agent';
import { RecordsSubscribeRequest } from './dwn-api.js';
import { Record } from './record.js';

/**
* Utility class for dealing with subscriptions.
*/
export class SubscriptionUtil {
/**
* Creates a record subscription handler that can be used to process incoming {Record} messages.
*/
static recordSubscriptionHandler({ agent, connectedDid, request }:{
agent: Web5Agent;
connectedDid: string;
request: RecordsSubscribeRequest;
}): DwnRecordSubscriptionHandler {
const { subscriptionHandler, from: remoteOrigin } = request;

return async (event) => {
const { message, initialWrite } = event;
const author = getRecordAuthor(message);
const recordOptions = {
author,
connectedDid,
remoteOrigin,
initialWrite
};

const record = new Record(agent, { ...message, ...recordOptions });
subscriptionHandler(record);
};
}
}
Loading
Loading