Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Commit

Permalink
Merge pull request #6741 from LiskHQ/6707-refactor_controller
Browse files Browse the repository at this point in the history
Refactor controller - Closes #6707
  • Loading branch information
shuse2 authored Sep 10, 2021
2 parents b1dec88 + 5dfc776 commit 5b7e0eb
Show file tree
Hide file tree
Showing 15 changed files with 439 additions and 782 deletions.
145 changes: 43 additions & 102 deletions framework/src/application.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,18 @@ import { Block } from '@liskhq/lisk-chain';
import { KVStore } from '@liskhq/lisk-db';
import { validator, LiskValidationError } from '@liskhq/lisk-validator';
import { objects, jobHandlers } from '@liskhq/lisk-utils';
import { APP_EVENT_SHUTDOWN, APP_EVENT_READY, APP_IDENTIFIER } from './constants';
import { APP_EVENT_SHUTDOWN, APP_EVENT_READY } from './constants';
import {
RPCConfig,
ApplicationConfig,
PluginConfig,
RegisteredSchema,
RegisteredModule,
PartialApplicationConfig,
ApplicationConfigForPlugin,
EndpointHandlers,
PluginEndpointContext,
} from './types';

import { BasePlugin, getPluginExportPath, validatePluginSpec } from './plugins/base_plugin';
import { BasePlugin } from './plugins/base_plugin';
import { systemDirs } from './system_dirs';
import { Controller, InMemoryChannel } from './controller';
import { applicationConfigSchema } from './schema';
Expand All @@ -42,7 +40,7 @@ import { Logger, createLogger } from './logger';

import { DuplicateAppInstanceError } from './errors';
import { BaseModule } from './modules/base_module';
import { mergeEndpointHandlers } from './endpoint';
import { getEndpointHandlers, mergeEndpointHandlers } from './endpoint';

const MINIMUM_EXTERNAL_MODULE_ID = 1000;

Expand Down Expand Up @@ -99,9 +97,7 @@ export class Application {
public logger!: Logger;

private readonly _node: Node;
private _controller!: Controller;
private _plugins: { [key: string]: BasePlugin };
private _channel!: InMemoryChannel;
private readonly _controller: Controller;

private _genesisBlock!: Record<string, unknown> | undefined;
private _blockchainDB!: KVStore;
Expand All @@ -126,19 +122,28 @@ export class Application {
}
this.config = mergedConfig;

// Private members
this._plugins = {};
// Initialize node
const { plugins, ...rootConfigs } = this.config;
this._node = new Node({
options: rootConfigs,
});
this._controller = new Controller({
appConfig: rootConfigs,
pluginConfigs: plugins,
});
}

public get networkIdentifier(): Buffer {
return this._node.networkIdentifier;
}

public get channel(): InMemoryChannel {
if (!this._controller.channel) {
throw new Error('Controller is not initialized yet.');
}
return this._controller.channel;
}

public static getDefaultModules(): BaseModule[] {
return [];
}
Expand All @@ -159,34 +164,7 @@ export class Application {
plugin: BasePlugin,
options: PluginConfig = { loadAsChildProcess: false },
): void {
const pluginName = plugin.name;

assert(
!Object.keys(this._plugins).includes(pluginName),
`A plugin with name "${pluginName}" already registered.`,
);

if (options.loadAsChildProcess) {
if (!getPluginExportPath(plugin)) {
throw new Error(
`Unable to register plugin "${pluginName}" to load as child process. Package name or __filename must be specified in nodeModulePath.`,
);
}
}

this.config.plugins[pluginName] = Object.assign(this.config.plugins[pluginName] ?? {}, options);

validatePluginSpec(plugin);

this._plugins[pluginName] = plugin;
}

public updatePluginConfig(name: string, options?: PluginConfig): void {
assert(Object.keys(this._plugins).includes(name), `No plugin ${name} is registered`);
this.config.plugins[name] = {
...this.config.plugins[name],
...options,
};
this._controller.registerPlugin(plugin, options);
}

