From 9f77229065317735aaa2dc79e167dc84a21e54b9 Mon Sep 17 00:00:00 2001 From: Jay Ji Date: Tue, 9 May 2023 11:42:01 +1200 Subject: [PATCH] Regenerate mmr (#1664) * preparation Unsafe mode and draft bulkUpsert with poi mmr move service to node-core * tidy up, fix tests * Fix gap mmr node missing issue, poiCache use plainPoiModel --- packages/common/src/constants.ts | 6 + .../blockDispatcher/base-block-dispatcher.ts | 3 +- .../blockDispatcher/block-dispatcher.ts | 2 +- .../worker-block-dispatcher.ts | 2 +- packages/node-core/src/indexer/index.ts | 4 +- packages/node-core/src/indexer/mmr.service.ts | 133 +++++++++--- .../src/indexer/mmrRegenerate.service.ts | 192 ++++++++++++++++++ .../src/indexer/{ => poi}/PoiBlock.ts | 2 +- packages/node-core/src/indexer/poi/index.ts | 6 + .../src/indexer/{ => poi}/poi.service.ts | 4 +- .../node-core/src/indexer/poi/poiModel.ts | 92 +++++++++ .../node-core/src/indexer/project.service.ts | 4 +- .../node-core/src/indexer/store.service.ts | 2 +- .../src/indexer/storeCache/cachePoi.spec.ts | 36 ++-- .../src/indexer/storeCache/cachePoi.ts | 47 ++--- .../indexer/storeCache/storeCache.service.ts | 4 +- packages/node/src/main.ts | 9 +- .../src/subcommands/mmrRegenerate.init.ts | 29 +++ .../src/subcommands/mmrRegenerate.module.ts | 35 ++++ packages/node/src/yargs.ts | 79 +++++++ 20 files changed, 606 insertions(+), 85 deletions(-) create mode 100644 packages/node-core/src/indexer/mmrRegenerate.service.ts rename packages/node-core/src/indexer/{ => poi}/PoiBlock.ts (97%) create mode 100644 packages/node-core/src/indexer/poi/index.ts rename packages/node-core/src/indexer/{ => poi}/poi.service.ts (94%) create mode 100644 packages/node-core/src/indexer/poi/poiModel.ts create mode 100644 packages/node/src/subcommands/mmrRegenerate.init.ts create mode 100644 packages/node/src/subcommands/mmrRegenerate.module.ts diff --git a/packages/common/src/constants.ts b/packages/common/src/constants.ts index 1e1c3216b3..b9b386b660 100644 --- a/packages/common/src/constants.ts +++ b/packages/common/src/constants.ts @@ -48,3 +48,9 @@ export enum SUPPORT_DB { // DATABASE ERROR REGEX export const CONNECTION_SSL_ERROR_REGEX = 'not support SSL'; + +// BLOCK BATCH SYNC between POI MMR <-> Filebased/Postgres MMR +export const RESET_MMR_BLOCK_BATCH = 1000; + +// Default Model fetch range +export const DEFAULT_FETCH_RANGE = 100; diff --git a/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts b/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts index 44501a74ed..4771241b33 100644 --- a/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts +++ b/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts @@ -209,7 +209,8 @@ export abstract class BaseBlockDispatcher implements IBloc await this.poiService.getLatestPoiBlockHash(), this.project.id ); - this.poi.set(poiBlock); + // This is the first creation of POI + this.poi.bulkUpsert([poiBlock]); this.poiService.setLatestPoiBlockHash(poiBlock.hash); this.storeCacheService.metadata.set('lastPoiHeight', height); } diff --git a/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts b/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts index 220940f2a9..1584d62e35 100644 --- a/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts +++ b/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts @@ -11,7 +11,7 @@ import {getLogger} from '../../logger'; import {profilerWrap} from '../../profiler'; import {Queue, AutoQueue, delay, memoryLock, waitForBatchSize} from '../../utils'; import {DynamicDsService} from '../dynamic-ds.service'; -import {PoiService} from '../poi.service'; +import {PoiService} from '../poi/poi.service'; import {SmartBatchService} from '../smartBatch.service'; import {StoreService} from '../store.service'; import {StoreCacheService} from '../storeCache'; diff --git a/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.ts b/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.ts index 58f910b952..027b5209a0 100644 --- a/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.ts +++ b/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.ts @@ -11,7 +11,7 @@ import {IndexerEvent} from '../../events'; import {getLogger} from '../../logger'; import {AutoQueue} from '../../utils'; import {DynamicDsService} from '../dynamic-ds.service'; -import {PoiService} from '../poi.service'; +import {PoiService} from '../poi/poi.service'; import {SmartBatchService} from '../smartBatch.service'; import {StoreService} from '../store.service'; import {StoreCacheService} from '../storeCache'; diff --git a/packages/node-core/src/indexer/index.ts b/packages/node-core/src/indexer/index.ts index 6be76f7608..1ec9c2dd49 100644 --- a/packages/node-core/src/indexer/index.ts +++ b/packages/node-core/src/indexer/index.ts @@ -4,12 +4,11 @@ export * from './benchmark.service'; export * from './connectionPool.service'; export * from './entities'; -export * from './PoiBlock'; +export * from './poi'; export * from './types'; export * from './StoreOperations'; export * from './store.service'; export * from './storeCache'; -export * from './poi.service'; export * from './mmr.service'; export * from './worker'; export * from './dictionary.service'; @@ -23,3 +22,4 @@ export * from './project.service'; export * from './fetch.service'; export * from './indexer.manager'; export * from './mmrMigrate.service'; +export * from './mmrRegenerate.service'; diff --git a/packages/node-core/src/indexer/mmr.service.ts b/packages/node-core/src/indexer/mmr.service.ts index 401466c8f6..4881788149 100644 --- a/packages/node-core/src/indexer/mmr.service.ts +++ b/packages/node-core/src/indexer/mmr.service.ts @@ -4,24 +4,26 @@ import assert from 'assert'; import fs from 'fs'; import {Injectable, OnApplicationShutdown} from '@nestjs/common'; -import {DEFAULT_WORD_SIZE, DEFAULT_LEAF, MMR_AWAIT_TIME} from '@subql/common'; +import {DEFAULT_WORD_SIZE, DEFAULT_LEAF, MMR_AWAIT_TIME, RESET_MMR_BLOCK_BATCH} from '@subql/common'; import {u8aToHex, u8aEq} from '@subql/utils'; import {MMR, FileBasedDb} from '@subql/x-merkle-mountain-range'; import {keccak256} from 'js-sha3'; -import {Sequelize} from 'sequelize'; +import {Op, Sequelize} from 'sequelize'; import {MmrStoreType, NodeConfig} from '../configure'; import {MmrPayload, MmrProof} from '../events'; +import {PlainPoiModel, PoiInterface} from '../indexer/poi'; import {getLogger} from '../logger'; import {delay, getExistingProjectSchema} from '../utils'; import {ProofOfIndex} from './entities'; import {PgBasedMMRDB} from './postgresMmrDb'; import {StoreCacheService} from './storeCache'; -import {CachePoiModel} from './storeCache/cachePoi'; - const logger = getLogger('mmr'); const keccak256Hash = (...nodeValues: Uint8Array[]) => Buffer.from(keccak256(Buffer.concat(nodeValues)), 'hex'); +const syncingMsg = (start: number, end: number, size: number) => + logger.info(`Syncing block [${start} - ${end}] mmr, total ${size} blocks `); + @Injectable() export class MmrService implements OnApplicationShutdown { private isShutdown = false; @@ -30,6 +32,7 @@ export class MmrService implements OnApplicationShutdown { // This is the next block height that suppose to calculate its mmr value private _nextMmrBlockHeight?: number; private _blockOffset?: number; + private _poi?: PlainPoiModel; constructor( private readonly nodeConfig: NodeConfig, @@ -41,7 +44,10 @@ export class MmrService implements OnApplicationShutdown { this.isShutdown = true; } - private get poi(): CachePoiModel { + get poi(): PoiInterface { + if (this._poi) { + return this._poi; + } const poi = this.storeCacheService.poi; if (!poi) { throw new Error('MMR service expected POI but it was not found'); @@ -70,13 +76,17 @@ export class MmrService implements OnApplicationShutdown { return this._blockOffset; } - async syncFileBaseFromPoi(blockOffset: number): Promise { + async init(blockOffset: number, poi: PlainPoiModel): Promise { + this._blockOffset = blockOffset; + await this.ensureMmr(); + this._poi = poi; + } + + // Exit option allow exit when POI is fully sync + async syncFileBaseFromPoi(blockOffset: number, exitHeight?: number, logging?: boolean): Promise { if (this.isSyncing) return; this.isSyncing = true; - this._mmrDb = - this.nodeConfig.mmrStoreType === MmrStoreType.Postgres - ? await this.ensurePostgresBasedMmr() - : await this.ensureFileBasedMmr(this.nodeConfig.mmrPath); + await this.ensureMmr(); this._blockOffset = blockOffset; // The mmr database current leaf length const mmrLeafLength = await this.mmrDb.getLeafLength(); @@ -101,6 +111,10 @@ export class MmrService implements OnApplicationShutdown { while (!this.isShutdown) { const poiBlocks = await this.poi.getPoiBlocksByRange(this.nextMmrBlockHeight); if (poiBlocks.length !== 0) { + if (logging) { + syncingMsg(poiBlocks[0].id, poiBlocks[poiBlocks.length - 1].id, poiBlocks.length); + } + const appendedBlocks = []; for (const block of poiBlocks) { if (this.nextMmrBlockHeight < block.id) { for (let i = this.nextMmrBlockHeight; i < block.id; i++) { @@ -108,16 +122,27 @@ export class MmrService implements OnApplicationShutdown { this._nextMmrBlockHeight = i + 1; } } - await this.appendMmrNode(block); + appendedBlocks.push(await this.appendMmrNode(block)); + this._nextMmrBlockHeight = block.id + 1; + } + // This should be safe, even poi bulkUpsert faild, filebased/postgres db node should already been written and accurate. + if (appendedBlocks.length) { + await this.poi.bulkUpsert(appendedBlocks); } } else { const {lastPoiHeight, lastProcessedHeight} = await this.storeCacheService.metadata.findMany([ 'lastPoiHeight', 'lastProcessedHeight', ]); - // this.nextMmrBlockHeight means block before nextMmrBlockHeight-1 already exist in filebase mmr if (this.nextMmrBlockHeight > Number(lastPoiHeight) && this.nextMmrBlockHeight <= Number(lastProcessedHeight)) { + if (logging) { + syncingMsg( + this.nextMmrBlockHeight, + Number(lastProcessedHeight), + Math.max(1, Number(lastProcessedHeight) - this.nextMmrBlockHeight) + ); + } for (let i = this.nextMmrBlockHeight; i <= Number(lastProcessedHeight); i++) { await this.mmrDb.append(DEFAULT_LEAF); this._nextMmrBlockHeight = i + 1; @@ -125,11 +150,14 @@ export class MmrService implements OnApplicationShutdown { } await delay(MMR_AWAIT_TIME); } + if (exitHeight !== undefined && this.nextMmrBlockHeight > exitHeight) { + break; + } } this.isSyncing = false; } - private async appendMmrNode(poiBlock: ProofOfIndex): Promise { + private async appendMmrNode(poiBlock: ProofOfIndex): Promise { const newLeaf = poiBlock.hash; if (newLeaf.length !== DEFAULT_WORD_SIZE) { throw new Error(`Append Mmr failed, input data length should be ${DEFAULT_WORD_SIZE}`); @@ -138,8 +166,52 @@ export class MmrService implements OnApplicationShutdown { // The next leaf index in mmr, current latest leaf index always .getLeafLength -1. await this.mmrDb.append(newLeaf, estLeafIndexByBlockHeight); const mmrRoot = await this.mmrDb.getRoot(estLeafIndexByBlockHeight); - this.updatePoiMmrRoot(poiBlock, mmrRoot); - this._nextMmrBlockHeight = poiBlock.id + 1; + return this.updatePoiMmrRoot(poiBlock, mmrRoot); + } + + async poiMmrToDb(latestDbMmrHeight: number, targetHeight: number): Promise { + if (latestDbMmrHeight === targetHeight) { + return; + } + let latest = latestDbMmrHeight; + this._nextMmrBlockHeight = latest + 1; + try { + while (latest <= targetHeight) { + const results = ( + await this.poi.model.findAll({ + limit: RESET_MMR_BLOCK_BATCH, + where: {id: {[Op.lte]: targetHeight, [Op.gt]: latest}, mmrRoot: {[Op.ne]: null}} as any, + order: [['id', 'ASC']], + }) + ).map((r) => r?.toJSON()); + if (results.length) { + logger.info( + `Upsert block [${results[0].id} - ${results[results.length - 1].id}] mmr to ${ + this.nodeConfig.mmrStoreType + } DB, total ${results.length} blocks ` + ); + for (const poiBlock of results) { + if (this.nextMmrBlockHeight < poiBlock.id) { + for (let i = this.nextMmrBlockHeight; i < poiBlock.id; i++) { + await this.mmrDb.append(DEFAULT_LEAF); + this._nextMmrBlockHeight = i + 1; + } + } + const estLeafIndexByBlockHeight = poiBlock.id - this.blockOffset - 1; + if (!poiBlock?.hash) { + throw new Error(`Copy POI block ${poiBlock?.id} hash to DB got undefined`); + } + await this.mmrDb.append(poiBlock?.hash, estLeafIndexByBlockHeight); + this._nextMmrBlockHeight = poiBlock.id + 1; + } + latest = results[results.length - 1].id; + } else { + break; + } + } + } catch (e) { + throw new Error(`When try to copy POI mmr to ${this.nodeConfig.mmrStoreType} DB got problem: ${e}`); + } } private validatePoiMmr(poiWithMmr: ProofOfIndex, mmrValue: Uint8Array): void { @@ -158,13 +230,20 @@ export class MmrService implements OnApplicationShutdown { } } - private updatePoiMmrRoot(poiBlock: ProofOfIndex, mmrValue: Uint8Array): void { + private updatePoiMmrRoot(poiBlock: ProofOfIndex, mmrValue: Uint8Array): ProofOfIndex { if (!poiBlock.mmrRoot) { poiBlock.mmrRoot = mmrValue; - this.poi.set(poiBlock); } else { this.validatePoiMmr(poiBlock, mmrValue); } + return poiBlock; + } + + private async ensureMmr(): Promise { + this._mmrDb = + this.nodeConfig.mmrStoreType === MmrStoreType.Postgres + ? await this.ensurePostgresBasedMmr() + : await this.ensureFileBasedMmr(this.nodeConfig.mmrPath); } private async ensureFileBasedMmr(projectMmrPath: string): Promise { @@ -200,14 +279,16 @@ export class MmrService implements OnApplicationShutdown { async getLatestMmr(): Promise { // latest leaf index need fetch from .db, as original method will use cache - const blockHeight = (await this.mmrDb.db.getLeafLength()) + this.blockOffset; - return this.getMmr(blockHeight); + return this.getMmr(await this.getLatestMmrHeight()); } async getLatestMmrProof(): Promise { + return this.getMmrProof(await this.getLatestMmrHeight()); + } + + async getLatestMmrHeight(): Promise { // latest leaf index need fetch from .db, as original method will use cache - const blockHeight = (await this.mmrDb.db.getLeafLength()) + this.blockOffset; - return this.getMmrProof(blockHeight); + return (await this.mmrDb.db.getLeafLength()) + this.blockOffset; } async getMmrProof(blockHeight: number): Promise { @@ -229,11 +310,11 @@ export class MmrService implements OnApplicationShutdown { }; } - async deleteMmrNode(blockHeight: number, blockOffset: number): Promise { - this._mmrDb = - this.nodeConfig.mmrStoreType === MmrStoreType.Postgres - ? await this.ensurePostgresBasedMmr() - : await this.ensureFileBasedMmr(this.nodeConfig.mmrPath); + async deleteMmrNode(blockHeight: number, blockOffset?: number): Promise { + await this.ensureMmr(); + if (blockOffset === undefined) { + throw new Error(`Block offset got undefined when delete mmr node`); + } const leafIndex = blockHeight - blockOffset - 1; if (leafIndex < 0) { throw new Error(`Target block height must greater equal to ${blockOffset + 1} `); diff --git a/packages/node-core/src/indexer/mmrRegenerate.service.ts b/packages/node-core/src/indexer/mmrRegenerate.service.ts new file mode 100644 index 0000000000..4b89d9d8cf --- /dev/null +++ b/packages/node-core/src/indexer/mmrRegenerate.service.ts @@ -0,0 +1,192 @@ +// Copyright 2020-2022 OnFinality Limited authors & contributors +// SPDX-License-Identifier: Apache-2.0 + +import assert from 'assert'; +import {Inject, Injectable} from '@nestjs/common'; +import {Sequelize} from 'sequelize'; +import {NodeConfig} from '../configure'; +import {getLogger} from '../logger'; +import {getExistingProjectSchema, initDbSchema} from '../utils/project'; +import {MmrService} from './mmr.service'; +import {PlainPoiModel} from './poi/poiModel'; +import {StoreService} from './store.service'; +import {CacheMetadataModel} from './storeCache/cacheMetadata'; +import {IProjectNetworkConfig, ISubqueryProject} from './types'; + +const logger = getLogger('MMR-Regeneration'); + +const targetHeightHelpMsg = (suggestHeight: number, storeType: string) => + `\n To fix this: \n Option 1. You can try to add --targetHeight=${suggestHeight} to continue regeneration. \n Option 2. Use --unsafe mode, allow POI table copy missing MMR to ${storeType} DB first, in order to continue regeneration`; + +@Injectable() +export class MmrRegenerateService { + private metadataRepo?: CacheMetadataModel; + private _schema?: string; + private _dbMmrLatestHeight?: number; + private _poiMmrLatestHeight?: number; + private _lastPoiHeight?: number; // Final sync target height + private _poi?: PlainPoiModel; + private _blockOffset?: number; + + constructor( + private readonly sequelize: Sequelize, + private readonly nodeConfig: NodeConfig, + private readonly storeService: StoreService, + private readonly mmrService: MmrService, + @Inject('ISubqueryProject') protected project: ISubqueryProject + ) {} + + async init(): Promise { + this._schema = await this.getExistingProjectSchema(); + await this.initDbSchema(); + this.metadataRepo = this.storeService.storeCache.metadata; + this._blockOffset = await this.getMetadataBlockOffset(); + this._lastPoiHeight = await this.getMetadataLastPoiHeight(); + if (!this.storeService.poiRepo) { + throw new Error(`Store service POI not initialized`); + } + this._poi = new PlainPoiModel(this.storeService.poiRepo); + await this.mmrService.init(this.blockOffset, this.poi); + await this.probeStatus(); + } + + get dbMmrLatestHeight(): number { + if (this._dbMmrLatestHeight === undefined) { + logger.warn(`Database latest MMR block height is not found, use default value ${this.blockOffset + 1}`); + return this.blockOffset + 1; + } + return this._dbMmrLatestHeight; + } + + get poiMmrLatestHeight(): number { + assert(this._poiMmrLatestHeight, 'Poi latest Mmr block height is undefined '); + return this._poiMmrLatestHeight; + } + + get lastPoiHeight(): number { + if (this._lastPoiHeight === 0 || this._lastPoiHeight === undefined) { + throw new Error(`Last POI height record is ${this._lastPoiHeight}. Don't need re-generation`); + } + return this._lastPoiHeight; + } + + get blockOffset(): number { + assert(this._blockOffset !== undefined, 'Poi offset is not found within metadata'); + return this._blockOffset; + } + + get schema(): string { + assert(this._schema, 'Get exist schema failed'); + return this._schema; + } + + get poi(): PlainPoiModel { + assert(this._poi, 'Poi Model not initialised'); + return this._poi; + } + + private async probeStatus(): Promise { + this._dbMmrLatestHeight = await this.mmrService.getLatestMmrHeight(); + logger.info(`In ${this.nodeConfig.mmrStoreType} DB, latest MMR block height is ${this._dbMmrLatestHeight}`); + this._poiMmrLatestHeight = (await this.mmrService.poi.getLatestPoiWithMmr())?.id ?? this.blockOffset; + logger.info(`In POI table, latest MMR block height is ${this._poiMmrLatestHeight}`); + } + + private async resetMmr(regenStartHeight: number): Promise { + // remove value in filebased/postgres + await this.mmrService.deleteMmrNode(regenStartHeight + 1, this._blockOffset); + // set null for mmr in POI table + await this.poi.resetPoiMmr(this.poiMmrLatestHeight, regenStartHeight); + logger.info(`Reset mmr on POI AND ${this.nodeConfig.mmrStoreType} DB both completed!`); + } + + async regenerate(targetHeight?: number, resetOnly?: boolean, unsafe?: boolean): Promise { + if (targetHeight !== undefined && targetHeight < this.blockOffset) { + throw new Error(`The target height must greater equal than ${this.blockOffset + 1}`); + } + // SAFE mode: + + // EXPECTED MMR height order: + // filebased/postgres >= POI table >= targetHeight + + // If user provided targetHeight, will remove value in filebased/postgres, also set null for poi mmr with this height + // then sync again + + // If user did not provide any target height + // try to set targetHeight to be poi latest mmr height + + // Also validate poi latest mmr vs filebased/postgres mmr height + // For example if filebased/postgres mmr was removed, it will start sync from filebased/postgres height, + // which is block 0 + blockOffset, and override value in poi mmr + + // UNSAFE mode: + // allow poi last mmr height ahead of filebased/postgres, sync poi mmr to filebased/postgres until last one. + // Then back to normal sync loop + + if (!unsafe) { + if (this.dbMmrLatestHeight < this.poiMmrLatestHeight) { + throw new Error( + `The latest MMR height In POI table is ahead of ${this.nodeConfig.mmrStoreType} DB. ${targetHeightHelpMsg( + this.dbMmrLatestHeight, + this.nodeConfig.mmrStoreType + )} ` + ); + } else if (targetHeight !== undefined && this.poiMmrLatestHeight < targetHeight) { + throw new Error( + `Re-generate --targetHeight ${targetHeight} is ahead of POI table latest MMR height ${ + this.poiMmrLatestHeight + }. ${targetHeightHelpMsg(this.poiMmrLatestHeight, this.nodeConfig.mmrStoreType)}` + ); + } + // use undefined avoid 0 + const regenStartHeight = targetHeight !== undefined ? targetHeight : this.poiMmrLatestHeight; + logger.info( + `${resetOnly ? `Reset to` : `Regenerate from`} block ${Math.max(this.blockOffset + 1, regenStartHeight)} ${ + resetOnly ? `.` : `, final sync height will be ${this.lastPoiHeight}.` + }` + ); + await this.resetMmr(regenStartHeight); + } else { + logger.warn(`Unsafe mode is experimental`); + + const regenStartHeight = Math.min( + this.poiMmrLatestHeight ?? this.blockOffset + 1, + //isNaN(undefined) will lead type error + targetHeight === undefined ? Infinity : targetHeight + ); + + if (targetHeight !== undefined && targetHeight > regenStartHeight) { + logger.warn( + `Target height is ahead of last mmr record height in POI table, will start sync from block ${regenStartHeight}` + ); + } + if (this.poiMmrLatestHeight !== undefined && regenStartHeight < this.poiMmrLatestHeight) { + await this.poi.resetPoiMmr(this.poiMmrLatestHeight, regenStartHeight); + } + // Db mmr the latest height need to catch up regenStartHeight + await this.mmrService.poiMmrToDb(this.dbMmrLatestHeight, regenStartHeight); + } + + if (!resetOnly) { + await this.mmrService.syncFileBaseFromPoi(this.blockOffset, this.lastPoiHeight, true); + } + logger.warn(`-------- Final status -------- `); + await this.probeStatus(); + } + + private async getExistingProjectSchema(): Promise { + return getExistingProjectSchema(this.nodeConfig, this.sequelize); + } + + private async initDbSchema(): Promise { + await initDbSchema(this.project, this.schema, this.storeService); + } + + private async getMetadataBlockOffset(): Promise { + return this.metadataRepo?.find('blockOffset'); + } + + private async getMetadataLastPoiHeight(): Promise { + return this.metadataRepo?.find('lastPoiHeight'); + } +} diff --git a/packages/node-core/src/indexer/PoiBlock.ts b/packages/node-core/src/indexer/poi/PoiBlock.ts similarity index 97% rename from packages/node-core/src/indexer/PoiBlock.ts rename to packages/node-core/src/indexer/poi/PoiBlock.ts index b877e1b10a..8fdbace8ad 100644 --- a/packages/node-core/src/indexer/PoiBlock.ts +++ b/packages/node-core/src/indexer/poi/PoiBlock.ts @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 import {u8aConcat, numberToU8a, hexToU8a, isHex, isU8a, blake2AsU8a} from '@subql/utils'; -import {ProofOfIndex} from './entities/Poi.entity'; +import {ProofOfIndex} from '../entities/Poi.entity'; const poiBlockHash = ( id: number, diff --git a/packages/node-core/src/indexer/poi/index.ts b/packages/node-core/src/indexer/poi/index.ts new file mode 100644 index 0000000000..86d12dd65b --- /dev/null +++ b/packages/node-core/src/indexer/poi/index.ts @@ -0,0 +1,6 @@ +// Copyright 2020-2022 OnFinality Limited authors & contributors +// SPDX-License-Identifier: Apache-2.0 + +export * from './poiModel'; +export * from './poi.service'; +export * from './PoiBlock'; diff --git a/packages/node-core/src/indexer/poi.service.ts b/packages/node-core/src/indexer/poi/poi.service.ts similarity index 94% rename from packages/node-core/src/indexer/poi.service.ts rename to packages/node-core/src/indexer/poi/poi.service.ts index cae02a5123..528a9c92b5 100644 --- a/packages/node-core/src/indexer/poi.service.ts +++ b/packages/node-core/src/indexer/poi/poi.service.ts @@ -4,8 +4,8 @@ import {isMainThread} from 'node:worker_threads'; import {Injectable, OnApplicationShutdown} from '@nestjs/common'; import {hexToU8a} from '@subql/utils'; -import {StoreCacheService} from './storeCache'; -import {CachePoiModel} from './storeCache/cachePoi'; +import {StoreCacheService} from '../storeCache'; +import {CachePoiModel} from '../storeCache/cachePoi'; const DEFAULT_PARENT_HASH = hexToU8a('0x00'); diff --git a/packages/node-core/src/indexer/poi/poiModel.ts b/packages/node-core/src/indexer/poi/poiModel.ts new file mode 100644 index 0000000000..8cb54d714f --- /dev/null +++ b/packages/node-core/src/indexer/poi/poiModel.ts @@ -0,0 +1,92 @@ +// Copyright 2020-2022 OnFinality Limited authors & contributors +// SPDX-License-Identifier: Apache-2.0 + +import {DEFAULT_FETCH_RANGE, RESET_MMR_BLOCK_BATCH} from '@subql/common'; +import {u8aToBuffer} from '@subql/utils'; +import {Op, Transaction} from 'sequelize'; +import {getLogger} from '../../logger'; +import {PoiRepo, ProofOfIndex} from '../entities'; +const logger = getLogger('PoiCache'); + +export interface PoiInterface { + model: PoiRepo; + bulkUpsert(proofs: ProofOfIndex[]): Promise | void; + getLatestPoiWithMmr(): Promise; + getPoiBlocksByRange(startHeight: number): Promise; + resetPoiMmr?(latestPoiMmrHeight: number, targetHeight: number, tx: Transaction): Promise; +} + +export class PlainPoiModel implements PoiInterface { + constructor(readonly model: PoiRepo) {} + + async getPoiBlocksByRange(startHeight: number): Promise { + const result = await this.model.findAll({ + limit: DEFAULT_FETCH_RANGE, + where: {id: {[Op.gte]: startHeight}}, + order: [['id', 'ASC']], + }); + return result.map((r) => r?.toJSON()); + } + + async bulkUpsert(proofs: ProofOfIndex[]): Promise { + proofs.forEach((proof) => { + proof.chainBlockHash = u8aToBuffer(proof.chainBlockHash); + proof.hash = u8aToBuffer(proof.hash); + proof.parentHash = u8aToBuffer(proof.parentHash); + }); + await this.model.bulkCreate(proofs, { + updateOnDuplicate: Object.keys(proofs[0]) as unknown as (keyof ProofOfIndex)[], + }); + } + + // reset will be reverse order, in case exit mmr should still in order + // we expect startHeight is usually greater than targetHeight + async resetPoiMmr(latestPoiMmrHeight: number, targetHeight: number): Promise { + if (latestPoiMmrHeight === targetHeight) { + return; + } + let latest = latestPoiMmrHeight; + try { + // reverse order + while (targetHeight <= latest) { + const results = ( + await this.model.findAll({ + limit: RESET_MMR_BLOCK_BATCH, + where: {id: {[Op.lte]: latest, [Op.gte]: targetHeight}, mmrRoot: {[Op.ne]: null}} as any, + order: [['id', 'DESC']], + }) + ).map((r) => r?.toJSON()); + if (results.length) { + logger.info( + `Reset POI block [${results[0].id} - ${results[results.length - 1].id}] mmr to NULL, total ${ + results.length + } blocks ` + ); + for (const r of results) { + r.mmrRoot = undefined; + } + await this.model.bulkCreate(results, { + updateOnDuplicate: Object.keys(results[0]) as unknown as (keyof ProofOfIndex)[], + }); + latest = results[results.length - 1].id - 1; + } else { + break; + } + } + } catch (e) { + throw new Error(`When try to reset POI mmr got problem: ${e}`); + } + } + + async getLatestPoiWithMmr(): Promise { + const result = await this.model.findOne({ + order: [['id', 'DESC']], + where: {mmrRoot: {[Op.ne]: null}} as any, // Types problem with sequelize, undefined works but not null + }); + + if (!result) { + return null; + } + return result.toJSON(); + } +} diff --git a/packages/node-core/src/indexer/project.service.ts b/packages/node-core/src/indexer/project.service.ts index 4e4aac2590..3e227f4c61 100644 --- a/packages/node-core/src/indexer/project.service.ts +++ b/packages/node-core/src/indexer/project.service.ts @@ -12,7 +12,7 @@ import {getLogger} from '../logger'; import {getExistingProjectSchema, initDbSchema, initHotSchemaReload} from '../utils'; import {DynamicDsService} from './dynamic-ds.service'; import {MmrService} from './mmr.service'; -import {PoiService} from './poi.service'; +import {PoiService} from './poi/poi.service'; import {StoreService} from './store.service'; import {IDSProcessorService, IProjectNetworkConfig, IProjectService, ISubqueryProject} from './types'; @@ -238,7 +238,7 @@ export abstract class BaseProjectService imple } logger.info(`set blockOffset to ${offset}`); this._blockOffset = offset; - return this.mmrService.syncFileBaseFromPoi(offset).catch((err) => { + return this.mmrService.syncFileBaseFromPoi(offset, undefined, this.nodeConfig.debug).catch((err) => { logger.error(err, 'failed to sync poi to mmr'); process.exit(1); }); diff --git a/packages/node-core/src/indexer/store.service.ts b/packages/node-core/src/indexer/store.service.ts index bee57734b8..4b8bfa7ea7 100644 --- a/packages/node-core/src/indexer/store.service.ts +++ b/packages/node-core/src/indexer/store.service.ts @@ -89,7 +89,7 @@ class NoInitError extends Error { @Injectable() export class StoreService { - private poiRepo?: PoiRepo; + poiRepo?: PoiRepo; private removedIndexes: RemovedIndexes = {}; private _modelIndexedFields?: IndexField[]; private _modelsRelations?: GraphQLModelsRelationsEnums; diff --git a/packages/node-core/src/indexer/storeCache/cachePoi.spec.ts b/packages/node-core/src/indexer/storeCache/cachePoi.spec.ts index 18fae7ff45..6976a85e42 100644 --- a/packages/node-core/src/indexer/storeCache/cachePoi.spec.ts +++ b/packages/node-core/src/indexer/storeCache/cachePoi.spec.ts @@ -72,9 +72,9 @@ describe('CachePoi', () => { it('with mix of cache and db data', async () => { await poiRepo.bulkCreate([{id: 1}, {id: 2}, {id: 3}] as any); - cachePoi.set(getEmptyPoi(4)); - cachePoi.set(getEmptyPoi(5)); - cachePoi.set(getEmptyPoi(6)); + cachePoi.bulkUpsert([getEmptyPoi(4)]); + cachePoi.bulkUpsert([getEmptyPoi(5)]); + cachePoi.bulkUpsert([getEmptyPoi(6)]); const res = await cachePoi.getPoiBlocksByRange(2); expect(res.map((d) => d.id)).toEqual([2, 3, 4, 5, 6]); @@ -88,9 +88,9 @@ describe('CachePoi', () => { }); it('only cache data', async () => { - cachePoi.set(getEmptyPoi(4)); - cachePoi.set(getEmptyPoi(5)); - cachePoi.set(getEmptyPoi(6)); + cachePoi.bulkUpsert([getEmptyPoi(4)]); + cachePoi.bulkUpsert([getEmptyPoi(5)]); + cachePoi.bulkUpsert([getEmptyPoi(6)]); const res = await cachePoi.getPoiBlocksByRange(2); expect(res.map((d) => d.id)).toEqual([4, 5, 6]); @@ -101,9 +101,9 @@ describe('CachePoi', () => { it('with mix of cache and db data', async () => { await poiRepo.bulkCreate([{id: 1}, {id: 2}, {id: 3}] as any); - cachePoi.set(getEmptyPoi(4)); - cachePoi.set(getEmptyPoi(5)); - cachePoi.set(getEmptyPoi(6)); + cachePoi.bulkUpsert([getEmptyPoi(4)]); + cachePoi.bulkUpsert([getEmptyPoi(5)]); + cachePoi.bulkUpsert([getEmptyPoi(6)]); const res = await cachePoi.getLatestPoi(); expect(res?.id).toBe(6); @@ -117,9 +117,9 @@ describe('CachePoi', () => { }); it('only cache data', async () => { - cachePoi.set(getEmptyPoi(1)); - cachePoi.set(getEmptyPoi(2)); - cachePoi.set(getEmptyPoi(3)); + cachePoi.bulkUpsert([getEmptyPoi(1)]); + cachePoi.bulkUpsert([getEmptyPoi(2)]); + cachePoi.bulkUpsert([getEmptyPoi(3)]); const res = await cachePoi.getLatestPoi(); expect(res?.id).toBe(3); @@ -134,9 +134,9 @@ describe('CachePoi', () => { {id: 3, mmrRoot: 'mmr3'}, ] as any); - cachePoi.set(getEmptyPoi(4, 'mmr4')); - cachePoi.set(getEmptyPoi(5)); - cachePoi.set(getEmptyPoi(6)); + cachePoi.bulkUpsert([getEmptyPoi(4, 'mmr4')]); + cachePoi.bulkUpsert([getEmptyPoi(5)]); + cachePoi.bulkUpsert([getEmptyPoi(6)]); const res = await cachePoi.getLatestPoiWithMmr(); expect(res?.id).toBe(4); @@ -150,9 +150,9 @@ describe('CachePoi', () => { }); it('only cache data', async () => { - cachePoi.set(getEmptyPoi(1, 'mmr1')); - cachePoi.set(getEmptyPoi(2, 'mmr2')); - cachePoi.set(getEmptyPoi(3)); + cachePoi.bulkUpsert([getEmptyPoi(1, 'mmr1')]); + cachePoi.bulkUpsert([getEmptyPoi(2, 'mmr2')]); + cachePoi.bulkUpsert([getEmptyPoi(3)]); const res = await cachePoi.getLatestPoiWithMmr(); expect(res?.id).toBe(2); diff --git a/packages/node-core/src/indexer/storeCache/cachePoi.ts b/packages/node-core/src/indexer/storeCache/cachePoi.ts index cd72385050..4837e877c5 100644 --- a/packages/node-core/src/indexer/storeCache/cachePoi.ts +++ b/packages/node-core/src/indexer/storeCache/cachePoi.ts @@ -3,33 +3,35 @@ import {u8aToBuffer} from '@subql/utils'; import {Mutex} from 'async-mutex'; -import {Op, Transaction} from 'sequelize'; +import {Transaction} from 'sequelize'; import {getLogger} from '../../logger'; import {PoiRepo, ProofOfIndex} from '../entities'; +import {PlainPoiModel, PoiInterface} from '../poi/poiModel'; import {ICachedModelControl} from './types'; - const logger = getLogger('PoiCache'); -const DEFAULT_FETCH_RANGE = 100; - -export class CachePoiModel implements ICachedModelControl { +export class CachePoiModel implements ICachedModelControl, PoiInterface { private setCache: Record = {}; private removeCache: number[] = []; flushableRecordCounter = 0; + private plainPoiModel: PlainPoiModel; private mutex = new Mutex(); - constructor(readonly model: PoiRepo) {} + constructor(readonly model: PoiRepo) { + this.plainPoiModel = new PlainPoiModel(model); + } - set(proof: ProofOfIndex): void { - proof.chainBlockHash = u8aToBuffer(proof.chainBlockHash); - proof.hash = u8aToBuffer(proof.hash); - proof.parentHash = u8aToBuffer(proof.parentHash); + bulkUpsert(proofs: ProofOfIndex[]): void { + for (const proof of proofs) { + proof.chainBlockHash = u8aToBuffer(proof.chainBlockHash); + proof.hash = u8aToBuffer(proof.hash); + proof.parentHash = u8aToBuffer(proof.parentHash); - if (this.setCache[proof.id] === undefined) { - this.flushableRecordCounter += 1; + if (this.setCache[proof.id] === undefined) { + this.flushableRecordCounter += 1; + } + this.setCache[proof.id] = proof; } - - this.setCache[proof.id] = proof; } async getById(id: number): Promise { @@ -56,14 +58,7 @@ export class CachePoiModel implements ICachedModelControl { async getPoiBlocksByRange(startHeight: number): Promise { await this.mutex.waitForUnlock(); - const result = await this.model.findAll({ - limit: DEFAULT_FETCH_RANGE, - where: {id: {[Op.gte]: startHeight}}, - order: [['id', 'ASC']], - }); - - const resultData = result.map((r) => r?.toJSON()); - + const resultData = await this.plainPoiModel.getPoiBlocksByRange(startHeight); const poiBlocks = this.mergeResultsWithCache(resultData).filter((poiBlock) => poiBlock.id >= startHeight); if (poiBlocks.length !== 0) { return poiBlocks.sort((v) => v.id); @@ -83,12 +78,8 @@ export class CachePoiModel implements ICachedModelControl { async getLatestPoiWithMmr(): Promise { await this.mutex.waitForUnlock(); - const result = await this.model.findOne({ - order: [['id', 'DESC']], - where: {mmrRoot: {[Op.ne]: null}} as any, // Types problem with sequelize, undefined works but not null - }); - - return this.mergeResultsWithCache([result?.toJSON()], 'desc').find((v) => !!v.mmrRoot) ?? null; + const result = (await this.plainPoiModel.getLatestPoiWithMmr()) ?? undefined; + return this.mergeResultsWithCache([result], 'desc').find((v) => !!v.mmrRoot) ?? null; } get isFlushable(): boolean { diff --git a/packages/node-core/src/indexer/storeCache/storeCache.service.ts b/packages/node-core/src/indexer/storeCache/storeCache.service.ts index 185d8d6c31..99490c0b09 100644 --- a/packages/node-core/src/indexer/storeCache/storeCache.service.ts +++ b/packages/node-core/src/indexer/storeCache/storeCache.service.ts @@ -161,7 +161,9 @@ export class StoreCacheService implements BeforeApplicationShutdown { } isFlushable(): boolean { - const numOfRecords = sum(Object.values(this.cachedModels).map((m) => m.flushableRecordCounter)); + const numberOfPoiRecords = this.poi?.flushableRecordCounter ?? 0; + const numOfRecords = + sum(Object.values(this.cachedModels).map((m) => m.flushableRecordCounter)) + numberOfPoiRecords; this.eventEmitter.emit(IndexerEvent.StoreCacheRecordsSize, { value: numOfRecords, }); diff --git a/packages/node/src/main.ts b/packages/node/src/main.ts index 0ebb59b711..cc32ce4a38 100644 --- a/packages/node/src/main.ts +++ b/packages/node/src/main.ts @@ -16,6 +16,13 @@ initLogger( // Lazy import, to allow logger to be initialised before bootstrap() // As bootstrap runs services that requires logger const { bootstrap } = require('./init'); -if (!(argv._[0] === 'test' || argv._[0] === 'mmr-migrate')) { +if ( + !( + argv._[0] === 'test' || + argv._[0] === 'mmr-migrate' || + argv._[0] === 'mmr-regen' || + argv._[0] === 'force-clean' + ) +) { void bootstrap(); } diff --git a/packages/node/src/subcommands/mmrRegenerate.init.ts b/packages/node/src/subcommands/mmrRegenerate.init.ts new file mode 100644 index 0000000000..1ce7f6a2f1 --- /dev/null +++ b/packages/node/src/subcommands/mmrRegenerate.init.ts @@ -0,0 +1,29 @@ +// Copyright 2020-2022 OnFinality Limited authors & contributors +// SPDX-License-Identifier: Apache-2.0 + +import { NestFactory } from '@nestjs/core'; +import { getLogger, MmrRegenerateService } from '@subql/node-core'; +import { MmrRegenerateModule } from './mmrRegenerate.module'; + +const logger = getLogger('MMR-Regeneration'); + +export async function mmrRegenerateInit( + probeMode = false, + resetOnly = false, + unsafe = false, + targetHeight?: number, +): Promise { + try { + const app = await NestFactory.create(MmrRegenerateModule); + await app.init(); + const mmrRegenerateService = app.get(MmrRegenerateService); + await mmrRegenerateService.init(); + if (!probeMode) { + await mmrRegenerateService.regenerate(targetHeight, resetOnly, unsafe); + } + } catch (e) { + logger.error(e, 'Re-generate MMR failed to execute'); + process.exit(1); + } + process.exit(0); +} diff --git a/packages/node/src/subcommands/mmrRegenerate.module.ts b/packages/node/src/subcommands/mmrRegenerate.module.ts new file mode 100644 index 0000000000..e63c61737a --- /dev/null +++ b/packages/node/src/subcommands/mmrRegenerate.module.ts @@ -0,0 +1,35 @@ +// Copyright 2020-2022 OnFinality Limited authors & contributors +// SPDX-License-Identifier: Apache-2.0 + +import { Module } from '@nestjs/common'; +import { EventEmitterModule } from '@nestjs/event-emitter'; +import { + DbModule, + MmrService, + StoreCacheService, + StoreService, + MmrRegenerateService, +} from '@subql/node-core'; +import { ConfigureModule } from '../configure/configure.module'; + +@Module({ + providers: [ + StoreCacheService, + StoreService, + MmrService, + MmrRegenerateService, + ], + controllers: [], +}) +export class MmrRegenerateFeatureModule {} + +@Module({ + imports: [ + DbModule.forRoot(), + ConfigureModule.register(), + MmrRegenerateFeatureModule, + EventEmitterModule.forRoot(), + ], + controllers: [], +}) +export class MmrRegenerateModule {} diff --git a/packages/node/src/yargs.ts b/packages/node/src/yargs.ts index f8f3fa2c43..9974955fb1 100644 --- a/packages/node/src/yargs.ts +++ b/packages/node/src/yargs.ts @@ -4,6 +4,8 @@ import { initLogger } from '@subql/node-core/logger'; import { hideBin } from 'yargs/helpers'; import yargs from 'yargs/yargs'; +import { mmrRegenerateInit } from './subcommands/mmrRegenerate.init'; +import { reindexInit } from './subcommands/reindex.init'; export const yargsOptions = yargs(hideBin(process.argv)) .env('SUBQL_NODE') @@ -63,6 +65,83 @@ export const yargsOptions = yargs(hideBin(process.argv)) return reindexInit(argv.targetHeight); }, }) + .command({ + command: 'mmr-regen', + describe: + 'Re-generate mmr between Filebased/Postgres mmr and Proof of index', + builder: (yargs) => + yargs.options({ + probe: { + type: 'boolean', + description: + 'Fetch latest mmr height information from file based/postgres DB and Poi table', + demandOption: false, + default: false, + }, + targetHeight: { + type: 'number', + description: 'Re-genrate mmr value from this block height', + demandOption: false, + }, + resetOnly: { + type: 'boolean', + description: + 'Only reset the mmr value in both POI and file based/postgres DB to target height', + demandOption: false, + default: false, + }, + unsafe: { + type: 'boolean', + description: 'Allow sync mmr from Poi table to file or a postgres DB', + demandOption: false, + default: false, + }, + 'mmr-store-type': { + demandOption: false, + describe: + 'When regenerate MMR store in either a file or a postgres DB', + type: 'string', + choices: ['file', 'postgres'], + default: 'file', + }, + 'mmr-path': { + alias: 'm', + demandOption: false, + describe: + 'File based only : local path of the merkle mountain range (.mmr) file', + type: 'string', + }, + 'db-schema': { + demandOption: false, + describe: 'Db schema name of the project', + type: 'string', + }, + subquery: { + alias: 'f', + demandOption: true, + default: process.cwd(), + describe: 'Local path or IPFS cid of the subquery project', + type: 'string', + }, + }), + handler: (argv) => { + initLogger( + argv.debug as boolean, + argv.outputFmt as 'json' | 'colored', + argv.logLevel as string | undefined, + ); + + // lazy import to make sure logger is instantiated before all other services + // eslint-disable-next-line @typescript-eslint/no-var-requires + const { mmrRegenerateInit } = require('./subcommands/mmrRegenerate.init'); + return mmrRegenerateInit( + argv.probe, + argv.resetOnly, + argv.unsafe, + argv.targetHeight, + ); + }, + }) .command({ command: 'mmr-migrate', describe: 'Migrate MMR data from storage file to postgres DB',