Skip to content

Commit

Permalink
Update metadata when dictionary skips large number of blocks (#1577)
Browse files Browse the repository at this point in the history
* Update metadata when dictionary skips large number of blocks

* Bring back option to log sql
  • Loading branch information
stwiname committed Mar 30, 2023
1 parent ea4655f commit 524b691
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 53 deletions.
2 changes: 1 addition & 1 deletion packages/node-core/src/db/db.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ const buildSequelizeOptions = (nodeConfig: NodeConfig, option: DbOption): Sequel
},
logging: nodeConfig.debug
? (sql: string, timing?: number) => {
// logger.debug(sql);
logger.debug(sql);
}
: false,
};
Expand Down
2 changes: 1 addition & 1 deletion packages/node-core/src/indexer/benchmark.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export class BenchmarkService {
@Interval(SAMPLING_TIME_VARIANCE * 1000)
async benchmark(): Promise<void> {
try {
if (!this.currentProcessingHeight || !this.currentProcessingTimestamp || !this.currentProcessedBlockAmount) {
if (!this.currentProcessingHeight || !this.currentProcessingTimestamp) {
await delay(10);
} else {
if (this.lastRegisteredHeight && this.lastRegisteredTimestamp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export type ProcessBlockResponse = {
};

export interface IBlockDispatcher {
enqueueBlocks(heights: number[], latestBufferHeight?: number): void;
enqueueBlocks(heights: number[], latestBufferHeight?: number): Promise<void>;

queueSize: number;
freeSize: number;
Expand Down Expand Up @@ -69,7 +69,7 @@ export abstract class BaseBlockDispatcher<Q extends IQueue> implements IBlockDis
protected dynamicDsService: DynamicDsService<any>
) {}

abstract enqueueBlocks(heights: number[]): void;
abstract enqueueBlocks(heights: number[], latestBufferHeight?: number): Promise<void>;

async init(onDynamicDsCreated: (height: number) => Promise<void>): Promise<void> {
this.onDynamicDsCreated = onDynamicDsCreated;
Expand Down Expand Up @@ -198,14 +198,30 @@ export abstract class BaseBlockDispatcher<Q extends IQueue> implements IBlockDis
}
}

private updateStoreMetadata(height: number): void {
// Used when dictionary results skip a large number of blocks
protected async jumpBufferedHeight(height: number): Promise<void> {
this.updateStoreMetadata(height, false);
this.latestBufferedHeight = height;

// We're not actually processing this block, we just want to update health/benchmark
this.eventEmitter.emit(IndexerEvent.BlockProcessing, {
height,
timestamp: Date.now(),
});

await this.storeCacheService.flushCache(true);
}

private updateStoreMetadata(height: number, updateProcessed = true): void {
const meta = this.storeCacheService.metadata;
// Update store metadata
meta.setBulk([
{key: 'lastProcessedHeight', value: height},
{key: 'lastProcessedTimestamp', value: Date.now()},
]);
// Db Metadata increase BlockCount, in memory ref to block-dispatcher _processedBlockCount
meta.setIncrement('processedBlockCount');
if (updateProcessed) {
meta.setIncrement('processedBlockCount');
}
}
}
26 changes: 11 additions & 15 deletions packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2020-2021 OnFinality Limited authors & contributors
// SPDX-License-Identifier: Apache-2.0

import { getHeapStatistics } from 'v8';
import {getHeapStatistics} from 'v8';
import {OnApplicationShutdown} from '@nestjs/common';
import {EventEmitter2} from '@nestjs/event-emitter';
import {profilerWrap} from '@subql/node-core/profiler';
Expand All @@ -12,7 +12,7 @@ import {getLogger} from '../../logger';
import {Queue, AutoQueue, delay, memoryLock, waitForBatchSize} from '../../utils';
import {DynamicDsService} from '../dynamic-ds.service';
import {PoiService} from '../poi.service';
import { SmartBatchService } from '../smartBatch.service';
import {SmartBatchService} from '../smartBatch.service';
import {StoreService} from '../store.service';
import {StoreCacheService} from '../storeCache';
import {IProjectNetworkConfig, IProjectService, ISubqueryProject} from '../types';
Expand Down Expand Up @@ -77,20 +77,19 @@ export abstract class BlockDispatcher<B, DS>
this.processQueue.abort();
}

enqueueBlocks(cleanedBlocks: number[], latestBufferHeight?: number): void {
// // In the case where factors of batchSize is equal to bypassBlock or when cleanedBatchBlocks is []
// // to ensure block is bypassed, latestBufferHeight needs to be manually set
// If cleanedBlocks = []
if (!!latestBufferHeight && !cleanedBlocks.length) {
this.latestBufferedHeight = latestBufferHeight;
async enqueueBlocks(heights: number[], latestBufferHeight?: number): Promise<void> {
// In the case where factors of batchSize is equal to bypassBlock or when heights is []
// to ensure block is bypassed, latestBufferHeight needs to be manually set
if (!!latestBufferHeight && !heights.length) {
await this.jumpBufferedHeight(latestBufferHeight);
return;
}

logger.info(`Enqueueing blocks ${cleanedBlocks[0]}...${last(cleanedBlocks)}, total ${cleanedBlocks.length} blocks`);
logger.info(`Enqueueing blocks ${heights[0]}...${last(heights)}, total ${heights.length} blocks`);

this.queue.putMany(cleanedBlocks);
this.queue.putMany(heights);

this.latestBufferedHeight = latestBufferHeight ?? last(cleanedBlocks);
this.latestBufferedHeight = latestBufferHeight ?? last(heights);
void this.fetchBlocksFromQueue();
}

Expand All @@ -100,10 +99,7 @@ export abstract class BlockDispatcher<B, DS>
}

private memoryleft(): number {
return (
this.smartBatchService.heapMemoryLimit() -
getHeapStatistics().used_heap_size
);
return this.smartBatchService.heapMemoryLimit() - getHeapStatistics().used_heap_size;
}

private async fetchBlocksFromQueue(): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {getLogger} from '../../logger';
import {AutoQueue} from '../../utils';
import {DynamicDsService} from '../dynamic-ds.service';
import {PoiService} from '../poi.service';
import { SmartBatchService } from '../smartBatch.service';
import {SmartBatchService} from '../smartBatch.service';
import {StoreService} from '../store.service';
import {StoreCacheService} from '../storeCache';
import {IProjectNetworkConfig, IProjectService, ISubqueryProject} from '../types';
Expand Down Expand Up @@ -89,8 +89,10 @@ export abstract class WorkerBlockDispatcher<DS, W extends Worker>
}

async enqueueBlocks(heights: number[], latestBufferHeight?: number): Promise<void> {
// In the case where factors of batchSize is equal to bypassBlock or when heights is []
// to ensure block is bypassed, latestBufferHeight needs to be manually set
if (!!latestBufferHeight && !heights.length) {
this.latestBufferedHeight = latestBufferHeight;
await this.jumpBufferedHeight(latestBufferHeight);
return;
}

Expand All @@ -101,14 +103,9 @@ export abstract class WorkerBlockDispatcher<DS, W extends Worker>
let startIndex = 0;
while (startIndex < heights.length) {
const workerIdx = await this.getNextWorkerIndex();
const batchSize = Math.min(
heights.length - startIndex,
await this.maxBatchSize(workerIdx),
);
const batchSize = Math.min(heights.length - startIndex, await this.maxBatchSize(workerIdx));
await Promise.all(
heights
.slice(startIndex, startIndex + batchSize)
.map((height) => this.enqueueBlock(height, workerIdx)),
heights.slice(startIndex, startIndex + batchSize).map((height) => this.enqueueBlock(height, workerIdx))
);
startIndex += batchSize;
}
Expand Down Expand Up @@ -189,9 +186,7 @@ export abstract class WorkerBlockDispatcher<DS, W extends Worker>
}

private async getNextWorkerIndex(): Promise<number> {
return Promise.all(
this.workers.map((worker) => worker.getMemoryLeft()),
).then((memoryLeftValues) => {
return Promise.all(this.workers.map((worker) => worker.getMemoryLeft())).then((memoryLeftValues) => {
return memoryLeftValues.indexOf(Math.max(...memoryLeftValues));
});
}
Expand Down
32 changes: 12 additions & 20 deletions packages/node/src/indexer/fetch.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ export class FetchService implements OnApplicationShutdown {
private apiService: ApiService,
private nodeConfig: NodeConfig,
@Inject('ISubqueryProject') private project: SubqueryProject,
@Inject('IBlockDispatcher') private blockDispatcher: ISubstrateBlockDispatcher,
@Inject('IBlockDispatcher')
private blockDispatcher: ISubstrateBlockDispatcher,
private dictionaryService: DictionaryService,
private dsProcessorService: DsProcessorService,
private dynamicDsService: DynamicDsService,
Expand Down Expand Up @@ -490,7 +491,6 @@ export class FetchService implements OnApplicationShutdown {
.sort((a, b) => a - b);
if (batchBlocks.length === 0) {
// There we're no blocks in this query range, we can set a new height we're up to
// eslint-disable-next-line @typescript-eslint/await-thenable
await this.blockDispatcher.enqueueBlocks(
[],
Math.min(
Expand All @@ -506,7 +506,6 @@ export class FetchService implements OnApplicationShutdown {
const enqueuingBlocks = batchBlocks.slice(0, maxBlockSize);
const cleanedBatchBlocks =
this.filteredBlockBatch(enqueuingBlocks);
// eslint-disable-next-line @typescript-eslint/await-thenable
await this.blockDispatcher.enqueueBlocks(
cleanedBatchBlocks,
this.getLatestBufferHeight(cleanedBatchBlocks, enqueuingBlocks),
Expand All @@ -526,23 +525,16 @@ export class FetchService implements OnApplicationShutdown {
scaledBatchSize,
);

if (handlers.length && this.getModulos().length === handlers.length) {
const enqueuingBlocks = this.getEnqueuedModuloBlocks(startBlockHeight);
const cleanedBatchBlocks = this.filteredBlockBatch(enqueuingBlocks);
// eslint-disable-next-line @typescript-eslint/await-thenable
await this.blockDispatcher.enqueueBlocks(
cleanedBatchBlocks,
this.getLatestBufferHeight(cleanedBatchBlocks, enqueuingBlocks),
);
} else {
const enqueuingBlocks = range(startBlockHeight, endHeight + 1);
const cleanedBatchBlocks = this.filteredBlockBatch(enqueuingBlocks);
// eslint-disable-next-line @typescript-eslint/await-thenable
await this.blockDispatcher.enqueueBlocks(
cleanedBatchBlocks,
this.getLatestBufferHeight(cleanedBatchBlocks, enqueuingBlocks),
);
}
const enqueuingBlocks =
handlers.length && this.getModulos().length === handlers.length
? this.getEnqueuedModuloBlocks(startBlockHeight)
: range(startBlockHeight, endHeight + 1);

const cleanedBatchBlocks = this.filteredBlockBatch(enqueuingBlocks);
await this.blockDispatcher.enqueueBlocks(
cleanedBatchBlocks,
this.getLatestBufferHeight(cleanedBatchBlocks, enqueuingBlocks),
);
}
}
private getLatestBufferHeight(
Expand Down

0 comments on commit 524b691

Please sign in to comment.