Skip to content

Commit

Permalink
records subscribe handler
Browse files Browse the repository at this point in the history
  • Loading branch information
LiranCohen committed Jan 13, 2024
1 parent 67ed273 commit 3d1e8ab
Show file tree
Hide file tree
Showing 15 changed files with 142 additions and 225 deletions.
3 changes: 2 additions & 1 deletion src/dwn.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import type { DataStore } from './types/data-store.js';
import type { EventLog } from './types/event-log.js';
import type { EventStream } from './types/subscriptions.js';
import type { MessageOptions } from './types/core-types.js';
import type { MessageStore } from './types/message-store.js';
import type { MethodHandler } from './types/method-handler.js';
import type { TenantGate } from './core/tenant-gate.js';
import type { UnionMessageReply } from './core/message-reply.js';
import type { EventsGetMessage, EventsGetReply, EventsQueryMessage, EventsQueryReply, EventsSubscribeMessage, EventsSubscribeMessageOptions, EventsSubscribeReply } from './types/events-types.js';
import type { GenericMessage, GenericMessageReply, MessageOptions } from './types/message-types.js';
import type { GenericMessage, GenericMessageReply } from './types/message-types.js';
import type { MessagesGetMessage, MessagesGetReply } from './types/messages-types.js';
import type { PermissionsGrantMessage, PermissionsRequestMessage, PermissionsRevokeMessage } from './types/permissions-types.js';
import type { ProtocolsConfigureMessage, ProtocolsQueryMessage, ProtocolsQueryReply } from './types/protocols-types.js';
Expand Down
79 changes: 0 additions & 79 deletions src/event-log/subscription.ts

This file was deleted.

2 changes: 1 addition & 1 deletion src/handlers/events-subscribe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import EventEmitter from 'events';

import type { DidResolver } from '../did/did-resolver.js';
import type { Filter } from '../types/query-types.js';
import type { GenericMessageHandler } from '../types/message-types.js';
import type { GenericMessageHandler } from '../types/core-types.js';
import type { MethodHandler } from '../types/method-handler.js';
import type { EventListener, EventStream } from '../types/subscriptions.js';
import type { EventsSubscribeMessage, EventsSubscribeReply, EventsSubscription } from '../types/events-types.js';
Expand Down
4 changes: 2 additions & 2 deletions src/handlers/records-delete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ export class RecordsDeleteHandler implements MethodHandler {
return messageReplyFromError(e, 401);
}

const recordsWrite = await RecordsWrite.getInitialWrite(existingMessages);
const indexes = recordsDelete.constructIndexes(recordsWrite);
// newestExistingMessage is always a write by this point.
const indexes = recordsDelete.constructIndexes(newestExistingMessage as RecordsWriteMessage);
const messageCid = await Message.getCid(message);
await this.messageStore.put(tenant, message, indexes);
await this.eventLog.append(tenant, messageCid, indexes);
Expand Down
6 changes: 3 additions & 3 deletions src/handlers/records-subscribe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import type { Filter } from '../types/query-types.js';
import type { MessageStore } from '../types//message-store.js';
import type { MethodHandler } from '../types/method-handler.js';
import type { EventListener, EventStream } from '../types/subscriptions.js';
import type { RecordsSubscribeMessage, RecordsSubscribeMessageHandler, RecordsSubscribeReply, RecordsSubscription } from '../types/records-types.js';
import type { RecordsHandler, RecordsSubscribeMessage, RecordsSubscribeReply, RecordsSubscription } from '../types/records-types.js';

