Skip to content

Commit

Permalink
update interface naming and add comments
Browse files Browse the repository at this point in the history
  • Loading branch information
LiranCohen committed Jan 10, 2024
1 parent 4da1a3c commit 2b23061
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 9 deletions.
4 changes: 2 additions & 2 deletions src/event-log/event-stream.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { MessageStore } from '../types/message-store.js';
import type { EventsSubscribeMessage, EventsSubscription } from '../types/events-types.js';
import type { EventStream, Subscription } from '../types/subscriptions.js';
import type { EventStream, SubscriptionHandler } from '../types/subscriptions.js';
import type { Filter, KeyValues } from '../types/query-types.js';
import type { GenericMessage, GenericMessageSubscription } from '../types/message-types.js';
import type { RecordsSubscribeMessage, RecordsSubscription } from '../types/records-types.js';
Expand All @@ -25,7 +25,7 @@ export class EventStreamEmitter implements EventStream {
private reauthorizationTTL: number;

private isOpen: boolean = false;
private subscriptions: Map<string, Subscription> = new Map();
private subscriptions: Map<string, SubscriptionHandler> = new Map();

constructor(config?: EventStreamConfig) {
this.reauthorizationTTL = config?.reauthorizationTTL || 0; // if set to zero it does not reauthorize
Expand Down
9 changes: 7 additions & 2 deletions src/event-log/subscription.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import type { DwnError } from '../core/dwn-error.js';
import type { EventEmitter } from 'events';
import type { MessageStore } from '../types/message-store.js';
import type { EmitFunction, Subscription } from '../types/subscriptions.js';
import type { EmitFunction, SubscriptionHandler } from '../types/subscriptions.js';
import type { Filter, KeyValues } from '../types/query-types.js';
import type { GenericMessage, GenericMessageHandler } from '../types/message-types.js';

import { FilterUtility } from '../utils/filter.js';

export class SubscriptionBase implements Subscription {
/**
* Base class to extend default subscription behavior.
*
* ie. `RecordsSubscriptionHandler` has different rules for authorization and only matches specific message types.
*/
export class SubscriptionHandlerBase implements SubscriptionHandler {
protected eventEmitter: EventEmitter;
protected messageStore: MessageStore;
protected filters: Filter[];
Expand Down
4 changes: 2 additions & 2 deletions src/handlers/events-subscribe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { Events } from '../utils/events.js';
import { EventsSubscribe } from '../interfaces/events-subscribe.js';
import { Message } from '../core/message.js';
import { messageReplyFromError } from '../core/message-reply.js';
import { SubscriptionBase } from '../event-log/subscription.js';
import { SubscriptionHandlerBase } from '../event-log/subscription.js';
import { authenticate, authorizeOwner } from '../core/auth.js';

export class EventsSubscribeHandler implements MethodHandler {
Expand Down Expand Up @@ -57,7 +57,7 @@ export class EventsSubscribeHandler implements MethodHandler {
}
}

export class EventsSubscriptionHandler extends SubscriptionBase {
export class EventsSubscriptionHandler extends SubscriptionHandlerBase {
public static async create(input: {
tenant: string,
message: EventsSubscribeMessage,
Expand Down
4 changes: 2 additions & 2 deletions src/handlers/records-subscribe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { Records } from '../utils/records.js';
import { RecordsDelete } from '../interfaces/records-delete.js';
import { RecordsSubscribe } from '../interfaces/records-subscribe.js';
import { RecordsWrite } from '../interfaces/records-write.js';
import { SubscriptionBase } from '../event-log/subscription.js';
import { SubscriptionHandlerBase } from '../event-log/subscription.js';
import { Time } from '../utils/time.js';
import { DwnError, DwnErrorCode } from '../core/dwn-error.js';
import { DwnInterfaceName, DwnMethodName } from '../enums/dwn-interface-method.js';
Expand Down Expand Up @@ -231,7 +231,7 @@ export class RecordsSubscribeHandler implements MethodHandler {
}
}

export class RecordsSubscriptionHandler extends SubscriptionBase {
export class RecordsSubscriptionHandler extends SubscriptionHandlerBase {
private recordsSubscribe: RecordsSubscribe;

private reauthorizationTTL: number;
Expand Down
10 changes: 9 additions & 1 deletion src/types/subscriptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import type { RecordsSubscribeMessage, RecordsSubscription } from './records-typ

export type EmitFunction = (tenant: string, message: GenericMessage, indexes: KeyValues) => void;

/**
* The EventStream interface implements a pub/sub system based on Message filters.
*/
export interface EventStream {
subscribe(tenant: string, message: EventsSubscribeMessage, filters: Filter[], messageStore: MessageStore): Promise<EventsSubscription>;
subscribe(tenant: string, message: RecordsSubscribeMessage, filters: Filter[], messageStore: MessageStore): Promise<RecordsSubscription>;
Expand All @@ -17,7 +20,12 @@ export interface EventStream {
close(): Promise<void>;
}

export interface Subscription {
/**
* The SubscriptionHandler interface is implemented by specific types of Subscription Handlers.
*
* ie. `RecordsSubscriptionHandler` has behavior to re-authorize subscriptions.
*/
export interface SubscriptionHandler {
id: string;
listener: EmitFunction;
on: (handler: GenericMessageHandler) => { off: () => void };
Expand Down

0 comments on commit 2b23061

Please sign in to comment.