Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EventsSubscribe #658

Merged
merged 44 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
2eaa9c8
Squashed commit of record and event subscriptions base.
LiranCohen Dec 21, 2023
b5c904e
slight refactor
LiranCohen Dec 21, 2023
c5cf75c
add timeouts to tests
LiranCohen Dec 22, 2023
727dc73
test delegate grant
LiranCohen Dec 22, 2023
cc60ed6
signal when record is updated
LiranCohen Dec 23, 2023
e33e7cb
update notifier signature
LiranCohen Dec 29, 2023
2942d63
handler signature should just be the incoming message, ignoring previ…
LiranCohen Jan 3, 2024
8b39c8c
update tests and comments
LiranCohen Jan 4, 2024
51d0a12
update events filters
LiranCohen Jan 5, 2024
fd28d09
refactor EventStream interface, added new test scaffolding
LiranCohen Jan 9, 2024
a09504a
add error handler to records subscription
LiranCohen Jan 9, 2024
9d891cd
remove uneeded event subsribe testing until delegating eEventsQuery/G…
LiranCohen Jan 9, 2024
0a193e7
add error handling to general subscriptions
LiranCohen Jan 10, 2024
edff0d2
emit unknown error
LiranCohen Jan 10, 2024
e2cf628
fix circular deps
LiranCohen Jan 10, 2024
5c73cba
update interface naming and add comments
LiranCohen Jan 10, 2024
786586e
rip out records subscribe
LiranCohen Jan 10, 2024
ee71088
simplify the EventStream interface and logic
LiranCohen Jan 11, 2024
5eb7dbd
the subsciription message handler should come in through MessageOptions
LiranCohen Jan 13, 2024
c82c6e4
should add the latest write published status to the delete index
LiranCohen Jan 13, 2024
7cc34b6
revert an invisible change
LiranCohen Jan 13, 2024
256d89c
remove unecessary class
LiranCohen Jan 14, 2024
e04dc12
rename to EventEmitterStream
LiranCohen Jan 14, 2024
22cbbb4
scaffold testing
LiranCohen Jan 16, 2024
904e299
clean up handler functionality
LiranCohen Jan 16, 2024
3e4e024
event emitter stream tests
LiranCohen Jan 16, 2024
95cb72a
EventStream is optional
LiranCohen Jan 16, 2024
cbeca31
simplify events subscribe handler and add test coverage
LiranCohen Jan 16, 2024
a5a6cac
clean up interface and increase coverage
LiranCohen Jan 16, 2024
12c067e
add more filter tests
LiranCohen Jan 16, 2024
27c110f
remove console logs
LiranCohen Jan 16, 2024
de7a73c
update after rbase
LiranCohen Jan 17, 2024
fcdbf70
remove unecessary duplicate type/interface
LiranCohen Jan 17, 2024
bd6d341
review updates
LiranCohen Jan 17, 2024
50612f9
rename classes and properties suggested by review comments
LiranCohen Jan 17, 2024
9a8a0dc
fix isRecordsFilter method and conversion/normalization
LiranCohen Jan 18, 2024
4256e8d
review updates
LiranCohen Jan 18, 2024
d92f3ad
review suggestions, added more validation to parse
LiranCohen Jan 18, 2024
041df8a
add coverage
LiranCohen Jan 18, 2024
8e5ecb5
simplified more code, added comments, updated test
LiranCohen Jan 18, 2024
d21ab28
improve coverage
LiranCohen Jan 18, 2024
c29d4fa
review updates, added more test scenarios
LiranCohen Jan 18, 2024
b583140
scenario tests, review comment updates
LiranCohen Jan 19, 2024
19101e7
version bump
LiranCohen Jan 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions src/event-log/event-emitter-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,13 @@ import { EventEmitter } from 'events';

const EVENTS_LISTENER_CHANNEL = 'events';

type EventStreamEmitterConfig = {
emitter?: EventEmitter;
};

