Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Commit

Permalink
Merge pull request #6760 from LiskHQ/6622-update_rpc_endpoint_naming
Browse files Browse the repository at this point in the history
Update endpoint to use `_` separator - Closes #6622 and #6666
  • Loading branch information
shuse2 authored Sep 10, 2021
2 parents 5b7e0eb + a5f9273 commit 244e9ef
Show file tree
Hide file tree
Showing 39 changed files with 223 additions and 331 deletions.
17 changes: 4 additions & 13 deletions framework/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,13 @@
* Removal or modification of this copyright notice is prohibited.
*/

export const INTERNAL_EVENTS = Object.freeze([
'registeredToBus',
'loading:started',
'loading:finished',
'unloading:started',
'unloading:finished',
'unloading:error',
]);

export const eventWithModuleNameReg = /^([^\d][\w]+)((?::[^\d][\w]+)+)$/;
export const eventWithModuleNameReg = /^([^\d][\w]+)((?:_[^\d][\w]+)+)$/;
export const moduleNameReg = /^[a-zA-Z][a-zA-Z0-9_]*$/;
export const actionWithModuleNameReg = /^[a-zA-Z][a-zA-Z0-9_]*:[a-zA-Z][a-zA-Z0-9]*$/;
export const actionWithModuleNameReg = /^[a-zA-Z][a-zA-Z0-9_]*_[a-zA-Z][a-zA-Z0-9]*$/;
export const APP_IDENTIFIER = 'app';

export const APP_EVENT_READY = 'app:ready';
export const APP_EVENT_SHUTDOWN = 'app:shutdown';
export const APP_EVENT_READY = 'app_ready';
export const APP_EVENT_SHUTDOWN = 'app_shutdown';

