diff --git a/src/dwn.ts b/src/dwn.ts index fdf528c27..d9cdce737 100644 --- a/src/dwn.ts +++ b/src/dwn.ts @@ -5,7 +5,7 @@ import type { MessageStore } from './types/message-store.js'; import type { MethodHandler } from './types/method-handler.js'; import type { TenantGate } from './core/tenant-gate.js'; import type { UnionMessageReply } from './core/message-reply.js'; -import type { EventsGetMessage, EventsGetReply, EventsQueryMessage, EventsQueryReply, EventsSubscribeMessage, EventsSubscribeReply } from './types/events-types.js'; +import type { EventsGetMessage, EventsGetReply, EventsQueryMessage, EventsQueryReply, EventsSubscribeMessage, EventsSubscribeMessageOptions, EventsSubscribeReply } from './types/events-types.js'; import type { GenericMessage, GenericMessageReply, MessageOptions } from './types/message-types.js'; import type { MessagesGetMessage, MessagesGetReply } from './types/messages-types.js'; import type { PermissionsGrantMessage, PermissionsRequestMessage, PermissionsRevokeMessage } from './types/permissions-types.js'; @@ -157,7 +157,8 @@ export class Dwn { */ public async processMessage(tenant: string, rawMessage: EventsGetMessage): Promise; public async processMessage(tenant: string, rawMessage: EventsQueryMessage): Promise; - public async processMessage(tenant: string, rawMessage: EventsSubscribeMessage): Promise; + public async processMessage( + tenant: string, rawMessage: EventsSubscribeMessage, options?: EventsSubscribeMessageOptions): Promise; public async processMessage(tenant: string, rawMessage: MessagesGetMessage): Promise; public async processMessage(tenant: string, rawMessage: ProtocolsConfigureMessage): Promise; public async processMessage(tenant: string, rawMessage: ProtocolsQueryMessage): Promise; @@ -175,13 +176,14 @@ export class Dwn { return errorMessageReply; } - const { dataStream } = options; + const { dataStream, handler } = options; const handlerKey = rawMessage.descriptor.interface + rawMessage.descriptor.method; const methodHandlerReply = await this.methodHandlers[handlerKey].handle({ tenant, message: rawMessage as GenericMessage, - dataStream + dataStream, + handler }); return methodHandlerReply; diff --git a/src/handlers/events-subscribe.ts b/src/handlers/events-subscribe.ts index c0e046f66..3686b28b6 100644 --- a/src/handlers/events-subscribe.ts +++ b/src/handlers/events-subscribe.ts @@ -1,15 +1,17 @@ -import type { DidResolver } from '../did/did-resolver.js'; import EventEmitter from 'events'; -import type { EventStream } from '../types/subscriptions.js'; + +import type { DidResolver } from '../did/did-resolver.js'; import type { Filter } from '../types/query-types.js'; +import type { GenericMessageHandler } from '../types/message-types.js'; import type { MethodHandler } from '../types/method-handler.js'; -import type { EventsSubscribeMessage, EventsSubscribeReply } from '../types/events-types.js'; +import type { EventListener, EventStream } from '../types/subscriptions.js'; +import type { EventsSubscribeMessage, EventsSubscribeReply, EventsSubscription } from '../types/events-types.js'; import { Events } from '../utils/events.js'; import { EventsSubscribe } from '../interfaces/events-subscribe.js'; +import { FilterUtility } from '../utils/filter.js'; import { Message } from '../core/message.js'; import { messageReplyFromError } from '../core/message-reply.js'; -import { SubscriptionHandlerBase } from '../event-log/subscription.js'; import { authenticate, authorizeOwner } from '../core/auth.js'; export class EventsSubscribeHandler implements MethodHandler { @@ -21,10 +23,13 @@ export class EventsSubscribeHandler implements MethodHandler { public async handle({ tenant, message, + handler, }: { tenant: string; message: EventsSubscribeMessage; + handler: GenericMessageHandler; }): Promise { + let subscriptionRequest: EventsSubscribe; try { subscriptionRequest = await EventsSubscribe.parse(message); @@ -40,33 +45,51 @@ export class EventsSubscribeHandler implements MethodHandler { } try { - const { filters } = message.descriptor; - const eventEmitter = new EventEmitter(); const eventsFilters = Events.convertFilters(filters); - const subscription = await EventsSubscriptionHandler.create({ tenant, message, filters: eventsFilters, eventEmitter }); - this.eventStream.subscribe(subscription.id, subscription.listener); + const messageCid = await Message.getCid(message); + const subscription = await this.createEventSubscription(tenant, messageCid, handler, eventsFilters); const messageReply: EventsSubscribeReply = { status: { code: 200, detail: 'OK' }, subscription, }; + return messageReply; } catch (error) { return messageReplyFromError(error, 401); } } -} -export class EventsSubscriptionHandler extends SubscriptionHandlerBase { - public static async create(input: { + /** + * Creates an EventStream subscription and assigns the message handler to the listener. + * The listener checks that the incoming message matches the supplied filters, as well as is attributed to the tenant. + */ + private async createEventSubscription( tenant: string, - message: EventsSubscribeMessage, - filters: Filter[], - eventEmitter: EventEmitter, - }): Promise { - const id = await Message.getCid(input.message); - return new EventsSubscriptionHandler({ ...input, id }); - } -}; + messageCid: string, + handler: GenericMessageHandler, + filters: Filter[] + ): Promise { + + const eventEmitter = new EventEmitter(); + const eventChannel = `${tenant}_${messageCid}`; + + const listener: EventListener = (eventTenant, eventMessage, eventIndexes):void => { + if (tenant === eventTenant && FilterUtility.matchAnyFilter(eventIndexes, filters)) { + eventEmitter.emit(eventChannel, eventMessage); + } + }; + const eventsSubscription = await this.eventStream.subscribe(messageCid, listener); + eventEmitter.on(eventChannel, handler); + + return { + id : messageCid, + close : async (): Promise => { + await eventsSubscription.close(); + eventEmitter.off(eventChannel, handler); + }, + }; + } +} \ No newline at end of file diff --git a/src/types/events-types.ts b/src/types/events-types.ts index bbce47ef1..0f76b2091 100644 --- a/src/types/events-types.ts +++ b/src/types/events-types.ts @@ -1,6 +1,5 @@ -import type { DwnError } from '../core/dwn-error.js'; import type { ProtocolsQueryFilter } from './protocols-types.js'; -import type { AuthorizationModel, GenericMessage, GenericMessageReply } from './message-types.js'; +import type { AuthorizationModel, GenericMessage, GenericMessageHandler, GenericMessageReply } from './message-types.js'; import type { DwnInterfaceName, DwnMethodName } from '../enums/dwn-interface-method.js'; import type { RangeCriterion, RangeFilter } from './query-types.js'; @@ -42,6 +41,11 @@ export type EventsGetReply = GenericMessageReply & { entries?: string[]; }; + +export type EventsSubscribeMessageOptions = { + handler: GenericMessageHandler; +}; + export type EventsSubscribeMessage = { authorization?: AuthorizationModel; descriptor: EventsSubscribeDescriptor; @@ -51,8 +55,6 @@ export type EventsHandler = (message: GenericMessage) => void; export type EventsSubscription = { id: string; - on: (handler: EventsHandler) => { off: () => void }; - onError: (handler: (error: DwnError) => void) => void; close: () => Promise; }; diff --git a/src/types/message-types.ts b/src/types/message-types.ts index 1705cbc20..d2895b0f8 100644 --- a/src/types/message-types.ts +++ b/src/types/message-types.ts @@ -17,6 +17,7 @@ export type GenericMessage = { */ export type MessageOptions = { dataStream?: Readable; + handler?: GenericMessageHandler; }; /** diff --git a/src/types/method-handler.ts b/src/types/method-handler.ts index 7bc9bb07c..bbf273f33 100644 --- a/src/types/method-handler.ts +++ b/src/types/method-handler.ts @@ -1,5 +1,5 @@ import type { Readable } from 'readable-stream'; -import type { GenericMessage, GenericMessageReply } from './message-types.js'; +import type { GenericMessage, GenericMessageHandler, GenericMessageReply } from './message-types.js'; /** * Interface that defines a message handler of a specific method. @@ -12,5 +12,6 @@ export interface MethodHandler { tenant: string; message: GenericMessage; dataStream?: Readable + handler?: GenericMessageHandler; }): Promise; } \ No newline at end of file diff --git a/tests/handlers/events-subscribe.spec.ts b/tests/handlers/events-subscribe.spec.ts index 4f7b88077..e32b3d953 100644 --- a/tests/handlers/events-subscribe.spec.ts +++ b/tests/handlers/events-subscribe.spec.ts @@ -64,26 +64,26 @@ export function testEventsSubscribeHandler(): void { it('should allow tenant to subscribe their own event stream', async () => { const alice = await DidKeyResolver.generate(); + // set up a promise to read later that captures the emitted messageCid + let handler; + const messageSubscriptionPromise: Promise = new Promise((resolve) => { + handler = async (message: GenericMessage):Promise => { + const messageCid = await Message.getCid(message); + resolve(messageCid); + }; + }); + // testing Subscription Request const subscriptionRequest = await EventsSubscribe.create({ signer: Jws.createSigner(alice), }); + const subscriptionReply = await dwn.processMessage(alice.did, subscriptionRequest.message, { handler }); - const subscriptionReply = await dwn.processMessage(alice.did, subscriptionRequest.message); expect(subscriptionReply.status.code).to.equal(200); expect(subscriptionReply.subscription).to.not.be.undefined; - // set up a promise to read later that captures the emitted messageCid - const messageSubscriptionPromise: Promise = new Promise((resolve) => { - const process = async (message: GenericMessage):Promise => { - const messageCid = await Message.getCid(message); - resolve(messageCid); - }; - subscriptionReply.subscription!.on(process); - }); - const messageWrite = await TestDataGenerator.generateRecordsWrite({ author: alice }); - const writeReply = await dwn.processMessage(alice.did, messageWrite.message, messageWrite.dataStream); + const writeReply = await dwn.processMessage(alice.did, messageWrite.message, { dataStream: messageWrite.dataStream }); expect(writeReply.status.code).to.equal(202); const messageCid = await Message.getCid(messageWrite.message); diff --git a/tests/scenarios/subscriptions.spec.ts b/tests/scenarios/subscriptions.spec.ts index df8383b9e..10af7f94c 100644 --- a/tests/scenarios/subscriptions.spec.ts +++ b/tests/scenarios/subscriptions.spec.ts @@ -55,24 +55,23 @@ export function testSubscriptionScenarios(): void { it('all events', async () => { const alice = await DidKeyResolver.generate(); - // subscribe to all messages - const eventsSubscription = await TestDataGenerator.generateEventsSubscribe({ author: alice }); - const eventsSubscriptionReply = await dwn.processMessage(alice.did, eventsSubscription.message); - expect(eventsSubscriptionReply.status.code).to.equal(200); - expect(eventsSubscriptionReply.subscription?.id).to.equal(await Message.getCid(eventsSubscription.message)); - // create a handler that adds the messageCid of each message to an array. const messageCids: string[] = []; - const messageHandler = async (message: GenericMessage): Promise => { + const handler = async (message: GenericMessage): Promise => { const messageCid = await Message.getCid(message); messageCids.push(messageCid); }; - const handler = eventsSubscriptionReply.subscription!.on(messageHandler); + + // subscribe to all messages + const eventsSubscription = await TestDataGenerator.generateEventsSubscribe({ author: alice }); + const eventsSubscriptionReply = await dwn.processMessage(alice.did, eventsSubscription.message, { handler }); + expect(eventsSubscriptionReply.status.code).to.equal(200); + expect(eventsSubscriptionReply.subscription?.id).to.equal(await Message.getCid(eventsSubscription.message)); // generate various messages const write1 = await TestDataGenerator.generateRecordsWrite({ author: alice }); const write1MessageCid = await Message.getCid(write1.message); - const write1Reply = await dwn.processMessage(alice.did, write1.message, write1.dataStream); + const write1Reply = await dwn.processMessage(alice.did, write1.message, { dataStream: write1.dataStream }); expect(write1Reply.status.code).to.equal(202); const grant1 = await TestDataGenerator.generatePermissionsGrant({ author: alice }); @@ -91,11 +90,11 @@ export function testSubscriptionScenarios(): void { expect(deleteWrite1Reply.status.code).to.equal(202); // unregister the handler - handler.off(); + await eventsSubscriptionReply.subscription?.close(); // create a message after const write2 = await TestDataGenerator.generateRecordsWrite({ author: alice }); - const write2Reply = await dwn.processMessage(alice.did, write2.message, write2.dataStream); + const write2Reply = await dwn.processMessage(alice.did, write2.message, { dataStream: write2.dataStream }); expect(write2Reply.status.code).to.equal(202); await Time.minimalSleep(); @@ -138,44 +137,43 @@ export function testSubscriptionScenarios(): void { const proto1Messages:string[] = []; const proto2Messages:string[] = []; + // we add a handler to the subscription and add the messageCid to the appropriate array + const proto1Handler = async (message:GenericMessage):Promise => { + const messageCid = await Message.getCid(message); + proto1Messages.push(messageCid); + }; + // subscribe to proto1 messages const proto1Subscription = await TestDataGenerator.generateEventsSubscribe({ author: alice, filters: [{ protocol: proto1 }] }); - const proto1SubscriptionReply = await dwn.processMessage(alice.did, proto1Subscription.message); + const proto1SubscriptionReply = await dwn.processMessage(alice.did, proto1Subscription.message, { handler: proto1Handler }); expect(proto1SubscriptionReply.status.code).to.equal(200); expect(proto1SubscriptionReply.subscription?.id).to.equal(await Message.getCid(proto1Subscription.message)); // we add a handler to the subscription and add the messageCid to the appropriate array - const proto1Handler = async (message:GenericMessage):Promise => { + const proto2Handler = async (message:GenericMessage):Promise => { const messageCid = await Message.getCid(message); - proto1Messages.push(messageCid); + proto2Messages.push(messageCid); }; - const proto1Sub = proto1SubscriptionReply.subscription!.on(proto1Handler); // subscribe to proto2 messages const proto2Subscription = await TestDataGenerator.generateEventsSubscribe({ author: alice, filters: [{ protocol: proto2 }] }); - const proto2SubscriptionReply = await dwn.processMessage(alice.did, proto2Subscription.message); + const proto2SubscriptionReply = await dwn.processMessage(alice.did, proto2Subscription.message, { handler: proto2Handler }); expect(proto2SubscriptionReply.status.code).to.equal(200); expect(proto2SubscriptionReply.subscription?.id).to.equal(await Message.getCid(proto2Subscription.message)); - // we add a handler to the subscription and add the messageCid to the appropriate array - const proto2Handler = async (message:GenericMessage):Promise => { - const messageCid = await Message.getCid(message); - proto2Messages.push(messageCid); - }; - proto2SubscriptionReply.subscription!.on(proto2Handler); // create some random record, will not show up in records subscription const write1Random = await TestDataGenerator.generateRecordsWrite({ author: alice }); - const write1RandomResponse = await dwn.processMessage(alice.did, write1Random.message, write1Random.dataStream); + const write1RandomResponse = await dwn.processMessage(alice.did, write1Random.message, { dataStream: write1Random.dataStream }); expect(write1RandomResponse.status.code).to.equal(202); // create a record for proto1 const write1proto1 = await TestDataGenerator.generateRecordsWrite({ author: alice, protocol: proto1, ...postProperties }); - const write1Response = await dwn.processMessage(alice.did, write1proto1.message, write1proto1.dataStream); + const write1Response = await dwn.processMessage(alice.did, write1proto1.message, { dataStream: write1proto1.dataStream }); expect(write1Response.status.code).equals(202); // create a record for proto2 const write1proto2 = await TestDataGenerator.generateRecordsWrite({ author: alice, protocol: proto2, ...postProperties }); - const write1Proto2Response = await dwn.processMessage(alice.did, write1proto2.message, write1proto2.dataStream); + const write1Proto2Response = await dwn.processMessage(alice.did, write1proto2.message, { dataStream: write1proto2.dataStream }); expect(write1Proto2Response.status.code).equals(202); expect(proto1Messages.length).to.equal(1, 'proto1'); @@ -184,16 +182,16 @@ export function testSubscriptionScenarios(): void { expect(proto2Messages).to.include(await Message.getCid(write1proto2.message)); // remove listener for proto1 - proto1Sub.off(); + proto1SubscriptionReply.subscription?.close(); // create another record for proto1 const write2proto1 = await TestDataGenerator.generateRecordsWrite({ author: alice, protocol: proto1, ...postProperties }); - const write2Response = await dwn.processMessage(alice.did, write2proto1.message, write2proto1.dataStream); + const write2Response = await dwn.processMessage(alice.did, write2proto1.message, { dataStream: write2proto1.dataStream }); expect(write2Response.status.code).equals(202); // create another record for proto2 const write2proto2 = await TestDataGenerator.generateRecordsWrite({ author: alice, protocol: proto2, ...postProperties }); - const write2Proto2Response = await dwn.processMessage(alice.did, write2proto2.message, write2proto2.dataStream); + const write2Proto2Response = await dwn.processMessage(alice.did, write2proto2.message, { dataStream: write2proto2.dataStream }); expect(write2Proto2Response.status.code).equals(202); // proto1 messages from handler do not change. @@ -208,24 +206,23 @@ export function testSubscriptionScenarios(): void { it('unsubscribes', async () => { const alice = await DidKeyResolver.generate(); - // subscribe to all events - const eventsSubscription = await TestDataGenerator.generateEventsSubscribe({ author: alice }); - const eventsSubscriptionReply = await dwn.processMessage(alice.did, eventsSubscription.message); - expect(eventsSubscriptionReply.status.code).to.equal(200); - // messageCids of events const messageCids:string[] = []; - const eventsHandler = async (message: GenericMessage): Promise => { + const handler = async (message: GenericMessage): Promise => { const messageCid = await Message.getCid(message); messageCids.push(messageCid); }; - eventsSubscriptionReply.subscription!.on(eventsHandler); + + // subscribe to all events + const eventsSubscription = await TestDataGenerator.generateEventsSubscribe({ author: alice }); + const eventsSubscriptionReply = await dwn.processMessage(alice.did, eventsSubscription.message, { handler }); + expect(eventsSubscriptionReply.status.code).to.equal(200); expect(messageCids.length).to.equal(0); // no events exist yet const record1 = await TestDataGenerator.generateRecordsWrite({ author: alice }); - const record1Reply = await dwn.processMessage(alice.did, record1.message, record1.dataStream); + const record1Reply = await dwn.processMessage(alice.did, record1.message, { dataStream: record1.dataStream }); expect(record1Reply.status.code).to.equal(202); const record1MessageCid = await Message.getCid(record1.message); @@ -237,7 +234,7 @@ export function testSubscriptionScenarios(): void { // write another message. const record2 = await TestDataGenerator.generateRecordsWrite({ author: alice }); - const record2Reply = await dwn.processMessage(alice.did, record2.message, record2.dataStream); + const record2Reply = await dwn.processMessage(alice.did, record2.message, { dataStream: record2.dataStream }); expect(record2Reply.status.code).to.equal(202); // sleep to make sure events have some time to emit.