diff --git a/packages/node-core/src/indexer/storeCache/cachePoi.spec.ts b/packages/node-core/src/indexer/storeCache/cachePoi.spec.ts new file mode 100644 index 0000000000..18fae7ff45 --- /dev/null +++ b/packages/node-core/src/indexer/storeCache/cachePoi.spec.ts @@ -0,0 +1,161 @@ +// Copyright 2020-2022 OnFinality Limited authors & contributors +// SPDX-License-Identifier: Apache-2.0 + +import {Op} from 'sequelize'; +import {PoiRepo, ProofOfIndex} from '../entities'; +import {CachePoiModel} from './cachePoi'; + +const mockPoiRepo = (): PoiRepo => { + const data: Record = {}; + + const ascKeys = () => (Object.keys(data) as any[] as number[]).sort((a: number, b: number) => a - b); + const descKeys = () => [...ascKeys()].reverse(); + + return { + findByPk: (key: number) => ({toJSON: () => data[key]}), + findAll: ({limit, order, where}: any) => { + const orderedKeys = ascKeys(); + + const startHeight = where.id[Op.gte]; + + const filteredKeys = orderedKeys.filter((key) => key >= startHeight).slice(0, limit); + + return filteredKeys.map((key) => ({toJSON: () => data[key]})); + }, + findOne: ({order, where}: any) => { + const orderedKeys = descKeys(); + + if (!orderedKeys.length) { + return null; + } + + if (!where) { + return {toJSON: () => data[orderedKeys[0]]}; + } + + // Only support null mmrRoot + for (const key of orderedKeys) { + if (data[key].mmrRoot !== null && data[key].mmrRoot !== undefined) { + return {toJSON: () => data[key]}; + } + } + + return null; + }, + bulkCreate: (input: {id: number}[], opts: any) => input.map((d) => (data[d.id] = d)), + destroy: (key: number) => { + delete data[key]; + }, + } as any as PoiRepo; +}; + +const getEmptyPoi = (id: number, mmrRoot?: any): ProofOfIndex => { + return { + id, + chainBlockHash: new Uint8Array(), + hash: new Uint8Array(), + parentHash: new Uint8Array(), + mmrRoot, + } as ProofOfIndex; +}; + +describe('CachePoi', () => { + let poiRepo: PoiRepo; + let cachePoi: CachePoiModel; + + beforeEach(() => { + poiRepo = mockPoiRepo(); + cachePoi = new CachePoiModel(poiRepo); + }); + + describe('getPoiBlocksByRange', () => { + it('with mix of cache and db data', async () => { + await poiRepo.bulkCreate([{id: 1}, {id: 2}, {id: 3}] as any); + + cachePoi.set(getEmptyPoi(4)); + cachePoi.set(getEmptyPoi(5)); + cachePoi.set(getEmptyPoi(6)); + + const res = await cachePoi.getPoiBlocksByRange(2); + expect(res.map((d) => d.id)).toEqual([2, 3, 4, 5, 6]); + }); + + it('only db data', async () => { + await poiRepo.bulkCreate([{id: 1}, {id: 2}, {id: 3}] as any); + + const res = await cachePoi.getPoiBlocksByRange(2); + expect(res.map((d) => d.id)).toEqual([2, 3]); + }); + + it('only cache data', async () => { + cachePoi.set(getEmptyPoi(4)); + cachePoi.set(getEmptyPoi(5)); + cachePoi.set(getEmptyPoi(6)); + + const res = await cachePoi.getPoiBlocksByRange(2); + expect(res.map((d) => d.id)).toEqual([4, 5, 6]); + }); + }); + + describe('getLatestPoi', () => { + it('with mix of cache and db data', async () => { + await poiRepo.bulkCreate([{id: 1}, {id: 2}, {id: 3}] as any); + + cachePoi.set(getEmptyPoi(4)); + cachePoi.set(getEmptyPoi(5)); + cachePoi.set(getEmptyPoi(6)); + + const res = await cachePoi.getLatestPoi(); + expect(res?.id).toBe(6); + }); + + it('only db data', async () => { + await poiRepo.bulkCreate([{id: 1}, {id: 2}, {id: 3}] as any); + + const res = await cachePoi.getLatestPoi(); + expect(res?.id).toBe(3); + }); + + it('only cache data', async () => { + cachePoi.set(getEmptyPoi(1)); + cachePoi.set(getEmptyPoi(2)); + cachePoi.set(getEmptyPoi(3)); + + const res = await cachePoi.getLatestPoi(); + expect(res?.id).toBe(3); + }); + }); + + describe('getLatestPoiWithMmr', () => { + it('with mix of cache and db data', async () => { + await poiRepo.bulkCreate([ + {id: 1, mmrRoot: 'mmr1'}, + {id: 2, mmrRoot: 'mmr2'}, + {id: 3, mmrRoot: 'mmr3'}, + ] as any); + + cachePoi.set(getEmptyPoi(4, 'mmr4')); + cachePoi.set(getEmptyPoi(5)); + cachePoi.set(getEmptyPoi(6)); + + const res = await cachePoi.getLatestPoiWithMmr(); + expect(res?.id).toBe(4); + }); + + it('only db data', async () => { + await poiRepo.bulkCreate([{id: 1, mmrRoot: 'mmr1'}, {id: 2, mmrRoot: 'mmr2'}, {id: 3}] as any); + + const res = await cachePoi.getLatestPoiWithMmr(); + expect(res?.id).toBe(2); + }); + + it('only cache data', async () => { + cachePoi.set(getEmptyPoi(1, 'mmr1')); + cachePoi.set(getEmptyPoi(2, 'mmr2')); + cachePoi.set(getEmptyPoi(3)); + + const res = await cachePoi.getLatestPoiWithMmr(); + expect(res?.id).toBe(2); + }); + }); +}); diff --git a/packages/node-core/src/indexer/storeCache/cachePoi.ts b/packages/node-core/src/indexer/storeCache/cachePoi.ts index fe8a6691e9..cd72385050 100644 --- a/packages/node-core/src/indexer/storeCache/cachePoi.ts +++ b/packages/node-core/src/indexer/storeCache/cachePoi.ts @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 import {u8aToBuffer} from '@subql/utils'; +import {Mutex} from 'async-mutex'; import {Op, Transaction} from 'sequelize'; import {getLogger} from '../../logger'; import {PoiRepo, ProofOfIndex} from '../entities'; @@ -15,6 +16,7 @@ export class CachePoiModel implements ICachedModelControl { private setCache: Record = {}; private removeCache: number[] = []; flushableRecordCounter = 0; + private mutex = new Mutex(); constructor(readonly model: PoiRepo) {} @@ -31,6 +33,7 @@ export class CachePoiModel implements ICachedModelControl { } async getById(id: number): Promise { + await this.mutex.waitForUnlock(); if (this.removeCache.includes(id)) { logger.debug(`Attempted to get deleted POI with id="${id}"`); return undefined; @@ -52,6 +55,7 @@ export class CachePoiModel implements ICachedModelControl { } async getPoiBlocksByRange(startHeight: number): Promise { + await this.mutex.waitForUnlock(); const result = await this.model.findAll({ limit: DEFAULT_FETCH_RANGE, where: {id: {[Op.gte]: startHeight}}, @@ -60,9 +64,7 @@ export class CachePoiModel implements ICachedModelControl { const resultData = result.map((r) => r?.toJSON()); - const poiBlocks = Object.values(this.mergeResultsWithCache(resultData)).filter( - (poiBlock) => poiBlock.id >= startHeight - ); + const poiBlocks = this.mergeResultsWithCache(resultData).filter((poiBlock) => poiBlock.id >= startHeight); if (poiBlocks.length !== 0) { return poiBlocks.sort((v) => v.id); } else { @@ -71,34 +73,22 @@ export class CachePoiModel implements ICachedModelControl { } async getLatestPoi(): Promise { + await this.mutex.waitForUnlock(); const result = await this.model.findOne({ order: [['id', 'DESC']], }); - if (!result) return null; - - return Object.values(this.mergeResultsWithCache([result.toJSON()])).reduce((acc, val) => { - if (acc && acc.id < val.id) return acc; - return val; - }, null as ProofOfIndex | null); + return this.mergeResultsWithCache([result?.toJSON()], 'desc')[0]; } async getLatestPoiWithMmr(): Promise { + await this.mutex.waitForUnlock(); const result = await this.model.findOne({ order: [['id', 'DESC']], where: {mmrRoot: {[Op.ne]: null}} as any, // Types problem with sequelize, undefined works but not null }); - if (!result) { - return null; - } - - return Object.values(this.mergeResultsWithCache([result.toJSON()])) - .filter((v) => !!v.mmrRoot) - .reduce((acc, val) => { - if (acc && acc.id < val.id) return acc; - return val; - }, null as ProofOfIndex | null); + return this.mergeResultsWithCache([result?.toJSON()], 'desc').find((v) => !!v.mmrRoot) ?? null; } get isFlushable(): boolean { @@ -106,29 +96,44 @@ export class CachePoiModel implements ICachedModelControl { } async flush(tx: Transaction): Promise { - logger.debug(`Flushing ${this.flushableRecordCounter} items from cache`); - const pendingFlush = Promise.all([ - this.model.bulkCreate(Object.values(this.setCache), {transaction: tx, updateOnDuplicate: ['mmrRoot']}), - this.model.destroy({where: {id: this.removeCache}, transaction: tx}), - ]); - - // Don't await DB operations to complete before clearing. - // This allows new data to be cached while flushing - this.clear(); - - await pendingFlush; + const release = await this.mutex.acquire(); + try { + tx.afterCommit(() => { + release(); + }); + logger.debug(`Flushing ${this.flushableRecordCounter} items from cache`); + const pendingFlush = Promise.all([ + this.model.bulkCreate(Object.values(this.setCache), {transaction: tx, updateOnDuplicate: ['mmrRoot']}), + this.model.destroy({where: {id: this.removeCache}, transaction: tx}), + ]); + + // Don't await DB operations to complete before clearing. + // This allows new data to be cached while flushing + this.clear(); + + await pendingFlush; + } catch (e) { + release(); + throw e; + } } - private mergeResultsWithCache(results: ProofOfIndex[]): Record { + private mergeResultsWithCache(results: (ProofOfIndex | undefined)[], order: 'asc' | 'desc' = 'asc'): ProofOfIndex[] { const copy = {...this.setCache}; - results.map((result) => { + results.forEach((result) => { if (result) { copy[result.id] = result; } }); - return copy; + const ascending = Object.values(copy).sort((a, b) => a.id - b.id); + + if (order === 'asc') { + return ascending; + } + + return ascending.reverse(); } private clear(): void {