Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Commit

Permalink
Merge pull request #7358 from LiskHQ/7301-update-event-queue-wrapper
Browse files Browse the repository at this point in the history
Update the event queue wrapper - Closes #7301
  • Loading branch information
shuse2 authored Aug 3, 2022
2 parents 5b1877d + f3a1c3e commit ef942e7
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 53 deletions.
17 changes: 0 additions & 17 deletions framework/src/state_machine/api_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
* Removal or modification of this copyright notice is prohibited.
*/

import { EVENT_STANDARD_TYPE_ID } from '@liskhq/lisk-chain';
import { EventQueue } from './event_queue';
import { PrefixedStateReadWriter, StateDBReadWriter } from './prefixed_state_read_writer';
import { SubStore, ImmutableSubStore, ImmutableAPIContext, EventQueueAdder } from './types';
Expand All @@ -38,22 +37,6 @@ export const createImmutableAPIContext = (
immutableSubstoreGetter.getStore(moduleID, storePrefix),
});

export const wrapEventQueue = (eventQueue: EventQueue, topic: Buffer): EventQueueAdder => ({
add: (
moduleID: Buffer,
typeID: Buffer,
data: Buffer,
topics?: Buffer[],
noRevert?: boolean,
): void => {
if (typeID.equals(EVENT_STANDARD_TYPE_ID)) {
throw new Error('Event type ID 0 is reserved for standard event.');
}
const topicsWithDefault = [topic, ...(topics ?? [])];
eventQueue.add(moduleID, typeID, data, topicsWithDefault, noRevert);
},
});

