Skip to content

Commit

Permalink
flush with cache stored records size (#1566)
Browse files Browse the repository at this point in the history
* flush with cache stored records size

* update
  • Loading branch information
jiqiang90 authored Mar 17, 2023
1 parent 1afdd8c commit 4840d84
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 22 deletions.
6 changes: 6 additions & 0 deletions packages/node-core/src/configure/NodeConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export interface IConfig {
readonly pgCa?: string;
readonly pgKey?: string;
readonly pgCert?: string;
readonly storeCacheThreshold: number;
}

export type MinConfig = Partial<Omit<IConfig, 'subquery'>> & Pick<IConfig, 'subquery'>;
Expand All @@ -66,6 +67,7 @@ const DEFAULT_CONFIG = {
disableHistorical: false,
multiChain: false,
unfinalizedBlocks: false,
storeCacheThreshold: 300,
};

export class NodeConfig implements IConfig {
Expand Down Expand Up @@ -109,6 +111,10 @@ export class NodeConfig implements IConfig {
return this._config.batchSize;
}

get storeCacheThreshold(): number {
return this._config.storeCacheThreshold;
}

get networkEndpoint(): string | undefined {
return this._config.networkEndpoint;
}
Expand Down
6 changes: 6 additions & 0 deletions packages/node-core/src/indexer/storeCache/cacheMetadata.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ export class CacheMetadataModel implements ICachedModelControl<any> {
// Needed for dynamic datasources
private getCache: Partial<MetadataKeys> = {};

flushableRecordCounter = 0;

constructor(readonly model: MetadataRepo) {}

async find<K extends MetadataKey>(key: K): Promise<MetadataKeys[K] | undefined> {
Expand All @@ -37,6 +39,9 @@ export class CacheMetadataModel implements ICachedModelControl<any> {

set<K extends MetadataKey>(key: K, value: MetadataKeys[K]): void {
guardBlockedKeys(key);
if (this.setCache[key] === undefined) {
this.flushableRecordCounter += 1;
}
this.setCache[key] = value;
this.getCache[key] = value;
}
Expand Down Expand Up @@ -91,5 +96,6 @@ export class CacheMetadataModel implements ICachedModelControl<any> {

clear(): void {
this.setCache = {};
this.flushableRecordCounter = 0;
}
}
5 changes: 5 additions & 0 deletions packages/node-core/src/indexer/storeCache/cacheModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ export class CachedModel<

private removeCache: Record<string, RemoveValue> = {};

flushableRecordCounter = 0;

constructor(readonly model: ModelStatic<Model<T, T>>, private readonly historical = true) {}

private get historicalModel(): ModelStatic<Model<T & HistoricalModel, T & HistoricalModel>> {
Expand Down Expand Up @@ -126,6 +128,7 @@ export class CachedModel<
this.setCache[id] = new SetValueModel();
}
this.setCache[id].set(data, blockHeight);
this.flushableRecordCounter += 1;
// IMPORTANT
// This sync getCache with setCache
// Change this will impact `getByFieldFromCache`, `allCachedIds` and related methods.
Expand Down Expand Up @@ -176,6 +179,7 @@ export class CachedModel<
this.removeCache[id] = {
removedAtBlock: blockHeight,
};
this.flushableRecordCounter += 1;
if (this.getCache[id]) {
delete this.getCache[id];
// Also when .get, check removeCache first, should return undefined if removed
Expand Down Expand Up @@ -233,6 +237,7 @@ export class CachedModel<
this.getCache = {};
this.setCache = {};
this.removeCache = {};
this.flushableRecordCounter = 0;
}

dumpSetData(): SetData<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ describe('Store Cache Service historical', () => {
const sequilize = new Sequelize();

it('could init store cache service and init cache for models', () => {
storeService = new StoreCacheService(sequilize);
storeService = new StoreCacheService(sequilize, null);
storeService.getModel('entity1');
expect((storeService as any).cachedModels.entity1).toBeDefined();
expect((storeService as any).cachedModels.entity2).toBeUndefined();
});

it('could set cache for entity, also get from it', async () => {
storeService = new StoreCacheService(sequilize);
storeService = new StoreCacheService(sequilize, null);
storeService.getModel('entity1');
storeService.getModel('entity2');

Expand Down
26 changes: 7 additions & 19 deletions packages/node-core/src/indexer/storeCache/storeCache.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,27 @@

import assert from 'assert';
import {Injectable} from '@nestjs/common';
import {NodeConfig} from '@subql/node-core/configure';
import {sum} from 'lodash';
import {Sequelize, Transaction} from 'sequelize';
import {NodeConfig} from '../../configure';
import {MetadataRepo} from '../entities';
import {CacheMetadataModel} from './cacheMetadata';
import {CachedModel} from './cacheModel';
import {ICachedModel, EntitySetData, ICachedModelControl} from './types';

const FLUSH_FREQUENCY = 5;

@Injectable()
export class StoreCacheService {
historical: boolean;
private flushCounter: number;
private cachedModels: Record<string, ICachedModelControl<any>> = {};
private metadataRepo: MetadataRepo;

constructor(private sequelize: Sequelize) {
constructor(private sequelize: Sequelize, private config: NodeConfig) {
this.resetMemoryStore();
this.flushCounter = 0;
this.historical = true; // TODO, need handle when is not historical
}

setMetadataRepo(repo: MetadataRepo): void {
this.metadataRepo = repo;
}

counterIncrement(): void {
this.flushCounter += 1;
}

getModel<T>(entity: string): ICachedModel<T> {
if (entity === '_metadata') {
throw new Error('Please use getMetadataModel instead');
Expand Down Expand Up @@ -72,18 +63,14 @@ export class StoreCacheService {
}

private async _flushCache(tx: Transaction): Promise<void> {
if (!this.historical) {
return;
}

// Get models that have data to flush
const updatableModels = Object.values(this.cachedModels).filter((m) => m.isFlushable);

await Promise.all(updatableModels.map((model) => model.flush(tx)));
}

async flushCache(tx: Transaction): Promise<void> {
if (this.isFlushable()) {
async flushCache(tx: Transaction, forceFlush?: boolean): Promise<void> {
if (this.isFlushable() || forceFlush) {
await this._flushCache(tx);
// Note flushCache and commit transaction need to sequential
// await this.commitTransaction();
Expand All @@ -98,7 +85,8 @@ export class StoreCacheService {
}

isFlushable(): boolean {
return this.flushCounter % FLUSH_FREQUENCY === 0;
const numOfRecords = sum(Object.values(this.cachedModels).map((m) => m.flushableRecordCounter));
return numOfRecords >= this.config.storeCacheThreshold;
}

resetMemoryStore(): void {
Expand Down
2 changes: 1 addition & 1 deletion packages/node-core/src/indexer/storeCache/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export interface ICachedModel<T> {

export interface ICachedModelControl<T> {
isFlushable: boolean;

flushableRecordCounter: number;
sync(data: SetData<T>): void;
flush(tx: Transaction): Promise<void>;
dumpSetData(): SetData<T>;
Expand Down
6 changes: 6 additions & 0 deletions packages/node/src/yargs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -232,4 +232,10 @@ export const yargsOptions = yargs(hideBin(process.argv))
'Postgres client certificate - Path to client certificate e.g /path/to/client-certificates/postgresql.crt',
type: 'string',
},
'store-cache-threshold': {
demandOption: false,
describe:
'Store cache will flush when number of records excess this threshold',
type: 'number',
},
});

0 comments on commit 4840d84

Please sign in to comment.