export class EventEmitterStream implements EventStream {
private eventEmitter: EventEmitter;
private isOpen: boolean = false;

constructor(config?: EventStreamEmitterConfig) {
constructor() {
// we capture the rejections and currently just log the errors that are produced
this.eventEmitter = config?.emitter || new EventEmitter({ captureRejections: true });
this.eventEmitter = new EventEmitter({ captureRejections: true });
this.eventEmitter.on('error', this.eventError);
}

Expand Down
4 changes: 2 additions & 2 deletions src/interfaces/records-delete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ export class RecordsDelete extends AbstractMessage<RecordsDeleteMessage> {
}

/**
* Indexed properties needed for MessageStore indexing.
*/
* Indexed properties needed for MessageStore indexing.
*/
public constructIndexes(
initialWrite: RecordsWriteMessage,
): KeyValues {
Expand Down
3 changes: 3 additions & 0 deletions src/types/events-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { AuthorizationModel, GenericMessage, GenericMessageReply, MessageSu
import type { DwnInterfaceName, DwnMethodName } from '../enums/dwn-interface-method.js';
import type { PaginationCursor, RangeCriterion, RangeFilter } from './query-types.js';

// filters used when filtering for any type of Message across interfaces
export type EventsMessageFilter = {
interface?: string;
method?: string;
Expand All @@ -23,6 +24,8 @@ export type EventsRecordsFilter = {
dateCreated?: RangeCriterion;
};

// a union type of the different types of filters a user can use when issuing an EventsQuery or EventsSubscribe
thehenrytsai marked this conversation as resolved.
Show resolved Hide resolved
//TODO: simplify the EventsFilters to only the necessary in order to reduce complexity https://github.com/TBD54566975/dwn-sdk-js/issues/663
export type EventsFilter = EventsMessageFilter | EventsRecordsFilter | ProtocolsQueryFilter;
thehenrytsai marked this conversation as resolved.
Show resolved Hide resolved

export type EventsGetDescriptor = {
Expand Down
11 changes: 5 additions & 6 deletions tests/event-log/event-emitter-stream.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { EventEmitter } from 'events';

import type { EventEmitter } from 'events';
import type { MessageStore } from '../../src/index.js';

import { EventEmitterStream } from '../../src/event-log/event-emitter-stream.js';
Expand Down Expand Up @@ -37,8 +36,8 @@ describe('EventEmitterStream', () => {
});

it('should remove listeners when `close` method is used', async () => {
const emitter = new EventEmitter();
eventStream = new EventEmitterStream({ emitter });
eventStream = new EventEmitterStream();
const emitter = (eventStream as any).eventEmitter as EventEmitter;
thehenrytsai marked this conversation as resolved.
Show resolved Hide resolved

// count the `events` listeners, which represents all listeners
expect(emitter.listenerCount('events')).to.equal(0);
Expand All @@ -53,8 +52,8 @@ describe('EventEmitterStream', () => {

it('logs message when the emitter experiences an error', async () => {
const eventErrorSpy = sinon.spy(EventEmitterStream.prototype as any, 'eventError');
const emitter = new EventEmitter({ captureRejections: true });
eventStream = new EventEmitterStream({ emitter });
eventStream = new EventEmitterStream();
const emitter = (eventStream as any).eventEmitter as EventEmitter;
emitter.emit('error', new Error('random error'));
expect(eventErrorSpy.callCount).to.equal(1);
});
Expand Down
22 changes: 16 additions & 6 deletions tests/event-log/event-stream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,20 @@ describe('EventStream', () => {
});

it('emits all messages to each subscriptions', async () => {
thehenrytsai marked this conversation as resolved.
Show resolved Hide resolved
const messageCids: string[] = [];
const handler = async (_tenant: string, message: GenericMessage, _indexes: KeyValues): Promise<void> => {
const messageCids1: string[] = [];
const handler1 = async (_tenant: string, message: GenericMessage, _indexes: KeyValues): Promise<void> => {
const messageCid = await Message.getCid(message);
messageCids.push(messageCid);
messageCids1.push(messageCid);
};
const subcription = await eventStream.subscribe('sub-1', handler);

const messageCids2: string[] = [];
const handler2 = async (_tenant: string, message: GenericMessage, _indexes: KeyValues): Promise<void> => {
const messageCid = await Message.getCid(message);
messageCids2.push(messageCid);
};

const subscription1 = await eventStream.subscribe('sub-1', handler1);
const subscription2 = await eventStream.subscribe('sub-2', handler2);

const message1 = await TestDataGenerator.generateRecordsWrite({});
const message1Cid = await Message.getCid(message1.message);
Expand All @@ -40,11 +48,13 @@ describe('EventStream', () => {
const message3Cid = await Message.getCid(message3.message);
eventStream.emit('did:alice', message3.message, {});

await subcription.close();
await subscription1.close();
await subscription2.close();

await Time.minimalSleep();

expect(messageCids).to.have.members([ message1Cid, message2Cid, message3Cid ]);
expect(messageCids1).to.have.members([ message1Cid, message2Cid, message3Cid ]);
expect(messageCids2).to.have.members([ message1Cid, message2Cid, message3Cid ]);
});

it('does not emit messages if subscription is closed', async () => {
Expand Down
7 changes: 6 additions & 1 deletion tests/interfaces/events-subscribe.spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { authorizeOwner } from '../../src/core/auth.js';
import { EventsSubscribe } from '../../src/interfaces/events-subscribe.js';
import { DidKeyResolver, DwnInterfaceName, DwnMethodName, Jws, Time } from '../../src/index.js';

Expand All @@ -8,15 +9,19 @@ describe('EventsSubscribe', () => {
it('should be able to create and authorize EventsSubscribe', async () => {
thehenrytsai marked this conversation as resolved.
Show resolved Hide resolved
const alice = await DidKeyResolver.generate();
const timestamp = Time.getCurrentTimestamp();
const { message } = await EventsSubscribe.create({
const eventsSubscribe = await EventsSubscribe.create({
signer : Jws.createSigner(alice),
messageTimestamp : timestamp,
});

const message = eventsSubscribe.message;
expect(message.descriptor.interface).to.eql(DwnInterfaceName.Events);
expect(message.descriptor.method).to.eql(DwnMethodName.Subscribe);
expect(message.authorization).to.exist;
expect(message.descriptor.messageTimestamp).to.equal(timestamp);

// EventsSubscribe authorizes against owner
await authorizeOwner(alice.did, eventsSubscribe);
});
});
});
4 changes: 2 additions & 2 deletions tests/scenarios/subscriptions.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -266,11 +266,11 @@ export function testSubscriptionScenarios(): void {
expect(write2Proto2Response.status.code).equals(202);

// proto1 messages from handler do not change.
expect(proto1Messages.length).to.equal(1, 'proto1 after subscription.off()');
expect(proto1Messages.length).to.equal(1, 'proto1 after subscription.close()');
expect(proto1Messages).to.include(await Message.getCid(write1proto1.message));

//proto2 messages from handler have the new message.
expect(proto2Messages.length).to.equal(2, 'proto2 after subscription.off()');
expect(proto2Messages.length).to.equal(2, 'proto2 after subscription.close()');
expect(proto2Messages).to.have.members([await Message.getCid(write1proto2.message), await Message.getCid(write2proto2.message)]);
});

Expand Down
10 changes: 5 additions & 5 deletions tests/test-event-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,24 @@ import type { EventStream } from '../src/index.js';
import { EventEmitterStream } from '../src/index.js';

/**
* Class that manages store implementations for testing.
* Class that manages the EventStream implementation for testing.
* This is intended to be extended as the single point of configuration
* that allows different store implementations to be swapped in
* to test compatibility with default/built-in store implementations.
* that allows different EventStream implementations to be swapped in
* to test compatibility with default/built-in implementation.
*/
export class TestEventStream {
private static eventStream?: EventStream;

/**
* Overrides test stores with given implementation.
* Overrides the event stream with a given implementation.
* If not given, default implementation will be used.
*/
public static override(overrides?: { eventStream?: EventStream }): void {
TestEventStream.eventStream = overrides?.eventStream;
}

/**
* Initializes and return the stores used for running the test suite.
* Initializes and returns the event stream used for running the test suite.
*/
public static get(): EventStream {
TestEventStream.eventStream ??= new EventEmitterStream();
Expand Down