Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Update endpoint to use _ separator - Closes #6622 and #6666 #6760

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 4 additions & 13 deletions framework/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,13 @@
* Removal or modification of this copyright notice is prohibited.
*/

export const INTERNAL_EVENTS = Object.freeze([
'registeredToBus',
'loading:started',
'loading:finished',
'unloading:started',
'unloading:finished',
'unloading:error',
]);

export const eventWithModuleNameReg = /^([^\d][\w]+)((?::[^\d][\w]+)+)$/;
export const eventWithModuleNameReg = /^([^\d][\w]+)((?:_[^\d][\w]+)+)$/;
export const moduleNameReg = /^[a-zA-Z][a-zA-Z0-9_]*$/;
export const actionWithModuleNameReg = /^[a-zA-Z][a-zA-Z0-9_]*:[a-zA-Z][a-zA-Z0-9]*$/;
export const actionWithModuleNameReg = /^[a-zA-Z][a-zA-Z0-9_]*_[a-zA-Z][a-zA-Z0-9]*$/;
export const APP_IDENTIFIER = 'app';

export const APP_EVENT_READY = 'app:ready';
export const APP_EVENT_SHUTDOWN = 'app:shutdown';
export const APP_EVENT_READY = 'app_ready';
export const APP_EVENT_SHUTDOWN = 'app_shutdown';

