Skip to content

Commit

Permalink
bulkUpsert with poi mmr
Browse files Browse the repository at this point in the history
  • Loading branch information
jiqiang90 committed May 5, 2023
1 parent e4fae3c commit e2d4dba
Show file tree
Hide file tree
Showing 14 changed files with 188 additions and 104 deletions.
3 changes: 3 additions & 0 deletions packages/common/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,6 @@ export const CONNECTION_SSL_ERROR_REGEX = 'not support SSL';

// BLOCK BATCH SYNC between POI MMR <-> Filebased/Postgres MMR
export const RESET_MMR_BLOCK_BATCH = 1000;

// Default Model fetch range
export const DEFAULT_FETCH_RANGE = 100;
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS> implements IBloc
await this.poiService.getLatestPoiBlockHash(),
this.project.id
);
this.poi.set(poiBlock);
// This is the first creation of POI
this.poi.bulkUpsert([poiBlock]);
this.poiService.setLatestPoiBlockHash(poiBlock.hash);
this.storeCacheService.metadata.set('lastPoiHeight', height);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {getLogger} from '../../logger';
import {profilerWrap} from '../../profiler';
import {Queue, AutoQueue, delay, memoryLock, waitForBatchSize} from '../../utils';
import {DynamicDsService} from '../dynamic-ds.service';
import {PoiService} from '../poi.service';
import {PoiService} from '../poi/poi.service';
import {SmartBatchService} from '../smartBatch.service';
import {StoreService} from '../store.service';
import {StoreCacheService} from '../storeCache';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {IndexerEvent} from '../../events';
import {getLogger} from '../../logger';
import {AutoQueue} from '../../utils';
import {DynamicDsService} from '../dynamic-ds.service';
import {PoiService} from '../poi.service';
import {PoiService} from '../poi/poi.service';
import {SmartBatchService} from '../smartBatch.service';
import {StoreService} from '../store.service';
import {StoreCacheService} from '../storeCache';
Expand Down
3 changes: 1 addition & 2 deletions packages/node-core/src/indexer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@
export * from './benchmark.service';
export * from './connectionPool.service';
export * from './entities';
export * from './PoiBlock';
export * from './poi';
export * from './types';
export * from './StoreOperations';
export * from './store.service';
export * from './storeCache';
export * from './poi.service';
export * from './mmr.service';
export * from './worker';
export * from './dictionary.service';
Expand Down
48 changes: 35 additions & 13 deletions packages/node-core/src/indexer/mmr.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import assert from 'assert';
import fs from 'fs';
import {Injectable, OnApplicationShutdown} from '@nestjs/common';
import {DEFAULT_WORD_SIZE, DEFAULT_LEAF, MMR_AWAIT_TIME, RESET_MMR_BLOCK_BATCH} from '@subql/common';
import {PlainPoiModel} from '@subql/node-core/indexer/poi';
import {u8aToHex, u8aEq} from '@subql/utils';
import {MMR, FileBasedDb} from '@subql/x-merkle-mountain-range';
import {keccak256} from 'js-sha3';
Expand All @@ -21,6 +22,9 @@ const logger = getLogger('mmr');

const keccak256Hash = (...nodeValues: Uint8Array[]) => Buffer.from(keccak256(Buffer.concat(nodeValues)), 'hex');

const syncingMsg = (start: number, end: number, size: number) =>
logger.info(`Syncing block [${start} - ${end}] mmr, total ${size} blocks `);

