Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flush with cache stored records size #1566

Merged
merged 2 commits into from
Mar 17, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions packages/node-core/src/configure/NodeConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export interface IConfig {
readonly pgCa?: string;
readonly pgKey?: string;
readonly pgCert?: string;
readonly storeCacheThreshold: number;
}

export type MinConfig = Partial<Omit<IConfig, 'subquery'>> & Pick<IConfig, 'subquery'>;
Expand All @@ -66,6 +67,7 @@ const DEFAULT_CONFIG = {
disableHistorical: false,
multiChain: false,
unfinalizedBlocks: false,
storeCacheThreshold: 300,
};

export class NodeConfig implements IConfig {
Expand Down Expand Up @@ -109,6 +111,10 @@ export class NodeConfig implements IConfig {
return this._config.batchSize;
}

get storeCacheThreshold(): number {
return this._config.storeCacheThreshold;
}

get networkEndpoint(): string | undefined {
return this._config.networkEndpoint;
}
Expand Down
6 changes: 6 additions & 0 deletions packages/node-core/src/indexer/storeCache/cacheMetadata.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ export class CacheMetadataModel implements ICachedModelControl<any> {
// Needed for dynamic datasources
private getCache: Partial<MetadataKeys> = {};

flushableRecordCounter = 0;

constructor(readonly model: MetadataRepo) {}

async find<K extends MetadataKey>(key: K): Promise<MetadataKeys[K] | undefined> {
Expand All @@ -37,6 +39,9 @@ export class CacheMetadataModel implements ICachedModelControl<any> {

set<K extends MetadataKey>(key: K, value: MetadataKeys[K]): void {
guardBlockedKeys(key);
if (this.setCache[key] === undefined) {
this.flushableRecordCounter += 1;
}
this.setCache[key] = value;
this.getCache[key] = value;
}
Expand Down Expand Up @@ -91,5 +96,6 @@ export class CacheMetadataModel implements ICachedModelControl<any> {

clear(): void {
this.setCache = {};
this.flushableRecordCounter = 0;
}
}
5 changes: 5 additions & 0 deletions packages/node-core/src/indexer/storeCache/cacheModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ export class CachedModel<

private removeCache: Record<string, RemoveValue> = {};

flushableRecordCounter = 0;

constructor(readonly model: ModelStatic<Model<T, T>>, private readonly historical = true) {}

private get historicalModel(): ModelStatic<Model<T & HistoricalModel, T & HistoricalModel>> {
Expand Down Expand Up @@ -126,6 +128,7 @@ export class CachedModel<
this.setCache[id] = new SetValueModel();
}
this.setCache[id].set(data, blockHeight);
this.flushableRecordCounter += 1;
// IMPORTANT
// This sync getCache with setCache
// Change this will impact `getByFieldFromCache`, `allCachedIds` and related methods.
Expand Down Expand Up @@ -176,6 +179,7 @@ export class CachedModel<
this.removeCache[id] = {
removedAtBlock: blockHeight,
};
this.flushableRecordCounter += 1;
if (this.getCache[id]) {
delete this.getCache[id];
// Also when .get, check removeCache first, should return undefined if removed
Expand Down Expand Up @@ -233,6 +237,7 @@ export class CachedModel<
this.getCache = {};
this.setCache = {};
this.removeCache = {};
this.flushableRecordCounter = 0;
}

dumpSetData(): SetData<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ describe('Store Cache Service historical', () => {
const sequilize = new Sequelize();

it('could init store cache service and init cache for models', () => {
storeService = new StoreCacheService(sequilize);
storeService = new StoreCacheService(sequilize, null);
storeService.getModel('entity1');
expect((storeService as any).cachedModels.entity1).toBeDefined();
expect((storeService as any).cachedModels.entity2).toBeUndefined();
});

it('could set cache for entity, also get from it', async () => {
storeService = new StoreCacheService(sequilize);
storeService = new StoreCacheService(sequilize, null);
storeService.getModel('entity1');
storeService.getModel('entity2');

Expand Down
18 changes: 6 additions & 12 deletions packages/node-core/src/indexer/storeCache/storeCache.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,29 @@

import assert from 'assert';
import {Injectable} from '@nestjs/common';
import {NodeConfig} from '@subql/node-core/configure';
import {sum} from 'lodash';
import {Sequelize, Transaction} from 'sequelize';
import {NodeConfig} from '../../configure';
import {MetadataRepo} from '../entities';
import {CacheMetadataModel} from './cacheMetadata';
import {CachedModel} from './cacheModel';
import {ICachedModel, EntitySetData, ICachedModelControl} from './types';

const FLUSH_FREQUENCY = 5;

@Injectable()
export class StoreCacheService {
historical: boolean;
private flushCounter: number;
private cachedModels: Record<string, ICachedModelControl<any>> = {};
private metadataRepo: MetadataRepo;

constructor(private sequelize: Sequelize) {
constructor(private sequelize: Sequelize, private config: NodeConfig) {
this.resetMemoryStore();
this.flushCounter = 0;
this.historical = true; // TODO, need handle when is not historical
this.historical = true;
jiqiang90 marked this conversation as resolved.
Show resolved Hide resolved
}

setMetadataRepo(repo: MetadataRepo): void {
this.metadataRepo = repo;
}

counterIncrement(): void {
this.flushCounter += 1;
}

getModel<T>(entity: string): ICachedModel<T> {
if (entity === '_metadata') {
throw new Error('Please use getMetadataModel instead');
Expand Down Expand Up @@ -98,7 +91,8 @@ export class StoreCacheService {
}

isFlushable(): boolean {
jiqiang90 marked this conversation as resolved.
Show resolved Hide resolved
return this.flushCounter % FLUSH_FREQUENCY === 0;
const numOfRecords = sum(Object.values(this.cachedModels).map((m) => m.flushableRecordCounter));
return numOfRecords >= this.config.storeCacheThreshold;
}

resetMemoryStore(): void {
Expand Down
2 changes: 1 addition & 1 deletion packages/node-core/src/indexer/storeCache/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export interface ICachedModel<T> {

export interface ICachedModelControl<T> {
isFlushable: boolean;

flushableRecordCounter: number;
sync(data: SetData<T>): void;
flush(tx: Transaction): Promise<void>;
dumpSetData(): SetData<T>;
Expand Down
6 changes: 6 additions & 0 deletions packages/node/src/yargs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -232,4 +232,10 @@ export const yargsOptions = yargs(hideBin(process.argv))
'Postgres client certificate - Path to client certificate e.g /path/to/client-certificates/postgresql.crt',
type: 'string',
},
'store-cache-threshold': {
demandOption: false,
describe:
'Store cache will flush when number of records excess this threshold',
type: 'number',
},
});