export class APIContext {
private readonly _stateStore: PrefixedStateReadWriter;
private readonly _eventQueue: EventQueueAdder;
Expand Down
14 changes: 7 additions & 7 deletions framework/src/state_machine/block_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import { Transaction } from '@liskhq/lisk-chain';
import { Logger } from '../logger';
import { createAPIContext, createImmutableAPIContext, wrapEventQueue } from './api_context';
import { createAPIContext, createImmutableAPIContext } from './api_context';
import { EVENT_INDEX_AFTER_TRANSACTIONS, EVENT_INDEX_BEFORE_TRANSACTIONS } from './constants';
import { EventQueue } from './event_queue';
import { PrefixedStateReadWriter } from './prefixed_state_read_writer';
Expand Down Expand Up @@ -98,13 +98,13 @@ export class BlockContext {
}

public getBlockExecuteContext(): BlockExecuteContext {
const wrappedEventQueue = wrapEventQueue(this._eventQueue, EVENT_INDEX_BEFORE_TRANSACTIONS);
const childQueue = this._eventQueue.getChildQueue(EVENT_INDEX_BEFORE_TRANSACTIONS);
return {
logger: this._logger,
networkIdentifier: this._networkIdentifier,
eventQueue: wrappedEventQueue,
eventQueue: childQueue,
getAPIContext: () =>
createAPIContext({ stateStore: this._stateStore, eventQueue: wrappedEventQueue }),
createAPIContext({ stateStore: this._stateStore, eventQueue: childQueue }),
getStore: (moduleID: Buffer, storePrefix: number) =>
this._stateStore.getStore(moduleID, storePrefix),
header: this._header,
Expand All @@ -120,13 +120,13 @@ export class BlockContext {
if (!this._transactions) {
throw new Error('Cannot create block after execute context without transactions');
}
const wrappedEventQueue = wrapEventQueue(this._eventQueue, EVENT_INDEX_AFTER_TRANSACTIONS);
const childQueue = this._eventQueue.getChildQueue(EVENT_INDEX_AFTER_TRANSACTIONS);
return {
logger: this._logger,
networkIdentifier: this._networkIdentifier,
eventQueue: wrappedEventQueue,
eventQueue: childQueue,
getAPIContext: () =>
createAPIContext({ stateStore: this._stateStore, eventQueue: wrappedEventQueue }),
createAPIContext({ stateStore: this._stateStore, eventQueue: childQueue }),
getStore: (moduleID: Buffer, storePrefix: number) =>
this._stateStore.getStore(moduleID, storePrefix),
header: this._header,
Expand Down
43 changes: 35 additions & 8 deletions framework/src/state_machine/event_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@
* Removal or modification of this copyright notice is prohibited.
*/

import { Event, EVENT_MAX_EVENT_SIZE_BYTES, EVENT_MAX_TOPICS_PER_EVENT } from '@liskhq/lisk-chain';
import {
Event,
EVENT_MAX_EVENT_SIZE_BYTES,
EVENT_MAX_TOPICS_PER_EVENT,
EVENT_STANDARD_TYPE_ID,
} from '@liskhq/lisk-chain';

interface RevertibleEvent {
event: Event;
Expand All @@ -21,43 +26,65 @@ interface RevertibleEvent {

export class EventQueue {
private readonly _events: RevertibleEvent[];
private readonly _defaultTopics: Buffer[];

public constructor() {
this._events = [];
public constructor(events?: RevertibleEvent[], defaultTopics?: Buffer[]) {
this._events = events ?? [];
this._defaultTopics = defaultTopics ?? [];
}

public add(
moduleID: Buffer,
typeID: Buffer,
data: Buffer,
topics: Buffer[],
topics?: Buffer[],
noRevert?: boolean,
): void {
const allTopics = [...this._defaultTopics, ...(topics ?? [])];
if (data.length > EVENT_MAX_EVENT_SIZE_BYTES) {
throw new Error(
`Max size of event data is ${EVENT_MAX_EVENT_SIZE_BYTES} but received ${data.length}`,
);
}
if (!topics.length) {
if (!allTopics.length) {
throw new Error('Topics must have at least one element.');
}
if (topics.length > EVENT_MAX_TOPICS_PER_EVENT) {
if (allTopics.length > EVENT_MAX_TOPICS_PER_EVENT) {
throw new Error(
`Max topics per event is ${EVENT_MAX_TOPICS_PER_EVENT} but received ${topics.length}`,
`Max topics per event is ${EVENT_MAX_TOPICS_PER_EVENT} but received ${allTopics.length}`,
);
}
if (typeID.equals(EVENT_STANDARD_TYPE_ID)) {
throw new Error('Event type ID 0 is reserved for standard event.');
}
this.unsafeAdd(moduleID, typeID, data, topics, noRevert);
}

public unsafeAdd(
moduleID: Buffer,
typeID: Buffer,
data: Buffer,
topics?: Buffer[],
noRevert?: boolean,
): void {
const allTopics = [...this._defaultTopics, ...(topics ?? [])];
this._events.push({
event: new Event({
moduleID,
typeID,
index: this._events.length,
data,
topics,
topics: allTopics,
}),
noRevert: noRevert ?? false,
});
}

public getChildQueue(topicID: Buffer): EventQueue {
const allTopics = [...this._defaultTopics, topicID];
return new EventQueue(this._events, allTopics);
}

public createSnapshot(): number {
return this._events.length;
}
Expand Down
16 changes: 7 additions & 9 deletions framework/src/state_machine/genesis_block_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
*/

import { Logger } from '../logger';
import { APIContext, wrapEventQueue } from './api_context';
import { APIContext } from './api_context';
import { EVENT_INDEX_FINALIZE_GENESIS_STATE, EVENT_INDEX_INIT_GENESIS_STATE } from './constants';
import { EventQueue } from './event_queue';
import { PrefixedStateReadWriter } from './prefixed_state_read_writer';
Expand Down Expand Up @@ -48,11 +48,10 @@ export class GenesisBlockContext {
}

public createInitGenesisStateContext(): GenesisBlockExecuteContext {
const wrappedEventQueue = wrapEventQueue(this._eventQueue, EVENT_INDEX_INIT_GENESIS_STATE);
const childQueue = this._eventQueue.getChildQueue(EVENT_INDEX_INIT_GENESIS_STATE);
return {
eventQueue: wrappedEventQueue,
getAPIContext: () =>
new APIContext({ stateStore: this._stateStore, eventQueue: wrappedEventQueue }),
eventQueue: childQueue,
getAPIContext: () => new APIContext({ stateStore: this._stateStore, eventQueue: childQueue }),
getStore: (moduleID: Buffer, storePrefix: number) =>
this._stateStore.getStore(moduleID, storePrefix),
header: this._header,
Expand All @@ -76,11 +75,10 @@ export class GenesisBlockContext {
}

public createFinalizeGenesisStateContext(): GenesisBlockExecuteContext {
const wrappedEventQueue = wrapEventQueue(this._eventQueue, EVENT_INDEX_FINALIZE_GENESIS_STATE);
const childQueue = this._eventQueue.getChildQueue(EVENT_INDEX_FINALIZE_GENESIS_STATE);
return {
eventQueue: wrappedEventQueue,
getAPIContext: () =>
new APIContext({ stateStore: this._stateStore, eventQueue: wrappedEventQueue }),
eventQueue: childQueue,
getAPIContext: () => new APIContext({ stateStore: this._stateStore, eventQueue: childQueue }),
getStore: (moduleID: Buffer, storePrefix: number) =>
this._stateStore.getStore(moduleID, storePrefix),
header: this._header,
Expand Down
4 changes: 2 additions & 2 deletions framework/src/state_machine/state_machine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ export class StateMachine {
const commandContext = ctx.createCommandExecuteContext(command.schema);
try {
await command.execute(commandContext);
ctx.eventQueue.add(
ctx.eventQueue.unsafeAdd(
ctx.transaction.moduleID,
EVENT_STANDARD_TYPE_ID,
codec.encode(standardEventDataSchema, { success: true }),
Expand All @@ -183,7 +183,7 @@ export class StateMachine {
} catch (error) {
ctx.eventQueue.restoreSnapshot(commandEventQueueSnapshotID);
ctx.stateStore.restoreSnapshot(commandStateStoreSnapshotID);
ctx.eventQueue.add(
ctx.eventQueue.unsafeAdd(
ctx.transaction.moduleID,
EVENT_STANDARD_TYPE_ID,
codec.encode(standardEventDataSchema, { success: false }),
Expand Down
12 changes: 6 additions & 6 deletions framework/src/state_machine/transaction_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import { Transaction } from '@liskhq/lisk-chain';
import { codec, Schema } from '@liskhq/lisk-codec';
import { Logger } from '../logger';
import { createAPIContext, createImmutableAPIContext, wrapEventQueue } from './api_context';
import { createAPIContext, createImmutableAPIContext } from './api_context';
import { EventQueue } from './event_queue';
import { PrefixedStateReadWriter } from './prefixed_state_read_writer';
import {
Expand Down Expand Up @@ -87,13 +87,13 @@ export class TransactionContext {
if (!this._assets) {
throw new Error('Transaction Execution requires block assets in the context.');
}
const wrappedEventQueue = wrapEventQueue(this._eventQueue, this._transaction.id);
const childQueue = this._eventQueue.getChildQueue(this._transaction.id);
return {
logger: this._logger,
networkIdentifier: this._networkIdentifier,
eventQueue: wrappedEventQueue,
eventQueue: childQueue,
getAPIContext: () =>
createAPIContext({ stateStore: this._stateStore, eventQueue: wrappedEventQueue }),
createAPIContext({ stateStore: this._stateStore, eventQueue: childQueue }),
getStore: (moduleID: Buffer, storePrefix: number) =>
this._stateStore.getStore(moduleID, storePrefix),
header: this._header,
Expand Down Expand Up @@ -130,14 +130,14 @@ export class TransactionContext {
if (!this._assets) {
throw new Error('Transaction Execution requires block assets in the context.');
}
const wrappedEventQueue = wrapEventQueue(this._eventQueue, this._transaction.id);
const childQueue = this._eventQueue.getChildQueue(this._transaction.id);
return {
logger: this._logger,
networkIdentifier: this._networkIdentifier,
// TODO: Need to pass wrapper of eventQueue with possibility to create/restore snapshot https://github.com/LiskHQ/lisk-sdk/issues/7211
eventQueue: this.eventQueue,
getAPIContext: () =>
createAPIContext({ stateStore: this._stateStore, eventQueue: wrappedEventQueue }),
createAPIContext({ stateStore: this._stateStore, eventQueue: childQueue }),
getStore: (moduleID: Buffer, storePrefix: number) =>
this._stateStore.getStore(moduleID, storePrefix),
header: this._header,
Expand Down
15 changes: 12 additions & 3 deletions framework/test/unit/state_machine/event_queue.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ describe('EventQueue', () => {

it('should be able to add events to queue', () => {
// Act
events.map(e => eventQueue.add(e.moduleID, e.typeID, e.data, e.topics));
events.map(e => eventQueue.unsafeAdd(e.moduleID, e.typeID, e.data, e.topics));
const addedEvents = eventQueue.getEvents();

// Asset
Expand All @@ -104,8 +104,17 @@ describe('EventQueue', () => {
});
});

it('should be able to get events from child queue', () => {
for (const e of events) {
eventQueue.unsafeAdd(e.moduleID, e.typeID, e.data, e.topics);
}
const childQueue = eventQueue.getChildQueue(events[0].topics[0]);

expect(childQueue.getEvents()).toHaveLength(events.length);
});

it('should return original set of events when create and restore snapshot', () => {
events.map(e => eventQueue.add(e.moduleID, e.typeID, e.data, e.topics));
events.map(e => eventQueue.unsafeAdd(e.moduleID, e.typeID, e.data, e.topics));
expect(eventQueue.getEvents()).toHaveLength(events.length);

const snapshotID = eventQueue.createSnapshot();
Expand All @@ -125,7 +134,7 @@ describe('EventQueue', () => {
});

it('should maintain new nonRevertible events when restoring the snapshot', () => {
events.map(e => eventQueue.add(e.moduleID, e.typeID, e.data, e.topics));
events.map(e => eventQueue.unsafeAdd(e.moduleID, e.typeID, e.data, e.topics));
expect(eventQueue.getEvents()).toHaveLength(events.length);

const snapshotID = eventQueue.createSnapshot();
Expand Down
2 changes: 1 addition & 1 deletion framework/test/unit/state_machine/state_machine.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ describe('state_machine', () => {
},
];
for (const e of events) {
eventQueue.add(e.moduleID, e.typeID, e.data, e.topics);
eventQueue.unsafeAdd(e.moduleID, e.typeID, e.data, e.topics);
}

mod.beforeCommandExecute.mockImplementation(() => {
Expand Down

0 comments on commit ef942e7

Please sign in to comment.