diff --git a/packages/node-core/src/configure/NodeConfig.ts b/packages/node-core/src/configure/NodeConfig.ts index 66f3dfcd04..e981f32fce 100644 --- a/packages/node-core/src/configure/NodeConfig.ts +++ b/packages/node-core/src/configure/NodeConfig.ts @@ -45,6 +45,7 @@ export interface IConfig { readonly pgCa?: string; readonly pgKey?: string; readonly pgCert?: string; + readonly storeCacheThreshold: number; } export type MinConfig = Partial> & Pick; @@ -66,6 +67,7 @@ const DEFAULT_CONFIG = { disableHistorical: false, multiChain: false, unfinalizedBlocks: false, + storeCacheThreshold: 300, }; export class NodeConfig implements IConfig { @@ -109,6 +111,10 @@ export class NodeConfig implements IConfig { return this._config.batchSize; } + get storeCacheThreshold(): number { + return this._config.storeCacheThreshold; + } + get networkEndpoint(): string | undefined { return this._config.networkEndpoint; } diff --git a/packages/node-core/src/indexer/storeCache/cacheMetadata.ts b/packages/node-core/src/indexer/storeCache/cacheMetadata.ts index 406ead102f..15533da05d 100644 --- a/packages/node-core/src/indexer/storeCache/cacheMetadata.ts +++ b/packages/node-core/src/indexer/storeCache/cacheMetadata.ts @@ -20,6 +20,8 @@ export class CacheMetadataModel implements ICachedModelControl { // Needed for dynamic datasources private getCache: Partial = {}; + flushableRecordCounter = 0; + constructor(readonly model: MetadataRepo) {} async find(key: K): Promise { @@ -37,6 +39,9 @@ export class CacheMetadataModel implements ICachedModelControl { set(key: K, value: MetadataKeys[K]): void { guardBlockedKeys(key); + if (this.setCache[key] === undefined) { + this.flushableRecordCounter += 1; + } this.setCache[key] = value; this.getCache[key] = value; } @@ -91,5 +96,6 @@ export class CacheMetadataModel implements ICachedModelControl { clear(): void { this.setCache = {}; + this.flushableRecordCounter = 0; } } diff --git a/packages/node-core/src/indexer/storeCache/cacheModel.ts b/packages/node-core/src/indexer/storeCache/cacheModel.ts index 70d29716e3..1118450ea9 100644 --- a/packages/node-core/src/indexer/storeCache/cacheModel.ts +++ b/packages/node-core/src/indexer/storeCache/cacheModel.ts @@ -27,6 +27,8 @@ export class CachedModel< private removeCache: Record = {}; + flushableRecordCounter = 0; + constructor(readonly model: ModelStatic>, private readonly historical = true) {} private get historicalModel(): ModelStatic> { @@ -126,6 +128,7 @@ export class CachedModel< this.setCache[id] = new SetValueModel(); } this.setCache[id].set(data, blockHeight); + this.flushableRecordCounter += 1; // IMPORTANT // This sync getCache with setCache // Change this will impact `getByFieldFromCache`, `allCachedIds` and related methods. @@ -176,6 +179,7 @@ export class CachedModel< this.removeCache[id] = { removedAtBlock: blockHeight, }; + this.flushableRecordCounter += 1; if (this.getCache[id]) { delete this.getCache[id]; // Also when .get, check removeCache first, should return undefined if removed @@ -233,6 +237,7 @@ export class CachedModel< this.getCache = {}; this.setCache = {}; this.removeCache = {}; + this.flushableRecordCounter = 0; } dumpSetData(): SetData { diff --git a/packages/node-core/src/indexer/storeCache/storeCache.service.spec.ts b/packages/node-core/src/indexer/storeCache/storeCache.service.spec.ts index 12730d5c8e..1fa4a65d4a 100644 --- a/packages/node-core/src/indexer/storeCache/storeCache.service.spec.ts +++ b/packages/node-core/src/indexer/storeCache/storeCache.service.spec.ts @@ -38,14 +38,14 @@ describe('Store Cache Service historical', () => { const sequilize = new Sequelize(); it('could init store cache service and init cache for models', () => { - storeService = new StoreCacheService(sequilize); + storeService = new StoreCacheService(sequilize, null); storeService.getModel('entity1'); expect((storeService as any).cachedModels.entity1).toBeDefined(); expect((storeService as any).cachedModels.entity2).toBeUndefined(); }); it('could set cache for entity, also get from it', async () => { - storeService = new StoreCacheService(sequilize); + storeService = new StoreCacheService(sequilize, null); storeService.getModel('entity1'); storeService.getModel('entity2'); diff --git a/packages/node-core/src/indexer/storeCache/storeCache.service.ts b/packages/node-core/src/indexer/storeCache/storeCache.service.ts index 61eafcfce0..5da42b18b2 100644 --- a/packages/node-core/src/indexer/storeCache/storeCache.service.ts +++ b/packages/node-core/src/indexer/storeCache/storeCache.service.ts @@ -3,36 +3,27 @@ import assert from 'assert'; import {Injectable} from '@nestjs/common'; +import {NodeConfig} from '@subql/node-core/configure'; +import {sum} from 'lodash'; import {Sequelize, Transaction} from 'sequelize'; -import {NodeConfig} from '../../configure'; import {MetadataRepo} from '../entities'; import {CacheMetadataModel} from './cacheMetadata'; import {CachedModel} from './cacheModel'; import {ICachedModel, EntitySetData, ICachedModelControl} from './types'; -const FLUSH_FREQUENCY = 5; - @Injectable() export class StoreCacheService { - historical: boolean; - private flushCounter: number; private cachedModels: Record> = {}; private metadataRepo: MetadataRepo; - constructor(private sequelize: Sequelize) { + constructor(private sequelize: Sequelize, private config: NodeConfig) { this.resetMemoryStore(); - this.flushCounter = 0; - this.historical = true; // TODO, need handle when is not historical } setMetadataRepo(repo: MetadataRepo): void { this.metadataRepo = repo; } - counterIncrement(): void { - this.flushCounter += 1; - } - getModel(entity: string): ICachedModel { if (entity === '_metadata') { throw new Error('Please use getMetadataModel instead'); @@ -72,18 +63,14 @@ export class StoreCacheService { } private async _flushCache(tx: Transaction): Promise { - if (!this.historical) { - return; - } - // Get models that have data to flush const updatableModels = Object.values(this.cachedModels).filter((m) => m.isFlushable); await Promise.all(updatableModels.map((model) => model.flush(tx))); } - async flushCache(tx: Transaction): Promise { - if (this.isFlushable()) { + async flushCache(tx: Transaction, forceFlush?: boolean): Promise { + if (this.isFlushable() || forceFlush) { await this._flushCache(tx); // Note flushCache and commit transaction need to sequential // await this.commitTransaction(); @@ -98,7 +85,8 @@ export class StoreCacheService { } isFlushable(): boolean { - return this.flushCounter % FLUSH_FREQUENCY === 0; + const numOfRecords = sum(Object.values(this.cachedModels).map((m) => m.flushableRecordCounter)); + return numOfRecords >= this.config.storeCacheThreshold; } resetMemoryStore(): void { diff --git a/packages/node-core/src/indexer/storeCache/types.ts b/packages/node-core/src/indexer/storeCache/types.ts index 03ebb36cf3..91cc379197 100644 --- a/packages/node-core/src/indexer/storeCache/types.ts +++ b/packages/node-core/src/indexer/storeCache/types.ts @@ -29,7 +29,7 @@ export interface ICachedModel { export interface ICachedModelControl { isFlushable: boolean; - + flushableRecordCounter: number; sync(data: SetData): void; flush(tx: Transaction): Promise; dumpSetData(): SetData; diff --git a/packages/node/src/yargs.ts b/packages/node/src/yargs.ts index 622a851dd9..4797be0c7d 100644 --- a/packages/node/src/yargs.ts +++ b/packages/node/src/yargs.ts @@ -232,4 +232,10 @@ export const yargsOptions = yargs(hideBin(process.argv)) 'Postgres client certificate - Path to client certificate e.g /path/to/client-certificates/postgresql.crt', type: 'string', }, + 'store-cache-threshold': { + demandOption: false, + describe: + 'Store cache will flush when number of records excess this threshold', + type: 'number', + }, });