@Injectable()
export class MmrService implements OnApplicationShutdown {
private isShutdown = false;
Expand All @@ -29,6 +33,7 @@ export class MmrService implements OnApplicationShutdown {
// This is the next block height that suppose to calculate its mmr value
private _nextMmrBlockHeight?: number;
private _blockOffset?: number;
private _poi?: PlainPoiModel;

constructor(
private readonly nodeConfig: NodeConfig,
Expand All @@ -40,7 +45,10 @@ export class MmrService implements OnApplicationShutdown {
this.isShutdown = true;
}

get poi(): CachePoiModel {
get poi(): CachePoiModel | PlainPoiModel {
if (this._poi) {
return this._poi;
}
const poi = this.storeCacheService.poi;
if (!poi) {
throw new Error('MMR service expected POI but it was not found');
Expand Down Expand Up @@ -69,13 +77,14 @@ export class MmrService implements OnApplicationShutdown {
return this._blockOffset;
}

async init(blockOffset: number): Promise<void> {
async init(blockOffset: number, poi: PlainPoiModel): Promise<void> {
this._blockOffset = blockOffset;
await this.ensureMmr();
this._poi = poi;
}

// Exit option allow exit when POI is fully sync
async syncFileBaseFromPoi(blockOffset: number, exit?: boolean): Promise<void> {
async syncFileBaseFromPoi(blockOffset: number, exitHeight?: number, logging?: boolean): Promise<void> {
if (this.isSyncing) return;
this.isSyncing = true;
await this.ensureMmr();
Expand Down Expand Up @@ -103,38 +112,52 @@ export class MmrService implements OnApplicationShutdown {
while (!this.isShutdown) {
const poiBlocks = await this.poi.getPoiBlocksByRange(this.nextMmrBlockHeight);
if (poiBlocks.length !== 0) {
if (logging) {
syncingMsg(poiBlocks[0].id, poiBlocks[poiBlocks.length - 1].id, poiBlocks.length);
}
const appendedBlocks = [];
for (const block of poiBlocks) {
if (this.nextMmrBlockHeight < block.id) {
for (let i = this.nextMmrBlockHeight; i < block.id; i++) {
await this.mmrDb.append(DEFAULT_LEAF);
this._nextMmrBlockHeight = i + 1;
}
}
await this.appendMmrNode(block);
appendedBlocks.push(await this.appendMmrNode(block));
this._nextMmrBlockHeight = block.id + 1;
}
} else {
if (exit) {
break;
if (appendedBlocks.length) {
await this.poi.bulkUpsert(appendedBlocks);
}
} else {
const {lastPoiHeight, lastProcessedHeight} = await this.storeCacheService.metadata.findMany([
'lastPoiHeight',
'lastProcessedHeight',
]);

// this.nextMmrBlockHeight means block before nextMmrBlockHeight-1 already exist in filebase mmr
if (this.nextMmrBlockHeight > Number(lastPoiHeight) && this.nextMmrBlockHeight <= Number(lastProcessedHeight)) {
if (logging) {
syncingMsg(
this.nextMmrBlockHeight,
Number(lastProcessedHeight),
Math.max(1, Number(lastProcessedHeight) - this.nextMmrBlockHeight)
);
}
for (let i = this.nextMmrBlockHeight; i <= Number(lastProcessedHeight); i++) {
await this.mmrDb.append(DEFAULT_LEAF);
this._nextMmrBlockHeight = i + 1;
}
}
await delay(MMR_AWAIT_TIME);
}
if (exitHeight !== undefined && this.nextMmrBlockHeight > exitHeight) {
break;
}
}
this.isSyncing = false;
}

private async appendMmrNode(poiBlock: ProofOfIndex): Promise<void> {
private async appendMmrNode(poiBlock: ProofOfIndex): Promise<ProofOfIndex> {
const newLeaf = poiBlock.hash;
if (newLeaf.length !== DEFAULT_WORD_SIZE) {
throw new Error(`Append Mmr failed, input data length should be ${DEFAULT_WORD_SIZE}`);
Expand All @@ -143,8 +166,7 @@ export class MmrService implements OnApplicationShutdown {
// The next leaf index in mmr, current latest leaf index always .getLeafLength -1.
await this.mmrDb.append(newLeaf, estLeafIndexByBlockHeight);
const mmrRoot = await this.mmrDb.getRoot(estLeafIndexByBlockHeight);
this.updatePoiMmrRoot(poiBlock, mmrRoot);
this._nextMmrBlockHeight = poiBlock.id + 1;
return this.updatePoiMmrRoot(poiBlock, mmrRoot);
}

async poiMmrToDb(latestDbMmrHeight: number, targetHeight: number): Promise<void> {
Expand Down Expand Up @@ -200,13 +222,13 @@ export class MmrService implements OnApplicationShutdown {
}
}

private updatePoiMmrRoot(poiBlock: ProofOfIndex, mmrValue: Uint8Array): void {
private updatePoiMmrRoot(poiBlock: ProofOfIndex, mmrValue: Uint8Array): ProofOfIndex {
if (!poiBlock.mmrRoot) {
poiBlock.mmrRoot = mmrValue;
this.poi.set(poiBlock);
} else {
this.validatePoiMmr(poiBlock, mmrValue);
}
return poiBlock;
}

private async ensureMmr(): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

import {u8aConcat, numberToU8a, hexToU8a, isHex, isU8a, blake2AsU8a} from '@subql/utils';
import {ProofOfIndex} from './entities/Poi.entity';
import {ProofOfIndex} from '../entities/Poi.entity';

const poiBlockHash = (
id: number,
Expand Down
6 changes: 6 additions & 0 deletions packages/node-core/src/indexer/poi/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// Copyright 2020-2022 OnFinality Limited authors & contributors
// SPDX-License-Identifier: Apache-2.0

export * from './poiModel';
export * from './poi.service';
export * from './PoiBlock';
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import {isMainThread} from 'node:worker_threads';
import {Injectable, OnApplicationShutdown} from '@nestjs/common';
import {hexToU8a} from '@subql/utils';
import {StoreCacheService} from './storeCache';
import {CachePoiModel} from './storeCache/cachePoi';
import {StoreCacheService} from '../storeCache';
import {CachePoiModel} from '../storeCache/cachePoi';

const DEFAULT_PARENT_HASH = hexToU8a('0x00');

Expand Down
91 changes: 91 additions & 0 deletions packages/node-core/src/indexer/poi/poiModel.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2020-2022 OnFinality Limited authors & contributors
// SPDX-License-Identifier: Apache-2.0

import {DEFAULT_FETCH_RANGE, RESET_MMR_BLOCK_BATCH} from '@subql/common';
import {u8aToBuffer} from '@subql/utils';
import {Op, Transaction} from 'sequelize';
import {getLogger} from '../../logger';
import {PoiRepo, ProofOfIndex} from '../entities';
const logger = getLogger('PoiCache');

export interface PoiInterface {
bulkUpsert(proofs: ProofOfIndex[]): Promise<void> | void;
getLatestPoiWithMmr(): Promise<ProofOfIndex | null>;
getPoiBlocksByRange(startHeight: number): Promise<ProofOfIndex[]>;
resetPoiMmr?(latestPoiMmrHeight: number, targetHeight: number, tx: Transaction): Promise<void>;
}

export class PlainPoiModel implements PoiInterface {
constructor(readonly model: PoiRepo) {}

async getPoiBlocksByRange(startHeight: number): Promise<ProofOfIndex[]> {
const result = await this.model.findAll({
limit: DEFAULT_FETCH_RANGE,
where: {id: {[Op.gte]: startHeight}},
order: [['id', 'ASC']],
});
return result.map((r) => r?.toJSON<ProofOfIndex>());
}

async bulkUpsert(proofs: ProofOfIndex[]): Promise<void> {
proofs.forEach((proof) => {
proof.chainBlockHash = u8aToBuffer(proof.chainBlockHash);
proof.hash = u8aToBuffer(proof.hash);
proof.parentHash = u8aToBuffer(proof.parentHash);
});
await this.model.bulkCreate(proofs, {
updateOnDuplicate: Object.keys(proofs[0]) as unknown as (keyof ProofOfIndex)[],
});
}

// reset will be reverse order, in case exit mmr should still in order
// we expect startHeight is usually greater than targetHeight
async resetPoiMmr(latestPoiMmrHeight: number, targetHeight: number): Promise<void> {
if (latestPoiMmrHeight === targetHeight) {
return;
}
let latest = latestPoiMmrHeight;
try {
// reverse order
while (targetHeight <= latest) {
const results = (
await this.model.findAll({
limit: RESET_MMR_BLOCK_BATCH,
where: {id: {[Op.lte]: latest, [Op.gte]: targetHeight}, mmrRoot: {[Op.ne]: null}} as any,
order: [['id', 'DESC']],
})
).map((r) => r?.toJSON<ProofOfIndex>());
if (results.length) {
logger.info(
`Reset POI block [${results[0].id} - ${results[results.length - 1].id}] mmr to NULL, total ${
results.length
} blocks `
);
for (const r of results) {
r.mmrRoot = undefined;
}
await this.model.bulkCreate(results, {
updateOnDuplicate: Object.keys(results[0]) as unknown as (keyof ProofOfIndex)[],
});
latest = results[results.length - 1].id - 1;
} else {
break;
}
}
} catch (e) {
throw new Error(`When try to reset POI mmr got problem: ${e}`);
}
}

async getLatestPoiWithMmr(): Promise<ProofOfIndex | null> {
const result = await this.model.findOne({
order: [['id', 'DESC']],
where: {mmrRoot: {[Op.ne]: null}} as any, // Types problem with sequelize, undefined works but not null
});

if (!result) {
return null;
}
return result;
}
}
4 changes: 2 additions & 2 deletions packages/node-core/src/indexer/project.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {getLogger} from '../logger';
import {getExistingProjectSchema, initDbSchema, initHotSchemaReload} from '../utils';
import {DynamicDsService} from './dynamic-ds.service';
import {MmrService} from './mmr.service';
import {PoiService} from './poi.service';
import {PoiService} from './poi/poi.service';
import {StoreService} from './store.service';
import {IDSProcessorService, IProjectNetworkConfig, IProjectService, ISubqueryProject} from './types';

Expand Down Expand Up @@ -238,7 +238,7 @@ export abstract class BaseProjectService<DS extends {startBlock?: number}> imple
}
logger.info(`set blockOffset to ${offset}`);
this._blockOffset = offset;
return this.mmrService.syncFileBaseFromPoi(offset).catch((err) => {
return this.mmrService.syncFileBaseFromPoi(offset, undefined, this.nodeConfig.debug).catch((err) => {
logger.error(err, 'failed to sync poi to mmr');
process.exit(1);
});
Expand Down
2 changes: 1 addition & 1 deletion packages/node-core/src/indexer/store.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class NoInitError extends Error {

@Injectable()
export class StoreService {
private poiRepo?: PoiRepo;
poiRepo?: PoiRepo;
private removedIndexes: RemovedIndexes = {};
private _modelIndexedFields?: IndexField[];
private _modelsRelations?: GraphQLModelsRelationsEnums;
Expand Down
Loading

0 comments on commit e2d4dba

Please sign in to comment.