Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EventsSubscribe #658

Merged
merged 44 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
2eaa9c8
Squashed commit of record and event subscriptions base.
LiranCohen Dec 21, 2023
b5c904e
slight refactor
LiranCohen Dec 21, 2023
c5cf75c
add timeouts to tests
LiranCohen Dec 22, 2023
727dc73
test delegate grant
LiranCohen Dec 22, 2023
cc60ed6
signal when record is updated
LiranCohen Dec 23, 2023
e33e7cb
update notifier signature
LiranCohen Dec 29, 2023
2942d63
handler signature should just be the incoming message, ignoring previ…
LiranCohen Jan 3, 2024
8b39c8c
update tests and comments
LiranCohen Jan 4, 2024
51d0a12
update events filters
LiranCohen Jan 5, 2024
fd28d09
refactor EventStream interface, added new test scaffolding
LiranCohen Jan 9, 2024
a09504a
add error handler to records subscription
LiranCohen Jan 9, 2024
9d891cd
remove uneeded event subsribe testing until delegating eEventsQuery/G…
LiranCohen Jan 9, 2024
0a193e7
add error handling to general subscriptions
LiranCohen Jan 10, 2024
edff0d2
emit unknown error
LiranCohen Jan 10, 2024
e2cf628
fix circular deps
LiranCohen Jan 10, 2024
5c73cba
update interface naming and add comments
LiranCohen Jan 10, 2024
786586e
rip out records subscribe
LiranCohen Jan 10, 2024
ee71088
simplify the EventStream interface and logic
LiranCohen Jan 11, 2024
5eb7dbd
the subsciription message handler should come in through MessageOptions
LiranCohen Jan 13, 2024
c82c6e4
should add the latest write published status to the delete index
LiranCohen Jan 13, 2024
7cc34b6
revert an invisible change
LiranCohen Jan 13, 2024
256d89c
remove unecessary class
LiranCohen Jan 14, 2024
e04dc12
rename to EventEmitterStream
LiranCohen Jan 14, 2024
22cbbb4
scaffold testing
LiranCohen Jan 16, 2024
904e299
clean up handler functionality
LiranCohen Jan 16, 2024
3e4e024
event emitter stream tests
LiranCohen Jan 16, 2024
95cb72a
EventStream is optional
LiranCohen Jan 16, 2024
cbeca31
simplify events subscribe handler and add test coverage
LiranCohen Jan 16, 2024
a5a6cac
clean up interface and increase coverage
LiranCohen Jan 16, 2024
12c067e
add more filter tests
LiranCohen Jan 16, 2024
27c110f
remove console logs
LiranCohen Jan 16, 2024
de7a73c
update after rbase
LiranCohen Jan 17, 2024
fcdbf70
remove unecessary duplicate type/interface
LiranCohen Jan 17, 2024
bd6d341
review updates
LiranCohen Jan 17, 2024
50612f9
rename classes and properties suggested by review comments
LiranCohen Jan 17, 2024
9a8a0dc
fix isRecordsFilter method and conversion/normalization
LiranCohen Jan 18, 2024
4256e8d
review updates
LiranCohen Jan 18, 2024
d92f3ad
review suggestions, added more validation to parse
LiranCohen Jan 18, 2024
041df8a
add coverage
LiranCohen Jan 18, 2024
8e5ecb5
simplified more code, added comments, updated test
LiranCohen Jan 18, 2024
d21ab28
improve coverage
LiranCohen Jan 18, 2024
c29d4fa
review updates, added more test scenarios
LiranCohen Jan 18, 2024
b583140
scenario tests, review comment updates
LiranCohen Jan 19, 2024
19101e7
version bump
LiranCohen Jan 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build/compile-validators.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import Definitions from '../json-schemas/definitions.json' assert { type: 'json'
import EventsFilter from '../json-schemas/interface-methods/events-filter.json' assert { type: 'json' };
import EventsGet from '../json-schemas/interface-methods/events-get.json' assert { type: 'json' };
import EventsQuery from '../json-schemas/interface-methods/events-query.json' assert { type: 'json' };
import EventsSubscribe from '../json-schemas/interface-methods/events-subscribe.json' assert { type: 'json' };
import GeneralJwk from '../json-schemas/jwk/general-jwk.json' assert { type: 'json' };
import GeneralJws from '../json-schemas/general-jws.json' assert { type: 'json' };
import GenericSignaturePayload from '../json-schemas/signature-payloads/generic-signature-payload.json' assert { type: 'json' };
Expand Down Expand Up @@ -60,6 +61,7 @@ const schemas = {
EventsFilter,
EventsGet,
EventsQuery,
EventsSubscribe,
Definitions,
GeneralJwk,
GeneralJws,
Expand Down
48 changes: 48 additions & 0 deletions json-schemas/interface-methods/events-subscribe.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "https://identity.foundation/dwn/json-schemas/events-subscribe.json",
"type": "object",
"additionalProperties": false,
"required": [
"descriptor",
"authorization"
],
"properties": {
"authorization": {
"$ref": "https://identity.foundation/dwn/json-schemas/authorization.json"
},
"descriptor": {
"type": "object",
"additionalProperties": false,
"required": [
"interface",
"method",
"messageTimestamp",
"filters"
],
"properties": {
"interface": {
"enum": [
"Events"
],
"type": "string"
},
"method": {
"enum": [
"Subscribe"
],
"type": "string"
},
"messageTimestamp": {
"type": "string"
},
"filters": {
"type": "array",
"items": {
"$ref": "https://identity.foundation/dwn/json-schemas/events-filter.json"
}
}
}
}
}
}
1 change: 1 addition & 0 deletions src/core/dwn-error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export enum DwnErrorCode {
DidNotValid = 'DidNotValid',
DidResolutionFailed = 'DidResolutionFailed',
Ed25519InvalidJwk = 'Ed25519InvalidJwk',
EventsSubscribeEventStreamUnimplemented = 'EventsSubscribeEventStreamUnimplemented',
GeneralJwsVerifierGetPublicKeyNotFound = 'GeneralJwsVerifierGetPublicKeyNotFound',
GeneralJwsVerifierInvalidSignature = 'GeneralJwsVerifierInvalidSignature',
GrantAuthorizationGrantExpired = 'GrantAuthorizationGrantExpired',
Expand Down
7 changes: 6 additions & 1 deletion src/core/message-reply.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type { PaginationCursor } from '../types/query-types.js';
import type { ProtocolsConfigureMessage } from '../types/protocols-types.js';
import type { Readable } from 'readable-stream';
import type { RecordsWriteMessage } from '../types/records-types.js';
import type { GenericMessageReply, QueryResultEntry } from '../types/message-types.js';
import type { GenericMessageReply, MessageSubscription, QueryResultEntry } from '../types/message-types.js';

export function messageReplyFromError(e: unknown, code: number): GenericMessageReply {

Expand Down Expand Up @@ -40,4 +40,9 @@ export type UnionMessageReply = GenericMessageReply & {
* Mutually exclusive with `record`.
*/
cursor?: PaginationCursor;

/**
* A subscription object if a subscription was requested.
*/
subscription?: MessageSubscription;
};
110 changes: 88 additions & 22 deletions src/dwn.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import type { DataStore } from './types/data-store.js';
import type { EventLog } from './types/event-log.js';
import type { EventStream } from './types/subscriptions.js';
import type { MessageStore } from './types/message-store.js';
import type { MethodHandler } from './types/method-handler.js';
import type { TenantGate } from './core/tenant-gate.js';
import type { UnionMessageReply } from './core/message-reply.js';
import type { EventsGetMessage, EventsGetReply, EventsQueryMessage, EventsQueryReply } from './types/event-types.js';
import type { EventsGetMessage, EventsGetReply, EventsQueryMessage, EventsQueryReply, EventsSubscribeMessage, EventsSubscribeMessageOptions, EventsSubscribeReply } from './types/events-types.js';
import type { GenericMessage, GenericMessageReply, MessageOptions } from './types/message-types.js';
import type { MessagesGetMessage, MessagesGetReply } from './types/messages-types.js';
import type { PermissionsGrantMessage, PermissionsRequestMessage, PermissionsRevokeMessage } from './types/permissions-types.js';
Expand All @@ -15,6 +16,7 @@ import { AllowAllTenantGate } from './core/tenant-gate.js';
import { DidResolver } from './did/did-resolver.js';
import { EventsGetHandler } from './handlers/events-get.js';
import { EventsQueryHandler } from './handlers/events-query.js';
import { EventsSubscribeHandler } from './handlers/events-subscribe.js';
import { Message } from './core/message.js';
import { messageReplyFromError } from './core/message-reply.js';
import { MessagesGetHandler } from './handlers/messages-get.js';
Expand All @@ -36,32 +38,87 @@ export class Dwn {
private dataStore: DataStore;
private eventLog: EventLog;
private tenantGate: TenantGate;
private eventStream?: EventStream;

private constructor(config: DwnConfig) {
this.didResolver = config.didResolver!;
this.tenantGate = config.tenantGate!;
this.messageStore = config.messageStore;
this.dataStore = config.dataStore;
this.eventLog = config.eventLog;
this.eventStream = config.eventStream;

this.methodHandlers = {
[DwnInterfaceName.Events + DwnMethodName.Get] : new EventsGetHandler(this.didResolver, this.eventLog),
[DwnInterfaceName.Events + DwnMethodName.Query] : new EventsQueryHandler(this.didResolver, this.eventLog),
[DwnInterfaceName.Messages + DwnMethodName.Get] : new MessagesGetHandler(this.didResolver, this.messageStore, this.dataStore),
[DwnInterfaceName.Permissions + DwnMethodName.Grant] : new PermissionsGrantHandler(
this.didResolver, this.messageStore, this.eventLog),
[DwnInterfaceName.Events + DwnMethodName.Get]: new EventsGetHandler(
this.didResolver,
this.eventLog,
),
[DwnInterfaceName.Events + DwnMethodName.Query]: new EventsQueryHandler(
this.didResolver,
this.eventLog,
),
[DwnInterfaceName.Events+ DwnMethodName.Subscribe]: new EventsSubscribeHandler(
this.didResolver,
this.eventStream,
),
[DwnInterfaceName.Messages + DwnMethodName.Get]: new MessagesGetHandler(
this.didResolver,
this.messageStore,
this.dataStore,
),
[DwnInterfaceName.Permissions + DwnMethodName.Grant]: new PermissionsGrantHandler(
this.didResolver,
this.messageStore,
this.eventLog,
this.eventStream
),
[DwnInterfaceName.Permissions + DwnMethodName.Request]: new PermissionsRequestHandler(
this.didResolver, this.messageStore, this.eventLog),
this.didResolver,
this.messageStore,
this.eventLog,
this.eventStream
),
[DwnInterfaceName.Permissions + DwnMethodName.Revoke]: new PermissionsRevokeHandler(
this.didResolver, this.messageStore, this.eventLog),
this.didResolver,
this.messageStore,
this.eventLog,
this.eventStream
),
[DwnInterfaceName.Protocols + DwnMethodName.Configure]: new ProtocolsConfigureHandler(
this.didResolver, this.messageStore, this.dataStore, this.eventLog),
[DwnInterfaceName.Protocols + DwnMethodName.Query] : new ProtocolsQueryHandler(this.didResolver, this.messageStore, this.dataStore),
[DwnInterfaceName.Records + DwnMethodName.Delete] : new RecordsDeleteHandler(
this.didResolver, this.messageStore, this.dataStore, this.eventLog),
[DwnInterfaceName.Records + DwnMethodName.Query] : new RecordsQueryHandler(this.didResolver, this.messageStore, this.dataStore),
[DwnInterfaceName.Records + DwnMethodName.Read] : new RecordsReadHandler(this.didResolver, this.messageStore, this.dataStore),
[DwnInterfaceName.Records + DwnMethodName.Write] : new RecordsWriteHandler(this.didResolver, this.messageStore, this.dataStore, this.eventLog),
this.didResolver,
this.messageStore,
this.eventLog,
this.eventStream
),
[DwnInterfaceName.Protocols + DwnMethodName.Query]: new ProtocolsQueryHandler(
this.didResolver,
this.messageStore,
this.dataStore
),
[DwnInterfaceName.Records + DwnMethodName.Delete]: new RecordsDeleteHandler(
this.didResolver,
this.messageStore,
this.dataStore,
this.eventLog,
this.eventStream
),
[DwnInterfaceName.Records + DwnMethodName.Query]: new RecordsQueryHandler(
this.didResolver,
this.messageStore,
this.dataStore
),
[DwnInterfaceName.Records + DwnMethodName.Read]: new RecordsReadHandler(
this.didResolver,
this.messageStore,
this.dataStore
),
[DwnInterfaceName.Records + DwnMethodName.Write]: new RecordsWriteHandler(
this.didResolver,
this.messageStore,
this.dataStore,
this.eventLog,
this.eventStream
)
};
}

Expand All @@ -82,12 +139,14 @@ export class Dwn {
await this.messageStore.open();
await this.dataStore.open();
await this.eventLog.open();
await this.eventStream?.open();
}

public async close(): Promise<void> {
this.messageStore.close();
this.dataStore.close();
this.eventLog.close();
await this.eventStream?.close();
await this.messageStore.close();
await this.dataStore.close();
await this.eventLog.close();
}

/**
Expand All @@ -96,6 +155,8 @@ export class Dwn {
*/
public async processMessage(tenant: string, rawMessage: EventsGetMessage): Promise<EventsGetReply>;
public async processMessage(tenant: string, rawMessage: EventsQueryMessage): Promise<EventsQueryReply>;
public async processMessage(
tenant: string, rawMessage: EventsSubscribeMessage, options?: EventsSubscribeMessageOptions): Promise<EventsSubscribeReply>;
public async processMessage(tenant: string, rawMessage: MessagesGetMessage): Promise<MessagesGetReply>;
public async processMessage(tenant: string, rawMessage: ProtocolsConfigureMessage): Promise<GenericMessageReply>;
public async processMessage(tenant: string, rawMessage: ProtocolsQueryMessage): Promise<ProtocolsQueryReply>;
Expand All @@ -113,13 +174,14 @@ export class Dwn {
return errorMessageReply;
}

const { dataStream } = options;
const { dataStream, subscriptionHandler } = options;

const handlerKey = rawMessage.descriptor.interface + rawMessage.descriptor.method;
const methodHandlerReply = await this.methodHandlers[handlerKey].handle({
tenant,
message: rawMessage as GenericMessage,
dataStream
dataStream,
subscriptionHandler
});

return methodHandlerReply;
Expand Down Expand Up @@ -154,6 +216,7 @@ export class Dwn {
// Verify interface and method
const dwnInterface = rawMessage?.descriptor?.interface;
const dwnMethod = rawMessage?.descriptor?.method;

if (dwnInterface === undefined || dwnMethod === undefined) {
return {
status: { code: 400, detail: `Both interface and method must be present, interface: ${dwnInterface}, method: ${dwnMethod}` }
Expand All @@ -174,10 +237,13 @@ export class Dwn {
* DWN configuration.
*/
export type DwnConfig = {
didResolver?: DidResolver,
didResolver?: DidResolver;
tenantGate?: TenantGate;

// event stream is optional if a DWN does not wish to provide subscription services.
eventStream?: EventStream;

messageStore: MessageStore;
dataStore: DataStore;
eventLog: EventLog
eventLog: EventLog;
};
3 changes: 2 additions & 1 deletion src/enums/dwn-interface-method.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ export enum DwnMethodName {
Request = 'Request',
Revoke = 'Revoke',
Write = 'Write',
Delete = 'Delete'
Delete = 'Delete',
Subscribe = 'Subscribe'
}
53 changes: 53 additions & 0 deletions src/event-log/event-emitter-stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import type { GenericMessage } from '../types/message-types.js';
import type { KeyValues } from '../types/query-types.js';
import type { EventListener, EventStream, EventSubscription } from '../types/subscriptions.js';

import { EventEmitter } from 'events';

const EVENTS_LISTENER_CHANNEL = 'events';

type EventStreamEmitterConfig = {
emitter?: EventEmitter;
};

export class EventEmitterStream implements EventStream {
private eventEmitter: EventEmitter;
private isOpen: boolean = false;

constructor(config?: EventStreamEmitterConfig) {
thehenrytsai marked this conversation as resolved.
Show resolved Hide resolved
// we capture the rejections and currently just log the errors that are produced
this.eventEmitter = config?.emitter || new EventEmitter({ captureRejections: true });
this.eventEmitter.on('error', this.eventError);
}

// we subscribe to the general `EventEmitter` error events with this handler.
// this handler is also called when there is a caught error upon emitting an event from a handler.
private eventError(error: any): void {
console.error('event emitter error', error);
};

async subscribe(id: string, listener: EventListener): Promise<EventSubscription> {
this.eventEmitter.on(EVENTS_LISTENER_CHANNEL, listener);
return {
id,
close: async (): Promise<void> => { this.eventEmitter.off(EVENTS_LISTENER_CHANNEL, listener); }
};
}

async open(): Promise<void> {
this.isOpen = true;
}

async close(): Promise<void> {
this.isOpen = false;
this.eventEmitter.removeAllListeners();
}

emit(tenant: string, message: GenericMessage, indexes: KeyValues): void {
if (!this.isOpen) {
// silently ignore
thehenrytsai marked this conversation as resolved.
Show resolved Hide resolved
return;
}
this.eventEmitter.emit(EVENTS_LISTENER_CHANNEL, tenant, message, indexes);
}
}
2 changes: 2 additions & 0 deletions src/event-log/event-log-level.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { EventLog } from '../types/event-log.js';
import type { EventStream } from '../types/subscriptions.js';
import type { ULIDFactory } from 'ulidx';
import type { Filter, KeyValues, PaginationCursor } from '../types/query-types.js';

Expand All @@ -14,6 +15,7 @@ type EventLogLevelConfig = {
*/
location?: string,
createLevelDatabase?: typeof createLevelDatabase,
eventStream?: EventStream,
};

export class EventLogLevel implements EventLog {
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/events-get.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { DidResolver } from '../did/did-resolver.js';
import type { EventLog } from '../types/event-log.js';
import type { MethodHandler } from '../types/method-handler.js';
import type { EventsGetMessage, EventsGetReply } from '../types/event-types.js';
import type { EventsGetMessage, EventsGetReply } from '../types/events-types.js';

import { EventsGet } from '../interfaces/events-get.js';
import { messageReplyFromError } from '../core/message-reply.js';
Expand Down
5 changes: 3 additions & 2 deletions src/handlers/events-query.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import type { DidResolver } from '../did/did-resolver.js';
import type { EventLog } from '../types/event-log.js';
import type { MethodHandler } from '../types/method-handler.js';
import type { EventsQueryMessage, EventsQueryReply } from '../types/event-types.js';
import type { EventsQueryMessage, EventsQueryReply } from '../types/events-types.js';

import { Events } from '../utils/events.js';
import { EventsQuery } from '../interfaces/events-query.js';
import { messageReplyFromError } from '../core/message-reply.js';
import { authenticate, authorizeOwner } from '../core/auth.js';
Expand Down Expand Up @@ -31,7 +32,7 @@ export class EventsQueryHandler implements MethodHandler {
return messageReplyFromError(e, 401);
}

const logFilters = EventsQuery.convertFilters(message.descriptor.filters);
const logFilters = Events.convertFilters(message.descriptor.filters);
const { events, cursor } = await this.eventLog.queryEvents(tenant, logFilters, message.descriptor.cursor);

return {
Expand Down
Loading