diff --git a/packages/node-core/src/db/db.module.ts b/packages/node-core/src/db/db.module.ts index 2cd13bbf81..4c580f5929 100644 --- a/packages/node-core/src/db/db.module.ts +++ b/packages/node-core/src/db/db.module.ts @@ -74,7 +74,7 @@ const buildSequelizeOptions = (nodeConfig: NodeConfig, option: DbOption): Sequel }, logging: nodeConfig.debug ? (sql: string, timing?: number) => { - // logger.debug(sql); + logger.debug(sql); } : false, }; diff --git a/packages/node-core/src/indexer/benchmark.service.ts b/packages/node-core/src/indexer/benchmark.service.ts index 698a9a8839..97e38385d8 100644 --- a/packages/node-core/src/indexer/benchmark.service.ts +++ b/packages/node-core/src/indexer/benchmark.service.ts @@ -32,7 +32,7 @@ export class BenchmarkService { @Interval(SAMPLING_TIME_VARIANCE * 1000) async benchmark(): Promise { try { - if (!this.currentProcessingHeight || !this.currentProcessingTimestamp || !this.currentProcessedBlockAmount) { + if (!this.currentProcessingHeight || !this.currentProcessingTimestamp) { await delay(10); } else { if (this.lastRegisteredHeight && this.lastRegisteredTimestamp) { diff --git a/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts b/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts index fc6f8bc939..d7b230c4db 100644 --- a/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts +++ b/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts @@ -30,7 +30,7 @@ export type ProcessBlockResponse = { }; export interface IBlockDispatcher { - enqueueBlocks(heights: number[], latestBufferHeight?: number): void; + enqueueBlocks(heights: number[], latestBufferHeight?: number): Promise; queueSize: number; freeSize: number; @@ -69,7 +69,7 @@ export abstract class BaseBlockDispatcher implements IBlockDis protected dynamicDsService: DynamicDsService ) {} - abstract enqueueBlocks(heights: number[]): void; + abstract enqueueBlocks(heights: number[], latestBufferHeight?: number): Promise; async init(onDynamicDsCreated: (height: number) => Promise): Promise { this.onDynamicDsCreated = onDynamicDsCreated; @@ -198,7 +198,21 @@ export abstract class BaseBlockDispatcher implements IBlockDis } } - private updateStoreMetadata(height: number): void { + // Used when dictionary results skip a large number of blocks + protected async jumpBufferedHeight(height: number): Promise { + this.updateStoreMetadata(height, false); + this.latestBufferedHeight = height; + + // We're not actually processing this block, we just want to update health/benchmark + this.eventEmitter.emit(IndexerEvent.BlockProcessing, { + height, + timestamp: Date.now(), + }); + + await this.storeCacheService.flushCache(true); + } + + private updateStoreMetadata(height: number, updateProcessed = true): void { const meta = this.storeCacheService.metadata; // Update store metadata meta.setBulk([ @@ -206,6 +220,8 @@ export abstract class BaseBlockDispatcher implements IBlockDis {key: 'lastProcessedTimestamp', value: Date.now()}, ]); // Db Metadata increase BlockCount, in memory ref to block-dispatcher _processedBlockCount - meta.setIncrement('processedBlockCount'); + if (updateProcessed) { + meta.setIncrement('processedBlockCount'); + } } } diff --git a/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts b/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts index 995c5683c5..123c14f9ae 100644 --- a/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts +++ b/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts @@ -1,7 +1,7 @@ // Copyright 2020-2021 OnFinality Limited authors & contributors // SPDX-License-Identifier: Apache-2.0 -import { getHeapStatistics } from 'v8'; +import {getHeapStatistics} from 'v8'; import {OnApplicationShutdown} from '@nestjs/common'; import {EventEmitter2} from '@nestjs/event-emitter'; import {profilerWrap} from '@subql/node-core/profiler'; @@ -12,7 +12,7 @@ import {getLogger} from '../../logger'; import {Queue, AutoQueue, delay, memoryLock, waitForBatchSize} from '../../utils'; import {DynamicDsService} from '../dynamic-ds.service'; import {PoiService} from '../poi.service'; -import { SmartBatchService } from '../smartBatch.service'; +import {SmartBatchService} from '../smartBatch.service'; import {StoreService} from '../store.service'; import {StoreCacheService} from '../storeCache'; import {IProjectNetworkConfig, IProjectService, ISubqueryProject} from '../types'; @@ -77,20 +77,19 @@ export abstract class BlockDispatcher this.processQueue.abort(); } - enqueueBlocks(cleanedBlocks: number[], latestBufferHeight?: number): void { - // // In the case where factors of batchSize is equal to bypassBlock or when cleanedBatchBlocks is [] - // // to ensure block is bypassed, latestBufferHeight needs to be manually set - // If cleanedBlocks = [] - if (!!latestBufferHeight && !cleanedBlocks.length) { - this.latestBufferedHeight = latestBufferHeight; + async enqueueBlocks(heights: number[], latestBufferHeight?: number): Promise { + // In the case where factors of batchSize is equal to bypassBlock or when heights is [] + // to ensure block is bypassed, latestBufferHeight needs to be manually set + if (!!latestBufferHeight && !heights.length) { + await this.jumpBufferedHeight(latestBufferHeight); return; } - logger.info(`Enqueueing blocks ${cleanedBlocks[0]}...${last(cleanedBlocks)}, total ${cleanedBlocks.length} blocks`); + logger.info(`Enqueueing blocks ${heights[0]}...${last(heights)}, total ${heights.length} blocks`); - this.queue.putMany(cleanedBlocks); + this.queue.putMany(heights); - this.latestBufferedHeight = latestBufferHeight ?? last(cleanedBlocks); + this.latestBufferedHeight = latestBufferHeight ?? last(heights); void this.fetchBlocksFromQueue(); } @@ -100,10 +99,7 @@ export abstract class BlockDispatcher } private memoryleft(): number { - return ( - this.smartBatchService.heapMemoryLimit() - - getHeapStatistics().used_heap_size - ); + return this.smartBatchService.heapMemoryLimit() - getHeapStatistics().used_heap_size; } private async fetchBlocksFromQueue(): Promise { diff --git a/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.ts b/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.ts index 828a473549..45db902d7f 100644 --- a/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.ts +++ b/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.ts @@ -12,7 +12,7 @@ import {getLogger} from '../../logger'; import {AutoQueue} from '../../utils'; import {DynamicDsService} from '../dynamic-ds.service'; import {PoiService} from '../poi.service'; -import { SmartBatchService } from '../smartBatch.service'; +import {SmartBatchService} from '../smartBatch.service'; import {StoreService} from '../store.service'; import {StoreCacheService} from '../storeCache'; import {IProjectNetworkConfig, IProjectService, ISubqueryProject} from '../types'; @@ -89,8 +89,10 @@ export abstract class WorkerBlockDispatcher } async enqueueBlocks(heights: number[], latestBufferHeight?: number): Promise { + // In the case where factors of batchSize is equal to bypassBlock or when heights is [] + // to ensure block is bypassed, latestBufferHeight needs to be manually set if (!!latestBufferHeight && !heights.length) { - this.latestBufferedHeight = latestBufferHeight; + await this.jumpBufferedHeight(latestBufferHeight); return; } @@ -101,14 +103,9 @@ export abstract class WorkerBlockDispatcher let startIndex = 0; while (startIndex < heights.length) { const workerIdx = await this.getNextWorkerIndex(); - const batchSize = Math.min( - heights.length - startIndex, - await this.maxBatchSize(workerIdx), - ); + const batchSize = Math.min(heights.length - startIndex, await this.maxBatchSize(workerIdx)); await Promise.all( - heights - .slice(startIndex, startIndex + batchSize) - .map((height) => this.enqueueBlock(height, workerIdx)), + heights.slice(startIndex, startIndex + batchSize).map((height) => this.enqueueBlock(height, workerIdx)) ); startIndex += batchSize; } @@ -189,9 +186,7 @@ export abstract class WorkerBlockDispatcher } private async getNextWorkerIndex(): Promise { - return Promise.all( - this.workers.map((worker) => worker.getMemoryLeft()), - ).then((memoryLeftValues) => { + return Promise.all(this.workers.map((worker) => worker.getMemoryLeft())).then((memoryLeftValues) => { return memoryLeftValues.indexOf(Math.max(...memoryLeftValues)); }); } diff --git a/packages/node/src/indexer/fetch.service.ts b/packages/node/src/indexer/fetch.service.ts index 113208a962..56d13e6862 100644 --- a/packages/node/src/indexer/fetch.service.ts +++ b/packages/node/src/indexer/fetch.service.ts @@ -96,7 +96,8 @@ export class FetchService implements OnApplicationShutdown { private apiService: ApiService, private nodeConfig: NodeConfig, @Inject('ISubqueryProject') private project: SubqueryProject, - @Inject('IBlockDispatcher') private blockDispatcher: ISubstrateBlockDispatcher, + @Inject('IBlockDispatcher') + private blockDispatcher: ISubstrateBlockDispatcher, private dictionaryService: DictionaryService, private dsProcessorService: DsProcessorService, private dynamicDsService: DynamicDsService, @@ -490,7 +491,6 @@ export class FetchService implements OnApplicationShutdown { .sort((a, b) => a - b); if (batchBlocks.length === 0) { // There we're no blocks in this query range, we can set a new height we're up to - // eslint-disable-next-line @typescript-eslint/await-thenable await this.blockDispatcher.enqueueBlocks( [], Math.min( @@ -506,7 +506,6 @@ export class FetchService implements OnApplicationShutdown { const enqueuingBlocks = batchBlocks.slice(0, maxBlockSize); const cleanedBatchBlocks = this.filteredBlockBatch(enqueuingBlocks); - // eslint-disable-next-line @typescript-eslint/await-thenable await this.blockDispatcher.enqueueBlocks( cleanedBatchBlocks, this.getLatestBufferHeight(cleanedBatchBlocks, enqueuingBlocks), @@ -526,23 +525,16 @@ export class FetchService implements OnApplicationShutdown { scaledBatchSize, ); - if (handlers.length && this.getModulos().length === handlers.length) { - const enqueuingBlocks = this.getEnqueuedModuloBlocks(startBlockHeight); - const cleanedBatchBlocks = this.filteredBlockBatch(enqueuingBlocks); - // eslint-disable-next-line @typescript-eslint/await-thenable - await this.blockDispatcher.enqueueBlocks( - cleanedBatchBlocks, - this.getLatestBufferHeight(cleanedBatchBlocks, enqueuingBlocks), - ); - } else { - const enqueuingBlocks = range(startBlockHeight, endHeight + 1); - const cleanedBatchBlocks = this.filteredBlockBatch(enqueuingBlocks); - // eslint-disable-next-line @typescript-eslint/await-thenable - await this.blockDispatcher.enqueueBlocks( - cleanedBatchBlocks, - this.getLatestBufferHeight(cleanedBatchBlocks, enqueuingBlocks), - ); - } + const enqueuingBlocks = + handlers.length && this.getModulos().length === handlers.length + ? this.getEnqueuedModuloBlocks(startBlockHeight) + : range(startBlockHeight, endHeight + 1); + + const cleanedBatchBlocks = this.filteredBlockBatch(enqueuingBlocks); + await this.blockDispatcher.enqueueBlocks( + cleanedBatchBlocks, + this.getLatestBufferHeight(cleanedBatchBlocks, enqueuingBlocks), + ); } } private getLatestBufferHeight(