Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP-assign continuous blocks to worker #1173

Merged
merged 1 commit into from
Jul 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions packages/node/src/configure/NodeConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export interface IConfig {
readonly proofOfIndex: boolean;
readonly mmrPath?: string;
readonly ipfs?: string;
readonly workers?: number;
}

export type MinConfig = Partial<Omit<IConfig, 'subquery'>> &
Expand Down Expand Up @@ -147,6 +148,10 @@ export class NodeConfig implements IConfig {
return this._config.dbSchema ?? this.subqueryName;
}

get workers(): number {
return this._config.workers;
}

merge(config: Partial<IConfig>): this {
assign(this._config, config);
return this;
Expand Down
47 changes: 25 additions & 22 deletions packages/node/src/indexer/fetch.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,28 +48,31 @@ const { argv } = getYargsOption();
IndexerManager,
{
provide: 'IBlockDispatcher',
useFactory: (
apiService: ApiService,
nodeConfig: NodeConfig,
eventEmitter: EventEmitter2,
indexerManager: IndexerManager,
) => {
if (argv.workers) {
return new WorkerBlockDispatcherService(
argv.workers,
nodeConfig.batchSize,
eventEmitter,
);
}

return new BlockDispatcherService(
apiService,
nodeConfig,
indexerManager,
eventEmitter,
);
},
inject: [ApiService, NodeConfig, EventEmitter2, IndexerManager],
useClass: argv.workers
? WorkerBlockDispatcherService
: BlockDispatcherService,
// useFactory: (
// apiService: ApiService,
// nodeConfig: NodeConfig,
// eventEmitter: EventEmitter2,
// indexerManager: IndexerManager,
// ) => {
// if (argv.workers) {
// return new WorkerBlockDispatcherService(
// argv.workers,
// nodeConfig.batchSize,
// eventEmitter,
// );
// }
//
// return new BlockDispatcherService(
// apiService,
// nodeConfig,
// indexerManager,
// eventEmitter,
// );
// },
// inject: [ApiService, NodeConfig, EventEmitter2, IndexerManager],
},
{
provide: FetchService,
Expand Down
32 changes: 18 additions & 14 deletions packages/node/src/indexer/worker/block-dispatcher.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import assert from 'assert';
import os from 'os';
import path from 'path';
import { Injectable, OnApplicationShutdown } from '@nestjs/common';
import { Interval } from '@nestjs/schedule';
import { RuntimeVersion } from '@polkadot/types/interfaces';
import { SubstrateBlock } from '@subql/types';
import chalk from 'chalk';
Expand All @@ -24,6 +25,7 @@ import {
NumFetchedBlocks,
NumFetchingBlocks,
SetCurrentRuntimeVersion,
GetWorkerStatus,
} from './worker';
import { Worker } from './worker.builder';

Expand All @@ -33,6 +35,7 @@ type IIndexerWorker = {
numFetchedBlocks: NumFetchedBlocks;
numFetchingBlocks: NumFetchingBlocks;
setCurrentRuntimeVersion: SetCurrentRuntimeVersion;
getStatus: GetWorkerStatus;
};

type IInitIndexerWorker = IIndexerWorker & {
Expand All @@ -53,6 +56,7 @@ async function createIndexerWorker(): Promise<IndexerWorker> {
'numFetchedBlocks',
'numFetchingBlocks',
'setCurrentRuntimeVersion',
'getStatus',
],
);

Expand Down Expand Up @@ -244,17 +248,9 @@ export class WorkerBlockDispatcherService
private queue: AutoQueue<void>;
private _latestBufferedHeight: number;

/**
* @param numWorkers. The number of worker threads to run, this is capped at number of cpus
* @param workerQueueSize. The number of fetched blocks queued to be processed
*/
constructor(
numWorkers: number,
workerQueueSize: number,
private eventEmitter: EventEmitter2,
) {
this.numWorkers = getMaxWorkers(numWorkers);
this.queue = new AutoQueue(this.numWorkers * workerQueueSize);
constructor(nodeConfig: NodeConfig, private eventEmitter: EventEmitter2) {
this.numWorkers = getMaxWorkers(nodeConfig.workers);
this.queue = new AutoQueue(this.numWorkers * nodeConfig.batchSize * 2);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* 2 is important so worker can always prepare next batch

}

async init(
Expand Down Expand Up @@ -286,7 +282,8 @@ export class WorkerBlockDispatcherService
} blocks`,
);

heights.map((height) => this.enqueueBlock(height));
const workerIdx = this.getNextWorkerIndex();
heights.map((height) => this.enqueueBlock(height, workerIdx));

this.latestBufferedHeight = last(heights);
}
Expand All @@ -296,9 +293,8 @@ export class WorkerBlockDispatcherService
this.queue.flush();
}

private enqueueBlock(height: number) {
private enqueueBlock(height: number, workerIdx: number) {
if (this.isShutdown) return;
const workerIdx = this.getNextWorkerIndex();
const worker = this.workers[workerIdx];

assert(worker, `Worker ${workerIdx} not found`);
Expand Down Expand Up @@ -360,6 +356,14 @@ export class WorkerBlockDispatcherService
void this.queue.put(processBlock);
}

@Interval(15000)
async sampleWorkerStatus() {
for (const worker of this.workers) {
const status = await worker.getStatus();
logger.info(JSON.stringify(status));
}
}

get queueSize(): number {
return this.queue.size;
}
Expand Down
25 changes: 23 additions & 2 deletions packages/node/src/indexer/worker/worker.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import { Injectable } from '@nestjs/common';
import { RuntimeVersion } from '@polkadot/types/interfaces';
import { NodeConfig } from '../../configure/NodeConfig';
import { AutoQueue } from '../../utils/autoQueue';
import { fetchBlocksBatches } from '../../utils/substrate';
import { ApiService } from '../api.service';
Expand All @@ -17,18 +18,27 @@ export type ProcessBlockResponse = {
dynamicDsCreated: boolean;
};

export type WorkerStatusResponse = {
threadId: number;
isIndexing: boolean;
fetchedBlocks: number;
toFetchBlocks: number;
};

@Injectable()
export class WorkerService {
private fetchedBlocks: Record<string, BlockContent> = {};
private currentRuntimeVersion: RuntimeVersion | undefined;
private _isIndexing = false;

private queue: AutoQueue<FetchBlockResponse>;

constructor(
private apiService: ApiService,
private indexerManager: IndexerManager,
private nodeConfig: NodeConfig,
) {
this.queue = new AutoQueue(undefined, 5);
this.queue = new AutoQueue(undefined, nodeConfig.batchSize);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

improve fetch performance for worker (same as before)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found this change made the biggest difference. Even on my branch this change increases the performance to match.

}

async fetchBlock(height: number): Promise<FetchBlockResponse> {
Expand Down Expand Up @@ -68,6 +78,7 @@ export class WorkerService {
}

async processBlock(height: number): Promise<ProcessBlockResponse> {
this._isIndexing = true;
const block = this.fetchedBlocks[height];

if (!block) {
Expand All @@ -76,7 +87,13 @@ export class WorkerService {

delete this.fetchedBlocks[height];

return this.indexerManager.indexBlock(block, this.currentRuntimeVersion);
const response = await this.indexerManager.indexBlock(
block,
this.currentRuntimeVersion,
);

this._isIndexing = false;
return response;
}

get numFetchedBlocks(): number {
Expand All @@ -86,4 +103,8 @@ export class WorkerService {
get numFetchingBlocks(): number {
return this.queue.size;
}

get isIndexing(): boolean {
return this._isIndexing;
}
}
13 changes: 13 additions & 0 deletions packages/node/src/indexer/worker/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
FetchBlockResponse,
ProcessBlockResponse,
WorkerService,
WorkerStatusResponse,
} from './worker.service';

let app: INestApplication;
Expand Down Expand Up @@ -67,6 +68,16 @@ async function numFetchingBlocks(): Promise<number> {
return workerService.numFetchingBlocks;
}

// eslint-disable-next-line @typescript-eslint/require-await
async function getStatus(): Promise<WorkerStatusResponse> {
return {
threadId,
fetchedBlocks: workerService.numFetchedBlocks,
toFetchBlocks: workerService.numFetchingBlocks,
isIndexing: workerService.isIndexing,
};
}

// Register these functions to be exposed to worker host
registerWorker({
initWorker,
Expand All @@ -75,6 +86,7 @@ registerWorker({
numFetchedBlocks,
numFetchingBlocks,
setCurrentRuntimeVersion,
getStatus,
});

// Export types to be used on the parent
Expand All @@ -84,3 +96,4 @@ export type ProcessBlock = typeof processBlock;
export type NumFetchedBlocks = typeof numFetchedBlocks;
export type NumFetchingBlocks = typeof numFetchingBlocks;
export type SetCurrentRuntimeVersion = typeof setCurrentRuntimeVersion;
export type GetWorkerStatus = typeof getStatus;
6 changes: 4 additions & 2 deletions packages/node/src/utils/autoQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ export class AutoQueue<T> {
private pendingPromise = false;
private queue: Queue<Action<T>>;
private _abort = false;
private processingTasks = 0;

private eventEmitter = new EventEmitter2();

Expand All @@ -85,7 +86,7 @@ export class AutoQueue<T> {
}

get size(): number {
return this.queue.size;
return this.queue.size + this.processingTasks;
}

get capacity(): number {
Expand Down Expand Up @@ -135,6 +136,7 @@ export class AutoQueue<T> {
const actions = this.queue.takeMany(this.concurrency);

if (!actions.length) break;
this.processingTasks += actions.length;

this.eventEmitter.emit('size', this.queue.size);

Expand All @@ -144,7 +146,7 @@ export class AutoQueue<T> {
actions.map(async (action) => {
try {
const payload = await action.task();

this.processingTasks -= 1;
action.resolve(payload);
} catch (e) {
action.reject(e);
Expand Down