export const RPC_MODES = {
IPC: 'ipc',
Expand Down
12 changes: 0 additions & 12 deletions framework/src/controller/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,6 @@ The controller defines a set of events, that each component can subscribe to:

The following events and actions are available for all enabled plugins and are at the same time accessible by all enabled plugins.

#### Events

| Event | Description |
| --------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| _plugin_:registeredToBus | Triggered when the plugin has completed registering its events and actions with the controller. So when this event is triggered, the subscriber of the event can be sure that the controller has whitelisted its requested events and actions. |
| _plugin_:loading:started | Triggered just before the controller calls the plugin’s `load` method. |
| _plugin_:loading:finished | Triggered just after the plugin’s `load` method has completed execution. |
| _plugin_:unloading:started | Triggered just before the controller calls the plugin’s `unload` method. |
| _plugin_:unloading:error | Triggered if any error occurred during the call of plugin’s `unload` method. |
| _plugin_:unloading:finished | Triggered just after the plugin’s `unload` method has completed execution. |
| app:ready | Triggered when the controller has finished initializing the plugins and each plugin has been successfully loaded. |

#### Actions

Most of the data flow will be handled through the propagation of such events.
Expand Down
9 changes: 5 additions & 4 deletions framework/src/controller/bus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import * as JSONRPC from './jsonrpc';
import { WSServer } from './ws/ws_server';
import { HTTPServer } from './http/http_server';
import { JSONRPCError } from './jsonrpc';
import { getEndpointPath } from '../endpoint';

interface BusConfiguration {
readonly httpServer?: HTTPServer;
Expand Down Expand Up @@ -230,19 +231,19 @@ export class Bus {
}

events.forEach(eventName => {
if (this._events[`${namespace}:${eventName}`] !== undefined) {
if (this._events[getEndpointPath(namespace, eventName)] !== undefined) {
throw new Error(`Event "${eventName}" already registered with bus.`);
}
this._events[`${namespace}:${eventName}`] = true;
this._events[getEndpointPath(namespace, eventName)] = true;
});
this._wsServer?.registerAllowedEvent([...this.getEvents()]);

for (const methodName of Object.keys(endpointInfo)) {
if (this._endpointInfos[`${namespace}:${methodName}`] !== undefined) {
if (this._endpointInfos[getEndpointPath(namespace, methodName)] !== undefined) {
throw new Error(`Endpoint "${methodName}" already registered with bus.`);
}

this._endpointInfos[`${namespace}:${methodName}`] = endpointInfo[methodName];
this._endpointInfos[getEndpointPath(namespace, methodName)] = endpointInfo[methodName];
}

if (options.type === ChannelType.ChildProcess && options.socketPath) {
Expand Down
18 changes: 5 additions & 13 deletions framework/src/controller/channels/base_channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,16 @@
*/

import { EventCallback } from '../event';
import { eventWithModuleNameReg, INTERNAL_EVENTS } from '../../constants';
import { eventWithModuleNameReg } from '../../constants';
import { EndpointHandlers } from '../../types';
import { Logger } from '../../logger';

export interface BaseChannelOptions {
[key: string]: unknown;
readonly skipInternalEvents?: boolean;
}

export abstract class BaseChannel {
public readonly eventsList: ReadonlyArray<string>;
public readonly endpointsList: ReadonlyArray<string>;
public readonly namespace: string;

protected readonly endpointHandlers: EndpointHandlers;
protected readonly options: Record<string, unknown>;
protected readonly _logger: Logger;
private _requestId: number;

Expand All @@ -37,13 +31,11 @@ export abstract class BaseChannel {
namespace: string,
events: ReadonlyArray<string>,
endpoints: EndpointHandlers,
options: BaseChannelOptions = {},
) {
this._logger = logger;
this.namespace = namespace;
this.options = options;

this.eventsList = options.skipInternalEvents ? events : [...events, ...INTERNAL_EVENTS];
this.eventsList = events;

this.endpointHandlers = {};
this._requestId = 0;
Expand Down Expand Up @@ -79,18 +71,18 @@ export abstract class BaseChannel {
}

// Listen to any event happening in the application
// Specified as moduleName:eventName
// Specified as moduleName_eventName
// If its related to your own moduleName specify as :eventName
abstract subscribe(eventName: string, cb: EventCallback): void;
abstract unsubscribe(eventName: string, cb: EventCallback): void;

// Publish the event on the channel
// Specified as moduleName:eventName
// Specified as moduleName_eventName
// If its related to your own moduleName specify as :eventName
abstract publish(eventName: string, data?: Record<string, unknown>): void;

// Call action of any moduleName through controller
// Specified as moduleName:actionName
// Specified as moduleName_actionName
abstract invoke<T>(actionName: string, params?: Record<string, unknown>): Promise<T>;

abstract registerToBus(arg: unknown): Promise<void>;
Expand Down
5 changes: 2 additions & 3 deletions framework/src/controller/channels/in_memory_channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { StateStore } from '@liskhq/lisk-chain';
import { KVStore } from '@liskhq/lisk-db';
import { Event, EventCallback } from '../event';
import { Request } from '../request';
import { BaseChannel, BaseChannelOptions } from './base_channel';
import { BaseChannel } from './base_channel';
import { Bus } from '../bus';
import * as JSONRPC from '../jsonrpc/types';
import { ChannelType, EndpointHandlers } from '../../types';
Expand All @@ -33,9 +33,8 @@ export class InMemoryChannel extends BaseChannel {
namespace: string,
events: ReadonlyArray<string>,
endpoints: EndpointHandlers,
options: BaseChannelOptions = {},
) {
super(logger, namespace, events, endpoints, options);
super(logger, namespace, events, endpoints);
this._db = db;
}

Expand Down
6 changes: 3 additions & 3 deletions framework/src/controller/channels/ipc_channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ import { EventEmitter2, ListenerFn } from 'eventemitter2';
import { join } from 'path';
import { Request } from '../request';
import { Event } from '../event';
import { BaseChannel, BaseChannelOptions } from './base_channel';
import { BaseChannel } from './base_channel';
import { IPCClient } from '../ipc/ipc_client';
import { EndpointInfo, ChannelType, EndpointHandlers } from '../../types';
import * as JSONRPC from '../jsonrpc';
import { IPC_EVENTS } from '../constants';
import { Logger } from '../../logger';

interface ChildProcessOptions extends BaseChannelOptions {
interface ChildProcessOptions {
socketsPath: string;
}

Expand All @@ -40,7 +40,7 @@ export class IPCChannel extends BaseChannel {
endpoints: EndpointHandlers,
options: ChildProcessOptions,
) {
super(logger, namespace, events, endpoints, options);
super(logger, namespace, events, endpoints);

this._ipcClient = new IPCClient({
socketsDir: options.socketsPath,
Expand Down
27 changes: 18 additions & 9 deletions framework/src/controller/child_process_loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// Parameters passed by `child_process.fork(_, parameters)`

import { join } from 'path';
import { createLogger } from '../logger';
import { createLogger, Logger } from '../logger';
import { getEndpointHandlers } from '../endpoint';
import { BasePlugin } from '../plugins/base_plugin';
import { systemDirs } from '../system_dirs';
Expand All @@ -29,6 +29,7 @@ const moduleExportName: string = process.argv[3];
const Klass: InstantiablePlugin = require(modulePath)[moduleExportName];
let channel: IPCChannel;
let plugin: BasePlugin;
let logger: Logger;

const _loadPlugin = async (
config: Record<string, unknown>,
Expand All @@ -42,11 +43,11 @@ const _loadPlugin = async (
const pluginName = plugin.name;

const dirs = systemDirs(appConfig.label, appConfig.rootPath);
const logger = createLogger({
logger = createLogger({
consoleLogLevel: appConfig.logger.consoleLogLevel,
fileLogLevel: appConfig.logger.fileLogLevel,
logFilePath: join(dirs.logs, `plugin-${pluginName}.log`),
module: `plugin:${pluginName}`,
module: `plugin_${pluginName}`,
});

channel = new IPCChannel(
Expand All @@ -61,27 +62,35 @@ const _loadPlugin = async (

await channel.registerToBus();

channel.publish(`${pluginName}:registeredToBus`);
channel.publish(`${pluginName}:loading:started`);
logger.debug({ plugin: pluginName }, 'Plugin is registered to bus');

await plugin.init({ appConfig, channel, config, logger });
await plugin.load(channel);

channel.publish(`${pluginName}:loading:finished`);
logger.debug({ plugin: pluginName }, 'Plugin is successfully loaded');
if (process.send) {
process.send({ action: 'loaded' });
}
};

const _unloadPlugin = async (code = 0) => {
const pluginName = plugin.name;

channel.publish(`${pluginName}:unloading:started`);
logger.debug({ plugin: pluginName }, 'Unloading plugin');
try {
await plugin.unload();
channel.publish(`${pluginName}:unloading:finished`);
logger.debug({ plugin: pluginName }, 'Successfully unloaded plugin');
channel.cleanup();
if (process.send) {
process.send({ action: 'unloaded' });
}
process.exit(code);
} catch (error) {
channel.publish(`${pluginName}:unloading:error`, error);
logger.debug({ plugin: pluginName, err: error as Error }, 'Fail to unload plugin');
channel.cleanup();
if (process.send) {
process.send({ action: 'unloadedWithError', err: error as Error });
}
process.exit(1);
}
};
Expand Down
54 changes: 28 additions & 26 deletions framework/src/controller/controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ export class Controller {
APP_IDENTIFIER,
arg.events,
arg.endpoints,
{ skipInternalEvents: true },
);
}

Expand Down Expand Up @@ -287,17 +286,14 @@ export class Controller {
plugin.endpoint ? getEndpointHandlers(plugin.endpoint) : {},
);
await channel.registerToBus(this._bus);
channel.publish(`${name}:registeredToBus`);
channel.publish(`${name}:loading:started`);
this._logger.debug({ plugin: name }, 'Plugin is registered to bus');

await plugin.init({ config, channel, appConfig, logger: this._logger });
await plugin.load(channel);

channel.publish(`${name}:loading:finished`);
this._logger.debug({ plugin: name }, 'Plugin is successfully loaded');

this._inMemoryPlugins[name] = { plugin, channel };

this._logger.info(name, 'Loaded in-memory plugin');
}

private async _loadChildProcessPlugin(
Expand Down Expand Up @@ -347,9 +343,11 @@ export class Controller {

await Promise.race([
new Promise<void>(resolve => {
this.channel.once(`${name}:loading:finished`, () => {
this._logger.info({ name }, 'Loaded child-process plugin');
resolve();
child.on('message', ({ action }: { action: string }) => {
if (action === 'loaded') {
this._logger.info({ name }, 'Loaded child-process plugin');
resolve();
}
});
}),
new Promise((_, reject) => {
Expand All @@ -361,12 +359,12 @@ export class Controller {
}

private async _unloadInMemoryPlugin(name: string): Promise<void> {
this._inMemoryPlugins[name].channel.publish(`${name}:unloading:started`);
this._logger.debug({ plugin: name }, 'Unloading plugin');
try {
await this._inMemoryPlugins[name].plugin.unload();
this._inMemoryPlugins[name].channel.publish(`${name}:unloading:finished`);
this._logger.debug({ plugin: name }, 'Successfully unloaded plugin');
} catch (error) {
this._inMemoryPlugins[name].channel.publish(`${name}:unloading:error`, error);
this._logger.debug({ plugin: name, err: error as Error }, 'Fail to unload plugin');
} finally {
delete this._inMemoryPlugins[name];
}
Expand All @@ -384,20 +382,24 @@ export class Controller {
});

await Promise.race([
new Promise<void>(resolve => {
this.channel.once(`${name}:unloading:finished`, () => {
this._logger.info(`Child process plugin "${name}" unloaded`);
delete this._childProcesses[name];
resolve();
});
}),
new Promise((_, reject) => {
this.channel.once(`${name}:unloading:error`, data => {
this._logger.info(`Child process plugin "${name}" unloaded with error`);
this._logger.error(data ?? {}, 'Unloading plugin error.');
delete this._childProcesses[name];
reject(data);
});
new Promise<void>((resolve, reject) => {
this._childProcesses[name].on(
'message',
({ action, err }: { action: string; err?: Error }) => {
if (action !== 'unloaded' && action !== 'unloadedWithError') {
return;
}
delete this._childProcesses[name];
if (action === 'unloaded') {
this._logger.info(`Child process plugin "${name}" unloaded`);
resolve();
} else {
this._logger.info(`Child process plugin "${name}" unloaded with error`);
this._logger.error({ err }, 'Unloading plugin error.');
reject(err);
}
},
);
}),
new Promise((_, reject) => {
setTimeout(() => {
Expand Down
9 changes: 5 additions & 4 deletions framework/src/controller/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import { strict as assert } from 'assert';
import { eventWithModuleNameReg } from '../constants';
import { getEndpointPath } from '../endpoint';
import { NotificationRequest, VERSION } from './jsonrpc';

export type EventCallback = (data?: Record<string, unknown>) => void | Promise<void>;
Expand All @@ -31,9 +32,9 @@ export class Event {
`Event name "${name}" must be a valid name with module name and event name.`,
);

const [moduleName, ...eventName] = name.split(':');
const [moduleName, ...eventName] = name.split('_');
this.module = moduleName;
this.name = eventName.join(':');
this.name = eventName.join('_');
this.data = data;
}

Expand All @@ -47,12 +48,12 @@ export class Event {
public toJSONRPCNotification(): NotificationRequest {
return {
jsonrpc: VERSION,
method: `${this.module}:${this.name}`,
method: getEndpointPath(this.module, this.name),
params: this.data,
};
}

public key(): string {
return `${this.module}:${this.name}`;
return getEndpointPath(this.module, this.name);
}
}
Loading

0 comments on commit 244e9ef

Please sign in to comment.