public registerModule(Module: BaseModule): void {
Expand Down Expand Up @@ -230,11 +208,12 @@ export class Application {

await this._mutex.runExclusive<void>(async () => {
// Initialize all objects
this._channel = this._initChannel();

this._controller = this._initController();

await this._controller.load();
this._controller.init({
logger: this.logger,
blockchainDB: this._blockchainDB,
endpoints: this._rootEndpoints(),
events: this._rootEvents(),
});

if (!this._genesisBlock) {
throw new Error('Genesis block must exist.');
Expand All @@ -243,20 +222,20 @@ export class Application {
const genesisBlock = Block.fromJSON(this._genesisBlock);

await this._node.init({
channel: this._channel,
channel: this.channel,
genesisBlock,
forgerDB: this._forgerDB,
blockchainDB: this._blockchainDB,
nodeDB: this._nodeDB,
logger: this.logger,
});

await this._loadPlugins();
await this._controller.start();
await this._node.start();
this.logger.debug(this._controller.bus.getEvents(), 'Application listening to events');
this.logger.debug(this._controller.bus.getEndpoints(), 'Application ready for actions');
this.logger.debug(this._controller.getEvents(), 'Application listening to events');
this.logger.debug(this._controller.getEndpoints(), 'Application ready for actions');

this._channel.publish(APP_EVENT_READY);
this.channel.publish(APP_EVENT_READY);
// TODO: Update genesis block to be provided in this function
// For now, the memory should be free up
delete this._genesisBlock;
Expand All @@ -269,9 +248,9 @@ export class Application {
const release = await this._mutex.acquire();

try {
this._channel.publish(APP_EVENT_SHUTDOWN);
this.channel.publish(APP_EVENT_SHUTDOWN);
await this._node.stop();
await this._controller.cleanup(errorCode, message);
await this._controller.stop(errorCode, message);
await this._blockchainDB.close();
await this._forgerDB.close();
await this._nodeDB.close();
Expand All @@ -297,21 +276,13 @@ export class Application {

private _registerModule(mod: BaseModule, validateModuleID = false): void {
assert(mod, 'Module implementation is required');
if (Application.getDefaultModules().includes(mod)) {
this._node.registerModule(mod);
} else if (validateModuleID && mod.id < MINIMUM_EXTERNAL_MODULE_ID) {
if (validateModuleID && mod.id < MINIMUM_EXTERNAL_MODULE_ID) {
throw new Error(
`Custom module must have id greater than or equal to ${MINIMUM_EXTERNAL_MODULE_ID}`,
);
} else {
this._node.registerModule(mod);
}
}

private async _loadPlugins(): Promise<void> {
const { plugins: pluginsConfig, ...rest } = this.config;
const appConfigForPlugin: ApplicationConfigForPlugin = rest;
await this._controller.loadPlugins(this._plugins, pluginsConfig, appConfigForPlugin);
this._node.registerModule(mod);
this._controller.registerEndpoint(mod.name, getEndpointHandlers(mod.endpoint));
}

private _initLogger(): Logger {
Expand All @@ -323,54 +294,24 @@ export class Application {
});
}

private _initChannel(): InMemoryChannel {
private _rootEndpoints(): EndpointHandlers {
const nodeEndpoint = this._node.getEndpoints();
const nodeEvents = this._node.getEvents();
const applicationEndpoint: EndpointHandlers = {
// eslint-disable-next-line @typescript-eslint/require-await
getRegisteredActions: async (_: PluginEndpointContext) => this._controller.bus.getEndpoints(),
getRegisteredActions: async (_: PluginEndpointContext) => this._controller.getEndpoints(),
// eslint-disable-next-line @typescript-eslint/require-await
getRegisteredEvents: async (_: PluginEndpointContext) => this._controller.bus.getEvents(),
getRegisteredEvents: async (_: PluginEndpointContext) => this._controller.getEvents(),
};
return new InMemoryChannel(
this.logger,
this._blockchainDB,
APP_IDENTIFIER,
[APP_EVENT_READY.replace('app:', ''), APP_EVENT_SHUTDOWN.replace('app:', ''), ...nodeEvents],
mergeEndpointHandlers(applicationEndpoint, nodeEndpoint),
{ skipInternalEvents: true },
);
return mergeEndpointHandlers(applicationEndpoint, nodeEndpoint);
}

private _initController(): Controller {
const moduleEndpoints = this._node.getModuleEndpoints();
const moduleChannels = this._node
.getRegisteredModules()
.map(
mod =>
new InMemoryChannel(
this.logger,
this._blockchainDB,
mod.name,
[],
moduleEndpoints[mod.name],
{ skipInternalEvents: true },
),
);
return new Controller({
appLabel: this.config.label,
blockchainDB: this._blockchainDB,
config: {
rootPath: this.config.rootPath,
rpc: objects.mergeDeep(
{ ipc: { path: systemDirs(this.config.label, this.config.rootPath).sockets } },
this.config.rpc,
) as RPCConfig,
},
logger: this.logger,
channel: this._channel,
moduleChannels,
});
private _rootEvents(): string[] {
const nodeEvents = this._node.getEvents();
return [
APP_EVENT_READY.replace('app:', ''),
APP_EVENT_SHUTDOWN.replace('app:', ''),
...nodeEvents,
];
}

private async _setupDirectories(): Promise<void> {
Expand Down
56 changes: 29 additions & 27 deletions framework/src/controller/bus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ const parseError = (id: JSONRPC.ID, err: Error | JSONRPC.JSONRPCError): JSONRPC.
};

export class Bus {
private readonly _logger: Logger;
private readonly _endpointInfos: {
[key: string]: EndpointInfo;
};
Expand All @@ -79,7 +78,7 @@ export class Bus {
[key: string]: ChannelInfo;
};
private readonly _rpcClients: { [key: string]: Dealer };
private readonly _internalIPCServer?: IPCServer;
private readonly _internalIPCServer: IPCServer;
private readonly _externalIPCServer?: IPCServer;
private readonly _rpcRequestIds: Set<string>;
private readonly _emitter: EventEmitter2;
Expand All @@ -91,9 +90,9 @@ export class Bus {
private readonly _handleExternalRPC: (rpcServer: Router) => Promise<void>;
private readonly _handleEvents: (subSocket: Subscriber) => Promise<void>;

public constructor(logger: Logger, config: BusConfiguration) {
this._logger = logger;
private _logger!: Logger;

public constructor(config: BusConfiguration) {
this._emitter = new EventEmitter2({
wildcard: true,
delimiter: ':',
Expand Down Expand Up @@ -210,21 +209,12 @@ export class Bus {
};
}

public async init(): Promise<boolean> {
if (this._internalIPCServer) {
await this._setupIPCInternalServer();
}
if (this._externalIPCServer) {
await this._setupIPCExternalServer();
}
if (this._wsServer) {
await this._setupWSServer();
}
if (this._httpServer) {
await this._setupHTTPServer();
}

return true;
public async start(logger: Logger): Promise<void> {
this._logger = logger;
await this._setupIPCInternalServer();
await this._setupIPCExternalServer();
await this._setupWSServer();
await this._setupHTTPServer();
}

// eslint-disable-next-line @typescript-eslint/require-await
Expand Down Expand Up @@ -509,32 +499,41 @@ export class Bus {
}

private async _setupIPCInternalServer(): Promise<void> {
await this._internalIPCServer?.start();
if (!this._internalIPCServer) {
return;
}
await this._internalIPCServer.start();

this._handleEvents((this._internalIPCServer as IPCServer).subSocket).catch(err => {
this._handleEvents(this._internalIPCServer.subSocket).catch(err => {
this._logger.debug(err, 'Error occured while listening to events on subscriber.');
});

this._handleRPC((this._internalIPCServer as IPCServer).rpcServer).catch(err => {
this._handleRPC(this._internalIPCServer.rpcServer).catch(err => {
this._logger.debug(err, 'Error occured while listening to RPCs on RPC router.');
});
}

private async _setupIPCExternalServer(): Promise<void> {
await this._externalIPCServer?.start();
if (!this._externalIPCServer) {
return;
}
await this._externalIPCServer.start();

this._handleEvents((this._externalIPCServer as IPCServer).subSocket).catch(err => {
this._handleEvents(this._externalIPCServer.subSocket).catch(err => {
this._logger.debug(err, 'Error occured while listening to events on subscriber.');
});

this._handleExternalRPC((this._externalIPCServer as IPCServer).rpcServer).catch(err => {
this._handleExternalRPC(this._externalIPCServer.rpcServer).catch(err => {
this._logger.debug(err, 'Error occured while listening to RPCs on RPC router.');
});
}

// eslint-disable-next-line @typescript-eslint/require-await
private async _setupWSServer(): Promise<void> {
this._wsServer?.start((socket, message) => {
if (!this._wsServer) {
return;
}
this._wsServer.start(this._logger, (socket, message) => {
this.invoke(message)
.then(data => {
socket.send(JSON.stringify(data as JSONRPC.ResponseObjectWithResult));
Expand All @@ -547,7 +546,10 @@ export class Bus {

// eslint-disable-next-line @typescript-eslint/require-await
private async _setupHTTPServer(): Promise<void> {
this._httpServer?.start((_req, res, message) => {
if (!this._httpServer) {
return;
}
this._httpServer.start(this._logger, (_req, res, message) => {
this.invoke(message)
.then(data => {
res.end(JSON.stringify(data as JSONRPC.ResponseObjectWithResult));
Expand Down
Loading

0 comments on commit 5b7e0eb

Please sign in to comment.