diff --git a/packages/node/src/configure/NodeConfig.ts b/packages/node/src/configure/NodeConfig.ts index 1214a9b98a..f15937a748 100644 --- a/packages/node/src/configure/NodeConfig.ts +++ b/packages/node/src/configure/NodeConfig.ts @@ -31,6 +31,7 @@ export interface IConfig { readonly proofOfIndex: boolean; readonly mmrPath?: string; readonly ipfs?: string; + readonly workers?: number; } export type MinConfig = Partial> & @@ -147,6 +148,10 @@ export class NodeConfig implements IConfig { return this._config.dbSchema ?? this.subqueryName; } + get workers(): number { + return this._config.workers; + } + merge(config: Partial): this { assign(this._config, config); return this; diff --git a/packages/node/src/indexer/fetch.module.ts b/packages/node/src/indexer/fetch.module.ts index 30008fd7d6..deafcf4d52 100644 --- a/packages/node/src/indexer/fetch.module.ts +++ b/packages/node/src/indexer/fetch.module.ts @@ -48,28 +48,31 @@ const { argv } = getYargsOption(); IndexerManager, { provide: 'IBlockDispatcher', - useFactory: ( - apiService: ApiService, - nodeConfig: NodeConfig, - eventEmitter: EventEmitter2, - indexerManager: IndexerManager, - ) => { - if (argv.workers) { - return new WorkerBlockDispatcherService( - argv.workers, - nodeConfig.batchSize, - eventEmitter, - ); - } - - return new BlockDispatcherService( - apiService, - nodeConfig, - indexerManager, - eventEmitter, - ); - }, - inject: [ApiService, NodeConfig, EventEmitter2, IndexerManager], + useClass: argv.workers + ? WorkerBlockDispatcherService + : BlockDispatcherService, + // useFactory: ( + // apiService: ApiService, + // nodeConfig: NodeConfig, + // eventEmitter: EventEmitter2, + // indexerManager: IndexerManager, + // ) => { + // if (argv.workers) { + // return new WorkerBlockDispatcherService( + // argv.workers, + // nodeConfig.batchSize, + // eventEmitter, + // ); + // } + // + // return new BlockDispatcherService( + // apiService, + // nodeConfig, + // indexerManager, + // eventEmitter, + // ); + // }, + // inject: [ApiService, NodeConfig, EventEmitter2, IndexerManager], }, { provide: FetchService, diff --git a/packages/node/src/indexer/worker/block-dispatcher.service.ts b/packages/node/src/indexer/worker/block-dispatcher.service.ts index 3bb7502863..8eec4fab2c 100644 --- a/packages/node/src/indexer/worker/block-dispatcher.service.ts +++ b/packages/node/src/indexer/worker/block-dispatcher.service.ts @@ -5,6 +5,7 @@ import assert from 'assert'; import os from 'os'; import path from 'path'; import { Injectable, OnApplicationShutdown } from '@nestjs/common'; +import { Interval } from '@nestjs/schedule'; import { RuntimeVersion } from '@polkadot/types/interfaces'; import { SubstrateBlock } from '@subql/types'; import chalk from 'chalk'; @@ -24,6 +25,7 @@ import { NumFetchedBlocks, NumFetchingBlocks, SetCurrentRuntimeVersion, + GetWorkerStatus, } from './worker'; import { Worker } from './worker.builder'; @@ -33,6 +35,7 @@ type IIndexerWorker = { numFetchedBlocks: NumFetchedBlocks; numFetchingBlocks: NumFetchingBlocks; setCurrentRuntimeVersion: SetCurrentRuntimeVersion; + getStatus: GetWorkerStatus; }; type IInitIndexerWorker = IIndexerWorker & { @@ -53,6 +56,7 @@ async function createIndexerWorker(): Promise { 'numFetchedBlocks', 'numFetchingBlocks', 'setCurrentRuntimeVersion', + 'getStatus', ], ); @@ -244,17 +248,9 @@ export class WorkerBlockDispatcherService private queue: AutoQueue; private _latestBufferedHeight: number; - /** - * @param numWorkers. The number of worker threads to run, this is capped at number of cpus - * @param workerQueueSize. The number of fetched blocks queued to be processed - */ - constructor( - numWorkers: number, - workerQueueSize: number, - private eventEmitter: EventEmitter2, - ) { - this.numWorkers = getMaxWorkers(numWorkers); - this.queue = new AutoQueue(this.numWorkers * workerQueueSize); + constructor(nodeConfig: NodeConfig, private eventEmitter: EventEmitter2) { + this.numWorkers = getMaxWorkers(nodeConfig.workers); + this.queue = new AutoQueue(this.numWorkers * nodeConfig.batchSize * 2); } async init( @@ -286,7 +282,8 @@ export class WorkerBlockDispatcherService } blocks`, ); - heights.map((height) => this.enqueueBlock(height)); + const workerIdx = this.getNextWorkerIndex(); + heights.map((height) => this.enqueueBlock(height, workerIdx)); this.latestBufferedHeight = last(heights); } @@ -296,9 +293,8 @@ export class WorkerBlockDispatcherService this.queue.flush(); } - private enqueueBlock(height: number) { + private enqueueBlock(height: number, workerIdx: number) { if (this.isShutdown) return; - const workerIdx = this.getNextWorkerIndex(); const worker = this.workers[workerIdx]; assert(worker, `Worker ${workerIdx} not found`); @@ -360,6 +356,14 @@ export class WorkerBlockDispatcherService void this.queue.put(processBlock); } + @Interval(15000) + async sampleWorkerStatus() { + for (const worker of this.workers) { + const status = await worker.getStatus(); + logger.info(JSON.stringify(status)); + } + } + get queueSize(): number { return this.queue.size; } diff --git a/packages/node/src/indexer/worker/worker.service.ts b/packages/node/src/indexer/worker/worker.service.ts index 3ecadc13e6..0da8cd43b2 100644 --- a/packages/node/src/indexer/worker/worker.service.ts +++ b/packages/node/src/indexer/worker/worker.service.ts @@ -3,6 +3,7 @@ import { Injectable } from '@nestjs/common'; import { RuntimeVersion } from '@polkadot/types/interfaces'; +import { NodeConfig } from '../../configure/NodeConfig'; import { AutoQueue } from '../../utils/autoQueue'; import { fetchBlocksBatches } from '../../utils/substrate'; import { ApiService } from '../api.service'; @@ -17,18 +18,27 @@ export type ProcessBlockResponse = { dynamicDsCreated: boolean; }; +export type WorkerStatusResponse = { + threadId: number; + isIndexing: boolean; + fetchedBlocks: number; + toFetchBlocks: number; +}; + @Injectable() export class WorkerService { private fetchedBlocks: Record = {}; private currentRuntimeVersion: RuntimeVersion | undefined; + private _isIndexing = false; private queue: AutoQueue; constructor( private apiService: ApiService, private indexerManager: IndexerManager, + private nodeConfig: NodeConfig, ) { - this.queue = new AutoQueue(undefined, 5); + this.queue = new AutoQueue(undefined, nodeConfig.batchSize); } async fetchBlock(height: number): Promise { @@ -68,6 +78,7 @@ export class WorkerService { } async processBlock(height: number): Promise { + this._isIndexing = true; const block = this.fetchedBlocks[height]; if (!block) { @@ -76,7 +87,13 @@ export class WorkerService { delete this.fetchedBlocks[height]; - return this.indexerManager.indexBlock(block, this.currentRuntimeVersion); + const response = await this.indexerManager.indexBlock( + block, + this.currentRuntimeVersion, + ); + + this._isIndexing = false; + return response; } get numFetchedBlocks(): number { @@ -86,4 +103,8 @@ export class WorkerService { get numFetchingBlocks(): number { return this.queue.size; } + + get isIndexing(): boolean { + return this._isIndexing; + } } diff --git a/packages/node/src/indexer/worker/worker.ts b/packages/node/src/indexer/worker/worker.ts index 5d6fc594a3..64039663f1 100644 --- a/packages/node/src/indexer/worker/worker.ts +++ b/packages/node/src/indexer/worker/worker.ts @@ -13,6 +13,7 @@ import { FetchBlockResponse, ProcessBlockResponse, WorkerService, + WorkerStatusResponse, } from './worker.service'; let app: INestApplication; @@ -67,6 +68,16 @@ async function numFetchingBlocks(): Promise { return workerService.numFetchingBlocks; } +// eslint-disable-next-line @typescript-eslint/require-await +async function getStatus(): Promise { + return { + threadId, + fetchedBlocks: workerService.numFetchedBlocks, + toFetchBlocks: workerService.numFetchingBlocks, + isIndexing: workerService.isIndexing, + }; +} + // Register these functions to be exposed to worker host registerWorker({ initWorker, @@ -75,6 +86,7 @@ registerWorker({ numFetchedBlocks, numFetchingBlocks, setCurrentRuntimeVersion, + getStatus, }); // Export types to be used on the parent @@ -84,3 +96,4 @@ export type ProcessBlock = typeof processBlock; export type NumFetchedBlocks = typeof numFetchedBlocks; export type NumFetchingBlocks = typeof numFetchingBlocks; export type SetCurrentRuntimeVersion = typeof setCurrentRuntimeVersion; +export type GetWorkerStatus = typeof getStatus; diff --git a/packages/node/src/utils/autoQueue.ts b/packages/node/src/utils/autoQueue.ts index 3249825604..fe0f42475b 100644 --- a/packages/node/src/utils/autoQueue.ts +++ b/packages/node/src/utils/autoQueue.ts @@ -77,6 +77,7 @@ export class AutoQueue { private pendingPromise = false; private queue: Queue>; private _abort = false; + private processingTasks = 0; private eventEmitter = new EventEmitter2(); @@ -85,7 +86,7 @@ export class AutoQueue { } get size(): number { - return this.queue.size; + return this.queue.size + this.processingTasks; } get capacity(): number { @@ -135,6 +136,7 @@ export class AutoQueue { const actions = this.queue.takeMany(this.concurrency); if (!actions.length) break; + this.processingTasks += actions.length; this.eventEmitter.emit('size', this.queue.size); @@ -144,7 +146,7 @@ export class AutoQueue { actions.map(async (action) => { try { const payload = await action.task(); - + this.processingTasks -= 1; action.resolve(payload); } catch (e) { action.reject(e);