Skip to content

Commit

Permalink
Regenerate mmr (#1664)
Browse files Browse the repository at this point in the history
* preparation

Unsafe mode and draft

bulkUpsert with poi mmr

move service to node-core

* tidy up, fix tests

* Fix gap mmr node missing issue, poiCache use plainPoiModel
  • Loading branch information
jiqiang90 authored May 8, 2023
1 parent 8b30925 commit 9f77229
Show file tree
Hide file tree
Showing 20 changed files with 606 additions and 85 deletions.
6 changes: 6 additions & 0 deletions packages/common/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,9 @@ export enum SUPPORT_DB {

// DATABASE ERROR REGEX
export const CONNECTION_SSL_ERROR_REGEX = 'not support SSL';

// BLOCK BATCH SYNC between POI MMR <-> Filebased/Postgres MMR
export const RESET_MMR_BLOCK_BATCH = 1000;

// Default Model fetch range
export const DEFAULT_FETCH_RANGE = 100;
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS> implements IBloc
await this.poiService.getLatestPoiBlockHash(),
this.project.id
);
this.poi.set(poiBlock);
// This is the first creation of POI
this.poi.bulkUpsert([poiBlock]);
this.poiService.setLatestPoiBlockHash(poiBlock.hash);
this.storeCacheService.metadata.set('lastPoiHeight', height);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {getLogger} from '../../logger';
import {profilerWrap} from '../../profiler';
import {Queue, AutoQueue, delay, memoryLock, waitForBatchSize} from '../../utils';
import {DynamicDsService} from '../dynamic-ds.service';
import {PoiService} from '../poi.service';
import {PoiService} from '../poi/poi.service';
import {SmartBatchService} from '../smartBatch.service';
import {StoreService} from '../store.service';
import {StoreCacheService} from '../storeCache';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {IndexerEvent} from '../../events';
import {getLogger} from '../../logger';
import {AutoQueue} from '../../utils';
import {DynamicDsService} from '../dynamic-ds.service';
import {PoiService} from '../poi.service';
import {PoiService} from '../poi/poi.service';
import {SmartBatchService} from '../smartBatch.service';
import {StoreService} from '../store.service';
import {StoreCacheService} from '../storeCache';
Expand Down
4 changes: 2 additions & 2 deletions packages/node-core/src/indexer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@
export * from './benchmark.service';
export * from './connectionPool.service';
export * from './entities';
export * from './PoiBlock';
export * from './poi';
export * from './types';
export * from './StoreOperations';
export * from './store.service';
export * from './storeCache';
export * from './poi.service';
export * from './mmr.service';
export * from './worker';
export * from './dictionary.service';
Expand All @@ -23,3 +22,4 @@ export * from './project.service';
export * from './fetch.service';
export * from './indexer.manager';
export * from './mmrMigrate.service';
export * from './mmrRegenerate.service';
133 changes: 107 additions & 26 deletions packages/node-core/src/indexer/mmr.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,26 @@
import assert from 'assert';
import fs from 'fs';
import {Injectable, OnApplicationShutdown} from '@nestjs/common';
import {DEFAULT_WORD_SIZE, DEFAULT_LEAF, MMR_AWAIT_TIME} from '@subql/common';
import {DEFAULT_WORD_SIZE, DEFAULT_LEAF, MMR_AWAIT_TIME, RESET_MMR_BLOCK_BATCH} from '@subql/common';
import {u8aToHex, u8aEq} from '@subql/utils';
import {MMR, FileBasedDb} from '@subql/x-merkle-mountain-range';
import {keccak256} from 'js-sha3';
import {Sequelize} from 'sequelize';
import {Op, Sequelize} from 'sequelize';
import {MmrStoreType, NodeConfig} from '../configure';
import {MmrPayload, MmrProof} from '../events';
import {PlainPoiModel, PoiInterface} from '../indexer/poi';
import {getLogger} from '../logger';
import {delay, getExistingProjectSchema} from '../utils';
import {ProofOfIndex} from './entities';
import {PgBasedMMRDB} from './postgresMmrDb';
import {StoreCacheService} from './storeCache';
import {CachePoiModel} from './storeCache/cachePoi';

const logger = getLogger('mmr');

const keccak256Hash = (...nodeValues: Uint8Array[]) => Buffer.from(keccak256(Buffer.concat(nodeValues)), 'hex');

const syncingMsg = (start: number, end: number, size: number) =>
logger.info(`Syncing block [${start} - ${end}] mmr, total ${size} blocks `);

@Injectable()
export class MmrService implements OnApplicationShutdown {
private isShutdown = false;
Expand All @@ -30,6 +32,7 @@ export class MmrService implements OnApplicationShutdown {
// This is the next block height that suppose to calculate its mmr value
private _nextMmrBlockHeight?: number;
private _blockOffset?: number;
private _poi?: PlainPoiModel;

constructor(
private readonly nodeConfig: NodeConfig,
Expand All @@ -41,7 +44,10 @@ export class MmrService implements OnApplicationShutdown {
this.isShutdown = true;
}

private get poi(): CachePoiModel {
get poi(): PoiInterface {
if (this._poi) {
return this._poi;
}
const poi = this.storeCacheService.poi;
if (!poi) {
throw new Error('MMR service expected POI but it was not found');
Expand Down Expand Up @@ -70,13 +76,17 @@ export class MmrService implements OnApplicationShutdown {
return this._blockOffset;
}

async syncFileBaseFromPoi(blockOffset: number): Promise<void> {
async init(blockOffset: number, poi: PlainPoiModel): Promise<void> {
this._blockOffset = blockOffset;
await this.ensureMmr();
this._poi = poi;
}

// Exit option allow exit when POI is fully sync
async syncFileBaseFromPoi(blockOffset: number, exitHeight?: number, logging?: boolean): Promise<void> {
if (this.isSyncing) return;
this.isSyncing = true;
this._mmrDb =
this.nodeConfig.mmrStoreType === MmrStoreType.Postgres
? await this.ensurePostgresBasedMmr()
: await this.ensureFileBasedMmr(this.nodeConfig.mmrPath);
await this.ensureMmr();
this._blockOffset = blockOffset;
// The mmr database current leaf length
const mmrLeafLength = await this.mmrDb.getLeafLength();
Expand All @@ -101,35 +111,53 @@ export class MmrService implements OnApplicationShutdown {
while (!this.isShutdown) {
const poiBlocks = await this.poi.getPoiBlocksByRange(this.nextMmrBlockHeight);
if (poiBlocks.length !== 0) {
if (logging) {
syncingMsg(poiBlocks[0].id, poiBlocks[poiBlocks.length - 1].id, poiBlocks.length);
}
const appendedBlocks = [];
for (const block of poiBlocks) {
if (this.nextMmrBlockHeight < block.id) {
for (let i = this.nextMmrBlockHeight; i < block.id; i++) {
await this.mmrDb.append(DEFAULT_LEAF);
this._nextMmrBlockHeight = i + 1;
}
}
await this.appendMmrNode(block);
appendedBlocks.push(await this.appendMmrNode(block));
this._nextMmrBlockHeight = block.id + 1;
}
// This should be safe, even poi bulkUpsert faild, filebased/postgres db node should already been written and accurate.
if (appendedBlocks.length) {
await this.poi.bulkUpsert(appendedBlocks);
}
} else {
const {lastPoiHeight, lastProcessedHeight} = await this.storeCacheService.metadata.findMany([
'lastPoiHeight',
'lastProcessedHeight',
]);

// this.nextMmrBlockHeight means block before nextMmrBlockHeight-1 already exist in filebase mmr
if (this.nextMmrBlockHeight > Number(lastPoiHeight) && this.nextMmrBlockHeight <= Number(lastProcessedHeight)) {
if (logging) {
syncingMsg(
this.nextMmrBlockHeight,
Number(lastProcessedHeight),
Math.max(1, Number(lastProcessedHeight) - this.nextMmrBlockHeight)
);
}
for (let i = this.nextMmrBlockHeight; i <= Number(lastProcessedHeight); i++) {
await this.mmrDb.append(DEFAULT_LEAF);
this._nextMmrBlockHeight = i + 1;
}
}
await delay(MMR_AWAIT_TIME);
}
if (exitHeight !== undefined && this.nextMmrBlockHeight > exitHeight) {
break;
}
}
this.isSyncing = false;
}

private async appendMmrNode(poiBlock: ProofOfIndex): Promise<void> {
private async appendMmrNode(poiBlock: ProofOfIndex): Promise<ProofOfIndex> {
const newLeaf = poiBlock.hash;
if (newLeaf.length !== DEFAULT_WORD_SIZE) {
throw new Error(`Append Mmr failed, input data length should be ${DEFAULT_WORD_SIZE}`);
Expand All @@ -138,8 +166,52 @@ export class MmrService implements OnApplicationShutdown {
// The next leaf index in mmr, current latest leaf index always .getLeafLength -1.
await this.mmrDb.append(newLeaf, estLeafIndexByBlockHeight);
const mmrRoot = await this.mmrDb.getRoot(estLeafIndexByBlockHeight);
this.updatePoiMmrRoot(poiBlock, mmrRoot);
this._nextMmrBlockHeight = poiBlock.id + 1;
return this.updatePoiMmrRoot(poiBlock, mmrRoot);
}

async poiMmrToDb(latestDbMmrHeight: number, targetHeight: number): Promise<void> {
if (latestDbMmrHeight === targetHeight) {
return;
}
let latest = latestDbMmrHeight;
this._nextMmrBlockHeight = latest + 1;
try {
while (latest <= targetHeight) {
const results = (
await this.poi.model.findAll({
limit: RESET_MMR_BLOCK_BATCH,
where: {id: {[Op.lte]: targetHeight, [Op.gt]: latest}, mmrRoot: {[Op.ne]: null}} as any,
order: [['id', 'ASC']],
})
).map((r) => r?.toJSON<ProofOfIndex>());
if (results.length) {
logger.info(
`Upsert block [${results[0].id} - ${results[results.length - 1].id}] mmr to ${
this.nodeConfig.mmrStoreType
} DB, total ${results.length} blocks `
);
for (const poiBlock of results) {
if (this.nextMmrBlockHeight < poiBlock.id) {
for (let i = this.nextMmrBlockHeight; i < poiBlock.id; i++) {
await this.mmrDb.append(DEFAULT_LEAF);
this._nextMmrBlockHeight = i + 1;
}
}
const estLeafIndexByBlockHeight = poiBlock.id - this.blockOffset - 1;
if (!poiBlock?.hash) {
throw new Error(`Copy POI block ${poiBlock?.id} hash to DB got undefined`);
}
await this.mmrDb.append(poiBlock?.hash, estLeafIndexByBlockHeight);
this._nextMmrBlockHeight = poiBlock.id + 1;
}
latest = results[results.length - 1].id;
} else {
break;
}
}
} catch (e) {
throw new Error(`When try to copy POI mmr to ${this.nodeConfig.mmrStoreType} DB got problem: ${e}`);
}
}

private validatePoiMmr(poiWithMmr: ProofOfIndex, mmrValue: Uint8Array): void {
Expand All @@ -158,13 +230,20 @@ export class MmrService implements OnApplicationShutdown {
}
}

private updatePoiMmrRoot(poiBlock: ProofOfIndex, mmrValue: Uint8Array): void {
private updatePoiMmrRoot(poiBlock: ProofOfIndex, mmrValue: Uint8Array): ProofOfIndex {
if (!poiBlock.mmrRoot) {
poiBlock.mmrRoot = mmrValue;
this.poi.set(poiBlock);
} else {
this.validatePoiMmr(poiBlock, mmrValue);
}
return poiBlock;
}

private async ensureMmr(): Promise<void> {
this._mmrDb =
this.nodeConfig.mmrStoreType === MmrStoreType.Postgres
? await this.ensurePostgresBasedMmr()
: await this.ensureFileBasedMmr(this.nodeConfig.mmrPath);
}

private async ensureFileBasedMmr(projectMmrPath: string): Promise<MMR> {
Expand Down Expand Up @@ -200,14 +279,16 @@ export class MmrService implements OnApplicationShutdown {

async getLatestMmr(): Promise<MmrPayload> {
// latest leaf index need fetch from .db, as original method will use cache
const blockHeight = (await this.mmrDb.db.getLeafLength()) + this.blockOffset;
return this.getMmr(blockHeight);
return this.getMmr(await this.getLatestMmrHeight());
}

async getLatestMmrProof(): Promise<MmrProof> {
return this.getMmrProof(await this.getLatestMmrHeight());
}

async getLatestMmrHeight(): Promise<number> {
// latest leaf index need fetch from .db, as original method will use cache
const blockHeight = (await this.mmrDb.db.getLeafLength()) + this.blockOffset;
return this.getMmrProof(blockHeight);
return (await this.mmrDb.db.getLeafLength()) + this.blockOffset;
}

async getMmrProof(blockHeight: number): Promise<MmrProof> {
Expand All @@ -229,11 +310,11 @@ export class MmrService implements OnApplicationShutdown {
};
}

async deleteMmrNode(blockHeight: number, blockOffset: number): Promise<void> {
this._mmrDb =
this.nodeConfig.mmrStoreType === MmrStoreType.Postgres
? await this.ensurePostgresBasedMmr()
: await this.ensureFileBasedMmr(this.nodeConfig.mmrPath);
async deleteMmrNode(blockHeight: number, blockOffset?: number): Promise<void> {
await this.ensureMmr();
if (blockOffset === undefined) {
throw new Error(`Block offset got undefined when delete mmr node`);
}
const leafIndex = blockHeight - blockOffset - 1;
if (leafIndex < 0) {
throw new Error(`Target block height must greater equal to ${blockOffset + 1} `);
Expand Down
Loading

0 comments on commit 9f77229

Please sign in to comment.