Skip to content

Commit

Permalink
the subsciription message handler should come in through MessageOptions
Browse files Browse the repository at this point in the history
  • Loading branch information
LiranCohen committed Jan 13, 2024
1 parent 1ceeb1a commit ba9c986
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 76 deletions.
10 changes: 6 additions & 4 deletions src/dwn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import type { MessageStore } from './types/message-store.js';
import type { MethodHandler } from './types/method-handler.js';
import type { TenantGate } from './core/tenant-gate.js';
import type { UnionMessageReply } from './core/message-reply.js';
import type { EventsGetMessage, EventsGetReply, EventsQueryMessage, EventsQueryReply, EventsSubscribeMessage, EventsSubscribeReply } from './types/events-types.js';
import type { EventsGetMessage, EventsGetReply, EventsQueryMessage, EventsQueryReply, EventsSubscribeMessage, EventsSubscribeMessageOptions, EventsSubscribeReply } from './types/events-types.js';
import type { GenericMessage, GenericMessageReply, MessageOptions } from './types/message-types.js';
import type { MessagesGetMessage, MessagesGetReply } from './types/messages-types.js';
import type { PermissionsGrantMessage, PermissionsRequestMessage, PermissionsRevokeMessage } from './types/permissions-types.js';
Expand Down Expand Up @@ -157,7 +157,8 @@ export class Dwn {
*/
public async processMessage(tenant: string, rawMessage: EventsGetMessage): Promise<EventsGetReply>;
public async processMessage(tenant: string, rawMessage: EventsQueryMessage): Promise<EventsQueryReply>;
public async processMessage(tenant: string, rawMessage: EventsSubscribeMessage): Promise<EventsSubscribeReply>;
public async processMessage(
tenant: string, rawMessage: EventsSubscribeMessage, options?: EventsSubscribeMessageOptions): Promise<EventsSubscribeReply>;
public async processMessage(tenant: string, rawMessage: MessagesGetMessage): Promise<MessagesGetReply>;
public async processMessage(tenant: string, rawMessage: ProtocolsConfigureMessage): Promise<GenericMessageReply>;
public async processMessage(tenant: string, rawMessage: ProtocolsQueryMessage): Promise<ProtocolsQueryReply>;
Expand All @@ -175,13 +176,14 @@ export class Dwn {
return errorMessageReply;
}

const { dataStream } = options;
const { dataStream, handler } = options;

const handlerKey = rawMessage.descriptor.interface + rawMessage.descriptor.method;
const methodHandlerReply = await this.methodHandlers[handlerKey].handle({
tenant,
message: rawMessage as GenericMessage,
dataStream
dataStream,
handler
});

return methodHandlerReply;
Expand Down
61 changes: 42 additions & 19 deletions src/handlers/events-subscribe.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import type { DidResolver } from '../did/did-resolver.js';
import EventEmitter from 'events';
import type { EventStream } from '../types/subscriptions.js';

import type { DidResolver } from '../did/did-resolver.js';
import type { Filter } from '../types/query-types.js';
import type { GenericMessageHandler } from '../types/message-types.js';
import type { MethodHandler } from '../types/method-handler.js';
import type { EventsSubscribeMessage, EventsSubscribeReply } from '../types/events-types.js';
import type { EventListener, EventStream } from '../types/subscriptions.js';
import type { EventsSubscribeMessage, EventsSubscribeReply, EventsSubscription } from '../types/events-types.js';

import { Events } from '../utils/events.js';
import { EventsSubscribe } from '../interfaces/events-subscribe.js';
import { FilterUtility } from '../utils/filter.js';
import { Message } from '../core/message.js';
import { messageReplyFromError } from '../core/message-reply.js';
import { SubscriptionHandlerBase } from '../event-log/subscription.js';
import { authenticate, authorizeOwner } from '../core/auth.js';

export class EventsSubscribeHandler implements MethodHandler {
Expand All @@ -21,10 +23,13 @@ export class EventsSubscribeHandler implements MethodHandler {
public async handle({
tenant,
message,
handler,
}: {
tenant: string;
message: EventsSubscribeMessage;
handler: GenericMessageHandler;
}): Promise<EventsSubscribeReply> {

let subscriptionRequest: EventsSubscribe;
try {
subscriptionRequest = await EventsSubscribe.parse(message);
Expand All @@ -40,33 +45,51 @@ export class EventsSubscribeHandler implements MethodHandler {
}

try {

const { filters } = message.descriptor;
const eventEmitter = new EventEmitter();
const eventsFilters = Events.convertFilters(filters);
const subscription = await EventsSubscriptionHandler.create({ tenant, message, filters: eventsFilters, eventEmitter });
this.eventStream.subscribe(subscription.id, subscription.listener);
const messageCid = await Message.getCid(message);
const subscription = await this.createEventSubscription(tenant, messageCid, handler, eventsFilters);

const messageReply: EventsSubscribeReply = {
status: { code: 200, detail: 'OK' },
subscription,
};

return messageReply;
} catch (error) {
return messageReplyFromError(error, 401);
}
}
}

export class EventsSubscriptionHandler extends SubscriptionHandlerBase {
public static async create(input: {
/**
* Creates an EventStream subscription and assigns the message handler to the listener.
* The listener checks that the incoming message matches the supplied filters, as well as is attributed to the tenant.
*/
private async createEventSubscription(
tenant: string,
message: EventsSubscribeMessage,
filters: Filter[],
eventEmitter: EventEmitter,
}): Promise<EventsSubscriptionHandler> {
const id = await Message.getCid(input.message);
return new EventsSubscriptionHandler({ ...input, id });
}
};
messageCid: string,
handler: GenericMessageHandler,
filters: Filter[]
): Promise<EventsSubscription> {

const eventEmitter = new EventEmitter();
const eventChannel = `${tenant}_${messageCid}`;

const listener: EventListener = (eventTenant, eventMessage, eventIndexes):void => {
if (tenant === eventTenant && FilterUtility.matchAnyFilter(eventIndexes, filters)) {
eventEmitter.emit(eventChannel, eventMessage);
}
};

const eventsSubscription = await this.eventStream.subscribe(messageCid, listener);
eventEmitter.on(eventChannel, handler);

return {
id : messageCid,
close : async (): Promise<void> => {
await eventsSubscription.close();
eventEmitter.off(eventChannel, handler);
},
};
}
}
10 changes: 6 additions & 4 deletions src/types/events-types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import type { DwnError } from '../core/dwn-error.js';
import type { ProtocolsQueryFilter } from './protocols-types.js';
import type { AuthorizationModel, GenericMessage, GenericMessageReply } from './message-types.js';
import type { AuthorizationModel, GenericMessage, GenericMessageHandler, GenericMessageReply } from './message-types.js';
import type { DwnInterfaceName, DwnMethodName } from '../enums/dwn-interface-method.js';
import type { RangeCriterion, RangeFilter } from './query-types.js';

Expand Down Expand Up @@ -42,6 +41,11 @@ export type EventsGetReply = GenericMessageReply & {
entries?: string[];
};


export type EventsSubscribeMessageOptions = {
handler: GenericMessageHandler;
};

export type EventsSubscribeMessage = {
authorization?: AuthorizationModel;
descriptor: EventsSubscribeDescriptor;
Expand All @@ -51,8 +55,6 @@ export type EventsHandler = (message: GenericMessage) => void;

export type EventsSubscription = {
id: string;
on: (handler: EventsHandler) => { off: () => void };
onError: (handler: (error: DwnError) => void) => void;
close: () => Promise<void>;
};

Expand Down
1 change: 1 addition & 0 deletions src/types/message-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export type GenericMessage = {
*/
export type MessageOptions = {
dataStream?: Readable;
handler?: GenericMessageHandler;
};

/**
Expand Down
3 changes: 2 additions & 1 deletion src/types/method-handler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Readable } from 'readable-stream';
import type { GenericMessage, GenericMessageReply } from './message-types.js';
import type { GenericMessage, GenericMessageHandler, GenericMessageReply } from './message-types.js';

/**
* Interface that defines a message handler of a specific method.
Expand All @@ -12,5 +12,6 @@ export interface MethodHandler {
tenant: string;
message: GenericMessage;
dataStream?: Readable
handler?: GenericMessageHandler;
}): Promise<GenericMessageReply>;
}
22 changes: 11 additions & 11 deletions tests/handlers/events-subscribe.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,26 +64,26 @@ export function testEventsSubscribeHandler(): void {
it('should allow tenant to subscribe their own event stream', async () => {
const alice = await DidKeyResolver.generate();

// set up a promise to read later that captures the emitted messageCid
let handler;
const messageSubscriptionPromise: Promise<string> = new Promise((resolve) => {
handler = async (message: GenericMessage):Promise<void> => {
const messageCid = await Message.getCid(message);
resolve(messageCid);
};
});

// testing Subscription Request
const subscriptionRequest = await EventsSubscribe.create({
signer: Jws.createSigner(alice),
});
const subscriptionReply = await dwn.processMessage(alice.did, subscriptionRequest.message, { handler });

const subscriptionReply = await dwn.processMessage(alice.did, subscriptionRequest.message);
expect(subscriptionReply.status.code).to.equal(200);
expect(subscriptionReply.subscription).to.not.be.undefined;

// set up a promise to read later that captures the emitted messageCid
const messageSubscriptionPromise: Promise<string> = new Promise((resolve) => {
const process = async (message: GenericMessage):Promise<void> => {
const messageCid = await Message.getCid(message);
resolve(messageCid);
};
subscriptionReply.subscription!.on(process);
});

const messageWrite = await TestDataGenerator.generateRecordsWrite({ author: alice });
const writeReply = await dwn.processMessage(alice.did, messageWrite.message, messageWrite.dataStream);
const writeReply = await dwn.processMessage(alice.did, messageWrite.message, { dataStream: messageWrite.dataStream });
expect(writeReply.status.code).to.equal(202);
const messageCid = await Message.getCid(messageWrite.message);

Expand Down
Loading

0 comments on commit ba9c986

Please sign in to comment.