import { authenticate } from '../core/auth.js';
import { FilterUtility } from '../utils/filter.js';
Expand All @@ -27,7 +27,7 @@ export class RecordsSubscribeHandler implements MethodHandler {
}: {
tenant: string,
message: RecordsSubscribeMessage,
handler: RecordsSubscribeMessageHandler,
handler: RecordsHandler,
}): Promise<RecordsSubscribeReply> {
let recordsSubscribe: RecordsSubscribe;
try {
Expand Down Expand Up @@ -74,7 +74,7 @@ export class RecordsSubscribeHandler implements MethodHandler {
private async createEventSubscription(
tenant: string,
messageCid: string,
handler: RecordsSubscribeMessageHandler,
handler: RecordsHandler,
filters: Filter[]
): Promise<RecordsSubscription> {

Expand Down
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
export type { DwnConfig } from './dwn.js';
export type { DidMethodResolver, DwnServiceEndpoint, ServiceEndpoint, DidDocument, DidResolutionResult, DidResolutionMetadata, DidDocumentMetadata, VerificationMethod } from './types/did-types.js';
export type { EventLog, GetEventsOptions } from './types/event-log.js';
export type { EventStream, SubscriptionReply } from './types/subscriptions.js';
export type { EventStream } from './types/subscriptions.js';
export type { EventsGetMessage, EventsGetReply, EventsSubscribeDescriptor, EventsSubscribeMessage, EventsSubscribeReply, EventsSubscription } from './types/events-types.js';
export type { Filter } from './types/query-types.js';
export type { GenericMessage, GenericMessageReply, MessageSort, Pagination, QueryResultEntry } from './types/message-types.js';
Expand Down
13 changes: 7 additions & 6 deletions src/interfaces/records-delete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,27 +71,28 @@ export class RecordsDelete extends AbstractMessage<RecordsDeleteMessage> {
}

/**
* Authorizes the delegate who signed this message.
* Authorizes the delegate who signed this message.
* Indexed properties needed for MessageStore indexing.
*/
public constructIndexes(
initialWrite: RecordsWriteMessage
latestWrite: RecordsWriteMessage
): KeyValues {
const message = this.message;
const descriptor = { ...message.descriptor };

// we add the immutable properties from the initial RecordsWrite message in order to use them when querying relevant deletes.
const { protocol, protocolPath, recipient, schema, parentId, dataFormat, dateCreated } = initialWrite.descriptor;
// we add the immutable properties from the latest RecordsWrite message in order to use them when querying relevant deletes.
const { protocol, protocolPath, recipient, schema, parentId, dataFormat, dateCreated } = latestWrite.descriptor;

// NOTE: the "trick" not may not be apparent on how a query is able to omit deleted records:
// we intentionally not add index for `isLatestBaseState` at all, this means that upon a successful delete,
// no messages with the record ID will match any query because queries by design filter by `isLatestBaseState = true`,
// `isLatestBaseState` for the initial delete would have been toggled to `false`
const indexes: { [key:string]: string | undefined } = {
const indexes: { [key:string]: string | boolean | undefined } = {
// isLatestBaseState : "true", // intentionally showing that this index is omitted
protocol, protocolPath, recipient, schema, parentId, dataFormat, dateCreated,
contextId : initialWrite.contextId,
contextId : latestWrite.contextId,
author : this.author!,
published : !!latestWrite.descriptor.published,
...descriptor
};
removeUndefinedProperties(indexes);
Expand Down
13 changes: 13 additions & 0 deletions src/types/core-types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import type { EventsHandler } from './events-types.js';
import type { Readable } from 'readable-stream';
import type { RecordsHandler } from './records-types.js';

/**
* MessageOptions that are used when processing a message.
*/
export type MessageOptions = {
dataStream?: Readable;
handler?: GenericMessageHandler;
};

export type GenericMessageHandler = EventsHandler | RecordsHandler;
11 changes: 5 additions & 6 deletions src/types/events-types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { ProtocolsQueryFilter } from './protocols-types.js';
import type { AuthorizationModel, GenericMessage, GenericMessageHandler, GenericMessageReply } from './message-types.js';
import type { AuthorizationModel, GenericMessage, GenericMessageReply } from './message-types.js';
import type { DwnInterfaceName, DwnMethodName } from '../enums/dwn-interface-method.js';
import type { RangeCriterion, RangeFilter } from './query-types.js';

Expand Down Expand Up @@ -41,18 +41,17 @@ export type EventsGetReply = GenericMessageReply & {
entries?: string[];
};


export type EventsSubscribeMessageOptions = {
handler: GenericMessageHandler;
};

export type EventsSubscribeMessage = {
authorization?: AuthorizationModel;
descriptor: EventsSubscribeDescriptor;
};

export type EventsHandler = (message: GenericMessage) => void;

export type EventsSubscribeMessageOptions = {
handler: EventsHandler;
};

export type EventsSubscription = {
id: string;
close: () => Promise<void>;
Expand Down
15 changes: 3 additions & 12 deletions src/types/message-types.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import type { DelegatedGrantMessage } from '../types/delegated-grant-message.js';
import type { EventsHandler } from './events-types.js';
import type { GeneralJws } from './jws-types.js';
import type { Readable } from 'readable-stream';
import type { RecordsSubscribeMessageHandler } from './records-types.js';
import type { SortDirection } from './query-types.js';

/**
Expand All @@ -13,14 +10,6 @@ export type GenericMessage = {
authorization?: AuthorizationModel;
};

/**
* MessageOptions that are used when processing a message.
*/
export type MessageOptions = {
dataStream?: Readable;
handler?: GenericMessageHandler;
};

/**
* The data model for the `authorization` property in a DWN message.
*/
Expand Down Expand Up @@ -78,7 +67,9 @@ export type QueryResultEntry = GenericMessage & {
encodedData?: string;
};

export type GenericMessageHandler = EventsHandler | RecordsSubscribeMessageHandler;
export type SubscriptionReply = GenericMessageReply & {
subscription?: GenericMessageSubscription;
};

export type GenericMessageSubscription = {
id: string;
Expand Down
3 changes: 2 additions & 1 deletion src/types/method-handler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { GenericMessageHandler } from './core-types.js';
import type { Readable } from 'readable-stream';
import type { GenericMessage, GenericMessageHandler, GenericMessageReply } from './message-types.js';
import type { GenericMessage, GenericMessageReply } from './message-types.js';

/**
* Interface that defines a message handler of a specific method.
Expand Down
6 changes: 3 additions & 3 deletions src/types/records-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,10 @@ export type RecordsQueryReply = GenericMessageReply & {
cursor?: string;
};

export type RecordsSubscribeMessageHandler = (message: RecordsWriteMessage | RecordsDeleteMessage) => void;
export type RecordsHandler = (message: RecordsWriteMessage | RecordsDeleteMessage) => void;

export type RecordsSubscribeMessgeOptions = {
handler: RecordsSubscribeMessageHandler;
export type RecordsSubscribeMessageOptions = {
handler: RecordsHandler;
};

export type RecordsSubscribeMessage = GenericMessage & {
Expand Down
9 changes: 2 additions & 7 deletions src/types/subscriptions.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import type { GenericMessageReply } from '../types/message-types.js';
import type { GenericMessage } from './message-types.js';
import type { KeyValues } from './query-types.js';
import type { GenericMessage, GenericMessageSubscription } from './message-types.js';

export type EventListener = (tenant: string, message: GenericMessage, indexes: KeyValues) => void;

Expand All @@ -17,8 +16,4 @@ export interface EventStream {
export interface EventSubscription {
id: string;
close: () => Promise<void>;
}

export type SubscriptionReply = GenericMessageReply & {
subscription?: GenericMessageSubscription;
};
}
2 changes: 0 additions & 2 deletions tests/handlers/records-subscribe.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,6 @@ export function testRecordsSubscribeHandler(): void {
const deleteChatReply = await dwn.processMessage(alice.did, deleteChatForBob.message);
expect(deleteChatReply.status.code).to.equal(202);

await Time.minimalSleep();

expect(messageCids.length).to.equal(2, 'after delete');
expect(messageCids[1]).to.equal(await Message.getCid(deleteChatForBob.message));
});
Expand Down
Loading

0 comments on commit 3d1e8ab

Please sign in to comment.