Skip to content

Commit

Permalink
EventsSubscribe (#658)
Browse files Browse the repository at this point in the history
This PR introduces an `EventStream` interface and an implementation based on `EventEmitter`  which emits events for any interfaces which are saved as a part of the DWN's message store. (ie. not Query/Read/Subscribe messages).

We also introduce an `EventsSubscribe` interface that follows the same authorization model as `EventsGet` and `EventsQuery`, which only allows access to the tenant owner.

In a subsequent PR we will introduce `RecordsSubscribe`, as well as some additional enhancements.

Co-authored-by: Liran Cohen <[email protected]>
Co-authored-by: Andor Kesselman <[email protected]>
  • Loading branch information
LiranCohen and andorsk authored Jan 19, 2024
1 parent 30ba7d5 commit a2f0dbe
Show file tree
Hide file tree
Showing 64 changed files with 2,195 additions and 283 deletions.
2 changes: 2 additions & 0 deletions build/compile-validators.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import Definitions from '../json-schemas/definitions.json' assert { type: 'json'
import EventsFilter from '../json-schemas/interface-methods/events-filter.json' assert { type: 'json' };
import EventsGet from '../json-schemas/interface-methods/events-get.json' assert { type: 'json' };
import EventsQuery from '../json-schemas/interface-methods/events-query.json' assert { type: 'json' };
import EventsSubscribe from '../json-schemas/interface-methods/events-subscribe.json' assert { type: 'json' };
import GeneralJwk from '../json-schemas/jwk/general-jwk.json' assert { type: 'json' };
import GeneralJws from '../json-schemas/general-jws.json' assert { type: 'json' };
import GenericSignaturePayload from '../json-schemas/signature-payloads/generic-signature-payload.json' assert { type: 'json' };
Expand Down Expand Up @@ -60,6 +61,7 @@ const schemas = {
EventsFilter,
EventsGet,
EventsQuery,
EventsSubscribe,
Definitions,
GeneralJwk,
GeneralJws,
Expand Down
48 changes: 48 additions & 0 deletions json-schemas/interface-methods/events-subscribe.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "https://identity.foundation/dwn/json-schemas/events-subscribe.json",
"type": "object",
"additionalProperties": false,
"required": [
"descriptor",
"authorization"
],
"properties": {
"authorization": {
"$ref": "https://identity.foundation/dwn/json-schemas/authorization.json"
},
"descriptor": {
"type": "object",
"additionalProperties": false,
"required": [
"interface",
"method",
"messageTimestamp",
"filters"
],
"properties": {
"interface": {
"enum": [
"Events"
],
"type": "string"
},
"method": {
"enum": [
"Subscribe"
],
"type": "string"
},
"messageTimestamp": {
"type": "string"
},
"filters": {
"type": "array",
"items": {
"$ref": "https://identity.foundation/dwn/json-schemas/events-filter.json"
}
}
}
}
}
}
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@tbd54566975/dwn-sdk-js",
"version": "0.2.12",
"version": "0.2.13",
"description": "A reference implementation of https://identity.foundation/decentralized-web-node/spec/",
"repository": {
"type": "git",
Expand Down
1 change: 1 addition & 0 deletions src/core/dwn-error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export enum DwnErrorCode {
DidNotValid = 'DidNotValid',
DidResolutionFailed = 'DidResolutionFailed',
Ed25519InvalidJwk = 'Ed25519InvalidJwk',
EventsSubscribeEventStreamUnimplemented = 'EventsSubscribeEventStreamUnimplemented',
GeneralJwsVerifierGetPublicKeyNotFound = 'GeneralJwsVerifierGetPublicKeyNotFound',
GeneralJwsVerifierInvalidSignature = 'GeneralJwsVerifierInvalidSignature',
GrantAuthorizationGrantExpired = 'GrantAuthorizationGrantExpired',
Expand Down
7 changes: 6 additions & 1 deletion src/core/message-reply.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type { PaginationCursor } from '../types/query-types.js';
import type { ProtocolsConfigureMessage } from '../types/protocols-types.js';
import type { Readable } from 'readable-stream';
import type { RecordsWriteMessage } from '../types/records-types.js';
import type { GenericMessageReply, QueryResultEntry } from '../types/message-types.js';
import type { GenericMessageReply, MessageSubscription, QueryResultEntry } from '../types/message-types.js';

export function messageReplyFromError(e: unknown, code: number): GenericMessageReply {

Expand Down Expand Up @@ -40,4 +40,9 @@ export type UnionMessageReply = GenericMessageReply & {
* Mutually exclusive with `record`.
*/
cursor?: PaginationCursor;

/**
* A subscription object if a subscription was requested.
*/
subscription?: MessageSubscription;
};
110 changes: 88 additions & 22 deletions src/dwn.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import type { DataStore } from './types/data-store.js';
import type { EventLog } from './types/event-log.js';
import type { EventStream } from './types/subscriptions.js';
import type { MessageStore } from './types/message-store.js';
import type { MethodHandler } from './types/method-handler.js';
import type { TenantGate } from './core/tenant-gate.js';
import type { UnionMessageReply } from './core/message-reply.js';
import type { EventsGetMessage, EventsGetReply, EventsQueryMessage, EventsQueryReply } from './types/event-types.js';
import type { EventsGetMessage, EventsGetReply, EventsQueryMessage, EventsQueryReply, EventsSubscribeMessage, EventsSubscribeMessageOptions, EventsSubscribeReply } from './types/events-types.js';
import type { GenericMessage, GenericMessageReply, MessageOptions } from './types/message-types.js';
import type { MessagesGetMessage, MessagesGetReply } from './types/messages-types.js';
import type { PermissionsGrantMessage, PermissionsRequestMessage, PermissionsRevokeMessage } from './types/permissions-types.js';
Expand All @@ -15,6 +16,7 @@ import { AllowAllTenantGate } from './core/tenant-gate.js';
import { DidResolver } from './did/did-resolver.js';
import { EventsGetHandler } from './handlers/events-get.js';
import { EventsQueryHandler } from './handlers/events-query.js';
import { EventsSubscribeHandler } from './handlers/events-subscribe.js';
import { Message } from './core/message.js';
import { messageReplyFromError } from './core/message-reply.js';
import { MessagesGetHandler } from './handlers/messages-get.js';
Expand All @@ -36,32 +38,87 @@ export class Dwn {
private dataStore: DataStore;
private eventLog: EventLog;
private tenantGate: TenantGate;
private eventStream?: EventStream;

private constructor(config: DwnConfig) {
this.didResolver = config.didResolver!;
this.tenantGate = config.tenantGate!;
this.messageStore = config.messageStore;
this.dataStore = config.dataStore;
this.eventLog = config.eventLog;
this.eventStream = config.eventStream;

this.methodHandlers = {
[DwnInterfaceName.Events + DwnMethodName.Get] : new EventsGetHandler(this.didResolver, this.eventLog),
[DwnInterfaceName.Events + DwnMethodName.Query] : new EventsQueryHandler(this.didResolver, this.eventLog),
[DwnInterfaceName.Messages + DwnMethodName.Get] : new MessagesGetHandler(this.didResolver, this.messageStore, this.dataStore),
[DwnInterfaceName.Permissions + DwnMethodName.Grant] : new PermissionsGrantHandler(
this.didResolver, this.messageStore, this.eventLog),
[DwnInterfaceName.Events + DwnMethodName.Get]: new EventsGetHandler(
this.didResolver,
this.eventLog,
),
[DwnInterfaceName.Events + DwnMethodName.Query]: new EventsQueryHandler(
this.didResolver,
this.eventLog,
),
[DwnInterfaceName.Events+ DwnMethodName.Subscribe]: new EventsSubscribeHandler(
this.didResolver,
this.eventStream,
),
[DwnInterfaceName.Messages + DwnMethodName.Get]: new MessagesGetHandler(
this.didResolver,
this.messageStore,
this.dataStore,
),
[DwnInterfaceName.Permissions + DwnMethodName.Grant]: new PermissionsGrantHandler(
this.didResolver,
this.messageStore,
this.eventLog,
this.eventStream
),
[DwnInterfaceName.Permissions + DwnMethodName.Request]: new PermissionsRequestHandler(
this.didResolver, this.messageStore, this.eventLog),
this.didResolver,
this.messageStore,
this.eventLog,
this.eventStream
),
[DwnInterfaceName.Permissions + DwnMethodName.Revoke]: new PermissionsRevokeHandler(
this.didResolver, this.messageStore, this.eventLog),
this.didResolver,
this.messageStore,
this.eventLog,
this.eventStream
),
[DwnInterfaceName.Protocols + DwnMethodName.Configure]: new ProtocolsConfigureHandler(
this.didResolver, this.messageStore, this.dataStore, this.eventLog),
[DwnInterfaceName.Protocols + DwnMethodName.Query] : new ProtocolsQueryHandler(this.didResolver, this.messageStore, this.dataStore),
[DwnInterfaceName.Records + DwnMethodName.Delete] : new RecordsDeleteHandler(
this.didResolver, this.messageStore, this.dataStore, this.eventLog),
[DwnInterfaceName.Records + DwnMethodName.Query] : new RecordsQueryHandler(this.didResolver, this.messageStore, this.dataStore),
[DwnInterfaceName.Records + DwnMethodName.Read] : new RecordsReadHandler(this.didResolver, this.messageStore, this.dataStore),
[DwnInterfaceName.Records + DwnMethodName.Write] : new RecordsWriteHandler(this.didResolver, this.messageStore, this.dataStore, this.eventLog),
this.didResolver,
this.messageStore,
this.eventLog,
this.eventStream
),
[DwnInterfaceName.Protocols + DwnMethodName.Query]: new ProtocolsQueryHandler(
this.didResolver,
this.messageStore,
this.dataStore
),
[DwnInterfaceName.Records + DwnMethodName.Delete]: new RecordsDeleteHandler(
this.didResolver,
this.messageStore,
this.dataStore,
this.eventLog,
this.eventStream
),
[DwnInterfaceName.Records + DwnMethodName.Query]: new RecordsQueryHandler(
this.didResolver,
this.messageStore,
this.dataStore
),
[DwnInterfaceName.Records + DwnMethodName.Read]: new RecordsReadHandler(
this.didResolver,
this.messageStore,
this.dataStore
),
[DwnInterfaceName.Records + DwnMethodName.Write]: new RecordsWriteHandler(
this.didResolver,
this.messageStore,
this.dataStore,
this.eventLog,
this.eventStream
)
};
}

Expand All @@ -82,12 +139,14 @@ export class Dwn {
await this.messageStore.open();
await this.dataStore.open();
await this.eventLog.open();
await this.eventStream?.open();
}

public async close(): Promise<void> {
this.messageStore.close();
this.dataStore.close();
this.eventLog.close();
await this.eventStream?.close();
await this.messageStore.close();
await this.dataStore.close();
await this.eventLog.close();
}

/**
Expand All @@ -96,6 +155,8 @@ export class Dwn {
*/
public async processMessage(tenant: string, rawMessage: EventsGetMessage): Promise<EventsGetReply>;
public async processMessage(tenant: string, rawMessage: EventsQueryMessage): Promise<EventsQueryReply>;
public async processMessage(
tenant: string, rawMessage: EventsSubscribeMessage, options?: EventsSubscribeMessageOptions): Promise<EventsSubscribeReply>;
public async processMessage(tenant: string, rawMessage: MessagesGetMessage): Promise<MessagesGetReply>;
public async processMessage(tenant: string, rawMessage: ProtocolsConfigureMessage): Promise<GenericMessageReply>;
public async processMessage(tenant: string, rawMessage: ProtocolsQueryMessage): Promise<ProtocolsQueryReply>;
Expand All @@ -113,13 +174,14 @@ export class Dwn {
return errorMessageReply;
}

const { dataStream } = options;
const { dataStream, subscriptionHandler } = options;

const handlerKey = rawMessage.descriptor.interface + rawMessage.descriptor.method;
const methodHandlerReply = await this.methodHandlers[handlerKey].handle({
tenant,
message: rawMessage as GenericMessage,
dataStream
dataStream,
subscriptionHandler
});

return methodHandlerReply;
Expand Down Expand Up @@ -154,6 +216,7 @@ export class Dwn {
// Verify interface and method
const dwnInterface = rawMessage?.descriptor?.interface;
const dwnMethod = rawMessage?.descriptor?.method;

if (dwnInterface === undefined || dwnMethod === undefined) {
return {
status: { code: 400, detail: `Both interface and method must be present, interface: ${dwnInterface}, method: ${dwnMethod}` }
Expand All @@ -174,10 +237,13 @@ export class Dwn {
* DWN configuration.
*/
export type DwnConfig = {
didResolver?: DidResolver,
didResolver?: DidResolver;
tenantGate?: TenantGate;

// event stream is optional if a DWN does not wish to provide subscription services.
eventStream?: EventStream;

messageStore: MessageStore;
dataStore: DataStore;
eventLog: EventLog
eventLog: EventLog;
};
3 changes: 2 additions & 1 deletion src/enums/dwn-interface-method.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ export enum DwnMethodName {
Request = 'Request',
Revoke = 'Revoke',
Write = 'Write',
Delete = 'Delete'
Delete = 'Delete',
Subscribe = 'Subscribe'
}
49 changes: 49 additions & 0 deletions src/event-log/event-emitter-stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import type { GenericMessage } from '../types/message-types.js';
import type { KeyValues } from '../types/query-types.js';
import type { EventListener, EventStream, EventSubscription } from '../types/subscriptions.js';

import { EventEmitter } from 'events';

const EVENTS_LISTENER_CHANNEL = 'events';

export class EventEmitterStream implements EventStream {
private eventEmitter: EventEmitter;
private isOpen: boolean = false;

constructor() {
// we capture the rejections and currently just log the errors that are produced
this.eventEmitter = new EventEmitter({ captureRejections: true });
this.eventEmitter.on('error', this.eventError);
}

// we subscribe to the general `EventEmitter` error events with this handler.
// this handler is also called when there is a caught error upon emitting an event from a handler.
private eventError(error: any): void {
console.error('event emitter error', error);
};

async subscribe(id: string, listener: EventListener): Promise<EventSubscription> {
this.eventEmitter.on(EVENTS_LISTENER_CHANNEL, listener);
return {
id,
close: async (): Promise<void> => { this.eventEmitter.off(EVENTS_LISTENER_CHANNEL, listener); }
};
}

async open(): Promise<void> {
this.isOpen = true;
}

async close(): Promise<void> {
this.isOpen = false;
this.eventEmitter.removeAllListeners();
}

emit(tenant: string, message: GenericMessage, indexes: KeyValues): void {
if (!this.isOpen) {
console.error('message emitted when EventEmitterStream is closed', tenant, message, indexes);
return;
}
this.eventEmitter.emit(EVENTS_LISTENER_CHANNEL, tenant, message, indexes);
}
}
2 changes: 2 additions & 0 deletions src/event-log/event-log-level.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { EventLog } from '../types/event-log.js';
import type { EventStream } from '../types/subscriptions.js';
import type { ULIDFactory } from 'ulidx';
import type { Filter, KeyValues, PaginationCursor } from '../types/query-types.js';

Expand All @@ -14,6 +15,7 @@ type EventLogLevelConfig = {
*/
location?: string,
createLevelDatabase?: typeof createLevelDatabase,
eventStream?: EventStream,
};

export class EventLogLevel implements EventLog {
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/events-get.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { DidResolver } from '../did/did-resolver.js';
import type { EventLog } from '../types/event-log.js';
import type { MethodHandler } from '../types/method-handler.js';
import type { EventsGetMessage, EventsGetReply } from '../types/event-types.js';
import type { EventsGetMessage, EventsGetReply } from '../types/events-types.js';

import { EventsGet } from '../interfaces/events-get.js';
import { messageReplyFromError } from '../core/message-reply.js';
Expand Down
Loading

0 comments on commit a2f0dbe

Please sign in to comment.