Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

postgres based mmr DB #1618

Merged
merged 16 commits into from
May 1, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/node-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
"@subql/testing": "workspace:*",
"@subql/types": "workspace:*",
"@subql/utils": "workspace:*",
"@subql/x-merkle-mountain-range": "^2.0.0-0.1.2",
"@subql/x-merkle-mountain-range": "^2.0.0-0.1.3",
"@willsoto/nestjs-prometheus": "^5.1.1",
"async-lock": "^1.4.0",
"async-mutex": "^0.4.0",
Expand Down
11 changes: 11 additions & 0 deletions packages/node-core/src/configure/NodeConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ import {assign} from '../utils/object';

const logger = getLogger('configure');

export enum MmrStoreType {
File = 'file',
Postgres = 'postgres',
}

export interface IConfig {
readonly subquery: string;
readonly subqueryName?: string;
Expand All @@ -31,6 +36,7 @@ export interface IConfig {
readonly indexCountLimit: number;
readonly timestampField: boolean;
readonly proofOfIndex: boolean;
readonly mmrStoreType: MmrStoreType;
readonly mmrPath?: string;
readonly ipfs?: string;
readonly dictionaryTimeout: number;
Expand Down Expand Up @@ -64,6 +70,7 @@ const DEFAULT_CONFIG = {
indexCountLimit: 10,
timestampField: true,
proofOfIndex: false,
mmrStoreType: MmrStoreType.File,
dictionaryTimeout: 30,
profiler: false,
subscription: false,
Expand Down Expand Up @@ -186,6 +193,10 @@ export class NodeConfig implements IConfig {
return this._config.proofOfIndex;
}

get mmrStoreType(): MmrStoreType {
return this._config.mmrStoreType;
}

get dictionaryTimeout(): number {
return this._config.dictionaryTimeout;
}
Expand Down
4 changes: 3 additions & 1 deletion packages/node-core/src/indexer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ export * from './worker';
export * from './dictionary.service';
export * from './sandbox';
export * from './smartBatch.service';
export * from './dynamic-ds.service';
export * from './blockDispatcher';
export * from './postgresMmrDb';
export * from './dynamic-ds.service';
export * from './testing.service';
export * from './mmrMigrate.service';
81 changes: 49 additions & 32 deletions packages/node-core/src/indexer/mmr.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import {u8aToHex, u8aEq} from '@polkadot/util';
import {DEFAULT_WORD_SIZE, DEFAULT_LEAF, MMR_AWAIT_TIME} from '@subql/common';
import {MMR, FileBasedDb} from '@subql/x-merkle-mountain-range';
import {keccak256} from 'js-sha3';
import {NodeConfig} from '../configure';
import {Sequelize} from 'sequelize';
import {MmrStoreType, NodeConfig} from '../configure';
import {MmrPayload, MmrProof} from '../events';
import {getLogger} from '../logger';
import {delay} from '../utils';
import {delay, getExistingProjectSchema} from '../utils';
import {ProofOfIndex} from './entities';
import {PgBasedMMRDB} from './postgresMmrDb';
import {StoreCacheService} from './storeCache';
import {CachePoiModel} from './storeCache/cachePoi';

Expand All @@ -23,12 +25,16 @@ const keccak256Hash = (...nodeValues: Uint8Array[]) => Buffer.from(keccak256(Buf
export class MmrService implements OnApplicationShutdown {
private isShutdown = false;
private isSyncing = false;
private _fileBasedMmr?: MMR;
private _mmrDb?: MMR;
// This is the next block height that suppose to calculate its mmr value
private _nextMmrBlockHeight?: number;
private _blockOffset?: number;

constructor(private nodeConfig: NodeConfig, private storeCacheService: StoreCacheService) {}
constructor(
private readonly nodeConfig: NodeConfig,
private readonly storeCacheService: StoreCacheService,
private readonly sequelize: Sequelize
) {}

onApplicationShutdown(): void {
this.isShutdown = true;
Expand All @@ -42,11 +48,11 @@ export class MmrService implements OnApplicationShutdown {
return poi;
}

private get fileBasedMmr(): MMR {
if (!this._fileBasedMmr) {
private get mmrDb(): MMR {
if (!this._mmrDb) {
throw new Error('MMR Service sync has not been called');
}
return this._fileBasedMmr;
return this._mmrDb;
}

private get nextMmrBlockHeight(): number {
Expand All @@ -66,20 +72,22 @@ export class MmrService implements OnApplicationShutdown {
async syncFileBaseFromPoi(blockOffset: number): Promise<void> {
if (this.isSyncing) return;
this.isSyncing = true;
this._fileBasedMmr = await this.ensureFileBasedMmr(this.nodeConfig.mmrPath);
this._mmrDb =
this.nodeConfig.mmrStoreType === MmrStoreType.Postgres
? await this.ensurePostgresBasedMmr()
: await this.ensureFileBasedMmr(this.nodeConfig.mmrPath);
this._blockOffset = blockOffset;

// The file based database current leaf length
const fileBasedMmrLeafLength = await this.fileBasedMmr.getLeafLength();
// The mmr database current leaf length
const mmrLeafLength = await this.mmrDb.getLeafLength();
// However, when initialization we pick the previous block for file db and poi mmr validation
// if mmr leaf length 0 ensure the next block height to be processed min is 1.
this._nextMmrBlockHeight = fileBasedMmrLeafLength + blockOffset + 1;
this._nextMmrBlockHeight = mmrLeafLength + blockOffset + 1;
// The latest poi record in database with mmr value
const latestPoiWithMmr = await this.poi.getLatestPoiWithMmr();
if (latestPoiWithMmr) {
// The latestPoiWithMmr its mmr value in filebase db
const latestPoiFilebaseMmrValue = await this.fileBasedMmr.getRoot(latestPoiWithMmr.id - blockOffset - 1);
this.validatePoiMmr(latestPoiWithMmr, latestPoiFilebaseMmrValue);
const latestPoiMmrValue = await this.mmrDb.getRoot(latestPoiWithMmr.id - blockOffset - 1);
this.validatePoiMmr(latestPoiWithMmr, latestPoiMmrValue);
// Ensure aligned poi table and file based mmr
// If cache poi generated mmr haven't success write back to poi table,
// but latestPoiWithMmr still valid, mmr should delete advanced mmr
Expand All @@ -88,14 +96,14 @@ export class MmrService implements OnApplicationShutdown {
this._nextMmrBlockHeight = latestPoiWithMmr.id + 1;
}
}
logger.info(`file based database MMR start with next block height at ${this.nextMmrBlockHeight}`);
logger.info(`MMR database start with next block height at ${this.nextMmrBlockHeight}`);
while (!this.isShutdown) {
const poiBlocks = await this.poi.getPoiBlocksByRange(this.nextMmrBlockHeight);
if (poiBlocks.length !== 0) {
for (const block of poiBlocks) {
if (this.nextMmrBlockHeight < block.id) {
for (let i = this.nextMmrBlockHeight; i < block.id; i++) {
await this.fileBasedMmr.append(DEFAULT_LEAF);
await this.mmrDb.append(DEFAULT_LEAF);
this._nextMmrBlockHeight = i + 1;
}
}
Expand All @@ -110,7 +118,7 @@ export class MmrService implements OnApplicationShutdown {
// this.nextMmrBlockHeight means block before nextMmrBlockHeight-1 already exist in filebase mmr
if (this.nextMmrBlockHeight > Number(lastPoiHeight) && this.nextMmrBlockHeight <= Number(lastProcessedHeight)) {
for (let i = this.nextMmrBlockHeight; i <= Number(lastProcessedHeight); i++) {
await this.fileBasedMmr.append(DEFAULT_LEAF);
await this.mmrDb.append(DEFAULT_LEAF);
this._nextMmrBlockHeight = i + 1;
}
}
Expand All @@ -127,8 +135,8 @@ export class MmrService implements OnApplicationShutdown {
}
const estLeafIndexByBlockHeight = poiBlock.id - this.blockOffset - 1;
// The next leaf index in mmr, current latest leaf index always .getLeafLength -1.
await this.fileBasedMmr.append(newLeaf, estLeafIndexByBlockHeight);
const mmrRoot = await this.fileBasedMmr.getRoot(estLeafIndexByBlockHeight);
await this.mmrDb.append(newLeaf, estLeafIndexByBlockHeight);
const mmrRoot = await this.mmrDb.getRoot(estLeafIndexByBlockHeight);
this.updatePoiMmrRoot(poiBlock, mmrRoot);
this._nextMmrBlockHeight = poiBlock.id + 1;
}
Expand All @@ -138,13 +146,13 @@ export class MmrService implements OnApplicationShutdown {
throw new Error(`Poi block height ${poiWithMmr.id}, Poi mmr has not been set`);
} else if (!u8aEq(poiWithMmr.mmrRoot, mmrValue)) {
throw new Error(
`Poi block height ${poiWithMmr.id}, Poi mmr ${u8aToHex(
poiWithMmr.mmrRoot
)} not the same as filebased mmr: ${u8aToHex(mmrValue)}`
`Poi block height ${poiWithMmr.id}, Poi mmr ${u8aToHex(poiWithMmr.mmrRoot)} not the same as mmr db: ${u8aToHex(
mmrValue
)}`
);
} else {
logger.info(
`CHECKING : Poi block height ${poiWithMmr.id}, Poi mmr is same as file based mmr` //remove for debug
`CHECKING : Poi block height ${poiWithMmr.id}, Poi mmr is same as mmr db` //remove for debug
);
}
}
Expand All @@ -168,15 +176,21 @@ export class MmrService implements OnApplicationShutdown {
return new MMR(keccak256Hash, fileBasedDb);
}

private async ensurePostgresBasedMmr(): Promise<MMR> {
const postgresBasedDb = new PgBasedMMRDB(
this.sequelize,
(await getExistingProjectSchema(this.nodeConfig, this.sequelize))!
);
await postgresBasedDb.connect();
return new MMR(keccak256Hash, postgresBasedDb);
}

async getMmr(blockHeight: number): Promise<MmrPayload> {
const leafIndex = blockHeight - this.blockOffset - 1;
if (leafIndex < 0) {
throw new Error(`Parameter blockHeight must greater equal to ${this.blockOffset + 1} `);
}
const [mmrResponse, node] = await Promise.all([
this.fileBasedMmr.getRoot(leafIndex),
this.fileBasedMmr.get(leafIndex),
]);
const [mmrResponse, node] = await Promise.all([this.mmrDb.getRoot(leafIndex), this.mmrDb.get(leafIndex)]);
return {
offset: this.blockOffset,
height: blockHeight,
Expand All @@ -187,13 +201,13 @@ export class MmrService implements OnApplicationShutdown {

async getLatestMmr(): Promise<MmrPayload> {
// latest leaf index need fetch from .db, as original method will use cache
const blockHeight = (await this.fileBasedMmr.db.getLeafLength()) + this.blockOffset;
const blockHeight = (await this.mmrDb.db.getLeafLength()) + this.blockOffset;
return this.getMmr(blockHeight);
}

async getLatestMmrProof(): Promise<MmrProof> {
// latest leaf index need fetch from .db, as original method will use cache
const blockHeight = (await this.fileBasedMmr.db.getLeafLength()) + this.blockOffset;
const blockHeight = (await this.mmrDb.db.getLeafLength()) + this.blockOffset;
return this.getMmrProof(blockHeight);
}

Expand All @@ -202,7 +216,7 @@ export class MmrService implements OnApplicationShutdown {
if (leafIndex < 0) {
throw new Error(`Parameter blockHeight must greater equal to ${this.blockOffset + 1} `);
}
const mmrProof = await this.fileBasedMmr.getProof([leafIndex]);
const mmrProof = await this.mmrDb.getProof([leafIndex]);
const nodes = Object.entries(mmrProof.db.nodes).map(([key, data]) => {
return {
node: key,
Expand All @@ -217,11 +231,14 @@ export class MmrService implements OnApplicationShutdown {
}

async deleteMmrNode(blockHeight: number, blockOffset: number): Promise<void> {
this._fileBasedMmr = await this.ensureFileBasedMmr(this.nodeConfig.mmrPath);
this._mmrDb =
this.nodeConfig.mmrStoreType === MmrStoreType.Postgres
? await this.ensurePostgresBasedMmr()
: await this.ensureFileBasedMmr(this.nodeConfig.mmrPath);
const leafIndex = blockHeight - blockOffset - 1;
if (leafIndex < 0) {
throw new Error(`Target block height must greater equal to ${blockOffset + 1} `);
}
await this.fileBasedMmr.delete(leafIndex);
await this.mmrDb.delete(leafIndex);
}
}
74 changes: 74 additions & 0 deletions packages/node-core/src/indexer/mmrMigrate.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2020-2022 OnFinality Limited authors & contributors
// SPDX-License-Identifier: Apache-2.0

import {existsSync} from 'fs';
import {DEFAULT_WORD_SIZE} from '@subql/common';
import {NodeConfig, getExistingProjectSchema, getLogger, PgBasedMMRDB} from '@subql/node-core';
import {FileBasedDb} from '@subql/x-merkle-mountain-range';
import {Logging, Sequelize} from 'sequelize';

const logger = getLogger('mmr-migrate');

export enum MigrationDirection {
FileToDb = 'fileToDb',
DbToFile = 'dbToFile',
}

export class MMRMigrateService {
constructor(private nodeConfig: NodeConfig, private sequelize: Sequelize) {}

async migrate(direction: MigrationDirection): Promise<void> {
if (direction === MigrationDirection.FileToDb && !existsSync(this.nodeConfig.mmrPath)) {
logger.info(`MMR file not found: ${this.nodeConfig.mmrPath}`);
return;
}

let fileBasedMMRDb: FileBasedDb;

if (existsSync(this.nodeConfig.mmrPath)) {
fileBasedMMRDb = await FileBasedDb.open(this.nodeConfig.mmrPath);
} else {
fileBasedMMRDb = await FileBasedDb.create(this.nodeConfig.mmrPath, DEFAULT_WORD_SIZE);
}

const schema =
(await getExistingProjectSchema(this.nodeConfig, this.sequelize)) || (await this.createProjectSchema());
const pgBasedMMRDb = new PgBasedMMRDB(this.sequelize, schema);
await pgBasedMMRDb.connect();

const [source, target] =
direction === MigrationDirection.FileToDb ? [fileBasedMMRDb, pgBasedMMRDb] : [pgBasedMMRDb, fileBasedMMRDb];

const nodes = await source.getNodes();
const sortedEntries = Object.entries(nodes).sort(([a], [b]) => a.localeCompare(b));

const totalNodes = sortedEntries.length;
let completedNodes = 0;

for (const [index, value] of sortedEntries) {
await target.set(value, parseInt(index, 10));

completedNodes++;
const progressPercentage = Math.round((completedNodes / totalNodes) * 100);
process.stdout.clearLine(0);
process.stdout.cursorTo(0);
process.stdout.write(`Migration progress: ${progressPercentage}% | ${completedNodes}/${totalNodes} nodes`);
}

process.stdout.write('\n');

const leafLength = await source.getLeafLength();
await target.setLeafLength(leafLength);
}

private async createProjectSchema(): Promise<string> {
const schema = this.nodeConfig.dbSchema;
if (!this.nodeConfig.localMode) {
const schemas = await this.sequelize.showAllSchemas(undefined as unknown as Logging);
if (!(schemas as unknown as string[]).includes(schema)) {
await this.sequelize.createSchema(`"${schema}"`, undefined as unknown as Logging);
}
}
return schema;
}
}
Loading