diff --git a/README.md b/README.md index 8a52de861..e81680761 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,14 @@ # Decentralized Web Node (DWN) SDK +Code Coverage +![Statements](https://img.shields.io/badge/statements-96.86%25-brightgreen.svg?style=flat) ![Branches](https://img.shields.io/badge/branches-93.91%25-brightgreen.svg?style=flat) ![Functions](https://img.shields.io/badge/functions-93.38%25-brightgreen.svg?style=flat) ![Lines](https://img.shields.io/badge/lines-96.86%25-brightgreen.svg?style=flat) [![NPM](https://img.shields.io/npm/v/@tbd54566975/dwn-sdk-js.svg?logo=npm)](https://www.npmjs.com/package/@tbd54566975/dwn-sdk-js) [![codecov](https://codecov.io/github/TBD54566975/dwn-sdk-js/graphs/badge.svg)](https://codecov.io/github/TBD54566975/dwn-sdk-js) [![Build Status](https://img.shields.io/github/actions/workflow/status/TBD54566975/dwn-sdk-js/npm-publish-unstable.yml?branch=main&logo=github)](https://github.com/tbd54566975/dwn-sdk-js/actions/workflows/npm-publish-unstable.yml) [![License](https://img.shields.io/npm/l/@tbd54566975/dwn-sdk-js.svg?logo=apache)](https://github.com/tbd54566975/dwn-sdk-js/blob/main/LICENSE) [![Chat](https://img.shields.io/badge/chat-on%20discord-7289da.svg?logo=discord)](https://discord.com/channels/937858703112155166/1068273971432280196) - - [Introduction](#introduction) - [Installation](#installation) - [Additional Steps](#additional-steps) diff --git a/build/compile-validators.js b/build/compile-validators.js index c5e397bd6..aed7f0717 100644 --- a/build/compile-validators.js +++ b/build/compile-validators.js @@ -44,6 +44,7 @@ import RecordsRead from '../json-schemas/interface-methods/records-read.json' as import RecordsWrite from '../json-schemas/interface-methods/records-write.json' assert { type: 'json' }; import RecordsWriteSignaturePayload from '../json-schemas/signature-payloads/records-write-signature-payload.json' assert { type: 'json' }; import RecordsWriteUnidentified from '../json-schemas/interface-methods/records-write-unidentified.json' assert { type: 'json' }; +import SubscriptionsRequest from '../json-schemas/interface-methods/subscriptions-request.json' assert { type: 'json' }; const schemas = { Authorization, @@ -72,6 +73,7 @@ const schemas = { RecordsFilter, PublicJwk, GenericSignaturePayload, + SubscriptionsRequest, RecordsWriteSignaturePayload }; diff --git a/json-schemas/interface-methods/events-create.json b/json-schemas/interface-methods/events-create.json new file mode 100644 index 000000000..546dcc43a --- /dev/null +++ b/json-schemas/interface-methods/events-create.json @@ -0,0 +1,54 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "https://identity.foundation/dwn/json-schemas/events-create.json", + "type": "object", + "additionalProperties": false, + "required": [ + "descriptor" + ], + "properties": { + "authorization": { + "$ref": "https://identity.foundation/dwn/json-schemas/authorization.json" + }, + "descriptor": { + "type": "object", + "additionalProperties": true, + "required": [ + ], + "properties": { + "interface": { + "enum": [ + "Events" + ], + "type": "string" + }, + "method": { + "enum": [ + "Create" + ], + "type": "string" + }, + "messageTimestamp": { + "type": "string" + }, + "type": { + "type": "string" + }, + "scope": { + "type": "object", + "properties": { + "eventType": { + "enum": [ + "Log", + "Message", + "Operation", + "Sync" + ], + "type": "string" + } + } + } + } + } + } + } diff --git a/json-schemas/interface-methods/subscriptions-request.json b/json-schemas/interface-methods/subscriptions-request.json new file mode 100644 index 000000000..6f0577310 --- /dev/null +++ b/json-schemas/interface-methods/subscriptions-request.json @@ -0,0 +1,51 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "https://identity.foundation/dwn/json-schemas/subscriptions-request.json", + "type": "object", + "additionalProperties": false, + "required": [ + "descriptor" + ], + "properties": { + "authorization": { + "$ref": "https://identity.foundation/dwn/json-schemas/authorization.json" + }, + "descriptor": { + "type": "object", + "additionalProperties": false, + "required": [ + ], + "properties": { + "interface": { + "enum": [ + "Subscriptions" + ], + "type": "string" + }, + "method": { + "enum": [ + "Request" + ], + "type": "string" + }, + "messageTimestamp": { + "type": "string" + }, + "scope": { + "type": "object", + "properties": { + "eventType": { + "enum": [ + "Log", + "Message", + "Operation", + "Sync" + ], + "type": "string" + } + } + } + } + } + } + } diff --git a/json-schemas/permissions/permissions-definitions.json b/json-schemas/permissions/permissions-definitions.json index f2068ae01..5e811675f 100644 --- a/json-schemas/permissions/permissions-definitions.json +++ b/json-schemas/permissions/permissions-definitions.json @@ -25,6 +25,9 @@ }, { "$ref": "https://identity.foundation/dwn/json-schemas/permissions/scopes.json#/definitions/records-write-scope" + }, + { + "$ref": "https://identity.foundation/dwn/json-schemas/permissions/scopes.json#/definitions/subscriptions-request-scope" } ] }, diff --git a/json-schemas/permissions/scopes.json b/json-schemas/permissions/scopes.json index 1b53c0986..23cffccf9 100644 --- a/json-schemas/permissions/scopes.json +++ b/json-schemas/permissions/scopes.json @@ -85,6 +85,33 @@ "type": "string" } } - } + }, + "subscriptions-request-scope": { + "type": "object", + "required": [ + "interface", + "method" + ], + "properties": { + "interface": { + "const": "Subscriptions" + }, + "method": { + "const": "Request" + }, + "schema": { + "type": "string" + }, + "protocol": { + "type": "string" + }, + "contextId": { + "type": "string" + }, + "protocolPath": { + "type": "string" + } + } + } } } diff --git a/src/core/dwn-error.ts b/src/core/dwn-error.ts index 5886b07ee..cadfabd67 100644 --- a/src/core/dwn-error.ts +++ b/src/core/dwn-error.ts @@ -131,4 +131,11 @@ export enum DwnErrorCode { UrlProtocolNotNormalizable = 'UrlProtocolNotNormalizable', UrlSchemaNotNormalized = 'UrlSchemaNotNormalized', UrlSchemaNotNormalizable = 'UrlSchemaNotNormalizable', + SubscriptionsGrantAuthorizationConditionPublicationProhibited = 'SubscriptionsGrantAuthorizationConditionPublicationProhibited', + SubscriptionsGrantAuthorizationConditionPublicationRequired = 'SubscriptionsGrantAuthorizationConditionPublicationRequired', + SubscriptionsGrantAuthorizationScopeContextIdMismatch = 'SubscriptionsGrantAuthorizationScopeContextIdMismatch', + SubscriptionsGrantAuthorizationScopeNotProtocol = 'SubscriptionsGrantAuthorizationScopeNotProtocol', + SubscriptionsGrantAuthorizationScopeProtocolMismatch = 'SubscriptionsGrantAuthorizationScopeProtocolMismatch', + SubscriptionsGrantAuthorizationScopeProtocolPathMismatch = 'SubscriptionsGrantAuthorizationScopeProtocolPathMismatch', + SubscriptionsGrantAuthorizationScopeSchema = 'SubscriptionsGrantAuthorizationScopeSchema' }; diff --git a/src/core/grant-authorization.ts b/src/core/grant-authorization.ts index 62476834a..eeb67d49c 100644 --- a/src/core/grant-authorization.ts +++ b/src/core/grant-authorization.ts @@ -19,9 +19,7 @@ export class GrantAuthorization { permissionsGrantId: string, messageStore: MessageStore, ): Promise { - const incomingMessageDescriptor = incomingMessage.message.descriptor; - // Fetch grant const permissionsGrantMessage = await GrantAuthorization.fetchGrant(tenant, messageStore, permissionsGrantId); diff --git a/src/core/message.ts b/src/core/message.ts index 4ffcfed87..59d621128 100644 --- a/src/core/message.ts +++ b/src/core/message.ts @@ -1,21 +1,23 @@ -import type { GeneralJws } from '../types/jws-types.js'; -import type { Signer } from '../types/signer.js'; -import type { AuthorizationModel, Descriptor, GenericMessage, GenericSignaturePayload } from '../types/message-types.js'; - import { Cid } from '../utils/cid.js'; import { Encoder } from '../index.js'; +import type { GeneralJws } from '../types/jws-types.js'; import { GeneralJwsBuilder } from '../jose/jws/general/builder.js'; import { Jws } from '../utils/jws.js'; import { lexicographicalCompare } from '../utils/string.js'; import { removeUndefinedProperties } from '../utils/object.js'; +import type { Signer } from '../types/signer.js'; import { validateJsonSchema } from '../schema-validator.js'; +import type { AuthorizationModel, Descriptor, GenericMessage, GenericSignaturePayload } from '../types/message-types.js'; + export enum DwnInterfaceName { Events = 'Events', Messages = 'Messages', Permissions = 'Permissions', Protocols = 'Protocols', - Records = 'Records' + Records = 'Records', + Snapshots = 'Snapshots', + Subscriptions = 'Subscriptions' // Used to grant subscriptions. } export enum DwnMethodName { diff --git a/src/core/records-grant-authorization.ts b/src/core/records-grant-authorization.ts index a49474560..eab5e5790 100644 --- a/src/core/records-grant-authorization.ts +++ b/src/core/records-grant-authorization.ts @@ -1,11 +1,11 @@ +import { GrantAuthorization } from './grant-authorization.js'; import type { MessageStore } from '../types/message-store.js'; +import { PermissionsConditionPublication } from '../types/permissions-types.js'; import type { RecordsRead } from '../interfaces/records-read.js'; import type { RecordsWrite } from '../interfaces/records-write.js'; -import type { PermissionsGrantMessage, RecordsPermissionScope } from '../types/permissions-types.js'; -import { GrantAuthorization } from './grant-authorization.js'; -import { PermissionsConditionPublication } from '../types/permissions-types.js'; import { DwnError, DwnErrorCode } from './dwn-error.js'; +import type { PermissionsGrantMessage, RecordsPermissionScope } from '../types/permissions-types.js'; export class RecordsGrantAuthorization { /** @@ -161,4 +161,4 @@ export class RecordsGrantAuthorization { return grantScope.protocol === undefined && grantScope.schema === undefined; } -} +} \ No newline at end of file diff --git a/src/core/subscriptions-grant-authorization.ts b/src/core/subscriptions-grant-authorization.ts new file mode 100644 index 000000000..d4af4b15b --- /dev/null +++ b/src/core/subscriptions-grant-authorization.ts @@ -0,0 +1,160 @@ +import type { EventMessage } from '../interfaces/event-create.js'; +import { GrantAuthorization } from './grant-authorization.js'; +import type { MessageStore } from '../types/message-store.js'; +import type { SubscriptionRequest } from '../interfaces/subscription-request.js'; + +import { DwnError, DwnErrorCode } from './dwn-error.js'; +import type { PermissionsGrantMessage, SubscriptionPermissionScope } from '../types/permissions-types.js'; + +export class SubscriptionsGrantAuthorization { + + /** + * Authorizes the scope of a PermissionsGrant for Subscription. + * For initial connection setup. + */ + public static async authorizeSubscribe( + tenant: string, + incomingMessage: SubscriptionRequest, + author: string, + permissionGrantId: string, + messageStore: MessageStore, + ): Promise { + const permissionsGrantMessage = await GrantAuthorization.authorizeGenericMessage( + tenant, incomingMessage, + author, permissionGrantId, messageStore); + SubscriptionsGrantAuthorization.verifyScope(incomingMessage, permissionsGrantMessage); + } + + /** + * Authorizes the scope of a PermissionsGrant for Subscription. + * For initial connection setup. + * + * @andorsk Improve/Update this and remove hack. + */ + public static async authorizeEvent( + tenant: string, + incomingMessage: SubscriptionRequest, + event: EventMessage, + permissionGrantId: string, + messageStore: MessageStore, + author: string, + ): Promise { + // 1. Get Grant from initial Subscription Request. Check if it's still valid. + // Problem : Needs to check NEW message time. + incomingMessage.message.descriptor.messageTimestamp = event.message.descriptor.messageTimestamp; // HACK! + const permissionsGrantMessage = await GrantAuthorization.authorizeGenericMessage( + tenant, incomingMessage, author, + permissionGrantId, messageStore); + // 2. Check When Grant Was Valid + // 3. Check Event is Valid within Grant. + // 4. Verify Scope + SubscriptionsGrantAuthorization.verifyScope(incomingMessage, permissionsGrantMessage); + } + + /** + * @param subscriptionRequest The source of the record being authorized. + */ + private static verifyEventScope( + subscriptionRequest: SubscriptionRequest, + event: EventMessage, + permissionsGrantMessage: PermissionsGrantMessage, + ): void { + const grantScope = permissionsGrantMessage.descriptor.scope as SubscriptionPermissionScope; + if (SubscriptionsGrantAuthorization.isUnrestrictedScope(grantScope)) { + // scope has no restrictions beyond interface and method. Message is authorized to access any record. + return; + } else if (subscriptionRequest.message.descriptor.scope.recordFilters?.protocol !== undefined) { + // authorization of protocol records must have grants that explicitly include the protocol + SubscriptionsGrantAuthorization.authorizeProtocolRecord(subscriptionRequest, grantScope); + } else { + SubscriptionsGrantAuthorization.authorizeFlatRecord(subscriptionRequest, grantScope); + } + } + + /** + * @param subscriptionRequest The source of the record being authorized. + */ + private static verifyScope( + subscriptionRequest: SubscriptionRequest, + permissionsGrantMessage: PermissionsGrantMessage, + ): void { + const grantScope = permissionsGrantMessage.descriptor.scope as SubscriptionPermissionScope; + if (SubscriptionsGrantAuthorization.isUnrestrictedScope(grantScope)) { + // scope has no restrictions beyond interface and method. Message is authorized to access any record. + return; + } else if (subscriptionRequest.message.descriptor.scope.recordFilters?.protocol !== undefined) { + // authorization of protocol records must have grants that explicitly include the protocol + SubscriptionsGrantAuthorization.authorizeProtocolRecord(subscriptionRequest, grantScope); + } else { + SubscriptionsGrantAuthorization.authorizeFlatRecord(subscriptionRequest, grantScope); + } + } + + /** + * Checks if scope has no restrictions beyond interface and method. + * Grant-holder is authorized to access any record. + */ + private static isUnrestrictedScope(grantScope: SubscriptionPermissionScope): boolean { + return grantScope.protocol === undefined && + grantScope.schema === undefined && + grantScope.eventType == undefined; + } + + /** + * Authorizes a grant scope for a protocol record + */ + private static authorizeProtocolRecord( + subscriptionRequest: SubscriptionRequest, + grantScope: SubscriptionPermissionScope + ): void { + // Protocol records must have grants specifying the protocol + if (grantScope.protocol === undefined) { + throw new DwnError( + DwnErrorCode.SubscriptionsGrantAuthorizationScopeNotProtocol, + 'Grant for protocol subscription must specify protocol in its scope' + ); + } + + // The record's protocol must match the protocol specified in the record + if (grantScope.protocol !== subscriptionRequest.message.descriptor.scope.recordFilters?.protocol) { + throw new DwnError( + DwnErrorCode.SubscriptionsGrantAuthorizationScopeProtocolMismatch, + `Grant scope specifies different protocol than what appears in the subscription` + ); + } + + // If grant specifies either contextId, check that record is that context + if (grantScope.contextId !== undefined && grantScope.contextId !== subscriptionRequest.message.descriptor.scope.recordFilters?.contextId) { + throw new DwnError( + DwnErrorCode.SubscriptionsGrantAuthorizationScopeContextIdMismatch, + `Grant scope specifies different contextId than what appears in the subscription` + ); + } + + // If grant specifies protocolPath, check that record is at that protocolPath + if (grantScope.protocolPath !== undefined && grantScope.protocolPath !== + subscriptionRequest.message.descriptor.scope.recordFilters?.protocolPath) { + throw new DwnError( + DwnErrorCode.SubscriptionsGrantAuthorizationScopeProtocolPathMismatch, + `Grant scope specifies different protocolPath than what appears in the subscription` + ); + } + } + + /** + * Authorizes a grant scope for a non-protocol record + */ + private static authorizeFlatRecord( + subscriptionRequest: SubscriptionRequest, + grantScope: SubscriptionPermissionScope + ): void { + if (grantScope.schema !== undefined) { + if (grantScope.schema !== subscriptionRequest.message.descriptor.scope.recordFilters?.schema) { + throw new DwnError( + DwnErrorCode.RecordsGrantAuthorizationScopeSchema, + `Record does not have schema in PermissionsGrant scope with schema '${grantScope.schema}'` + ); + } + } + } +} \ No newline at end of file diff --git a/src/dwn.ts b/src/dwn.ts index 1c46a1fdb..f11a58874 100644 --- a/src/dwn.ts +++ b/src/dwn.ts @@ -1,31 +1,32 @@ -import type { DataStore } from './types/data-store.js'; -import type { EventLog } from './types/event-log.js'; -import type { GenericMessage } from './types/message-types.js'; -import type { MessageStore } from './types/message-store.js'; -import type { MethodHandler } from './types/method-handler.js'; -import type { Readable } from 'readable-stream'; -import type { RecordsWriteHandlerOptions } from './handlers/records-write.js'; -import type { TenantGate } from './core/tenant-gate.js'; -import type { EventsGetMessage, EventsGetReply, PermissionsGrantMessage, PermissionsRequestMessage, PermissionsRevokeMessage, ProtocolsConfigureMessage, ProtocolsQueryMessage, ProtocolsQueryReply } from './index.js'; -import type { GenericMessageReply, UnionMessageReply } from './core/message-reply.js'; -import type { MessagesGetMessage, MessagesGetReply } from './types/messages-types.js'; -import type { RecordsDeleteMessage, RecordsQueryMessage, RecordsQueryReply, RecordsReadMessage, RecordsReadReply, RecordsWriteMessage } from './types/records-types.js'; - import { AllowAllTenantGate } from './core/tenant-gate.js'; +import type { DataStore } from './types/data-store.js'; import { DidResolver } from './did/did-resolver.js'; +import type { EventLog } from './types/event-log.js'; import { EventsGetHandler } from './handlers/events-get.js'; +import type { GenericMessage } from './types/message-types.js'; import { messageReplyFromError } from './core/message-reply.js'; import { MessagesGetHandler } from './handlers/messages-get.js'; +import type { MessageStore } from './types/message-store.js'; +import type { MethodHandler } from './types/method-handler.js'; import { PermissionsGrantHandler } from './handlers/permissions-grant.js'; import { PermissionsRequestHandler } from './handlers/permissions-request.js'; import { PermissionsRevokeHandler } from './handlers/permissions-revoke.js'; import { ProtocolsConfigureHandler } from './handlers/protocols-configure.js'; import { ProtocolsQueryHandler } from './handlers/protocols-query.js'; +import type { Readable } from 'readable-stream'; import { RecordsDeleteHandler } from './handlers/records-delete.js'; import { RecordsQueryHandler } from './handlers/records-query.js'; import { RecordsReadHandler } from './handlers/records-read.js'; import { RecordsWriteHandler } from './handlers/records-write.js'; +import type { RecordsWriteHandlerOptions } from './handlers/records-write.js'; +import { SubscriptionsRequestHandler } from './handlers/subscriptions-request.js'; +import type { TenantGate } from './core/tenant-gate.js'; + import { DwnInterfaceName, DwnMethodName, Message } from './core/message.js'; +import { EventsGetMessage, EventsGetReply, EventStream, EventStreamI, PermissionsGrantMessage, PermissionsRequestMessage, PermissionsRevokeMessage, ProtocolsConfigureMessage, ProtocolsQueryMessage, ProtocolsQueryReply, SubscriptionRequestMessage, SubscriptionRequestReply } from './index.js'; +import type { GenericMessageReply, UnionMessageReply } from './core/message-reply.js'; +import type { MessagesGetMessage, MessagesGetReply } from './types/messages-types.js'; +import type { RecordsDeleteMessage, RecordsQueryMessage, RecordsQueryReply, RecordsReadMessage, RecordsReadReply, RecordsWriteMessage } from './types/records-types.js'; export class Dwn { private methodHandlers: { [key:string]: MethodHandler }; @@ -34,6 +35,7 @@ export class Dwn { private dataStore: DataStore; private eventLog: EventLog; private tenantGate: TenantGate; + private eventStream?: EventStreamI; private constructor(config: DwnConfig) { this.didResolver = config.didResolver!; @@ -41,6 +43,8 @@ export class Dwn { this.messageStore = config.messageStore; this.dataStore = config.dataStore; this.eventLog = config.eventLog; + this.eventStream = config.eventStream + ? config.eventStream : new EventStream(); this.methodHandlers = { [DwnInterfaceName.Events + DwnMethodName.Get] : new EventsGetHandler(this.didResolver, this.eventLog), @@ -60,6 +64,18 @@ export class Dwn { [DwnInterfaceName.Records + DwnMethodName.Read] : new RecordsReadHandler(this.didResolver, this.messageStore, this.dataStore), [DwnInterfaceName.Records + DwnMethodName.Write] : new RecordsWriteHandler(this.didResolver, this.messageStore, this.dataStore, this.eventLog), }; + + // only add subscriptions if event stream is enabled. + if (this.eventStream !== undefined) { + this.methodHandlers[ + DwnInterfaceName.Subscriptions + DwnMethodName.Request + ] = new SubscriptionsRequestHandler( + this.didResolver, + this.messageStore, + this.dataStore, + this.eventStream + ); + } } /** @@ -79,12 +95,18 @@ export class Dwn { await this.messageStore.open(); await this.dataStore.open(); await this.eventLog.open(); + if (this.eventStream) { + await this.eventStream.open(); + } } public async close(): Promise { this.messageStore.close(); this.dataStore.close(); this.eventLog.close(); + if (this.eventStream) { + await this.eventStream.close(); + } } /** @@ -153,6 +175,32 @@ export class Dwn { } } + /** + * Handles a `Subscription Request` message. + */ + public async handleSubscriptionRequest( + tenant: string, + message: SubscriptionRequestMessage): Promise { + const errorMessageReply = + (await this.validateTenant(tenant)) ?? + (await this.validateMessageIntegrity( + message, + DwnInterfaceName.Subscriptions, + DwnMethodName.Request + )); + if (errorMessageReply !== undefined) { + return errorMessageReply; + } + const handler = new SubscriptionsRequestHandler( + this.didResolver, + this.messageStore, + this.dataStore, + this.eventStream as EventStream + ); + return handler.handle({ tenant, message }); + } + + /** * Validates structure of DWN message * @param tenant The tenant DID to route the given message to. @@ -206,4 +254,5 @@ export type DwnConfig = { messageStore: MessageStore; dataStore: DataStore; eventLog: EventLog + eventStream?: EventStreamI; }; diff --git a/src/event-log/event-log-level.ts b/src/event-log/event-log-level.ts index 6272fbf2f..9c1bc45e0 100644 --- a/src/event-log/event-log-level.ts +++ b/src/event-log/event-log-level.ts @@ -1,9 +1,12 @@ +import type { EventStreamI } from './event-stream.js'; import type { LevelWrapperBatchOperation } from '../store/level-wrapper.js'; +import { monotonicFactory } from 'ulidx'; import type { ULIDFactory } from 'ulidx'; -import type { Event, EventLog, GetEventsOptions } from '../types/event-log.js'; -import { monotonicFactory } from 'ulidx'; import { createLevelDatabase, LevelWrapper } from '../store/level-wrapper.js'; +import type { Event, EventLog, GetEventsOptions } from '../types/event-log.js'; + +// import { EventMessage } from '../interfaces/event-create.js'; type EventLogLevelConfig = { /** @@ -13,6 +16,7 @@ type EventLogLevelConfig = { */ location?: string, createLevelDatabase?: typeof createLevelDatabase, + eventStream?: EventStreamI, }; const WATERMARKS_SUBLEVEL_NAME = 'watermarks'; @@ -60,6 +64,17 @@ export class EventLogLevel implements EventLog { await watermarkLog.put(watermark, messageCid); await cidLog.put(messageCid, watermark); + // if (this.config.eventStream) { + // @andorsk: Either add signing or allow for event creation without signing. + // const logMessage = await EventMessage.create({ + // descriptor: { + // type: EventType.Log, + // messageTimestamp: "asfd", + // }, + // authorizationSignatureInput : Jws.createSignatureInput(tennat), + // }) + // this.config.eventStream.add(logMessage) + // } return watermark; } diff --git a/src/event-log/event-stream.ts b/src/event-log/event-stream.ts new file mode 100644 index 000000000..f30eb35c9 --- /dev/null +++ b/src/event-log/event-stream.ts @@ -0,0 +1,188 @@ +import { EventEmitter } from 'events'; +import type { EventMessage } from '../interfaces/event-create.js'; +import type { EventType } from '../types/event-types.js'; +import type { RecordsFilter } from '../types/records-types.js'; + +export type CallbackQueryRequest = RecordsFilter & { + eventType?: EventType; +}; + +const eventChannel = 'event'; + +// EventStream is a sinked stream for Events +export interface EventStreamI { + add(e: EventMessage): Promise; + + on(f: (e: EventMessage) => void): EventEmitter; + createChild( + filter?: (e: EventMessage) => Promise, + transform?: (e: EventMessage) => Promise + ): Promise; + open(): Promise; + close(): Promise; + clear(): Promise; + id(): string; // returns the id +} + +export const defaultConfig = { + channelNames: { + event : 'event', + sync : 'sync', + operation : 'operation', + log : 'log', + message : 'message', + }, +}; + +type EventStreamConfig = { + channelNames?: { + sync: string; + operation: string; + message: string; + log: string; + }; + emitter?: EventEmitter; +}; + +/* + * Event stream provides a single pipeline for + * Event data to pass through. + * It allows for developers to attach multiple callback functions + * To an event stream, and also allows event buffering + * if needed. + * + * A few known use cases: + * - attaching a logger to the end of a event stream + * - attaching telemetry to the event stream. + * - attaching callback functions for subscription use case. + * + * Note: We are purposely not queueing jobs in right now, so + * there is no internal state handling, but you could make an event + * stream some kafka like streamer if you wanted. + */ +export class EventStream implements EventStreamI { + private isOpen: boolean = false; + private eventEmitter: EventEmitter; + private config: EventStreamConfig; + #id: string; + #parentId: string = ''; + + constructor(config?: EventStreamConfig) { + let emitter: EventEmitter; + if (config?.emitter === undefined) { + emitter = new EventEmitter(); + } else { + emitter = config.emitter; + } + this.#id = this.genUniqueId(); + + const channelConfig = { + ...(defaultConfig.channelNames || {}), + ...(config?.channelNames || {}), + }; + + this.config = { + channelNames : channelConfig, + emitter : emitter, + }; + + this.eventEmitter = emitter; + } + + id(): string { + return this.#id; + } + + // improve. temporary. just for now.... + genUniqueId(): string { + const dateStr = Date.now().toString(36); // convert num to base 36 and stringify + const randomStr = Math.random().toString(36).substring(2, 8); // start at index 2 to skip decimal point + + return `${dateStr}-${randomStr}`; + } + + on(f: (e: EventMessage) => void): EventEmitter { + return this.eventEmitter.on(eventChannel, (event) => { + f(event); + }); + } + + async createChild( + filter?: (e: EventMessage) => Promise, + transform?: (e: EventMessage) => Promise + ): Promise { + return new Promise((resolve, reject) => { + const childConfig: EventStreamConfig = { + emitter: new EventEmitter(), + }; + const childStream = new EventStream(childConfig); + childStream.#parentId = this.#id; + // console.log( + // "Created event stream", + // childStream.id(), + // "from parent", + // this.id() + // ); + + const eventListener = async (event: EventMessage): Promise => { + try { + if (!filter || (await filter(event))) { + // If a filter is provided and it passes, emit the event in the child stream + if (transform) { + // If a transform function is provided, apply it to the event + const transformedEvent = await transform(event); + childStream.add(transformedEvent); + } else { + childStream.add(event); + } + } + } catch (error) { + reject(error); + console.error('Error processing event:', error); + } + }; + + // Attach the event handler to the parent stream + this.eventEmitter.on(eventChannel, eventListener); + + // Resolve the promise with childStream + resolve(childStream); + }); + } + + async close(): Promise { + this.isOpen = false; + } + + async clear(): Promise { + throw new Error('clear not available in event emitter...'); + } + + async open(): Promise { + this.isOpen = true; + } + + private async emitEvent(e: EventMessage): Promise { + if (e.message.descriptor === undefined) { + throw new Error('descriptor not defined'); + } + this.eventEmitter.emit(eventChannel, e); + } + + // adds to the event stream. + // right now, we are doing some very basic callback handling. + // but in cases of high performance, + // an internal queue state can be maintained. + // which can be used to improve resiliance + // for event processing. + async add(e: EventMessage): Promise { + if (!this.isOpen) { + throw new Error('Event stream is not open. Cannot add to the stream.'); + } + try { + this.emitEvent(e); + } catch (error) { + throw error; // You can choose to handle or propagate the error as needed. + } + } +} diff --git a/src/handlers/subscriptions-request.ts b/src/handlers/subscriptions-request.ts new file mode 100644 index 000000000..54d44ca9c --- /dev/null +++ b/src/handlers/subscriptions-request.ts @@ -0,0 +1,103 @@ +import { authenticate } from '../core/auth.js'; +import type { EventMessage } from '../interfaces/event-create.js'; +import type { EventStreamI } from '../event-log/event-stream.js'; +import { messageReplyFromError } from '../core/message-reply.js'; +import type { MethodHandler } from '../types/method-handler.js'; +import { SubscriptionRequest } from '../interfaces/subscription-request.js'; + +import type { DataStore, DidResolver, MessageStore } from '../index.js'; +import type { + SubscriptionRequestMessage, + SubscriptionRequestReply, +} from '../types/subscriptions-request.js'; + +export class SubscriptionsRequestHandler implements MethodHandler { + constructor( + private didResolver: DidResolver, + private messageStore: MessageStore, + private dataStore: DataStore, + private eventStream: EventStreamI + ) {} + + public async handle({ + tenant, + message, + }: { + tenant: string; + message: SubscriptionRequestMessage; + }): Promise { + let subscriptionRequest: SubscriptionRequest; + try { + subscriptionRequest = await SubscriptionRequest.parse(message); + } catch (e) { + return messageReplyFromError(e, 400); + } + + // authentication + try { + if (subscriptionRequest.author !== undefined) { + await authenticate(message.authorization!, this.didResolver); + } + } catch (e) { + return messageReplyFromError(e, 401); + } + + try { + await subscriptionRequest.authorize(tenant, this.messageStore); + } catch (error) { + return messageReplyFromError(error, 401); + } + + try { + const filterFunction = async (event: EventMessage): Promise => { + try { + await authenticate(event.message.authorization!, this.didResolver); + await subscriptionRequest.authorizeEvent( + tenant, + event, + this.messageStore + ); + return true; + } catch (error) { + return false; + } + }; + + const synchronousFilterFunction = async ( + event: EventMessage + ): Promise => { + // Wrap the asynchronous filter function with synchronous behavior + try { + const result = await filterFunction(event); + // console.log( + // "filtering", + // event, + // "result", + // result, + // "descriptor", + // event.message.descriptor + // ); + return result; + } catch { + return false; + } + }; + + const childStream = await this.eventStream.createChild( + synchronousFilterFunction + ); + await childStream.open(); + + const messageReply: SubscriptionRequestReply = { + status : { code: 200, detail: 'OK' }, + subscription : { + emitter : childStream, + filter : subscriptionRequest.message.descriptor.scope, + }, + }; + return messageReply; + } catch (error) { + return messageReplyFromError(error, 401); + } + } +} diff --git a/src/index.ts b/src/index.ts index 70bbfa13d..2d91ec799 100644 --- a/src/index.ts +++ b/src/index.ts @@ -24,8 +24,10 @@ export { DwnError, DwnErrorCode } from './core/dwn-error.js'; export { DwnInterfaceName, DwnMethodName } from './core/message.js'; export { Encoder } from './utils/encoder.js'; export { EventsGet, EventsGetOptions } from './interfaces/events-get.js'; +export { EventType } from './types/event-types.js'; export { Encryption, EncryptionAlgorithm } from './utils/encryption.js'; export { EncryptionInput, KeyEncryptionInput, RecordsWrite, RecordsWriteOptions, CreateFromOptions } from './interfaces/records-write.js'; +export { EventMessage } from './interfaces/event-create.js'; export { executeUnlessAborted } from './utils/abort.js'; export { Jws } from './utils/jws.js'; export { KeyMaterial, PrivateJwk, PublicJwk } from './types/jose-types.js'; @@ -52,3 +54,7 @@ export { Time } from './utils/time.js'; export { DataStoreLevel } from './store/data-store-level.js'; export { EventLogLevel } from './event-log/event-log-level.js'; export { MessageStoreLevel } from './store/message-store-level.js'; +export { SubscriptionFilter, SubscriptionRequestReply, SubscriptionRequestMessage } from './types/subscriptions-request.js'; +export { SubscriptionRequest, SubscriptionRequestOptions } from './interfaces/subscription-request.js'; +export { EventStream } from './event-log/event-stream.js'; +export type { EventStreamI } from './event-log/event-stream.js'; diff --git a/src/interfaces/event-create.ts b/src/interfaces/event-create.ts new file mode 100644 index 000000000..54f806ae5 --- /dev/null +++ b/src/interfaces/event-create.ts @@ -0,0 +1,43 @@ +import { removeUndefinedProperties } from '../utils/object.js'; +import { Time } from '../utils/time.js'; +import { DwnInterfaceName, DwnMethodName, Message } from '../core/message.js'; + +import type { + EventDescriptor, + EventMessageI, + EventsCreateDescriptor, +} from '../types/event-types.js'; +import { type GenericMessage, type Signer } from '../index.js'; + +export type EventCreateOptions = { + descriptor: EventDescriptor; + messageId?: string; + messageTimestamp?: string; + message?: GenericMessage; + authorizationSigner?: Signer; +}; + +export class EventMessage extends Message> { + static async create(options: EventCreateOptions): Promise { + const descriptor: EventsCreateDescriptor = { + interface : DwnInterfaceName.Events, + method : DwnMethodName.Create, + messageTimestamp: + options.messageTimestamp ?? Time.getCurrentTimestamp(), + messageId : options.messageId, + eventDescriptor : options.descriptor, + }; + removeUndefinedProperties(descriptor); + + let authorization = undefined; + if (options.authorizationSigner !== undefined) { + authorization = options.authorizationSigner; + } + //const authorization = await Message.signAuthorizationAsAuthor(descriptor, options.authorizationSigner!); + authorization = options.message?.authorization; + const eventMessage = { descriptor, authorization }; + Message.validateJsonSchema(eventMessage); + // @andorsk Fix schema validation.... + return new EventMessage(eventMessage); + } +} diff --git a/src/interfaces/subscription-request.ts b/src/interfaces/subscription-request.ts new file mode 100644 index 000000000..7591a0a60 --- /dev/null +++ b/src/interfaces/subscription-request.ts @@ -0,0 +1,127 @@ +import type { EventMessage } from './event-create.js'; +import type { EventMessageI } from '../types/event-types.js'; +import { Message } from '../core/message.js'; +import { removeUndefinedProperties } from '../utils/object.js'; +import { Subscriptions } from '../utils/subscriptions.js'; +import { SubscriptionsGrantAuthorization } from '../core/subscriptions-grant-authorization.js'; +import { Time } from '../utils/time.js'; + +import { DwnInterfaceName, DwnMethodName } from '../core/message.js'; +import type { MessageStore, Signer } from '../index.js'; +import type { + SubscriptionFilter, + SubscriptionRequestMessage, + SubscriptionsRequestDescriptor, +} from '../types/subscriptions-request.js'; + +export type SubscriptionRequestOptions = { + filter?: SubscriptionFilter; + date?: string; + signer?: Signer; + permissionsGrantId?: string; +}; + +export class SubscriptionRequest extends Message { + public static async parse( + message: SubscriptionRequestMessage + ): Promise { + if (message.authorization !== undefined) { + // Add below back in + // await validateAuthorizationIntegrity(message as GenericMessage); + } + const subscriptionRequest = new SubscriptionRequest(message); + return subscriptionRequest; + } + + /** + * Creates a SubscriptionRequest message. + * + * @throws {DwnError} when a combination of required SubscriptionRequestOptions are missing + */ + public static async create( + options: SubscriptionRequestOptions + ): Promise { + const { filter } = options; + const currentTime = Time.getCurrentTimestamp(); + + const descriptor: SubscriptionsRequestDescriptor = { + interface : DwnInterfaceName.Subscriptions, + method : DwnMethodName.Request, + scope : Subscriptions.normalizeFilter(filter), + messageTimestamp : options.date ?? currentTime, + }; + + removeUndefinedProperties(descriptor); + + // only generate the `authorization` property if signature input is given + // let authorization = undefined; + + if (options.signer !== undefined) { + // Need to fix this + // authorization = await Message.( + // descriptor, + // options.signer, + // { permissionsGrantId } + //); + } + const message: SubscriptionRequestMessage = { descriptor, authorization }; + Message.validateJsonSchema(message); + return new SubscriptionRequest(message); + } + + // @androsk add scoping for protocls support + public async authorize( + tenant: string, + messageStore: MessageStore + ): Promise { + if (tenant === this.author) { + // if the eventStream owner is also the tenant, access is granted always. + return; + } else if ( + this.author !== undefined && + this.signaturePayload?.permissionsGrantId !== undefined + ) { + await SubscriptionsGrantAuthorization.authorizeSubscribe( + tenant, + this, + this.author, + this.signaturePayload?.permissionsGrantId, + messageStore + ); + } else { + throw new Error('message failed authorization'); + } + } + + public async authorizeEvent( + tenant: string, + event: EventMessage, + messageStore: MessageStore + ): Promise { + if (tenant == this.author) { + return; + } else if ( + this.author !== undefined && + this.signaturePayload?.permissionsGrantId !== undefined + ) { + await SubscriptionsGrantAuthorization.authorizeEvent( + tenant, + this, + event, + this.signaturePayload?.permissionsGrantId, + messageStore, + this.author + ); + } else { + throw new Error('message failed authorization'); + } + } +} + +export type CreateEventOptions = { + messageTimestamp: string; + authorization: string; + author: string; + event: EventMessageI; + signer?: Signer; +}; diff --git a/src/types/event-types.ts b/src/types/event-types.ts index 1a2482fa5..bc34b09f9 100644 --- a/src/types/event-types.ts +++ b/src/types/event-types.ts @@ -1,10 +1,11 @@ import type { Event } from './event-log.js'; import type { GenericMessageReply } from '../core/message-reply.js'; +import type { RecordsFilter } from './records-types.js'; import type { AuthorizationModel, GenericMessage } from './message-types.js'; import type { DwnInterfaceName, DwnMethodName } from '../core/message.js'; export type EventsGetDescriptor = { - interface : DwnInterfaceName.Events; + interface: DwnInterfaceName.Events; method: DwnMethodName.Get; watermark?: string; messageTimestamp: string; @@ -17,4 +18,119 @@ export type EventsGetMessage = GenericMessage & { export type EventsGetReply = GenericMessageReply & { events?: Event[]; -}; \ No newline at end of file +}; + +/* + * ---------------------------------------------------------- + * Event Stream Updates Below + * ---------------------------------------------------------- + */ + +/** + * Enum defining generic event types. + */ +export enum EventType { + Message = 'Message', // Represents a message event. + Sync = 'Sync', // Represents a synchronization event. + Operation = 'Operation', // Represents an operation event. + Log = 'Log', // represents a log event. + Record = 'Record', // represents a record event +} + +export type EventsCreateDescriptor = { + interface: DwnInterfaceName.Events; + method: DwnMethodName.Create; + messageId?: string; // attached message + eventDescriptor?: EventDescriptor; + messageTimestamp: string; +}; + +export type EventDescriptor = { + // The type of the event. + type: EventType; + + interface?: DwnInterfaceName; + // interface + method?: DwnMethodName; + // method + messageTimestamp: string; + // The timestamp of the event. + eventTimestamp?: string; + // The duration of the event. + eventDuration?: string; + //A description of the event. + description?: string; + // tags help search events. + tags?: Map; + // The unique identifier of the event. + eventId?: string; +}; + +export type BaseEventMessage = GenericMessage & { + descriptor: EventDescriptor; +}; + +export type InterfaceEventDescriptor = EventDescriptor & { + // The interface associated with the event. + interface?: DwnInterfaceName; + // The method associated with the event. + method?: DwnMethodName; + // event type + type: EventType.Operation; +}; + +export type InterfaceEventMessage = BaseEventMessage & { + descriptor: InterfaceEventDescriptor; +}; + +export type RecordEventDescriptor = InterfaceEventDescriptor & { + protocolPath?: string; + recipient?: string; + schema?: string; + parentId?: string; + dataCid: string; + dataSize: number; + dateCreated: string; + messageTimestamp: string; + published?: boolean; + datePublished?: string; + dataFormat: string; + + type: EventType.Message; + // The context ID associated with the event. + contextId?: string; + // The message CID associated with the event. + messageCid?: string; + // The tenant associated with the event. + tenant?: string; +}; + +export type RecordEventMessage = BaseEventMessage & { + descriptor: RecordEventDescriptor; +}; + +export type SyncEventDescriptor = EventDescriptor & { + type: EventType.Sync; + interface?: DwnInterfaceName.Events; + method?: DwnMethodName.Get; + watermark?: string; + messageTimestamp?: string; +}; + +export type SyncEventMessage = BaseEventMessage & { + descriptor: SyncEventDescriptor; +}; + +export type EventFilter = RecordsFilter & { + // filter by event type + eventType?: EventType; + // filter by interface + interface?: DwnInterfaceName; + // filter by method + method?: DwnMethodName; +}; + +export type EventMessageI = GenericMessage & { + descriptor: T; + authorization?: AuthorizationModel; +}; diff --git a/src/types/permissions-types.ts b/src/types/permissions-types.ts index bded32a52..11948ecad 100644 --- a/src/types/permissions-types.ts +++ b/src/types/permissions-types.ts @@ -1,10 +1,12 @@ +import type { EventType } from './event-types.js'; + import type { AuthorizationModel, GenericMessage } from './message-types.js'; import type { DwnInterfaceName, DwnMethodName } from '../index.js'; export type PermissionScope = { interface: DwnInterfaceName; method: DwnMethodName; -} | RecordsPermissionScope; +} | RecordsPermissionScope | SubscriptionPermissionScope ; // Method-specific scopes export type RecordsPermissionScope = { @@ -123,4 +125,22 @@ export type PermissionsRevokeDescriptor = { export type PermissionsRevokeMessage = GenericMessage & { authorization: AuthorizationModel; // overriding `GenericMessage` with `authorization` being required descriptor: PermissionsRevokeDescriptor; -}; \ No newline at end of file +}; + +// Method-specific scopes +export type SubscriptionPermissionScope = { + // filter to certain interfaces. + interface: DwnInterfaceName.Subscriptions; + // filter to certain methods + method: DwnMethodName.Request; + /** Event type filter...i.e logs, sync, operations...*/ + eventType?: EventType; + /** May only be present when `schema` is undefined */ + protocol?: string; + /** May only be present when `protocol` is defined and `protocolPath` is undefined */ + contextId?: string; + /** May only be present when `protocol` is defined and `contextId` is undefined */ + protocolPath?: string; + /** May only be present when `protocol` is undefined */ + schema?: string; +}; diff --git a/src/types/subscriptions-request.ts b/src/types/subscriptions-request.ts new file mode 100644 index 000000000..2081df211 --- /dev/null +++ b/src/types/subscriptions-request.ts @@ -0,0 +1,42 @@ +import type { AuthorizationModel } from './message-types.js'; +import type { EventStreamI } from '../event-log/event-stream.js'; +import type { GeneralJws } from './jws-types.js'; +import type { GenericMessageReply } from '../core/message-reply.js'; +import type { ProtocolsQueryFilter } from './protocols-types.js'; +import type { RecordsFilter } from './records-types.js'; + +import type { DwnInterfaceName, DwnMethodName } from '../core/message.js'; +import type { EventMessageI, EventType } from './event-types.js'; + +export type SubscriptionRequestMessage = { + authorization?: AuthorizationModel; + descriptor: SubscriptionsRequestDescriptor; +}; + +export type SubscriptionRequestReply = GenericMessageReply & { + subscription?: { + id?: string; + grantedFrom?: string; + grantedTo?: string; + attestation?: GeneralJws; + emitter?: EventStreamI; + filter?: SubscriptionFilter, + } +}; + +export type SubscriptionsRequestDescriptor = { + interface: DwnInterfaceName.Subscriptions; + method: DwnMethodName.Request; + scope: SubscriptionFilter; + messageTimestamp: string; +}; + +export type SubscriptionFilter = { + eventType: EventType; // probably will remove this... + recordFilters?: RecordsFilter; + protocolFilters?: ProtocolsQueryFilter; +}; + +export type EventMessageReply = GenericMessageReply & { + event?: EventMessageI, + }; \ No newline at end of file diff --git a/src/utils/subscriptions.ts b/src/utils/subscriptions.ts new file mode 100644 index 000000000..064d1b023 --- /dev/null +++ b/src/utils/subscriptions.ts @@ -0,0 +1,41 @@ +import type { EventType } from '../types/event-types.js'; +import type { SubscriptionFilter } from '../types/subscriptions-request.js'; + +import { normalizeProtocolUrl, normalizeSchemaUrl } from './url.js'; + +export class Subscriptions { + + /** + * Normalizes the protocol and schema URLs within a provided SubscriptionFilter and returns a copy of SubscriptionFilter with the modified values. + * + * @param filter incoming SubscriptionFilter to normalize. + * @returns {SubscriptionFilter} a copy of the incoming SubscriptionFilter with the normalized properties. + */ + public static normalizeFilter(filter?: SubscriptionFilter): SubscriptionFilter { + let protocol; + if (filter?.recordFilters?.protocol === undefined) { + protocol = undefined; + } else { + protocol = normalizeProtocolUrl(filter.recordFilters.protocol); + } + + let schema; + if (filter?.recordFilters?.schema === undefined) { + schema = undefined; + } else { + schema = normalizeSchemaUrl(filter.recordFilters.schema); + } + + const recordFilters = { + ...filter?.recordFilters, + protocol, + schema, + + }; + return { + ...filter, + recordFilters, + eventType: filter?.eventType as EventType + }; + } +} \ No newline at end of file diff --git a/tests/event-log/event-stream.spec.ts b/tests/event-log/event-stream.spec.ts new file mode 100644 index 000000000..58bce3211 --- /dev/null +++ b/tests/event-log/event-stream.spec.ts @@ -0,0 +1,214 @@ +import chaiAsPromised from 'chai-as-promised'; +import { EventMessage } from '../../src/interfaces/event-create.js'; +import { EventStream } from '../../src/event-log/event-stream.js'; +import { EventType } from '../../src/types/event-types.js'; +import type { InterfaceEventDescriptor } from '../../src/types/event-types.js'; +import { Jws } from '../../src/index.js'; +import { TestDataGenerator } from '../utils/test-data-generator.js'; + +import chai, { assert, expect } from 'chai'; +import { DwnInterfaceName, DwnMethodName } from '../../src/core/message.js'; + +chai.use(chaiAsPromised); + +describe('Event Stream Tests', () => { + let eventStream: EventStream; + + beforeEach(() => { + // Create a new instance of EventStream before each test + eventStream = new EventStream(); + }); + beforeEach(async () => { + await eventStream.open(); + }); + + afterEach(async () => { + // Clean up after each test by closing and clearing the event stream + await eventStream.close(); + }); + + it('test add callback', async () => { + try { + const alice = await TestDataGenerator.generatePersona(); + let messageReceived; + const eventHandledPromise = new Promise((resolve, reject) => { + eventStream.on(async (e: EventMessage) => { + try { + messageReceived = e.message.descriptor; + resolve(); // Resolve the promise when the event is handled. + } catch (error) { + reject(error); + } + }); + }); + const msg = await EventMessage.create({ + descriptor: { + type : EventType.Operation, + interface : DwnInterfaceName.Records, + method : DwnMethodName.Read, + messageTimestamp : '123', + }, + authorizationSigner: Jws.createSigner(alice), + }); + eventStream.add(msg); // add message + await eventHandledPromise; + expect(messageReceived).to.deep.equal(msg.message.descriptor); + } catch (error) { + assert.fail(error, undefined, 'Test failed due to an error'); + } + }); + + it('test bad message', async () => { + const alice = await TestDataGenerator.generatePersona(); + + const badMessage = await EventMessage.create({ + descriptor: { + type : EventType.Operation, + messageTimestamp : '1', + }, + authorizationSigner: Jws.createSigner(alice), + }); + try { + await eventStream.open(); + await eventStream.add(badMessage); + } catch (error: any) { + expect(error.message).to.equal('descriptor type not defined'); + } + }); + + it('should throw an error when adding events to a closed stream', async () => { + const alice = await TestDataGenerator.generatePersona(); + const event = await EventMessage.create({ + descriptor: { + type : EventType.Operation, + interface : DwnInterfaceName.Records, + method : DwnMethodName.Read, + messageTimestamp : '123', + }, + authorizationSigner: Jws.createSigner(alice), + }); + eventStream.close(); + // Attempt to add an event to a closed stream + try { + await eventStream.add(event); + } catch (error: any) { + expect(error.message).to.equal( + 'Event stream is not open. Cannot add to the stream.' + ); + } + }); + + it('should handle concurrent event sending', async () => { + const eventCount = 100; // Number of events to send concurrently + const eventPromises = []; + const alice = await TestDataGenerator.generatePersona(); + + let caughtMessages = 0; + const eventHandledPromise = new Promise((resolve, reject) => { + eventStream.on(async () => { + try { + caughtMessages += 1; + resolve(); // Resolve the promise when the event is handled. + } catch (error) { + reject(error); + } + }); + }); + + // Create an array of events to send concurrently + const events = Array.from({ length: eventCount }, (_, i) => ({ + descriptor: { + type : EventType.Log, + messageTimestamp : `${i}`, + eventNumber : i + 1, // Just an example property + }, + })); + + const sendEvent = (event: EventMessage): Promise => { + return eventStream.add(event); + }; + + for (const event of events) { + const eMsg = await EventMessage.create({ + descriptor : event.descriptor, + authorizationSigner : Jws.createSigner(alice), + }); + const eventPromise = sendEvent(eMsg); + eventPromises.push(eventPromise); + } + + // Wait for all event sending promises to resolve + await Promise.all(eventPromises); + await eventHandledPromise; + + expect(caughtMessages).to.equal(eventCount); + }); + + it('test emitter chaining', async () => { + try { + let count = 0; + const alice = await TestDataGenerator.generatePersona(); + + const filterFunction = async (event: EventMessage): Promise => { + const e: InterfaceEventDescriptor = event.message.descriptor + .eventDescriptor as unknown as InterfaceEventDescriptor; + return e.method === DwnMethodName.Read; + }; + + const childStream = await eventStream.createChild(filterFunction); + await childStream.open(); + + const eventHandledPromise = new Promise((resolve, reject) => { + // Define the event handler function outside the setTimeout + const eventHandler = async (): Promise => { + try { + count += 1; // adding 1 if passes filter. + resolve(); // Resolve the promise when the event is handled. + } catch (error) { + reject(error); + } + }; + childStream.on(eventHandler); + setTimeout(() => {}, 500); + }); + + const msg = await EventMessage.create({ + descriptor: { + type : EventType.Operation, + interface : DwnInterfaceName.Records, + method : DwnMethodName.Read, + messageTimestamp : '123', + }, + authorizationSigner: Jws.createSigner(alice), + }); + + await eventStream.add(msg); // add message + await eventHandledPromise; + const msg2 = await EventMessage.create({ + descriptor: { + type : EventType.Operation, + interface : DwnInterfaceName.Records, + method : DwnMethodName.Write, + messageTimestamp : '123', + }, + authorizationSigner: Jws.createSigner(alice), + }); + await eventStream.add(msg2); // add second message + const msg3 = await EventMessage.create({ + descriptor: { + type : EventType.Operation, + interface : DwnInterfaceName.Records, + method : DwnMethodName.Read, + messageTimestamp : '123', + }, + authorizationSigner: Jws.createSigner(alice), + }); + await eventStream.add(msg3); // add second message + await eventHandledPromise; + assert.equal(2, count, 'Wrong count. Should be 2 because of filters.'); + } catch (error) { + assert.fail(error, undefined, 'Test failed due to an error' + error); + } finally { + } + }); +}); diff --git a/tests/handlers/records-write.spec.ts b/tests/handlers/records-write.spec.ts index 3595cfc9e..f63285cc4 100644 --- a/tests/handlers/records-write.spec.ts +++ b/tests/handlers/records-write.spec.ts @@ -4183,4 +4183,4 @@ export function testRecordsWriteHandler(): void { await expect(handlerPromise).to.be.rejectedWith('an unknown error in messageStore.put()'); }); }); -} +} \ No newline at end of file diff --git a/tests/handlers/subscription-request.spec.ts b/tests/handlers/subscription-request.spec.ts new file mode 100644 index 000000000..cc9617a56 --- /dev/null +++ b/tests/handlers/subscription-request.spec.ts @@ -0,0 +1,319 @@ +import chaiAsPromised from 'chai-as-promised'; +import { Dwn } from '../../src/dwn.js'; +import type { EventMessage } from '../../src/interfaces/event-create.js'; +import type { EventStreamI } from '../../src/event-log/event-stream.js'; +import { EventType } from '../../src/types/event-types.js'; +import sinon from 'sinon'; +import { SubscriptionRequest } from '../../src/interfaces/subscription-request.js'; +import { TestDataGenerator } from '../utils/test-data-generator.js'; +import { TestStores } from '../test-stores.js'; + +import chai, { assert, expect } from 'chai'; +import type { DataStore, EventLog, MessageStore } from '../../src/index.js'; +import { + DidKeyResolver, + DidResolver, + DwnInterfaceName, + DwnMethodName, + Jws, + Message, +} from '../../src/index.js'; + +chai.use(chaiAsPromised); + +export function testSubscriptionRequestHandler(): void { + describe('SubscriptionRequest.handle()', () => { + let didResolver: DidResolver; + let messageStore: MessageStore; + let dataStore: DataStore; + let eventStream: EventStreamI; + let eventLog: EventLog; + let dwn: Dwn; + + describe('functional tests', () => { + // important to follow the `before` and `after` pattern to initialize and clean the stores in tests + // so that different test suites can reuse the same backend store for testing + before(async () => { + didResolver = new DidResolver([new DidKeyResolver()]); + + const stores = TestStores.get(); + messageStore = stores.messageStore; + dataStore = stores.dataStore; + eventLog = stores.eventLog; + eventStream = stores.eventStream as EventStreamI; + dwn = await Dwn.create({ + didResolver, + messageStore, + dataStore, + eventLog, + eventStream, + }); + }); + + beforeEach(async () => { + sinon.restore(); // wipe all previous stubs/spies/mocks/fakes + + // clean up before each test rather than after so that a test does not depend on other tests to do the clean up + await messageStore.clear(); + await dataStore.clear(); + await eventLog.clear(); + // await (eventStream as EventStreamI).clear(); + }); + + after(async () => { + await dwn.close(); + }); + + it('should allow tenant to subscribe their own event stream', async () => { + const alice = await DidKeyResolver.generate(); + + // testing Subscription Request + const subscriptionRequest = await SubscriptionRequest.create({ + signer: Jws.createSigner(alice), + }); + + const subscriptionReply = await dwn.handleSubscriptionRequest( + alice.did, + subscriptionRequest.message ); + expect(subscriptionReply.status.code).to.equal( + 200, + subscriptionReply.status.detail + ); + expect(subscriptionReply.subscription).to.exist; + // set up subscription... + try { + let messageReceived: EventMessage; + const eventHandledPromise = new Promise((resolve, reject) => { + subscriptionReply.subscription?.emitter?.on( + async (e: EventMessage) => { + try { + messageReceived = e; + resolve(); // Resolve the promise when the event is handled. + } catch (error) { + reject(error); + } + } + ); + }); + const { message, dataStream } = + await TestDataGenerator.generateRecordsWrite({ author: alice }); + const writeReply = await dwn.processMessage( + alice.did, + message, + dataStream + ); + expect(writeReply.status.code).to.equal(202); + await eventHandledPromise; + expect(messageReceived!).to.be.not.undefined; + expect(messageReceived!.message.descriptor).to.not.be.undefined; + expect(message.descriptor.dataCid).to.deep.equal( + messageReceived!.message.descriptor.eventDescriptor.dataCid + ); + } catch (error) { + assert.fail(error, undefined, 'Test failed due to an error' + error); + } + }); + + it('should not allow non-tenant to subscribe their an event stream', async () => { + // const alice = await DidKeyResolver.generate(); + const bob = await DidKeyResolver.generate(); + + // testing Subscription Request + const subscriptionRequest = await SubscriptionRequest.create({ + filter: { + eventType: EventType.Operation, + }, + }); + const subscriptionReply = await dwn.handleSubscriptionRequest( + bob.did, subscriptionRequest.message); + expect(subscriptionReply.status.code).to.equal( + 401, + subscriptionReply.status.detail + ); + expect(subscriptionReply.subscription).to.not.exist; + }); + + it('should allow a non-tenant to read subscriptions stream access they are authorized to', async () => { + const alice = await DidKeyResolver.generate(); + const bob = await DidKeyResolver.generate(); + + // Alice gives Bob a PermissionsGrant with scope RecordsRead + const permissionsGrant = + await TestDataGenerator.generatePermissionsGrant({ + author : alice, + grantedBy : alice.did, + grantedFor : alice.did, + grantedTo : bob.did, + scope : { + interface : DwnInterfaceName.Subscriptions, + method : DwnMethodName.Request, + }, + }); + + const permissionsGrantReply = await dwn.processMessage( + alice.did, + permissionsGrant.message + ); + expect(permissionsGrantReply.status.code).to.equal(202); + + // testing Subscription Request + const subscriptionRequest = await SubscriptionRequest.create({ + filter: { + eventType: EventType.Operation, + }, + signer : Jws.createSigner(bob), + permissionsGrantId : await Message.getCid(permissionsGrant.message), + }); + + const subscriptionReply = await dwn.handleSubscriptionRequest(alice.did, subscriptionRequest.message); + expect(subscriptionReply.status.code).to.equal( + 200, + subscriptionReply.status.detail + ); + assert.exists(subscriptionReply.subscription, 'subscription exists'); + + try { + let messageReceived: EventMessage; + const eventHandledPromise = new Promise((resolve, reject) => { + subscriptionReply.subscription?.emitter?.on( + async (e: EventMessage) => { + try { + messageReceived = e; + resolve(); // Resolve the promise when the event is handled. + } catch (error) { + reject(error); + } + } + ); + }); + + const { message, dataStream } = + await TestDataGenerator.generateRecordsWrite({ author: alice }); + const writeReply = await dwn.processMessage( + alice.did, + message, + dataStream + ); + expect(writeReply.status.code).to.equal(202); + + await eventHandledPromise; + expect(messageReceived!).to.be.not.undefined; + expect(messageReceived!.message.descriptor).to.not.be.undefined; + expect(message.descriptor.dataCid).to.deep.equal( + messageReceived!.message.descriptor.eventDescriptor.dataCid + ); + } catch (error) { + assert.fail(error, undefined, 'Test failed due to an error'); + } + }); + + it('should now allow a non-tenant to read subscriptions stream access they are authorized to, and then revoke permissions. they should no longer have access', async () => { + const alice = await DidKeyResolver.generate(); + const bob = await DidKeyResolver.generate(); + + // Alice gives Bob a PermissionsGrant with scope RecordsRead + const permissionsGrant = + await TestDataGenerator.generatePermissionsGrant({ + author : alice, + grantedBy : alice.did, + grantedFor : alice.did, + grantedTo : bob.did, + scope : { + interface : DwnInterfaceName.Subscriptions, + method : DwnMethodName.Request, + }, + }); + + const permissionsGrantReply = await dwn.processMessage( + alice.did, + permissionsGrant.message + ); + expect(permissionsGrantReply.status.code).to.equal(202); + // testing Subscription Request + const subscriptionRequest = await SubscriptionRequest.create({ + signer : Jws.createSigner(bob), + permissionsGrantId : await Message.getCid(permissionsGrant.message), + }); + const subscriptionReply = await dwn.handleSubscriptionRequest(alice.did,subscriptionRequest.message); + expect(subscriptionReply.status.code).to.equal( + 200, + subscriptionReply.status.detail + ); + assert.exists(subscriptionReply.subscription, 'subscription exists'); + + // set up subscription... + try { + let messageReceived: EventMessage | undefined; + const eventHandledPromise = new Promise((resolve, reject) => { + subscriptionReply.subscription?.emitter?.on( + async (e: EventMessage) => { + try { + messageReceived = e; + resolve(); // Resolve the promise when the event is handled. + } catch (error) { + reject(error); + } + } + ); + }); + + let { message, dataStream } = + await TestDataGenerator.generateRecordsWrite({ author: alice }); + let writeReply = await dwn.processMessage( + alice.did, + message, + dataStream + ); + expect(writeReply.status.code).to.equal( + 202, + 'could not write event...' + ); + + await eventHandledPromise; + expect(messageReceived!).to.be.not.undefined; + expect(messageReceived!.message.descriptor).to.not.be.undefined; + expect(message.descriptor.dataCid).to.deep.equal( + messageReceived!.message.descriptor.eventDescriptor.dataCid + ); + messageReceived = undefined; + // Alice revokes the grant + const { permissionsRevoke } = + await TestDataGenerator.generatePermissionsRevoke({ + author : alice, + permissionsGrantId : await Message.getCid( + permissionsGrant.message + ), + }); + const permissionsRevokeReply = await dwn.processMessage( + alice.did, + permissionsRevoke.message + ); + expect(permissionsRevokeReply.status.code).to.eq(202); + // wait 100 ms to make sure it didn't propgate. + await new Promise((resolve) => { + setTimeout(resolve, 100); + }); + assert.isUndefined( + messageReceived, + 'message should be undefined on permission revoke...' + ); + messageReceived = undefined; + assert.isUndefined( + messageReceived, + 'message should be undefined on write...' + ); + ({ message, dataStream } = + await TestDataGenerator.generateRecordsWrite({ author: alice })); + writeReply = await dwn.processMessage(alice.did, message, dataStream); + expect(writeReply.status.code).to.equal(202); + await new Promise((resolve) => { + setTimeout(resolve, 100); + }); + } catch (error) { + assert.fail(error, undefined, 'Test failed due to an error'); + } + }); + }); + }); +} +testSubscriptionRequestHandler(); diff --git a/tests/interfaces/subscription-request.spec.ts b/tests/interfaces/subscription-request.spec.ts new file mode 100644 index 000000000..c6f509b83 --- /dev/null +++ b/tests/interfaces/subscription-request.spec.ts @@ -0,0 +1,62 @@ +import { Dwn } from '../../src/dwn.js'; +import type { EventStreamI } from '../../src/event-log/event-stream.js'; +import { expect } from 'chai'; +import sinon from 'sinon'; +import { SubscriptionRequest } from '../../src/interfaces/subscription-request.js'; +import { TestStores } from '../test-stores.js'; + +import type { DataStore, EventLog, MessageStore } from '../../src/index.js'; +import { DidKeyResolver, DidResolver, DwnInterfaceName, DwnMethodName, Jws } from '../../src/index.js'; + +export function testSubscriptionsRequestHandler(): void { + describe('SubscriptionRequest.handle()', () => { + describe('functional test', () => { + + let didResolver: DidResolver; + let messageStore: MessageStore; + let dataStore: DataStore; + let eventLog: EventLog; + let dwn: Dwn; + let eventStream: EventStreamI; + + // important to follow the `before` and `after` pattern to initialize and clean the stores in tests + // so that different test suites can reuse the same backend store for testing + before(async () => { + didResolver = new DidResolver([new DidKeyResolver()]); + + const stores = TestStores.get(); + messageStore = stores.messageStore; + dataStore = stores.dataStore; + eventLog = stores.eventLog; + + dwn = await Dwn.create({ didResolver, messageStore, dataStore, eventLog }); + }); + + beforeEach(async () => { + sinon.restore(); // wipe all previous stubs/spies/mocks/fakes + + // clean up before each test rather than after so that a test does not depend on other tests to do the clean up + await messageStore.clear(); + await dataStore.clear(); + await eventLog.clear(); + await eventStream.clear(); + }); + + after(async () => { + await dwn.close(); + }); + + it('test create', async () => { + + const alice = await DidKeyResolver.generate(); + const { message } = await SubscriptionRequest.create({ + signer: Jws.createSigner(alice) + }); + + expect(message.descriptor.scope).to.eql({ interface: DwnInterfaceName.Subscriptions, method: DwnMethodName.Request }); + }); + + }); + }); +} + diff --git a/tests/test-stores.ts b/tests/test-stores.ts index db4dfb074..c33534b7e 100644 --- a/tests/test-stores.ts +++ b/tests/test-stores.ts @@ -1,3 +1,5 @@ +import { EventStream } from '../src/event-log/event-stream.js'; +import type { EventStreamI } from '../src/event-log/event-stream.js'; import type { DataStore, EventLog, MessageStore } from '../src/index.js'; import { DataStoreLevel, EventLogLevel, MessageStoreLevel } from '../src/index.js'; @@ -12,21 +14,23 @@ export class TestStores { private static messageStore?: MessageStore; private static dataStore?: DataStore; private static eventLog?: EventLog; + private static eventStream?: EventStreamI; /** * Overrides test stores with given implementation. * If not given, default implementation will be used. */ - public static override(overrides?: { messageStore?: MessageStore, dataStore?: DataStore, eventLog?: EventLog }): void { + public static override(overrides?: { messageStore?: MessageStore, dataStore?: DataStore, eventLog?: EventLog, eventStream?: EventStreamI }): void { TestStores.messageStore = overrides?.messageStore; TestStores.dataStore = overrides?.dataStore; TestStores.eventLog = overrides?.eventLog; + TestStores.eventStream = overrides?.eventStream; } /** * Initializes and return the stores used for running the test suite. */ - public static get(): { messageStore: MessageStore, dataStore: DataStore, eventLog: EventLog } { + public static get(): { messageStore: MessageStore, dataStore: DataStore, eventLog: EventLog, eventStream?: EventStreamI} { TestStores.messageStore ??= new MessageStoreLevel({ blockstoreLocation : 'TEST-MESSAGESTORE', indexLocation : 'TEST-INDEX' @@ -40,10 +44,13 @@ export class TestStores { location: 'TEST-EVENTLOG' }); + TestStores.eventStream ??= new EventStream(); + return { messageStore : TestStores.messageStore, dataStore : TestStores.dataStore, - eventLog : TestStores.eventLog + eventLog : TestStores.eventLog, + eventStream : TestStores.eventStream }; } } \ No newline at end of file diff --git a/tests/test-suite.ts b/tests/test-suite.ts index 85d0fce04..ad8be4f7b 100644 --- a/tests/test-suite.ts +++ b/tests/test-suite.ts @@ -13,6 +13,8 @@ import { testRecordsDeleteHandler } from './handlers/records-delete.spec.js'; import { testRecordsQueryHandler } from './handlers/records-query.spec.js'; import { testRecordsReadHandler } from './handlers/records-read.spec.js'; import { testRecordsWriteHandler } from './handlers/records-write.spec.js'; +import { testSubscriptionRequestHandler } from './handlers/subscription-request.spec.js'; + import { TestStores } from './test-stores.js'; /** @@ -43,7 +45,7 @@ export class TestSuite { testRecordsQueryHandler(); testRecordsReadHandler(); testRecordsWriteHandler(); - + testSubscriptionRequestHandler(); testEndToEndScenarios(); } } \ No newline at end of file