export const RPC_MODES = {
IPC: 'ipc',
Expand Down
12 changes: 0 additions & 12 deletions framework/src/controller/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,6 @@ The controller defines a set of events, that each component can subscribe to:

The following events and actions are available for all enabled plugins and are at the same time accessible by all enabled plugins.

#### Events

| Event | Description |
| --------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| _plugin_:registeredToBus | Triggered when the plugin has completed registering its events and actions with the controller. So when this event is triggered, the subscriber of the event can be sure that the controller has whitelisted its requested events and actions. |
| _plugin_:loading:started | Triggered just before the controller calls the plugin’s `load` method. |
| _plugin_:loading:finished | Triggered just after the plugin’s `load` method has completed execution. |
| _plugin_:unloading:started | Triggered just before the controller calls the plugin’s `unload` method. |
| _plugin_:unloading:error | Triggered if any error occurred during the call of plugin’s `unload` method. |
| _plugin_:unloading:finished | Triggered just after the plugin’s `unload` method has completed execution. |
| app:ready | Triggered when the controller has finished initializing the plugins and each plugin has been successfully loaded. |

#### Actions

Most of the data flow will be handled through the propagation of such events.
Expand Down
9 changes: 5 additions & 4 deletions framework/src/controller/bus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import * as JSONRPC from './jsonrpc';
import { WSServer } from './ws/ws_server';
import { HTTPServer } from './http/http_server';
import { JSONRPCError } from './jsonrpc';
import { getEndpointPath } from '../endpoint';

interface BusConfiguration {
readonly httpServer?: HTTPServer;
Expand Down Expand Up @@ -230,19 +231,19 @@ export class Bus {
}

events.forEach(eventName => {
if (this._events[`${namespace}:${eventName}`] !== undefined) {
if (this._events[getEndpointPath(namespace, eventName)] !== undefined) {
throw new Error(`Event "${eventName}" already registered with bus.`);
}
this._events[`${namespace}:${eventName}`] = true;
this._events[getEndpointPath(namespace, eventName)] = true;
});
this._wsServer?.registerAllowedEvent([...this.getEvents()]);

for (const methodName of Object.keys(endpointInfo)) {
if (this._endpointInfos[`${namespace}:${methodName}`] !== undefined) {
if (this._endpointInfos[getEndpointPath(namespace, methodName)] !== undefined) {
throw new Error(`Endpoint "${methodName}" already registered with bus.`);
}

this._endpointInfos[`${namespace}:${methodName}`] = endpointInfo[methodName];
this._endpointInfos[getEndpointPath(namespace, methodName)] = endpointInfo[methodName];
}

if (options.type === ChannelType.ChildProcess && options.socketPath) {
Expand Down
18 changes: 5 additions & 13 deletions framework/src/controller/channels/base_channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,16 @@
*/

import { EventCallback } from '../event';
import { eventWithModuleNameReg, INTERNAL_EVENTS } from '../../constants';
import { eventWithModuleNameReg } from '../../constants';
import { EndpointHandlers } from '../../types';
import { Logger } from '../../logger';

export interface BaseChannelOptions {
[key: string]: unknown;
readonly skipInternalEvents?: boolean;
}

export abstract class BaseChannel {
public readonly eventsList: ReadonlyArray<string>;
public readonly endpointsList: ReadonlyArray<string>;
public readonly namespace: string;

protected readonly endpointHandlers: EndpointHandlers;
protected readonly options: Record<string, unknown>;
protected readonly _logger: Logger;
private _requestId: number;

Expand All @@ -37,13 +31,11 @@ export abstract class BaseChannel {
namespace: string,
events: ReadonlyArray<string>,
endpoints: EndpointHandlers,
options: BaseChannelOptions = {},
) {
this._logger = logger;
this.namespace = namespace;
this.options = options;

this.eventsList = options.skipInternalEvents ? events : [...events, ...INTERNAL_EVENTS];
this.eventsList = events;

this.endpointHandlers = {};
this._requestId = 0;
Expand Down Expand Up @@ -79,18 +71,18 @@ export abstract class BaseChannel {
}

// Listen to any event happening in the application
// Specified as moduleName:eventName
// Specified as moduleName_eventName
// If its related to your own moduleName specify as :eventName
abstract subscribe(eventName: string, cb: EventCallback): void;
abstract unsubscribe(eventName: string, cb: EventCallback): void;

// Publish the event on the channel
// Specified as moduleName:eventName
// Specified as moduleName_eventName
// If its related to your own moduleName specify as :eventName
abstract publish(eventName: string, data?: Record<string, unknown>): void;

// Call action of any moduleName through controller
// Specified as moduleName:actionName
// Specified as moduleName_actionName
abstract invoke<T>(actionName: string, params?: Record<string, unknown>): Promise<T>;

abstract registerToBus(arg: unknown): Promise<void>;
Expand Down
5 changes: 2 additions & 3 deletions framework/src/controller/channels/in_memory_channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { StateStore } from '@liskhq/lisk-chain';
import { KVStore } from '@liskhq/lisk-db';
import { Event, EventCallback } from '../event';
import { Request } from '../request';
import { BaseChannel, BaseChannelOptions } from './base_channel';
import { BaseChannel } from './base_channel';
import { Bus } from '../bus';
import * as JSONRPC from '../jsonrpc/types';
import { ChannelType, EndpointHandlers } from '../../types';
Expand All @@ -33,9 +33,8 @@ export class InMemoryChannel extends BaseChannel {
namespace: string,
events: ReadonlyArray<string>,
endpoints: EndpointHandlers,
options: BaseChannelOptions = {},
) {
super(logger, namespace, events, endpoints, options);
super(logger, namespace, events, endpoints);
this._db = db;
}

Expand Down
6 changes: 3 additions & 3 deletions framework/src/controller/channels/ipc_channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ import { EventEmitter2, ListenerFn } from 'eventemitter2';
import { join } from 'path';
import { Request } from '../request';
import { Event } from '../event';
import { BaseChannel, BaseChannelOptions } from './base_channel';
import { BaseChannel } from './base_channel';
import { IPCClient } from '../ipc/ipc_client';
import { EndpointInfo, ChannelType, EndpointHandlers } from '../../types';
import * as JSONRPC from '../jsonrpc';
import { IPC_EVENTS } from '../constants';
import { Logger } from '../../logger';

interface ChildProcessOptions extends BaseChannelOptions {
interface ChildProcessOptions {
socketsPath: string;
}

Expand All @@ -40,7 +40,7 @@ export class IPCChannel extends BaseChannel {
endpoints: EndpointHandlers,
options: ChildProcessOptions,
) {
super(logger, namespace, events, endpoints, options);
super(logger, namespace, events, endpoints);

this._ipcClient = new IPCClient({
socketsDir: options.socketsPath,
Expand Down
27 changes: 18 additions & 9 deletions framework/src/controller/child_process_loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// Parameters passed by `child_process.fork(_, parameters)`

import { join } from 'path';
import { createLogger } from '../logger';
import { createLogger, Logger } from '../logger';
import { getEndpointHandlers } from '../endpoint';
import { BasePlugin } from '../plugins/base_plugin';
import { systemDirs } from '../system_dirs';
Expand All @@ -29,6 +29,7 @@ const moduleExportName: string = process.argv[3];
const Klass: InstantiablePlugin = require(modulePath)[moduleExportName];
let channel: IPCChannel;
let plugin: BasePlugin;
let logger: Logger;

const _loadPlugin = async (
config: Record<string, unknown>,
Expand All @@ -42,11 +43,11 @@ const _loadPlugin = async (
const pluginName = plugin.name;

const dirs = systemDirs(appConfig.label, appConfig.rootPath);
const logger = createLogger({
logger = createLogger({
consoleLogLevel: appConfig.logger.consoleLogLevel,
fileLogLevel: appConfig.logger.fileLogLevel,
logFilePath: join(dirs.logs, `plugin-${pluginName}.log`),
module: `plugin:${pluginName}`,
module: `plugin_${pluginName}`,
});

channel = new IPCChannel(
Expand All @@ -61,27 +62,35 @@ const _loadPlugin = async (

await channel.registerToBus();

channel.publish(`${pluginName}:registeredToBus`);
channel.publish(`${pluginName}:loading:started`);
logger.debug({ plugin: pluginName }, 'Plugin is registered to bus');

await plugin.init({ appConfig, channel, config, logger });
await plugin.load(channel);

channel.publish(`${pluginName}:loading:finished`);
logger.debug({ plugin: pluginName }, 'Plugin is successfully loaded');
if (process.send) {
process.send({ action: 'loaded' });
}
};

const _unloadPlugin = async (code = 0) => {
const pluginName = plugin.name;

channel.publish(`${pluginName}:unloading:started`);
logger.debug({ plugin: pluginName }, 'Unloading plugin');
try {
await plugin.unload();
channel.publish(`${pluginName}:unloading:finished`);
logger.debug({ plugin: pluginName }, 'Successfully unloaded plugin');
channel.cleanup();
if (process.send) {
process.send({ action: 'unloaded' });
}
process.exit(code);
} catch (error) {
channel.publish(`${pluginName}:unloading:error`, error);
logger.debug({ plugin: pluginName, err: error as Error }, 'Fail to unload plugin');
channel.cleanup();
if (process.send) {
process.send({ action: 'unloadedWithError', err: error as Error });
}
process.exit(1);
}
};
Expand Down
54 changes: 28 additions & 26 deletions framework/src/controller/controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ export class Controller {
APP_IDENTIFIER,
arg.events,
arg.endpoints,
{ skipInternalEvents: true },
);
}

Expand Down Expand Up @@ -287,17 +286,14 @@ export class Controller {
plugin.endpoint ? getEndpointHandlers(plugin.endpoint) : {},
);
await channel.registerToBus(this._bus);
channel.publish(`${name}:registeredToBus`);
channel.publish(`${name}:loading:started`);
this._logger.debug({ plugin: name }, 'Plugin is registered to bus');

await plugin.init({ config, channel, appConfig, logger: this._logger });
await plugin.load(channel);

channel.publish(`${name}:loading:finished`);
this._logger.debug({ plugin: name }, 'Plugin is successfully loaded');

this._inMemoryPlugins[name] = { plugin, channel };

this._logger.info(name, 'Loaded in-memory plugin');
}

private async _loadChildProcessPlugin(
Expand Down Expand Up @@ -347,9 +343,11 @@ export class Controller {

await Promise.race([
new Promise<void>(resolve => {
this.channel.once(`${name}:loading:finished`, () => {
this._logger.info({ name }, 'Loaded child-process plugin');
resolve();
child.on('message', ({ action }: { action: string }) => {
if (action === 'loaded') {
this._logger.info({ name }, 'Loaded child-process plugin');
resolve();
}
});
}),
new Promise((_, reject) => {
Expand All @@ -361,12 +359,12 @@ export class Controller {
}

private async _unloadInMemoryPlugin(name: string): Promise<void> {
this._inMemoryPlugins[name].channel.publish(`${name}:unloading:started`);
this._logger.debug({ plugin: name }, 'Unloading plugin');
try {
await this._inMemoryPlugins[name].plugin.unload();
this._inMemoryPlugins[name].channel.publish(`${name}:unloading:finished`);
this._logger.debug({ plugin: name }, 'Successfully unloaded plugin');
} catch (error) {
this._inMemoryPlugins[name].channel.publish(`${name}:unloading:error`, error);
this._logger.debug({ plugin: name, err: error as Error }, 'Fail to unload plugin');
} finally {
delete this._inMemoryPlugins[name];
}
Expand All @@ -384,20 +382,24 @@ export class Controller {
});

await Promise.race([
new Promise<void>(resolve => {
this.channel.once(`${name}:unloading:finished`, () => {
this._logger.info(`Child process plugin "${name}" unloaded`);
delete this._childProcesses[name];
resolve();
});
}),
new Promise((_, reject) => {
this.channel.once(`${name}:unloading:error`, data => {
this._logger.info(`Child process plugin "${name}" unloaded with error`);
this._logger.error(data ?? {}, 'Unloading plugin error.');
delete this._childProcesses[name];
reject(data);
});
new Promise<void>((resolve, reject) => {
this._childProcesses[name].on(
'message',
({ action, err }: { action: string; err?: Error }) => {
if (action !== 'unloaded' && action !== 'unloadedWithError') {
return;
}
delete this._childProcesses[name];
if (action === 'unloaded') {
this._logger.info(`Child process plugin "${name}" unloaded`);
resolve();
} else {
this._logger.info(`Child process plugin "${name}" unloaded with error`);
this._logger.error({ err }, 'Unloading plugin error.');
reject(err);
}
},
);
}),
new Promise((_, reject) => {
setTimeout(() => {
Expand Down
9 changes: 5 additions & 4 deletions framework/src/controller/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import { strict as assert } from 'assert';
import { eventWithModuleNameReg } from '../constants';
import { getEndpointPath } from '../endpoint';
import { NotificationRequest, VERSION } from './jsonrpc';

export type EventCallback = (data?: Record<string, unknown>) => void | Promise<void>;
Expand All @@ -31,9 +32,9 @@ export class Event {
`Event name "${name}" must be a valid name with module name and event name.`,
);

const [moduleName, ...eventName] = name.split(':');
const [moduleName, ...eventName] = name.split('_');
this.module = moduleName;
this.name = eventName.join(':');
this.name = eventName.join('_');
this.data = data;
}

Expand All @@ -47,12 +48,12 @@ export class Event {
public toJSONRPCNotification(): NotificationRequest {
return {
jsonrpc: VERSION,
method: `${this.module}:${this.name}`,
method: getEndpointPath(this.module, this.name),
params: this.data,
};
}

public key(): string {
return `${this.module}:${this.name}`;
return getEndpointPath(this.module, this.name);
}
}
Loading