Skip to content

Commit

Permalink
assign continuous blocks to worker (#1173)
Browse files Browse the repository at this point in the history
  • Loading branch information
ianhe8x authored and stwiname committed Jul 10, 2022
1 parent b32c2c8 commit 1332854
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 40 deletions.
5 changes: 5 additions & 0 deletions packages/node/src/configure/NodeConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export interface IConfig {
readonly proofOfIndex: boolean;
readonly mmrPath?: string;
readonly ipfs?: string;
readonly workers?: number;
}

export type MinConfig = Partial<Omit<IConfig, 'subquery'>> &
Expand Down Expand Up @@ -147,6 +148,10 @@ export class NodeConfig implements IConfig {
return this._config.dbSchema ?? this.subqueryName;
}

get workers(): number {
return this._config.workers;
}

merge(config: Partial<IConfig>): this {
assign(this._config, config);
return this;
Expand Down
47 changes: 25 additions & 22 deletions packages/node/src/indexer/fetch.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,28 +48,31 @@ const { argv } = getYargsOption();
IndexerManager,
{
provide: 'IBlockDispatcher',
useFactory: (
apiService: ApiService,
nodeConfig: NodeConfig,
eventEmitter: EventEmitter2,
indexerManager: IndexerManager,
) => {
if (argv.workers) {
return new WorkerBlockDispatcherService(
argv.workers,
nodeConfig.batchSize,
eventEmitter,
);
}

return new BlockDispatcherService(
apiService,
nodeConfig,
indexerManager,
eventEmitter,
);
},
inject: [ApiService, NodeConfig, EventEmitter2, IndexerManager],
useClass: argv.workers
? WorkerBlockDispatcherService
: BlockDispatcherService,
// useFactory: (
// apiService: ApiService,
// nodeConfig: NodeConfig,
// eventEmitter: EventEmitter2,
// indexerManager: IndexerManager,
// ) => {
// if (argv.workers) {
// return new WorkerBlockDispatcherService(
// argv.workers,
// nodeConfig.batchSize,
// eventEmitter,
// );
// }
//
// return new BlockDispatcherService(
// apiService,
// nodeConfig,
// indexerManager,
// eventEmitter,
// );
// },
// inject: [ApiService, NodeConfig, EventEmitter2, IndexerManager],
},
{
provide: FetchService,
Expand Down
32 changes: 18 additions & 14 deletions packages/node/src/indexer/worker/block-dispatcher.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import assert from 'assert';
import os from 'os';
import path from 'path';
import { Injectable, OnApplicationShutdown } from '@nestjs/common';
import { Interval } from '@nestjs/schedule';
import { RuntimeVersion } from '@polkadot/types/interfaces';
import { SubstrateBlock } from '@subql/types';
import chalk from 'chalk';
Expand All @@ -24,6 +25,7 @@ import {
NumFetchedBlocks,
NumFetchingBlocks,
SetCurrentRuntimeVersion,
GetWorkerStatus,
} from './worker';
import { Worker } from './worker.builder';

Expand All @@ -33,6 +35,7 @@ type IIndexerWorker = {
numFetchedBlocks: NumFetchedBlocks;
numFetchingBlocks: NumFetchingBlocks;
setCurrentRuntimeVersion: SetCurrentRuntimeVersion;
getStatus: GetWorkerStatus;
};

type IInitIndexerWorker = IIndexerWorker & {
Expand All @@ -53,6 +56,7 @@ async function createIndexerWorker(): Promise<IndexerWorker> {
'numFetchedBlocks',
'numFetchingBlocks',
'setCurrentRuntimeVersion',
'getStatus',
],
);

Expand Down Expand Up @@ -244,17 +248,9 @@ export class WorkerBlockDispatcherService
private queue: AutoQueue<void>;
private _latestBufferedHeight: number;

/**
* @param numWorkers. The number of worker threads to run, this is capped at number of cpus
* @param workerQueueSize. The number of fetched blocks queued to be processed
*/
constructor(
numWorkers: number,
workerQueueSize: number,
private eventEmitter: EventEmitter2,
) {
this.numWorkers = getMaxWorkers(numWorkers);
this.queue = new AutoQueue(this.numWorkers * workerQueueSize);
constructor(nodeConfig: NodeConfig, private eventEmitter: EventEmitter2) {
this.numWorkers = getMaxWorkers(nodeConfig.workers);
this.queue = new AutoQueue(this.numWorkers * nodeConfig.batchSize * 2);
}

async init(
Expand Down Expand Up @@ -286,7 +282,8 @@ export class WorkerBlockDispatcherService
} blocks`,
);

heights.map((height) => this.enqueueBlock(height));
const workerIdx = this.getNextWorkerIndex();
heights.map((height) => this.enqueueBlock(height, workerIdx));

this.latestBufferedHeight = last(heights);
}
Expand All @@ -296,9 +293,8 @@ export class WorkerBlockDispatcherService
this.queue.flush();
}

private enqueueBlock(height: number) {
private enqueueBlock(height: number, workerIdx: number) {
if (this.isShutdown) return;
const workerIdx = this.getNextWorkerIndex();
const worker = this.workers[workerIdx];

assert(worker, `Worker ${workerIdx} not found`);
Expand Down Expand Up @@ -360,6 +356,14 @@ export class WorkerBlockDispatcherService
void this.queue.put(processBlock);
}

@Interval(15000)
async sampleWorkerStatus() {
for (const worker of this.workers) {
const status = await worker.getStatus();
logger.info(JSON.stringify(status));
}
}

get queueSize(): number {
return this.queue.size;
}
Expand Down
25 changes: 23 additions & 2 deletions packages/node/src/indexer/worker/worker.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import { Injectable } from '@nestjs/common';
import { RuntimeVersion } from '@polkadot/types/interfaces';
import { NodeConfig } from '../../configure/NodeConfig';
import { AutoQueue } from '../../utils/autoQueue';
import { fetchBlocksBatches } from '../../utils/substrate';
import { ApiService } from '../api.service';
Expand All @@ -17,18 +18,27 @@ export type ProcessBlockResponse = {
dynamicDsCreated: boolean;
};

export type WorkerStatusResponse = {
threadId: number;
isIndexing: boolean;
fetchedBlocks: number;
toFetchBlocks: number;
};

@Injectable()
export class WorkerService {
private fetchedBlocks: Record<string, BlockContent> = {};
private currentRuntimeVersion: RuntimeVersion | undefined;
private _isIndexing = false;

private queue: AutoQueue<FetchBlockResponse>;

constructor(
private apiService: ApiService,
private indexerManager: IndexerManager,
private nodeConfig: NodeConfig,
) {
this.queue = new AutoQueue(undefined, 5);
this.queue = new AutoQueue(undefined, nodeConfig.batchSize);
}

async fetchBlock(height: number): Promise<FetchBlockResponse> {
Expand Down Expand Up @@ -68,6 +78,7 @@ export class WorkerService {
}

async processBlock(height: number): Promise<ProcessBlockResponse> {
this._isIndexing = true;
const block = this.fetchedBlocks[height];

if (!block) {
Expand All @@ -76,7 +87,13 @@ export class WorkerService {

delete this.fetchedBlocks[height];

return this.indexerManager.indexBlock(block, this.currentRuntimeVersion);
const response = await this.indexerManager.indexBlock(
block,
this.currentRuntimeVersion,
);

this._isIndexing = false;
return response;
}

get numFetchedBlocks(): number {
Expand All @@ -86,4 +103,8 @@ export class WorkerService {
get numFetchingBlocks(): number {
return this.queue.size;
}

get isIndexing(): boolean {
return this._isIndexing;
}
}
13 changes: 13 additions & 0 deletions packages/node/src/indexer/worker/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
FetchBlockResponse,
ProcessBlockResponse,
WorkerService,
WorkerStatusResponse,
} from './worker.service';

let app: INestApplication;
Expand Down Expand Up @@ -67,6 +68,16 @@ async function numFetchingBlocks(): Promise<number> {
return workerService.numFetchingBlocks;
}

// eslint-disable-next-line @typescript-eslint/require-await
async function getStatus(): Promise<WorkerStatusResponse> {
return {
threadId,
fetchedBlocks: workerService.numFetchedBlocks,
toFetchBlocks: workerService.numFetchingBlocks,
isIndexing: workerService.isIndexing,
};
}

// Register these functions to be exposed to worker host
registerWorker({
initWorker,
Expand All @@ -75,6 +86,7 @@ registerWorker({
numFetchedBlocks,
numFetchingBlocks,
setCurrentRuntimeVersion,
getStatus,
});

// Export types to be used on the parent
Expand All @@ -84,3 +96,4 @@ export type ProcessBlock = typeof processBlock;
export type NumFetchedBlocks = typeof numFetchedBlocks;
export type NumFetchingBlocks = typeof numFetchingBlocks;
export type SetCurrentRuntimeVersion = typeof setCurrentRuntimeVersion;
export type GetWorkerStatus = typeof getStatus;
6 changes: 4 additions & 2 deletions packages/node/src/utils/autoQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ export class AutoQueue<T> {
private pendingPromise = false;
private queue: Queue<Action<T>>;
private _abort = false;
private processingTasks = 0;

private eventEmitter = new EventEmitter2();

Expand All @@ -85,7 +86,7 @@ export class AutoQueue<T> {
}

get size(): number {
return this.queue.size;
return this.queue.size + this.processingTasks;
}

get capacity(): number {
Expand Down Expand Up @@ -135,6 +136,7 @@ export class AutoQueue<T> {
const actions = this.queue.takeMany(this.concurrency);

if (!actions.length) break;
this.processingTasks += actions.length;

this.eventEmitter.emit('size', this.queue.size);

Expand All @@ -144,7 +146,7 @@ export class AutoQueue<T> {
actions.map(async (action) => {
try {
const payload = await action.task();

this.processingTasks -= 1;
action.resolve(payload);
} catch (e) {
action.reject(e);
Expand Down

0 comments on commit 1332854

Please sign in to comment.