diff --git a/package.json b/package.json index 36d5e15e20..025df5ae74 100644 --- a/package.json +++ b/package.json @@ -31,7 +31,7 @@ "ts-loader": "^9.2.6", "ts-node": "^10.4.0", "tsconfig-paths": "^3.12.0", - "typescript": "^4.4.4" + "typescript": "^4.9.5" }, "resolutions": { "@polkadot/api": "10.1.4", diff --git a/packages/common/fixtures/package.json b/packages/common/fixtures/package.json index 7e66c73991..710e0e6ab0 100644 --- a/packages/common/fixtures/package.json +++ b/packages/common/fixtures/package.json @@ -21,7 +21,7 @@ "devDependencies": { "@polkadot/api": "^10", "@subql/types": "latest", - "typescript": "^4.1.3", + "typescript": "^4.9.5", "@subql/cli": "latest" } } diff --git a/packages/node-core/package.json b/packages/node-core/package.json index 0c8aa33c5f..a378748691 100644 --- a/packages/node-core/package.json +++ b/packages/node-core/package.json @@ -27,7 +27,7 @@ "@subql/types": "workspace:*", "@subql/utils": "workspace:*", "@subql/x-merkle-mountain-range": "^2.0.0-0.1.2", - "@willsoto/nestjs-prometheus": "^4.4.0", + "@willsoto/nestjs-prometheus": "^5.1.1", "async-lock": "^1.4.0", "async-mutex": "^0.4.0", "lodash": "^4.17.21", diff --git a/packages/node-core/src/api.service.ts b/packages/node-core/src/api.service.ts index f37962ee1d..6a57b2e661 100644 --- a/packages/node-core/src/api.service.ts +++ b/packages/node-core/src/api.service.ts @@ -2,8 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 import {Injectable} from '@nestjs/common'; -// import {ApiWrapper} from '@subql/types-avalanche'; -import {NetworkMetadataPayload} from './events'; +import {ISubqueryProject} from './indexer'; import {getLogger} from './logger'; const logger = getLogger('api'); @@ -14,20 +13,18 @@ type FetchFunction = (batch: number[]) => Promise; type FetchFunctionProvider = () => FetchFunction; @Injectable() -export abstract class ApiService { - networkMeta: NetworkMetadataPayload; +export abstract class ApiService

{ + constructor(protected project: P) {} - constructor(protected project: any) {} + abstract init(): Promise>; + abstract get api(): A; /*ApiWrapper*/ + abstract fetchBlocks(heights: number[]): Promise; - abstract init(): Promise; - abstract get api(): any; /*ApiWrapper*/ - abstract fetchBlocks(batch: number[]): Promise; - - async fetchBlocksGeneric( - fetchFuncProvider: FetchFunctionProvider, + async fetchBlocksGeneric( + fetchFuncProvider: FetchFunctionProvider, batch: number[], numAttempts = MAX_RECONNECT_ATTEMPTS - ): Promise { + ): Promise { { let reconnectAttempts = 0; while (reconnectAttempts < numAttempts) { @@ -35,7 +32,7 @@ export abstract class ApiService { // Get the latest fetch function from the provider const fetchFunc = fetchFuncProvider(); return await fetchFunc(batch); - } catch (e) { + } catch (e: any) { logger.error(e, `Failed to fetch blocks ${batch[0]}...${batch[batch.length - 1]}`); reconnectAttempts++; diff --git a/packages/node-core/src/configure/NodeConfig.ts b/packages/node-core/src/configure/NodeConfig.ts index 40307ff9f6..a956bc6ce2 100644 --- a/packages/node-core/src/configure/NodeConfig.ts +++ b/packages/node-core/src/configure/NodeConfig.ts @@ -26,7 +26,7 @@ export interface IConfig { readonly networkDictionary?: string; readonly dictionaryResolver?: string; readonly outputFmt?: 'json'; - readonly logLevel?: LevelWithSilent; + readonly logLevel: LevelWithSilent; readonly queryLimit: number; readonly indexCountLimit: number; readonly timestampField: boolean; @@ -45,15 +45,16 @@ export interface IConfig { readonly pgCa?: string; readonly pgKey?: string; readonly pgCert?: string; - readonly storeCacheThreshold?: number; - readonly storeGetCacheSize?: number; - readonly storeCacheAsync?: boolean; + readonly storeCacheThreshold: number; + readonly storeGetCacheSize: number; + readonly storeCacheAsync: boolean; } export type MinConfig = Partial> & Pick; const DEFAULT_CONFIG = { localMode: false, + logLevel: 'info', batchSize: 100, timeout: 900, blockTime: 6000, @@ -104,7 +105,11 @@ export class NodeConfig implements IConfig { get subqueryName(): string { assert(this._config.subquery); - return this._config.subqueryName ?? last(this.subquery.split(path.sep)); + const name = this._config.subqueryName ?? last(this.subquery.split(path.sep)); + if (!name) { + throw new Error('Unable to get subquery name'); + } + return name; } get localMode(): boolean { @@ -134,10 +139,10 @@ export class NodeConfig implements IConfig { } get storeCacheAsync(): boolean { - return this._config.storeCacheAsync; + return !!this._config.storeCacheAsync; } - get dictionaryResolver(): string { + get dictionaryResolver(): string | undefined { return this._config.dictionaryResolver; } @@ -188,7 +193,7 @@ export class NodeConfig implements IConfig { get mmrPath(): string { return this._config.mmrPath ?? `.mmr/${this.subqueryName}.mmr`; } - get ipfs(): string { + get ipfs(): string | undefined { return this._config.ipfs; } @@ -196,16 +201,16 @@ export class NodeConfig implements IConfig { return this._config.dbSchema ?? this.subqueryName; } - get workers(): number { + get workers(): number | undefined { return this._config.workers; } get profiler(): boolean { - return this._config.profiler; + return !!this._config.profiler; } get unsafe(): boolean { - return this._config.unsafe; + return !!this._config.unsafe; } get subscription(): boolean { @@ -221,7 +226,7 @@ export class NodeConfig implements IConfig { } get unfinalizedBlocks(): boolean { - return this._config.unfinalizedBlocks; + return !!this._config.unfinalizedBlocks; } get isPostgresSecureConnection(): boolean { @@ -234,8 +239,8 @@ export class NodeConfig implements IConfig { } try { return getFileContent(this._config.pgCa, 'postgres ca cert'); - } catch (e) { - logger.error(e); + } catch (e: any) { + logger.error(e, 'Unable to get postges CA Cert'); throw e; } } @@ -247,8 +252,8 @@ export class NodeConfig implements IConfig { try { return getFileContent(this._config.pgKey, 'postgres client key'); - } catch (e) { - logger.error(e); + } catch (e: any) { + logger.error(e, 'Unable to get postgres client key'); throw e; } } @@ -259,8 +264,8 @@ export class NodeConfig implements IConfig { } try { return getFileContent(this._config.pgCert, 'postgres client cert'); - } catch (e) { - logger.error(e); + } catch (e: any) { + logger.error(e, 'Unable to get postgres client cert'); throw e; } } diff --git a/packages/node-core/src/db/db.module.ts b/packages/node-core/src/db/db.module.ts index 4c580f5929..3601b17b49 100644 --- a/packages/node-core/src/db/db.module.ts +++ b/packages/node-core/src/db/db.module.ts @@ -35,7 +35,7 @@ async function establishConnectionSequelize(option: SequelizeOption, numRetries: const sequelize = new Sequelize(uri, option); try { await sequelize.authenticate(); - } catch (error) { + } catch (error: any) { if (JSON.stringify(error.message).includes(CONNECTION_SSL_ERROR_REGEX)) { logger.warn('Database does not support SSL connection, will try to connect without it'); option.dialectOptions = undefined; diff --git a/packages/node-core/src/indexer/PoiBlock.ts b/packages/node-core/src/indexer/PoiBlock.ts index 9fe1684a92..2facb7a869 100644 --- a/packages/node-core/src/indexer/PoiBlock.ts +++ b/packages/node-core/src/indexer/PoiBlock.ts @@ -1,29 +1,21 @@ // Copyright 2020-2022 OnFinality Limited authors & contributors // SPDX-License-Identifier: Apache-2.0 -import { u8aConcat, numberToU8a, hexToU8a, isHex, isU8a } from '@polkadot/util'; -import { blake2AsU8a } from '@polkadot/util-crypto'; -import { ProofOfIndex } from './entities/Poi.entity'; +import {u8aConcat, numberToU8a, hexToU8a, isHex, isU8a} from '@polkadot/util'; +import {blake2AsU8a} from '@polkadot/util-crypto'; +import {ProofOfIndex} from './entities/Poi.entity'; const poiBlockHash = ( id: number, chainBlockHash: string | Uint8Array, operationHashRoot: Uint8Array, parentHash: Uint8Array, - projectId: string, + projectId: string ): Uint8Array => { if (!id || !chainBlockHash || !operationHashRoot || !projectId) { throw new Error('Poof of index: can not generate block hash'); } - return blake2AsU8a( - u8aConcat( - numberToU8a(id), - chainBlockHash, - operationHashRoot, - Buffer.from(projectId), - parentHash, - ), - ); + return blake2AsU8a(u8aConcat(numberToU8a(id), chainBlockHash, operationHashRoot, Buffer.from(projectId), parentHash)); }; export class PoiBlock implements ProofOfIndex { @@ -32,7 +24,6 @@ export class PoiBlock implements ProofOfIndex { readonly hash: Uint8Array; readonly parentHash: Uint8Array; readonly operationHashRoot: Uint8Array; - mmrRoot: Uint8Array; readonly projectId: string; constructor( @@ -41,7 +32,7 @@ export class PoiBlock implements ProofOfIndex { hash: Uint8Array, parentHash: Uint8Array, operationHashRoot: Uint8Array, - projectId: string, + projectId: string ) { this.id = id; this.chainBlockHash = chainBlockHash; @@ -56,29 +47,18 @@ export class PoiBlock implements ProofOfIndex { chainBlockHash: string | Uint8Array, operationHashRoot: Uint8Array, parentHash: Uint8Array, - projectId: string, + projectId: string ): PoiBlock { - const _poiBlockHash = poiBlockHash( - id, - chainBlockHash, - operationHashRoot, - parentHash, - projectId, - ); - let _chainBlockHash: Uint8Array; + const _poiBlockHash = poiBlockHash(id, chainBlockHash, operationHashRoot, parentHash, projectId); + let _chainBlockHash: Uint8Array | undefined; if (isHex(chainBlockHash)) { _chainBlockHash = hexToU8a(chainBlockHash); } else if (isU8a(chainBlockHash)) { _chainBlockHash = chainBlockHash; + } else { + throw new Error(`Unable to create PoiBlock, chainBlockHash was not valid. Received: "${chainBlockHash}"`); } - const poiBlock = new PoiBlock( - id, - _chainBlockHash, - _poiBlockHash, - parentHash, - operationHashRoot, - projectId, - ); + const poiBlock = new PoiBlock(id, _chainBlockHash, _poiBlockHash, parentHash, operationHashRoot, projectId); return poiBlock; } } diff --git a/packages/node-core/src/indexer/StoreOperations.spec.ts b/packages/node-core/src/indexer/StoreOperations.spec.ts index 98da59136f..56f218e07c 100644 --- a/packages/node-core/src/indexer/StoreOperations.spec.ts +++ b/packages/node-core/src/indexer/StoreOperations.spec.ts @@ -33,9 +33,9 @@ const testOperations = [ entityType: 'StarterEntity', data: { id: '0x2494bd5d089cf370c366351e851755ee42e8477df1c17ea1d9c2ae94e4f77ea8', - field3: null, + field3: undefined, field1: 41914, - field2: null, + field2: undefined, field4: new Date('2020-05-29T13:28:36.000Z'), field5: true, createdAt: new Date('2021-08-18T02:36:06.549Z'), @@ -47,9 +47,9 @@ const testOperations = [ entityType: 'StarterEntity', data: { id: '0x2494bd5d089cf370c366351e851755ee42e8477df1c17ea1d9c2ae94e4f77ea8', - field3: null, + field3: undefined, field1: 41914, - field2: null, + field2: undefined, field4: new Date('2020-05-29T13:28:36.000Z'), field5: true, createdAt: new Date('2021-08-18T02:36:06.549Z'), @@ -61,9 +61,9 @@ const testOperations = [ entityType: 'StarterEntity', data: { id: '0x2494bd5d089cf370c366351e851755ee42e8477df1c17ea1d9c2ae94e4f77ea8', - field3: null, + field3: undefined, field1: 41914, - field2: null, + field2: undefined, field4: new Date('2020-05-29T13:28:36.000Z'), field5: true, field6: {meat: 0, fruit: {apple: 'Apple'}}, @@ -85,9 +85,9 @@ const falseOperation = { entityType: 'StarterEntity', data: { id: '0x2494bd5d089cf370c366351e851755ee42e8477df1c17ea1d9c2ae94e4f77ea8', - field3: null, + field3: undefined, field1: 41914, - field2: null, + field2: undefined, field4: new Date('2020-05-29T13:28:36.000Z'), field5: true, field6: {meat: 0, fruit: {apple: 'Apple'}}, diff --git a/packages/node-core/src/indexer/StoreOperations.ts b/packages/node-core/src/indexer/StoreOperations.ts index bcacd2a633..21b177133d 100644 --- a/packages/node-core/src/indexer/StoreOperations.ts +++ b/packages/node-core/src/indexer/StoreOperations.ts @@ -27,17 +27,20 @@ export class StoreOperations { } } else { const operationModel = this.models.find(({name}) => name === operation.entityType); + if (!operationModel) { + throw new Error(`Unable to find model with name ${operation.entityType}`); + } for (const field of operationModel.fields) { const fieldValue = (operation.data as Entity & Record)[field.name]; dataBufferArray.push(Buffer.from(field.name)); if (fieldValue !== undefined && fieldValue !== null) { - if (field.isEnum) { - //if it is a enum, process it as string - dataBufferArray.push(getTypeByScalarName('String').hashCode(fieldValue)); - } else { - dataBufferArray.push(getTypeByScalarName(field.type).hashCode(fieldValue)); + const type = field.isEnum ? getTypeByScalarName('String') : getTypeByScalarName(field.type); + if (!type) { + throw new Error('Unable to get type by scalar name'); } + + dataBufferArray.push(type.hashCode(fieldValue)); } } } @@ -62,7 +65,7 @@ export class StoreOperations { this.merkleTools.makeTree(); } - getOperationMerkleRoot(): Uint8Array { + getOperationMerkleRoot(): Uint8Array | null { if (this.merkleTools.getTreeReadyState()) { return this.merkleTools.getMerkleRoot(); } else { diff --git a/packages/node-core/src/indexer/benchmark.service.ts b/packages/node-core/src/indexer/benchmark.service.ts index 85627820f3..18ce4131fc 100644 --- a/packages/node-core/src/indexer/benchmark.service.ts +++ b/packages/node-core/src/indexer/benchmark.service.ts @@ -17,15 +17,15 @@ dayjs.extend(duration); @Injectable() export class BenchmarkService { - private currentProcessingHeight: number; - private currentProcessingTimestamp: number; - private targetHeight: number; - private lastRegisteredHeight: number; - private lastRegisteredTimestamp: number; - private blockPerSecond: number; + private currentProcessingHeight?: number; + private currentProcessingTimestamp?: number; + private targetHeight?: number; + private lastRegisteredHeight?: number; + private lastRegisteredTimestamp?: number; + private blockPerSecond?: number; - private currentProcessedBlockAmount: number; - private lastProcessedBlockAmount: number; + private currentProcessedBlockAmount?: number; + private lastProcessedBlockAmount?: number; constructor(private nodeConfig: NodeConfig) {} @@ -40,37 +40,41 @@ export class BenchmarkService { const timeDiff = this.currentProcessingTimestamp - this.lastRegisteredTimestamp; this.blockPerSecond = heightDiff === 0 || timeDiff === 0 ? 0 : heightDiff / (timeDiff / 1000); - const blockDuration = dayjs.duration( - (this.targetHeight - this.currentProcessingHeight) / this.blockPerSecond, - 'seconds' - ); - const hoursMinsStr = blockDuration.format('HH [hours] mm [mins]'); - const days = Math.floor(blockDuration.asDays()); - const durationStr = `${days} days ${hoursMinsStr}`; + if (!this.targetHeight) { + logger.debug('Target height is not defined, not logging benchmark'); + } else { + const blockDuration = dayjs.duration( + (this.targetHeight - this.currentProcessingHeight) / this.blockPerSecond, + 'seconds' + ); + const hoursMinsStr = blockDuration.format('HH [hours] mm [mins]'); + const days = Math.floor(blockDuration.asDays()); + const durationStr = `${days} days ${hoursMinsStr}`; + + if (this.nodeConfig.profiler && this.currentProcessedBlockAmount && this.lastProcessedBlockAmount) { + logger.info( + `Processed ${ + this.currentProcessedBlockAmount - this.lastProcessedBlockAmount + } blocks in the last ${SAMPLING_TIME_VARIANCE}secs ` + ); + } - if (this.nodeConfig.profiler) { logger.info( - `Processed ${ - this.currentProcessedBlockAmount - this.lastProcessedBlockAmount - } blocks in the last ${SAMPLING_TIME_VARIANCE}secs ` + this.targetHeight === this.lastRegisteredHeight && this.blockPerSecond === 0 + ? 'Fully synced, waiting for new blocks' + : `${this.blockPerSecond.toFixed( + 2 + )} blocks/s. Target height: ${this.targetHeight.toLocaleString()}. Current height: ${this.currentProcessingHeight.toLocaleString()}. Estimated time remaining: ${ + this.blockPerSecond === 0 ? 'unknown' : durationStr + }` ); } - - logger.info( - this.targetHeight === this.lastRegisteredHeight && this.blockPerSecond === 0 - ? 'Fully synced, waiting for new blocks' - : `${this.blockPerSecond.toFixed( - 2 - )} blocks/s. Target height: ${this.targetHeight.toLocaleString()}. Current height: ${this.currentProcessingHeight.toLocaleString()}. Estimated time remaining: ${ - this.blockPerSecond === 0 ? 'unknown' : durationStr - }` - ); } this.lastRegisteredHeight = this.currentProcessingHeight; this.lastRegisteredTimestamp = this.currentProcessingTimestamp; this.lastProcessedBlockAmount = this.currentProcessedBlockAmount; } - } catch (e) { + } catch (e: any) { logger.warn(e, 'Failed to measure benchmark'); } } diff --git a/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts b/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts index 92ab8fad40..198a3029b8 100644 --- a/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts +++ b/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts @@ -20,6 +20,7 @@ import {NodeConfig} from '../../configure'; import {IndexerEvent} from '../../events'; import {getLogger} from '../../logger'; import {IQueue} from '../../utils'; +import {CachePoiModel} from '../storeCache/cachePoi'; const logger = getLogger('BaseBlockDispatcherService'); @@ -50,11 +51,11 @@ function isNullMerkelRoot(operationHash: Uint8Array): boolean { } export abstract class BaseBlockDispatcher implements IBlockDispatcher { - protected _latestBufferedHeight: number; - protected _processedBlockCount: number; - protected latestProcessedHeight: number; - protected currentProcessingHeight: number; - protected onDynamicDsCreated: (height: number) => Promise; + protected _latestBufferedHeight = 0; + protected _processedBlockCount = 0; + protected latestProcessedHeight = 0; + protected currentProcessingHeight = 0; + private _onDynamicDsCreated?: (height: number) => Promise; constructor( protected nodeConfig: NodeConfig, @@ -72,8 +73,9 @@ export abstract class BaseBlockDispatcher implements IBloc abstract enqueueBlocks(heights: number[], latestBufferHeight?: number): Promise; async init(onDynamicDsCreated: (height: number) => Promise): Promise { - this.onDynamicDsCreated = onDynamicDsCreated; - this.setProcessedBlockCount(await this.storeCacheService.metadata.find('processedBlockCount', 0)); + this._onDynamicDsCreated = onDynamicDsCreated; + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.setProcessedBlockCount((await this.storeCacheService.metadata.find('processedBlockCount', 0))!); } get queueSize(): number { @@ -81,7 +83,7 @@ export abstract class BaseBlockDispatcher implements IBloc } get freeSize(): number { - return this.queue.freeSpace; + return this.queue.freeSpace!; } get smartBatchSize(): number { @@ -92,6 +94,13 @@ export abstract class BaseBlockDispatcher implements IBloc return this.smartBatchService.minimumHeapRequired; } + protected get onDynamicDsCreated(): (height: number) => Promise { + if (!this._onDynamicDsCreated) { + throw new Error('BaseBlockDispatcher has not been initialized'); + } + return this._onDynamicDsCreated; + } + get latestBufferedHeight(): number { return this._latestBufferedHeight; } @@ -132,7 +141,6 @@ export abstract class BaseBlockDispatcher implements IBloc // Is called directly before a block is processed protected preProcessBlock(height: number): void { - this.storeService.setOperationStack(); this.storeService.setBlockHeight(height); this.currentProcessingHeight = height; @@ -201,7 +209,7 @@ export abstract class BaseBlockDispatcher implements IBloc await this.poiService.getLatestPoiBlockHash(), this.project.id ); - this.storeCacheService.poi.set(poiBlock); + this.poi.set(poiBlock); this.poiService.setLatestPoiBlockHash(poiBlock.hash); this.storeCacheService.metadata.set('lastPoiHeight', height); } @@ -233,4 +241,12 @@ export abstract class BaseBlockDispatcher implements IBloc meta.setIncrement('processedBlockCount'); } } + + private get poi(): CachePoiModel { + const poi = this.storeCacheService.poi; + if (!poi) { + throw new Error('MMR service expected POI but it was not found'); + } + return poi; + } } diff --git a/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts b/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts index d55dc9c440..220940f2a9 100644 --- a/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts +++ b/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts @@ -89,7 +89,7 @@ export abstract class BlockDispatcher this.queue.putMany(heights); - this.latestBufferedHeight = latestBufferHeight ?? last(heights); + this.latestBufferedHeight = latestBufferHeight ?? last(heights) ?? this.latestBufferedHeight; void this.fetchBlocksFromQueue(); } @@ -111,7 +111,7 @@ export abstract class BlockDispatcher try { while (!this.isShutdown) { - const blockNums = this.queue.takeMany(Math.min(this.nodeConfig.batchSize, this.processQueue.freeSpace)); + const blockNums = this.queue.takeMany(Math.min(this.nodeConfig.batchSize, this.processQueue.freeSpace!)); // Used to compare before and after as a way to check if queue was flushed const bufferedHeight = this._latestBufferedHeight; @@ -138,7 +138,7 @@ export abstract class BlockDispatcher // If specVersion not changed, a known overallSpecVer will be pass in // Otherwise use api to fetch runtimes - if (memoryLock.isLocked) { + if (memoryLock.isLocked()) { await memoryLock.waitForUnlock(); } @@ -148,7 +148,8 @@ export abstract class BlockDispatcher // Check if the queues have been flushed between queue.takeMany and fetchBlocksBatches resolving // Peeking the queue is because the latestBufferedHeight could have regrown since fetching block - if (bufferedHeight > this._latestBufferedHeight || this.queue.peek() < Math.min(...blockNums)) { + const peeked = this.queue.peek(); + if (bufferedHeight > this._latestBufferedHeight || (peeked && peeked < Math.min(...blockNums))) { logger.info(`Queue was reset for new DS, discarding fetched blocks`); continue; } @@ -163,8 +164,8 @@ export abstract class BlockDispatcher await this.postProcessBlock(height, processBlockResponse); //set block to null for garbage collection - block = null; - } catch (e) { + (block as any) = null; + } catch (e: any) { // TODO discard any cache changes from this block height if (this.isShutdown) { return; @@ -186,7 +187,7 @@ export abstract class BlockDispatcher value: this.processQueue.size, }); } - } catch (e) { + } catch (e: any) { logger.error(e, 'Failed to fetch blocks from queue'); if (!this.isShutdown) { process.exit(1); diff --git a/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.ts b/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.ts index 4afc3ab8f4..58f910b952 100644 --- a/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.ts +++ b/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.ts @@ -28,14 +28,17 @@ type Worker = { terminate: () => Promise; }; +function initAutoQueue(workers: number | undefined, batchSize: number): AutoQueue { + assert(workers && workers > 0, 'Number of workers must be greater than 0'); + return new AutoQueue(workers * batchSize * 2); +} + export abstract class WorkerBlockDispatcher extends BaseBlockDispatcher, DS> implements OnApplicationShutdown { - protected workers: W[]; + protected workers: W[] = []; private numWorkers: number; - - private taskCounter = 0; private isShutdown = false; protected abstract fetchBlock(worker: W, height: number): Promise; @@ -57,14 +60,15 @@ export abstract class WorkerBlockDispatcher eventEmitter, project, projectService, - new AutoQueue(nodeConfig.workers * nodeConfig.batchSize * 2), + initAutoQueue(nodeConfig.workers, nodeConfig.batchSize), smartBatchService, storeService, storeCacheService, poiService, dynamicDsService ); - this.numWorkers = nodeConfig.workers; + // initAutoQueue will assert that workers is set. unfortunately we cant do anything before the super call + this.numWorkers = nodeConfig.workers!; } async init(onDynamicDsCreated: (height: number) => Promise): Promise { @@ -118,7 +122,7 @@ export abstract class WorkerBlockDispatcher heights.map(async (height) => this.enqueueBlock(height, await this.getNextWorkerIndex())); } - this.latestBufferedHeight = latestBufferHeight ?? last(heights); + this.latestBufferedHeight = latestBufferHeight ?? last(heights) ?? this.latestBufferedHeight; } private async enqueueBlock(height: number, workerIdx: number): Promise { @@ -151,7 +155,7 @@ export abstract class WorkerBlockDispatcher blockHash, reindexBlockHeight, }); - } catch (e) { + } catch (e: any) { // TODO discard any cache changes from this block height logger.error( e, diff --git a/packages/node-core/src/indexer/dictionary.service.test.ts b/packages/node-core/src/indexer/dictionary.service.test.ts index 772c9837e5..eea1062ef2 100644 --- a/packages/node-core/src/indexer/dictionary.service.test.ts +++ b/packages/node-core/src/indexer/dictionary.service.test.ts @@ -119,7 +119,7 @@ describe('DictionaryService', () => { const endBlock = 10001; const dic = await dictionaryService.getDictionary(startBlock, endBlock, batchSize, HAPPY_PATH_CONDITIONS); - expect(dic.batchBlocks.length).toBeGreaterThan(1); + expect(dic?.batchBlocks.length).toBeGreaterThan(1); }, 500000); it('return undefined when dictionary api failed', async () => { @@ -145,7 +145,7 @@ describe('DictionaryService', () => { const startBlock = 400000000; const endBlock = 400010000; const dic = await dictionaryService.getDictionary(startBlock, endBlock, batchSize, HAPPY_PATH_CONDITIONS); - expect(dic._metadata).toBeDefined(); + expect(dic?._metadata).toBeDefined(); }, 500000); it('test query the correct range', async () => { @@ -164,7 +164,7 @@ describe('DictionaryService', () => { ], }, ]); - expect(dic.batchBlocks).toEqual(range(startBlock, startBlock + batchSize)); + expect(dic?.batchBlocks).toEqual(range(startBlock, startBlock + batchSize)); }, 500000); it('use minimum value of event/extrinsic returned block as batch end block', async () => { @@ -213,7 +213,7 @@ describe('DictionaryService', () => { ], }, ]); - expect(dic.batchBlocks[dic.batchBlocks.length - 1]).toBe(333524); + expect(dic?.batchBlocks[dic.batchBlocks.length - 1]).toBe(333524); }, 500000); it('able to build queryEntryMap', async () => { diff --git a/packages/node-core/src/indexer/dictionary.service.ts b/packages/node-core/src/indexer/dictionary.service.ts index d4b5984647..d44eb0d818 100644 --- a/packages/node-core/src/indexer/dictionary.service.ts +++ b/packages/node-core/src/indexer/dictionary.service.ts @@ -141,7 +141,8 @@ function buildDictQueryFragment( }; if (useDistinct) { - node.args.distinct = ['BLOCK_HEIGHT']; + // Non null assertion here because we define the object explicitly + node.args!.distinct = ['BLOCK_HEIGHT']; } return [gqlVars, node]; @@ -149,12 +150,12 @@ function buildDictQueryFragment( @Injectable() export class DictionaryService implements OnApplicationShutdown { - protected client: ApolloClient; + private _client?: ApolloClient; private isShutdown = false; - private mappedDictionaryQueryEntries: Map; + private mappedDictionaryQueryEntries: Map = new Map(); private useDistinct = true; private useStartHeight = true; - protected _startHeight: number; + protected _startHeight?: number; constructor( readonly dictionaryEndpoint: string, @@ -173,7 +174,7 @@ export class DictionaryService implements OnApplicationShutdown { chainId: this.chainId, httpOptions: {fetch}, }); - } catch (e) { + } catch (e: any) { logger.error(e.message); process.exit(1); } @@ -181,7 +182,7 @@ export class DictionaryService implements OnApplicationShutdown { link = new HttpLink({uri: this.dictionaryEndpoint, fetch}); } - this.client = new ApolloClient({ + this._client = new ApolloClient({ cache: new InMemoryCache({resultCaching: true}), link, defaultOptions: { @@ -195,7 +196,7 @@ export class DictionaryService implements OnApplicationShutdown { }); } - setDictionaryStartHeight(start: number | undefined): void { + private setDictionaryStartHeight(start: number | undefined): void { // Since not all dictionary has adopt start height, if it is not set, we just consider it is 1. if (this._startHeight !== undefined) { return; @@ -204,8 +205,19 @@ export class DictionaryService implements OnApplicationShutdown { } get startHeight(): number { + if (!this._startHeight) { + throw new Error('Dictionary start height is not set'); + } return this._startHeight; } + + protected get client(): ApolloClient { + if (!this._client) { + throw new Error('Dictionary service has not been initialized'); + } + return this._client; + } + onApplicationShutdown(): void { this.isShutdown = true; } @@ -224,7 +236,7 @@ export class DictionaryService implements OnApplicationShutdown { queryEndBlock: number, batchSize: number, conditions: DictionaryQueryEntry[] - ): Promise { + ): Promise { const {query, variables} = this.dictionaryQuery(startBlock, queryEndBlock, batchSize, conditions); try { const resp = await timeout( @@ -253,7 +265,7 @@ export class DictionaryService implements OnApplicationShutdown { _metadata, batchBlocks, }; - } catch (err) { + } catch (err: any) { // Check if the error is about distinct argument and disable distinct if so if (JSON.stringify(err).includes(distinctErrorEscaped)) { this.useDistinct = false; @@ -304,33 +316,28 @@ export class DictionaryService implements OnApplicationShutdown { dataSources: Array, buildDictionaryQueryEntries: (startBlock: number) => DictionaryQueryEntry[] ): void { - const mappedDictionaryQueryEntries = new Map(); - - for (const ds of dataSources.sort((a, b) => a.startBlock - b.startBlock)) { - mappedDictionaryQueryEntries.set(ds.startBlock, buildDictionaryQueryEntries(ds.startBlock)); + const dsWithStartBlock = (dataSources.filter((ds) => !!ds.startBlock) as (DS & {startBlock: number})[]).sort( + (a, b) => a.startBlock - b.startBlock + ); + for (const ds of dsWithStartBlock) { + this.mappedDictionaryQueryEntries.set(ds.startBlock, buildDictionaryQueryEntries(ds.startBlock)); } - this.mappedDictionaryQueryEntries = mappedDictionaryQueryEntries; } getDictionaryQueryEntries(queryEndBlock: number): DictionaryQueryEntry[] { - let dictionaryQueryEntries: DictionaryQueryEntry[]; - this.mappedDictionaryQueryEntries.forEach((value, key) => { + for (const [key, value] of this.mappedDictionaryQueryEntries) { if (queryEndBlock >= key) { - dictionaryQueryEntries = value; + return value; } - }); - - if (dictionaryQueryEntries === undefined) { - return []; } - return dictionaryQueryEntries; + return []; } async scopedDictionaryEntries( startBlockHeight: number, queryEndBlock: number, scaledBatchSize: number - ): Promise { + ): Promise { return this.getDictionary( startBlockHeight, queryEndBlock, @@ -349,7 +356,7 @@ export class DictionaryService implements OnApplicationShutdown { return buildQuery([], nodes); } - async getMetadata(): Promise { + async getMetadata(): Promise { const {query} = this.metadataQuery(); try { const resp = await timeout( @@ -359,8 +366,11 @@ export class DictionaryService implements OnApplicationShutdown { this.nodeConfig.dictionaryTimeout ); const _metadata = resp.data._metadata; + + this.setDictionaryStartHeight(_metadata.startHeight); + return {_metadata}; - } catch (err) { + } catch (err: any) { if (JSON.stringify(err).includes(startHeightEscaped)) { this.useStartHeight = false; logger.warn(`Dictionary doesn't support validate start height.`); diff --git a/packages/node-core/src/indexer/dynamic-ds.service.ts b/packages/node-core/src/indexer/dynamic-ds.service.ts index 08d04e21e6..ba86e7e758 100644 --- a/packages/node-core/src/indexer/dynamic-ds.service.ts +++ b/packages/node-core/src/indexer/dynamic-ds.service.ts @@ -22,13 +22,20 @@ export interface IDynamicDsService { } export abstract class DynamicDsService implements IDynamicDsService { - private metadata: CacheMetadataModel; - private tempDsRecords: Record; + private _metadata?: CacheMetadataModel; + private tempDsRecords: Record = {}; - private _datasources: DS[]; + private _datasources?: DS[]; init(metadata: CacheMetadataModel): void { - this.metadata = metadata; + this._metadata = metadata; + } + + private get metadata(): CacheMetadataModel { + if (!this._metadata) { + throw new Error('DynamicDsService has not been initialized'); + } + return this._metadata; } async resetDynamicDatasource(targetHeight: number): Promise { @@ -52,7 +59,7 @@ export abstract class DynamicDsService implements IDynamicDsService { this._datasources.push(ds); return ds; - } catch (e) { + } catch (e: any) { logger.error(e, 'Failed to create dynamic ds'); process.exit(1); } @@ -81,7 +88,7 @@ export abstract class DynamicDsService implements IDynamicDsService { logger.info(`Loaded ${dataSources.length} dynamic datasources`); return dataSources; - } catch (e) { + } catch (e: any) { logger.error(`Unable to get dynamic datasources:\n${e.message}`); process.exit(1); } diff --git a/packages/node-core/src/indexer/mmr.service.ts b/packages/node-core/src/indexer/mmr.service.ts index a812359cac..1debe48ae3 100644 --- a/packages/node-core/src/indexer/mmr.service.ts +++ b/packages/node-core/src/indexer/mmr.service.ts @@ -13,6 +13,7 @@ import {getLogger} from '../logger'; import {delay} from '../utils'; import {ProofOfIndex} from './entities'; import {StoreCacheService} from './storeCache'; +import {CachePoiModel} from './storeCache/cachePoi'; const logger = getLogger('mmr'); @@ -22,10 +23,10 @@ const keccak256Hash = (...nodeValues: Uint8Array[]) => Buffer.from(keccak256(Buf export class MmrService implements OnApplicationShutdown { private isShutdown = false; private isSyncing = false; - private fileBasedMmr: MMR; + private _fileBasedMmr?: MMR; // This is the next block height that suppose to calculate its mmr value - private nextMmrBlockHeight: number; - private blockOffset: number; + private _nextMmrBlockHeight?: number; + private _blockOffset?: number; constructor(private nodeConfig: NodeConfig, private storeCacheService: StoreCacheService) {} @@ -33,19 +34,48 @@ export class MmrService implements OnApplicationShutdown { this.isShutdown = true; } + private get poi(): CachePoiModel { + const poi = this.storeCacheService.poi; + if (!poi) { + throw new Error('MMR service expected POI but it was not found'); + } + return poi; + } + + private get fileBasedMmr(): MMR { + if (!this._fileBasedMmr) { + throw new Error('MMR Service sync has not been called'); + } + return this._fileBasedMmr; + } + + private get nextMmrBlockHeight(): number { + if (!this._nextMmrBlockHeight) { + throw new Error('MMR Service sync has not been called'); + } + return this._nextMmrBlockHeight; + } + + private get blockOffset(): number { + if (!this._blockOffset) { + throw new Error('MMR Service sync has not been called'); + } + return this._blockOffset; + } + async syncFileBaseFromPoi(blockOffset: number): Promise { if (this.isSyncing) return; this.isSyncing = true; - this.fileBasedMmr = await this.ensureFileBasedMmr(this.nodeConfig.mmrPath); - this.blockOffset = blockOffset; + this._fileBasedMmr = await this.ensureFileBasedMmr(this.nodeConfig.mmrPath); + this._blockOffset = blockOffset; // The file based database current leaf length const fileBasedMmrLeafLength = await this.fileBasedMmr.getLeafLength(); // However, when initialization we pick the previous block for file db and poi mmr validation // if mmr leaf length 0 ensure the next block height to be processed min is 1. - this.nextMmrBlockHeight = fileBasedMmrLeafLength + blockOffset + 1; + this._nextMmrBlockHeight = fileBasedMmrLeafLength + blockOffset + 1; // The latest poi record in database with mmr value - const latestPoiWithMmr = await this.storeCacheService.poi.getLatestPoiWithMmr(); + const latestPoiWithMmr = await this.poi.getLatestPoiWithMmr(); if (latestPoiWithMmr) { // The latestPoiWithMmr its mmr value in filebase db const latestPoiFilebaseMmrValue = await this.fileBasedMmr.getRoot(latestPoiWithMmr.id - blockOffset - 1); @@ -55,18 +85,18 @@ export class MmrService implements OnApplicationShutdown { // but latestPoiWithMmr still valid, mmr should delete advanced mmr if (this.nextMmrBlockHeight > latestPoiWithMmr.id + 1) { await this.deleteMmrNode(latestPoiWithMmr.id + 1, blockOffset); - this.nextMmrBlockHeight = latestPoiWithMmr.id + 1; + this._nextMmrBlockHeight = latestPoiWithMmr.id + 1; } } logger.info(`file based database MMR start with next block height at ${this.nextMmrBlockHeight}`); while (!this.isShutdown) { - const poiBlocks = await this.storeCacheService.poi.getPoiBlocksByRange(this.nextMmrBlockHeight); + const poiBlocks = await this.poi.getPoiBlocksByRange(this.nextMmrBlockHeight); if (poiBlocks.length !== 0) { for (const block of poiBlocks) { if (this.nextMmrBlockHeight < block.id) { for (let i = this.nextMmrBlockHeight; i < block.id; i++) { await this.fileBasedMmr.append(DEFAULT_LEAF); - this.nextMmrBlockHeight = i + 1; + this._nextMmrBlockHeight = i + 1; } } await this.appendMmrNode(block); @@ -81,7 +111,7 @@ export class MmrService implements OnApplicationShutdown { if (this.nextMmrBlockHeight > Number(lastPoiHeight) && this.nextMmrBlockHeight <= Number(lastProcessedHeight)) { for (let i = this.nextMmrBlockHeight; i <= Number(lastProcessedHeight); i++) { await this.fileBasedMmr.append(DEFAULT_LEAF); - this.nextMmrBlockHeight = i + 1; + this._nextMmrBlockHeight = i + 1; } } await delay(MMR_AWAIT_TIME); @@ -100,11 +130,13 @@ export class MmrService implements OnApplicationShutdown { await this.fileBasedMmr.append(newLeaf, estLeafIndexByBlockHeight); const mmrRoot = await this.fileBasedMmr.getRoot(estLeafIndexByBlockHeight); this.updatePoiMmrRoot(poiBlock, mmrRoot); - this.nextMmrBlockHeight = poiBlock.id + 1; + this._nextMmrBlockHeight = poiBlock.id + 1; } private validatePoiMmr(poiWithMmr: ProofOfIndex, mmrValue: Uint8Array): void { - if (!u8aEq(poiWithMmr.mmrRoot, mmrValue)) { + if (!poiWithMmr.mmrRoot) { + throw new Error(`Poi block height ${poiWithMmr.id}, Poi mmr has not been set`); + } else if (!u8aEq(poiWithMmr.mmrRoot, mmrValue)) { throw new Error( `Poi block height ${poiWithMmr.id}, Poi mmr ${u8aToHex( poiWithMmr.mmrRoot @@ -120,7 +152,7 @@ export class MmrService implements OnApplicationShutdown { private updatePoiMmrRoot(poiBlock: ProofOfIndex, mmrValue: Uint8Array): void { if (!poiBlock.mmrRoot) { poiBlock.mmrRoot = mmrValue; - this.storeCacheService.poi.set(poiBlock); + this.poi.set(poiBlock); } else { this.validatePoiMmr(poiBlock, mmrValue); } @@ -185,7 +217,7 @@ export class MmrService implements OnApplicationShutdown { } async deleteMmrNode(blockHeight: number, blockOffset: number): Promise { - this.fileBasedMmr = await this.ensureFileBasedMmr(this.nodeConfig.mmrPath); + this._fileBasedMmr = await this.ensureFileBasedMmr(this.nodeConfig.mmrPath); const leafIndex = blockHeight - blockOffset - 1; if (leafIndex < 0) { throw new Error(`Target block height must greater equal to ${blockOffset + 1} `); diff --git a/packages/node-core/src/indexer/poi.service.ts b/packages/node-core/src/indexer/poi.service.ts index f145699ee0..7ffe3f40e3 100644 --- a/packages/node-core/src/indexer/poi.service.ts +++ b/packages/node-core/src/indexer/poi.service.ts @@ -12,8 +12,8 @@ const DEFAULT_PARENT_HASH = hexToU8a('0x00'); @Injectable() export class PoiService implements OnApplicationShutdown { private isShutdown = false; - private latestPoiBlockHash: Uint8Array; - private poiRepo: CachePoiModel; + private latestPoiBlockHash?: Uint8Array | null; + private poiRepo?: CachePoiModel; constructor(private storeCache: StoreCacheService) {} @@ -22,12 +22,12 @@ export class PoiService implements OnApplicationShutdown { } async init(): Promise { - this.poiRepo = this.storeCache.poi; + this.poiRepo = this.storeCache.poi ?? undefined; this.latestPoiBlockHash = await this.getLatestPoiBlockHash(); } private async fetchPoiBlockHashFromDb(): Promise { - const lastPoi = await this.poiRepo.getLatestPoi(); + const lastPoi = await this.poiRepo?.getLatestPoi(); if (lastPoi === null || lastPoi === undefined) { return null; } else if (lastPoi !== null && lastPoi.hash) { @@ -37,7 +37,7 @@ export class PoiService implements OnApplicationShutdown { } } - async getLatestPoiBlockHash(): Promise { + async getLatestPoiBlockHash(): Promise { if (!this.latestPoiBlockHash || !isMainThread) { const poiBlockHash = await this.fetchPoiBlockHashFromDb(); if (poiBlockHash === null || poiBlockHash === undefined) { diff --git a/packages/node-core/src/indexer/sandbox.spec.ts b/packages/node-core/src/indexer/sandbox.spec.ts index c99e2ba484..b72583168d 100644 --- a/packages/node-core/src/indexer/sandbox.spec.ts +++ b/packages/node-core/src/indexer/sandbox.spec.ts @@ -21,11 +21,11 @@ describe('sandbox for subql-node', () => { store: undefined, root, entry, - script: fs.readFileSync(path.join(root, entry)).toString(), + // script: fs.readFileSync(path.join(root, entry)).toString(), }, new NodeConfig({subquery: ' ', subqueryName: ' '}) ); - let sandboxFuncionEndAt: Date; + let sandboxFuncionEndAt: Date | undefined; vm.on('console.log', (line) => { if (line === 'OK') { sandboxFuncionEndAt = new Date(); @@ -33,7 +33,7 @@ describe('sandbox for subql-node', () => { }); await vm.securedExec('testSandbox', []); const secureExecEndAt = new Date(); - expect(sandboxFuncionEndAt).toBeTruthy(); - expect(secureExecEndAt.getTime()).toBeGreaterThanOrEqual(sandboxFuncionEndAt.getTime()); + expect(sandboxFuncionEndAt).toBeDefined(); + expect(secureExecEndAt.getTime()).toBeGreaterThanOrEqual(sandboxFuncionEndAt?.getTime() ?? 0); }); }); diff --git a/packages/node-core/src/indexer/sandbox.ts b/packages/node-core/src/indexer/sandbox.ts index 15d03a2c50..a41045c1a4 100644 --- a/packages/node-core/src/indexer/sandbox.ts +++ b/packages/node-core/src/indexer/sandbox.ts @@ -14,7 +14,6 @@ import {timeout} from '../utils'; export interface SandboxOption { store?: Store; - script: string; root: string; entry: string; } @@ -66,15 +65,17 @@ export class Sandbox extends NodeVM { return timeout(this.run(this.script), duration); } - protected async convertStack(stackTrace: string): Promise { + protected async convertStack(stackTrace: string | undefined): Promise { + if (!stackTrace) return undefined; if (!this.sourceMap) { logger.warn('Unable to find a source map. Rebuild your project with latest @subql/cli to generate a source map.'); logger.warn('Logging unresolved stack trace.'); return stackTrace; } - const entryFile = last(this.entry.split('/')); - const regex = new RegExp(`${entryFile.split('.')[0]}.${entryFile.split('.')[1]}:([0-9]+):([0-9]+)`, 'gi'); + const entryFile = last(this.entry.split('/')) ?? ''; + const entryParts = entryFile.split('.'); + const regex = new RegExp(`${entryParts[0]}.${entryParts[1]}:([0-9]+):([0-9]+)`, 'gi'); const matches = [...stackTrace.matchAll(regex)]; for (const match of matches) { @@ -88,7 +89,7 @@ export class Sandbox extends NodeVM { return stackTrace; } - decodeSourceMap(sourceMapPath: string) { + decodeSourceMap(sourceMapPath: string): any { const source = readFileSync(sourceMapPath).toString(); const sourceMapBase64 = source.split(`//# sourceMappingURL=data:application/json;charset=utf-8;base64,`)[1]; if (!sourceMapBase64) { @@ -134,7 +135,7 @@ export class IndexerSandbox extends Sandbox { this.setGlobal('funcName', funcName); try { await this.runTimeout(this.config.timeout); - } catch (e) { + } catch (e: any) { const newStack = await this.convertStack((e as Error).stack); e.stack = newStack; e.handler = funcName; diff --git a/packages/node-core/src/indexer/smartBatch.service.ts b/packages/node-core/src/indexer/smartBatch.service.ts index af6eac4312..48c5b686bb 100644 --- a/packages/node-core/src/indexer/smartBatch.service.ts +++ b/packages/node-core/src/indexer/smartBatch.service.ts @@ -9,14 +9,9 @@ import {BlockSizeBuffer} from '../utils/blockSizeBuffer'; @Injectable() export class SmartBatchService { private blockSizeBuffer: BlockSizeBuffer; - private memoryLimit: number; - constructor(private maxBatchSize: number, private minHeapRequired?: number) { + constructor(private maxBatchSize: number, private minHeapRequired: number = formatMBtoBytes(128)) { this.blockSizeBuffer = new BlockSizeBuffer(maxBatchSize); - this.memoryLimit = process.memoryUsage().heapTotal; - if (!minHeapRequired) { - this.minHeapRequired = formatMBtoBytes(128); - } } get minimumHeapRequired(): number { @@ -24,8 +19,10 @@ export class SmartBatchService { } addToSizeBuffer(blocks: any[]): void { - if (this.blockSizeBuffer.capacity && blocks.length > this.blockSizeBuffer.freeSpace) { - this.blockSizeBuffer.takeMany(blocks.length - this.blockSizeBuffer.freeSpace); + // Non-null assertion because we define a capacity + const freeSpace = this.blockSizeBuffer.freeSpace!; + if (this.blockSizeBuffer.capacity && blocks.length > freeSpace) { + this.blockSizeBuffer.takeMany(blocks.length - freeSpace); } blocks.forEach((block) => this.blockSizeBuffer.put(this.blockSize(block))); } @@ -39,7 +36,9 @@ export class SmartBatchService { while (stack.length > 1) { // Check for sentinel value - const {obj, prop} = stack.pop(); + const item = stack.pop(); + if (!item) continue; + const {obj, prop} = item; const type = typeof obj; if (type === 'string') { diff --git a/packages/node-core/src/indexer/store.service.spec.ts b/packages/node-core/src/indexer/store.service.spec.ts index f499bc4137..7e79de1f55 100644 --- a/packages/node-core/src/indexer/store.service.spec.ts +++ b/packages/node-core/src/indexer/store.service.spec.ts @@ -8,7 +8,7 @@ describe('Store Service', () => { let storeService: StoreService; it('addIdAndBlockRangeAttributes', () => { - storeService = new StoreService(null, null, null); + storeService = new StoreService(null as any, null as any, null as any, null as any); const attributes = { id: { type: DataTypes.STRING, diff --git a/packages/node-core/src/indexer/store.service.ts b/packages/node-core/src/indexer/store.service.ts index ad72df03cf..743c7935d4 100644 --- a/packages/node-core/src/indexer/store.service.ts +++ b/packages/node-core/src/indexer/store.service.ts @@ -60,7 +60,6 @@ import { import {generateIndexName, modelToTableName} from '../utils/sequelizeUtil'; import {MetadataFactory, MetadataRepo, PoiFactory, PoiRepo} from './entities'; import {CacheMetadataModel} from './storeCache'; -import {CachePoiModel} from './storeCache/cachePoi'; import {StoreCacheService} from './storeCache/storeCache.service'; import {StoreOperations} from './StoreOperations'; import {IProjectNetworkConfig, ISubqueryProject, OperationType} from './types'; @@ -83,60 +82,101 @@ interface NotifyTriggerPayload { eventManipulation: string; } +class NoInitError extends Error { + constructor() { + super('StoreService has not been initialized'); + } +} + @Injectable() export class StoreService { - private modelIndexedFields: IndexField[]; - private schema: string; - private modelsRelations: GraphQLModelsRelationsEnums; - private poiRepo: PoiRepo | undefined; - private metaDataRepo: MetadataRepo; - private operationStack: StoreOperations; - @Inject('ISubqueryProject') private subqueryProject: ISubqueryProject; - private blockHeight: number; - historical: boolean; - private dbType: SUPPORT_DB; - private useSubscription: boolean; + private poiRepo?: PoiRepo; private removedIndexes: RemovedIndexes = {}; + private _modelIndexedFields?: IndexField[]; + private _modelsRelations?: GraphQLModelsRelationsEnums; + private _metaDataRepo?: MetadataRepo; + private _historical?: boolean; + private _dbType?: SUPPORT_DB; + private _metadataModel?: CacheMetadataModel; + + // Should be updated each block + private _blockHeight?: number; + private operationStack?: StoreOperations; + + constructor( + private sequelize: Sequelize, + private config: NodeConfig, + readonly storeCache: StoreCacheService, + @Inject('ISubqueryProject') private subqueryProject: ISubqueryProject + ) {} + + private get modelIndexedFields(): IndexField[] { + assert(!!this._modelIndexedFields, new NoInitError()); + return this._modelIndexedFields; + } + + private get modelsRelations(): GraphQLModelsRelationsEnums { + assert(!!this._modelsRelations, new NoInitError()); + return this._modelsRelations; + } + + private get metaDataRepo(): MetadataRepo { + assert(!!this._metaDataRepo, new NoInitError()); + return this._metaDataRepo; + } + + private get blockHeight(): number { + assert(!!this._blockHeight, new Error('StoreService.setBlockHeight has not been called')); + return this._blockHeight; + } + + get historical(): boolean { + assert(!!this._historical, new NoInitError()); + return this._historical; + } - poiModel: CachePoiModel; - metadataModel: CacheMetadataModel; + private get dbType(): SUPPORT_DB { + assert(!!this._dbType, new NoInitError()); + return this._dbType; + } - constructor(private sequelize: Sequelize, private config: NodeConfig, readonly storeCache: StoreCacheService) {} + private get metadataModel(): CacheMetadataModel { + assert(!!this._metadataModel, new NoInitError()); + return this._metadataModel; + } async init(modelsRelations: GraphQLModelsRelationsEnums, schema: string): Promise { - this.schema = schema; - this.modelsRelations = modelsRelations; - this.historical = await this.getHistoricalStateEnabled(); + this._modelsRelations = modelsRelations; + this._historical = await this.getHistoricalStateEnabled(schema); logger.info(`Historical state is ${this.historical ? 'enabled' : 'disabled'}`); - this.dbType = await getDbType(this.sequelize); + this._dbType = await getDbType(this.sequelize); - this.useSubscription = this.config.subscription; - if (this.useSubscription && this.dbType === SUPPORT_DB.cockRoach) { - this.useSubscription = false; + let useSubscription = this.config.subscription; + if (useSubscription && this.dbType === SUPPORT_DB.cockRoach) { + useSubscription = false; logger.warn(`Subscription is not support with ${this.dbType}`); } if (this.historical && this.dbType === SUPPORT_DB.cockRoach) { - this.historical = false; + this._historical = false; logger.warn(`Historical feature is not support with ${this.dbType}`); } this.storeCache.init(this.historical, this.dbType === SUPPORT_DB.cockRoach); try { - await this.syncSchema(this.schema); - } catch (e) { + await this.syncSchema(schema, useSubscription); + } catch (e: any) { logger.error(e, `Having a problem when syncing schema`); process.exit(1); } try { - this.modelIndexedFields = await this.getAllIndexFields(this.schema); - } catch (e) { + this._modelIndexedFields = await this.getAllIndexFields(schema); + } catch (e: any) { logger.error(e, `Having a problem when get indexed fields`); process.exit(1); } this.storeCache.setRepos(this.metaDataRepo, this.poiRepo); - this.metadataModel = this.storeCache.metadata; - this.poiModel = this.storeCache.poi; + this._metadataModel = this.storeCache.metadata; this.metadataModel.set('historicalStateEnabled', this.historical); this.metadataModel.setIncrement('schemaMigrationCount'); @@ -168,7 +208,7 @@ export class StoreService { } // eslint-disable-next-line complexity - async syncSchema(schema: string): Promise { + async syncSchema(schema: string, useSubscription: boolean): Promise { const enumTypeMap = new Map(); if (this.historical) { const [results] = await this.sequelize.query(BTREE_GIST_EXTENSION_EXIST_QUERY); @@ -236,7 +276,7 @@ export class StoreService { } const extraQueries = []; // Function need to create ahead of triggers - if (this.useSubscription) { + if (useSubscription) { extraQueries.push(createSendNotificationTriggerFunction(schema)); } for (const model of this.modelsRelations.models) { @@ -278,7 +318,7 @@ export class StoreService { // see https://github.com/subquery/subql/issues/1542 } - if (this.useSubscription) { + if (useSubscription) { const triggerName = hashName(schema, 'notify_trigger', sequelizeModel.tableName); const notifyTriggers = await getTriggers(this.sequelize, triggerName); // Triggers not been found @@ -295,7 +335,7 @@ export class StoreService { } } // We have to drop the function after all triggers depend on it are removed - if (!this.useSubscription && this.dbType !== SUPPORT_DB.cockRoach) { + if (!useSubscription && this.dbType !== SUPPORT_DB.cockRoach) { extraQueries.push(dropNotifyFunction(schema)); } @@ -357,7 +397,7 @@ export class StoreService { this.poiRepo = PoiFactory(this.sequelize, schema); } - this.metaDataRepo = await MetadataFactory( + this._metaDataRepo = await MetadataFactory( this.sequelize, schema, this.config.multiChain, @@ -374,12 +414,12 @@ export class StoreService { // this.afterHandleCockroachIndex() } - async getHistoricalStateEnabled(): Promise { + async getHistoricalStateEnabled(schema: string): Promise { const {disableHistorical, multiChain} = this.config; try { const tableRes = await this.sequelize.query>( - `SELECT table_name FROM information_schema.tables where table_schema='${this.schema}'`, + `SELECT table_name FROM information_schema.tables where table_schema='${schema}'`, {type: QueryTypes.SELECT} ); @@ -396,7 +436,7 @@ export class StoreService { if (metadataTableNames.length === 1) { const res = await this.sequelize.query<{key: string; value: boolean | string}>( - `SELECT key, value FROM "${this.schema}"."${metadataTableNames[0]}" WHERE (key = 'historicalStateEnabled' OR key = 'genesisHash')`, + `SELECT key, value FROM "${schema}"."${metadataTableNames[0]}" WHERE (key = 'historicalStateEnabled' OR key = 'genesisHash')`, {type: QueryTypes.SELECT} ); @@ -434,6 +474,9 @@ export class StoreService { if (index.using === IndexType.GIN) { return; } + if (!index.fields) { + index.fields = []; + } index.fields.push('_block_range'); index.using = IndexType.GIST; // GIST does not support unique indexes @@ -454,7 +497,7 @@ export class StoreService { ): void { if (this.dbType === SUPPORT_DB.cockRoach) { indexes.forEach((index, i) => { - if (index.using === IndexType.HASH && !existedIndexes.includes(index.name)) { + if (index.using === IndexType.HASH && !existedIndexes.includes(index.name!)) { const cockroachDbIndexQuery = `CREATE INDEX "${index.name}" ON "${schema}"."${modelToTableName(modelName)}"(${ index.fields }) USING HASH;`; @@ -488,7 +531,8 @@ export class StoreService { const deprecated = generateIndexName(tableName, index); if (!existedIndexes.includes(deprecated)) { - index.name = blake2AsHex(`${modelName}_${index.fields.join('_')}`, 64).substring(0, 63); + const fields = (index.fields ?? []).join('_'); + index.name = blake2AsHex(`${modelName}_${fields}`, 64).substring(0, 63); } }); } @@ -496,7 +540,7 @@ export class StoreService { // Only used with historical to add indexes to ID fields for gettign entitities by ID private addHistoricalIdIndex(model: GraphQLModelsType, indexes: IndexesOptions[]): void { const idFieldName = model.fields.find((field) => field.type === 'ID')?.name; - if (idFieldName && !indexes.find((idx) => idx.fields.includes(idFieldName))) { + if (idFieldName && !indexes.find((idx) => idx.fields?.includes(idFieldName))) { indexes.push({ fields: [Utils.underscoredIf(idFieldName, true)], unique: false, @@ -579,18 +623,16 @@ export class StoreService { }); } - setOperationStack(): void { + setBlockHeight(blockHeight: number): void { + this._blockHeight = blockHeight; if (this.config.proofOfIndex) { this.operationStack = new StoreOperations(this.modelsRelations.models); } } - setBlockHeight(blockHeight: number): void { - this.blockHeight = blockHeight; - } - getOperationMerkleRoot(): Uint8Array { if (this.config.proofOfIndex) { + assert(this.operationStack, new Error('OperationStack is not set, make sure `setBlockHeight` has been called')); this.operationStack.makeOperationMerkleTree(); const merkelRoot = this.operationStack.getOperationMerkleRoot(); if (merkelRoot === null) { @@ -645,6 +687,9 @@ group by } async rewind(targetBlockHeight: number, transaction: Transaction): Promise { + if (!this.historical) { + throw new Error('Unable to reindex, historical state not enabled'); + } for (const model of Object.values(this.sequelize.models)) { if ('__block_range' in model.getAttributes()) { await model.destroy({ @@ -678,6 +723,7 @@ group by } this.metadataModel.set('lastProcessedHeight', targetBlockHeight); if (this.config.proofOfIndex) { + assert(this.poiRepo, new Error('Expected POI repo to exist')); await this.poiRepo.destroy({ transaction, where: { @@ -707,7 +753,7 @@ group by offset?: number; limit?: number; } = {} - ): Promise => { + ): Promise => { try { const indexed = this.modelIndexedFields.findIndex( @@ -756,9 +802,7 @@ group by try { this.storeCache.getModel(entity).set(_id, data, this.blockHeight); - if (this.config.proofOfIndex) { - this.operationStack.put(OperationType.Set, entity, data); - } + this.operationStack?.put(OperationType.Set, entity, data); } catch (e) { throw new Error(`Failed to set Entity ${entity} with _id ${_id}: ${e}`); } @@ -768,10 +812,8 @@ group by try { this.storeCache.getModel(entity).bulkCreate(data, this.blockHeight); - if (this.config.proofOfIndex) { - for (const item of data) { - this.operationStack.put(OperationType.Set, entity, item); - } + for (const item of data) { + this.operationStack?.put(OperationType.Set, entity, item); } } catch (e) { throw new Error(`Failed to bulkCreate Entity ${entity}: ${e}`); @@ -781,10 +823,8 @@ group by bulkUpdate: async (entity: string, data: Entity[], fields?: string[]): Promise => { try { this.storeCache.getModel(entity).bulkUpdate(data, this.blockHeight, fields); - if (this.config.proofOfIndex) { - for (const item of data) { - this.operationStack.put(OperationType.Set, entity, item); - } + for (const item of data) { + this.operationStack?.put(OperationType.Set, entity, item); } } catch (e) { throw new Error(`Failed to bulkCreate Entity ${entity}: ${e}`); @@ -795,9 +835,7 @@ group by try { this.storeCache.getModel(entity).remove(id, this.blockHeight); - if (this.config.proofOfIndex) { - this.operationStack.put(OperationType.Remove, entity, id); - } + this.operationStack?.put(OperationType.Remove, entity, id); } catch (e) { throw new Error(`Failed to remove Entity ${entity} with id ${id}: ${e}`); } diff --git a/packages/node-core/src/indexer/storeCache/cacheMetadata.ts b/packages/node-core/src/indexer/storeCache/cacheMetadata.ts index 714c983822..060b848a3e 100644 --- a/packages/node-core/src/indexer/storeCache/cacheMetadata.ts +++ b/packages/node-core/src/indexer/storeCache/cacheMetadata.ts @@ -25,8 +25,8 @@ export class CacheMetadataModel implements ICachedModelControl { if (!this.getCache[key]) { const record = await this.model.findByPk(key); - if (hasValue(record?.value)) { - this.getCache[key] = record.value as any; + if (hasValue(record)) { + this.getCache[key] = record.toJSON().value as any; } else if (hasValue(fallback)) { this.getCache[key] = fallback; } @@ -81,6 +81,10 @@ export class CacheMetadataModel implements ICachedModelControl { private async incrementJsonbCount(key: string, amount = 1, tx?: Transaction): Promise { const table = this.model.getTableName(); + if (!this.model.sequelize) { + throw new Error(`Sequelize is not available on ${this.model.name}`); + } + await this.model.sequelize.query( `UPDATE ${table} SET value = (COALESCE(value->0):: int + ${amount})::text::jsonb WHERE key ='${key}'`, tx && {transaction: tx} diff --git a/packages/node-core/src/indexer/storeCache/cacheModel.ts b/packages/node-core/src/indexer/storeCache/cacheModel.ts index b95e848649..92ef58b8a2 100644 --- a/packages/node-core/src/indexer/storeCache/cacheModel.ts +++ b/packages/node-core/src/indexer/storeCache/cacheModel.ts @@ -2,12 +2,11 @@ // SPDX-License-Identifier: Apache-2.0 import {flatten, includes, isEqual, uniq} from 'lodash'; -import {CreationAttributes, Model, ModelStatic, Op, Transaction} from 'sequelize'; +import {CreationAttributes, Model, ModelStatic, Op, Sequelize, Transaction} from 'sequelize'; import {Fn} from 'sequelize/types/utils'; import {NodeConfig} from '../../configure'; import {SetValueModel} from './setValueModel'; import { - HistoricalModel, ICachedModelControl, RemoveValue, SetData, @@ -29,14 +28,10 @@ export class CachedModel< { // Null value indicates its not defined in the db private getCache: GetData; - private setCache: SetData = {}; - private removeCache: Record = {}; - - private getNextStoreOperationIndex: () => number; - - readonly hasAssociations: boolean; + private _getNextStoreOperationIndex?: () => number; + readonly hasAssociations: boolean = false; flushableRecordCounter = 0; @@ -56,12 +51,15 @@ export class CachedModel< } } - init(getNextStoreOperationIndex: () => number) { - this.getNextStoreOperationIndex = getNextStoreOperationIndex; + init(getNextStoreOperationIndex: () => number): void { + this._getNextStoreOperationIndex = getNextStoreOperationIndex; } - private get historicalModel(): ModelStatic> { - return this.model as ModelStatic>; + private getNextStoreOperationIndex(): number { + if (!this._getNextStoreOperationIndex) { + throw new Error(`Cache model ${this.model.name} has not been initialized`); + } + return this._getNextStoreOperationIndex(); } allCachedIds(): string[] { @@ -70,7 +68,7 @@ export class CachedModel< return uniq(flatten([[...this.getCache.keys()], Object.keys(this.setCache), Object.keys(this.removeCache)])); } - async get(id: string): Promise { + async get(id: string): Promise { // If this already been removed if (this.removeCache[id]) { return; @@ -85,9 +83,11 @@ export class CachedModel< // https://github.com/sequelize/sequelize/issues/15179 where: {id} as any, }) - )?.toJSON(); - // getCache only keep records from db - this.getCache.set(id, record); + )?.toJSON(); + if (record) { + // getCache only keep records from db + this.getCache.set(id, record); + } } return record; } @@ -102,7 +102,7 @@ export class CachedModel< offset: number; limit: number; } - ): Promise { + ): Promise { let cachedData = this.getFromCache(field, value); if (cachedData.length <= options.offset) { // example cache length 16, offset is 30 @@ -131,13 +131,12 @@ export class CachedModel< this.getCache.set(data.id, data); }); - const joinedData = cachedData.concat(records.map((record) => record.toJSON() as T)); - return joinedData; + return cachedData.concat(records.map((record) => record.toJSON() as T)); } async getOneByField(field: keyof T, value: T[keyof T]): Promise { if (field === 'id') { - return this.get(value.toString()); + return this.get(`${value}`); } else { const oneFromCached = this.getFromCache(field, value, true)[0]; if (oneFromCached) { @@ -149,7 +148,9 @@ export class CachedModel< }) )?.toJSON(); - this.getCache.set(record.id, record); + if (record) { + this.getCache.set(record.id, record); + } return record; } } @@ -227,9 +228,7 @@ export class CachedModel< // We need to use upsert instead of bulkCreate for cockroach db // see this https://github.com/subquery/subql/issues/1606 this.useCockroachDb - ? records.map((r) => { - this.model.upsert(r, {transaction: tx}); - }) + ? records.map((r) => this.model.upsert(r, {transaction: tx})) : this.model.bulkCreate(records, { transaction: tx, updateOnDuplicate: Object.keys(records[0]) as unknown as (keyof T)[], @@ -255,7 +254,7 @@ export class CachedModel< await this.model.destroy({where: {id: removeRecordKey} as any, transaction: tx}); delete this.removeCache[removeRecordKey]; } else { - let setRecord: SetValue; + let setRecord: SetValue | undefined; for (const r of Object.values(this.setCache)) { setRecord = r.popRecordWithOpIndex(operationIndex); if (setRecord) break; @@ -291,12 +290,12 @@ export class CachedModel< return flatten( Object.values(setRecords).map((v) => { if (!this.historical) { - return v.getLatest().data; + return v.getLatest()?.data; } // Historical return v.getValues().map((historicalValue) => { // Alternative: historicalValue.data.__block_range = [historicalValue.startHeight, historicalValue.endHeight]; - historicalValue.data.__block_range = this.model.sequelize.fn( + historicalValue.data.__block_range = this.sequelize.fn( 'int8range', historicalValue.startHeight, historicalValue.endHeight @@ -346,9 +345,11 @@ export class CachedModel< const unifiedIds: string[] = []; Object.entries(this.setCache).map(([, model]) => { if (model.isMatchData(field, value)) { - const latestData = model.getLatest().data; - unifiedIds.push(latestData.id); - joinedData.push(latestData); + const latestData = model.getLatest()?.data; + if (latestData) { + unifiedIds.push(latestData.id); + joinedData.push(latestData); + } } }); // No need search further @@ -358,8 +359,10 @@ export class CachedModel< this.getCache.forEach((getValue, key) => { if ( + getValue && !unifiedIds.includes(key) && - ((field === undefined && value === undefined) || + (field === undefined || + value === undefined || (Array.isArray(value) && includes(value, getValue[field])) || isEqual(getValue[field], value)) ) { @@ -377,9 +380,13 @@ export class CachedModel< setRecords: SetData, removeRecords: Record ): Promise { - const closeSetRecords = Object.entries(setRecords).map(([id, value]) => { - return {id, blockHeight: value.getFirst().startHeight}; - }); + const closeSetRecords: {id: string; blockHeight: number}[] = []; + for (const [id, value] of Object.entries(setRecords)) { + const firstValue = value.getFirst(); + if (firstValue !== undefined) { + closeSetRecords.push({id, blockHeight: firstValue.startHeight}); + } + } const closeRemoveRecords = Object.entries(removeRecords).map(([id, value]) => { return {id, blockHeight: value.removedAtBlock}; }); @@ -389,13 +396,23 @@ export class CachedModel< return; } - await this.model.sequelize.query( + await this.sequelize.query( `UPDATE ${this.model.getTableName()} table1 SET _block_range = int8range(lower("_block_range"), table2._block_end) from (SELECT UNNEST(array[${mergedRecords.map((r) => - this.model.sequelize.escape(r.id) + this.sequelize.escape(r.id) )}]) AS id, UNNEST(array[${mergedRecords.map((r) => r.blockHeight)}]) AS _block_end) AS table2 WHERE table1.id = table2.id and "_block_range" @> _block_end-1::int8;`, {transaction: tx} ); } + + private get sequelize(): Sequelize { + const sequelize = this.model.sequelize; + + if (!sequelize) { + throw new Error(`Sequelize is not available on ${this.model.name}`); + } + + return sequelize; + } } diff --git a/packages/node-core/src/indexer/storeCache/cachePoi.ts b/packages/node-core/src/indexer/storeCache/cachePoi.ts index e34598065e..b36baab4af 100644 --- a/packages/node-core/src/indexer/storeCache/cachePoi.ts +++ b/packages/node-core/src/indexer/storeCache/cachePoi.ts @@ -75,19 +75,25 @@ export class CachePoiModel implements ICachedModelControl { order: [['id', 'DESC']], }); - return Object.values(this.mergeResultsWithCache([result?.toJSON()])).reduce((acc, val) => { + if (!result) return null; + + return Object.values(this.mergeResultsWithCache([result.toJSON()])).reduce((acc, val) => { if (acc && acc.id < val.id) return acc; return val; }, null as ProofOfIndex | null); } - async getLatestPoiWithMmr(): Promise { - const poiBlock = await this.model.findOne({ + async getLatestPoiWithMmr(): Promise { + const result = await this.model.findOne({ order: [['id', 'DESC']], - where: {mmrRoot: {[Op.ne]: null}}, + where: {mmrRoot: {[Op.ne]: null}} as any, // Types problem with sequelize, undefined works but not null }); - return Object.values(this.mergeResultsWithCache([poiBlock?.toJSON()])) + if (!result) { + return null; + } + + return Object.values(this.mergeResultsWithCache([result.toJSON()])) .filter((v) => !!v.mmrRoot) .reduce((acc, val) => { if (acc && acc.id < val.id) return acc; @@ -100,7 +106,7 @@ export class CachePoiModel implements ICachedModelControl { } async flush(tx: Transaction): Promise { - logger.info(`Flushing ${this.flushableRecordCounter} items from cache`); + logger.debug(`Flushing ${this.flushableRecordCounter} items from cache`); const pendingFlush = Promise.all([ this.model.bulkCreate(Object.values(this.setCache), {transaction: tx, updateOnDuplicate: ['mmrRoot']}), this.model.destroy({where: {id: this.removeCache}, transaction: tx}), diff --git a/packages/node-core/src/indexer/storeCache/setValueModel.ts b/packages/node-core/src/indexer/storeCache/setValueModel.ts index 535d096281..e116fcd6a4 100644 --- a/packages/node-core/src/indexer/storeCache/setValueModel.ts +++ b/packages/node-core/src/indexer/storeCache/setValueModel.ts @@ -9,15 +9,14 @@ export class SetValueModel { private _latestIndex = -1; popRecordWithOpIndex(operationIndex: number): SetValue | undefined { - let setRecord: SetValue; const opIndexInSetRecord = this.historicalValues.findIndex((v) => { return v.operationIndex === operationIndex; }); if (opIndexInSetRecord >= 0) { - setRecord = this.historicalValues[opIndexInSetRecord]; + const setRecord = this.historicalValues[opIndexInSetRecord]; this.deleteFromHistorical(opIndexInSetRecord); + return setRecord; } - return setRecord; } set(data: T, blockHeight: number, operationIndex: number): void { @@ -72,7 +71,7 @@ export class SetValueModel { newModel.historicalValues = this.historicalValues .filter((v) => v.startHeight < height) .map((v) => { - if (v.endHeight < height) { + if (v.endHeight && v.endHeight < height) { return v; } @@ -102,13 +101,13 @@ export class SetValueModel { } isMatchData(field?: keyof T, value?: T[keyof T] | T[keyof T][]): boolean { - if (field === undefined && value === undefined) { + if (field === undefined || value === undefined) { return true; } if (Array.isArray(value)) { return value.findIndex((v) => this.isMatchData(field, value)) > -1; } else { - return isEqual(this.getLatest().data[field], value); + return isEqual(this.getLatest()?.data?.[field], value); } } diff --git a/packages/node-core/src/indexer/storeCache/storeCache.service.spec.ts b/packages/node-core/src/indexer/storeCache/storeCache.service.spec.ts index 11811e0575..6f8c67f74c 100644 --- a/packages/node-core/src/indexer/storeCache/storeCache.service.spec.ts +++ b/packages/node-core/src/indexer/storeCache/storeCache.service.spec.ts @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 import {EventEmitter2} from '@nestjs/event-emitter'; -import {Op, Sequelize} from 'sequelize'; +import {Sequelize} from 'sequelize'; import {NodeConfig} from '../../configure'; import {StoreCacheService} from './storeCache.service'; @@ -151,7 +151,7 @@ describe('Store Cache Service historical', () => { ); // getOneByField const appleEntity_b5 = await appleModel.getOneByField('field1' as any, 'set apple at block 5'); - expect(appleEntity_b5.field1).toBe('set apple at block 5'); + expect(appleEntity_b5?.field1).toBe('set apple at block 5'); appleModel.set( 'apple-05-smith', { @@ -172,7 +172,7 @@ describe('Store Cache Service historical', () => { limit: 2, offset: 0, }); - expect(appleEntity_b5_records.length).toBe(2); + expect(appleEntity_b5_records?.length).toBe(2); // TODO, getByField with offset and limit // const appleEntity_b5_records_2 = await appleModel.getByField('field1' as any, 'set apple at block 5', null, { diff --git a/packages/node-core/src/indexer/storeCache/storeCache.service.ts b/packages/node-core/src/indexer/storeCache/storeCache.service.ts index c0059eedea..185d8d6c31 100644 --- a/packages/node-core/src/indexer/storeCache/storeCache.service.ts +++ b/packages/node-core/src/indexer/storeCache/storeCache.service.ts @@ -20,13 +20,13 @@ const logger = getLogger('StoreCache'); @Injectable() export class StoreCacheService implements BeforeApplicationShutdown { private cachedModels: Record = {}; - private metadataRepo: MetadataRepo; - private poiRepo: PoiRepo; - private pendingFlush: Promise; - private queuedFlush: Promise; + private metadataRepo?: MetadataRepo; + private poiRepo?: PoiRepo; + private pendingFlush?: Promise; + private queuedFlush?: Promise; private storeCacheThreshold: number; private _historical = true; - private _useCockroachDb: boolean; + private _useCockroachDb?: boolean; private _storeOperationIndex = 0; private _lastFlushedOperationIndex = 0; @@ -98,7 +98,7 @@ export class StoreCacheService implements BeforeApplicationShutdown { const flushToIndex = this._storeOperationIndex; for (let i = this._lastFlushedOperationIndex; i < flushToIndex; i++) { // Flush operation can be a no-op if it doesn't have that index - await Promise.all(relationalModels.map((m) => m.flushOperation(i, tx))); + await Promise.all(relationalModels.map((m) => m.flushOperation?.(i, tx))); } this._lastFlushedOperationIndex = flushToIndex; } @@ -125,7 +125,7 @@ export class StoreCacheService implements BeforeApplicationShutdown { await Promise.all(updatableModels.map((model) => model.flush(tx, blockHeight))); } await tx.commit(); - } catch (e) { + } catch (e: any) { logger.error(e, 'Database transaction failed'); await tx.rollback(); throw e; diff --git a/packages/node-core/src/indexer/storeCache/types.ts b/packages/node-core/src/indexer/storeCache/types.ts index 86ed19f76e..8a620753b6 100644 --- a/packages/node-core/src/indexer/storeCache/types.ts +++ b/packages/node-core/src/indexer/storeCache/types.ts @@ -8,13 +8,13 @@ import {SetValueModel} from './setValueModel'; export type HistoricalModel = {__block_range: any}; export interface ICachedModel { - get: (id: string) => Promise; + get: (id: string) => Promise; // limit always defined from store getByField: ( field: keyof T, value: T[keyof T] | T[keyof T][], - options?: {limit: number; offset?: number} - ) => Promise; + options: {limit: number; offset: number} + ) => Promise; getOneByField: (field: keyof T, value: T[keyof T]) => Promise; set: (id: string, data: T, blockHeight: number) => void; bulkCreate: (data: T[], blockHeight: number) => void; @@ -56,4 +56,4 @@ export type SetValue = { export type SetData = Record>; -export class GetData extends LRUCache {} +export class GetData extends LRUCache {} diff --git a/packages/node-core/src/indexer/testing.service.ts b/packages/node-core/src/indexer/testing.service.ts index 65531db4f7..3b875219f3 100644 --- a/packages/node-core/src/indexer/testing.service.ts +++ b/packages/node-core/src/indexer/testing.service.ts @@ -4,15 +4,6 @@ import {existsSync, readdirSync, statSync} from 'fs'; import path from 'path'; import {Inject, Injectable} from '@nestjs/common'; -import { - NodeConfig, - StoreService, - getLogger, - SandboxOption, - TestSandbox, - IIndexerManager, - ISubqueryProject, -} from '@subql/node-core'; import {SubqlTest} from '@subql/testing/interfaces'; import {DynamicDatasourceCreator, Store} from '@subql/types'; import {getAllEntitiesRelations} from '@subql/utils'; @@ -21,6 +12,11 @@ import {isEqual} from 'lodash'; import Pino from 'pino'; import {CreationAttributes, Model, Sequelize} from 'sequelize'; import {ApiService} from '../api.service'; +import {NodeConfig} from '../configure'; +import {getLogger} from '../logger'; +import {SandboxOption, TestSandbox} from './sandbox'; +import {StoreService} from './store.service'; +import {IIndexerManager, ISubqueryProject} from './types'; const logger = getLogger('subql-testing'); @@ -37,8 +33,8 @@ export abstract class TestingService { private testSandboxes: TestSandbox[]; private failedTestsSummary: { testName: string; - entityId: string; - entityName: string; + entityId?: string; + entityName?: string; failedAttributes: string[]; }[] = []; @@ -53,12 +49,7 @@ export abstract class TestingService { @Inject('ISubqueryProject') protected project: ISubqueryProject, protected readonly apiService: ApiService, protected readonly indexerManager: IIndexerManager - ) {} - - abstract indexBlock(block: B, handler: string): Promise; - - async init() { - await this.indexerManager.start(); + ) { const projectPath = this.project.root; // find all paths to test files const testFiles = this.findAllTestFiles(path.join(projectPath, 'dist/test')); @@ -68,11 +59,16 @@ export abstract class TestingService { const option: SandboxOption = { root: this.project.root, entry: file, - script: null, }; return new TestSandbox(option, this.nodeConfig); }); + } + + abstract indexBlock(block: B, handler: string): Promise; + + async init() { + await this.indexerManager.start(); logger.info(`Found ${this.testSandboxes.length} test files`); @@ -132,9 +128,9 @@ export abstract class TestingService { const [block] = await this.apiService.fetchBlocks([test.blockHeight]); // Init db - const schemas = await this.sequelize.showAllSchemas(undefined); + const schemas = await this.sequelize.showAllSchemas({}); if (!(schemas as unknown as string[]).includes(schema)) { - await this.sequelize.createSchema(`"${schema}"`, undefined); + await this.sequelize.createSchema(`"${schema}"`, {}); } const modelRelations = getAllEntitiesRelations(this.project.schema); @@ -144,17 +140,13 @@ export abstract class TestingService { // Init entities logger.debug('Initializing entities'); - await Promise.all( - test.dependentEntities.map((entity) => { - return entity.save(); - }) - ); + await Promise.all(test.dependentEntities.map((entity) => entity.save?.())); logger.debug('Running handler'); try { await this.indexBlock(block, test.handler); - } catch (e) { + } catch (e: any) { this.totalFailedTests += test.expectedEntities.length; logger.warn(`Test: ${test.name} field due to runtime error`, e); this.failedTestsSummary.push({ @@ -172,7 +164,7 @@ export abstract class TestingService { let failedTests = 0; for (let i = 0; i < test.expectedEntities.length; i++) { const expectedEntity = test.expectedEntities[i]; - const actualEntity = await store.get(expectedEntity._name, expectedEntity.id); + const actualEntity = await store.get(expectedEntity._name!, expectedEntity.id); const attributes = actualEntity as unknown as CreationAttributes; const failedAttributes: string[] = []; let passed = true; @@ -211,9 +203,9 @@ export abstract class TestingService { `${failedTests} failed` )} checks` ); - } catch (e) { + } catch (e: any) { this.totalFailedTests += test.expectedEntities.length; - logger.warn(`Test ${test.name} failed to run`, e); + logger.warn(e, `Test ${test.name} failed to run`); } finally { await this.sequelize.dropSchema(`"${schema}"`, { logging: false, @@ -236,9 +228,8 @@ export abstract class TestingService { return [dsCopy]; } } - - return []; } + return []; } private logFailedTestsSummary() { @@ -246,7 +237,7 @@ export abstract class TestingService { logger.warn(chalk.bold.underline.yellow('Failed tests summary:')); for (const failedTest of this.failedTestsSummary) { let testDetails = - failedTest.entityName || failedTest.entityId + failedTest.entityName && failedTest.entityId ? chalk.bold.red(`\n* ${failedTest.testName}\n\tEntity ${failedTest.entityName}: ${failedTest.entityId}\n`) : chalk.bold.red(`\n* ${failedTest.testName}\n`); for (const attr of failedTest.failedAttributes) { diff --git a/packages/node-core/src/indexer/types.ts b/packages/node-core/src/indexer/types.ts index d777e383fb..cdeeaf94ec 100644 --- a/packages/node-core/src/indexer/types.ts +++ b/packages/node-core/src/indexer/types.ts @@ -21,10 +21,15 @@ export interface IProjectNetworkConfig extends ProjectNetworkConfig { chainId: string; } -export interface ISubqueryProject { +export interface ISubqueryProject< + N extends IProjectNetworkConfig = IProjectNetworkConfig, + DS = unknown, + T = unknown, + C = unknown +> { id: string; root: string; - network: Partial; + network: N; dataSources: DS[]; schema: GraphQLSchema; templates: T[]; diff --git a/packages/node-core/src/indexer/worker/worker.builder.ts b/packages/node-core/src/indexer/worker/worker.builder.ts index 297b21611a..2680bce6d3 100644 --- a/packages/node-core/src/indexer/worker/worker.builder.ts +++ b/packages/node-core/src/indexer/worker/worker.builder.ts @@ -38,16 +38,21 @@ type AsyncMethods = Record; abstract class WorkerIO { private responseListeners: ResponseListener = {}; + protected port: workers.MessagePort | workers.Worker; protected abstract getReqId(): number; constructor( - protected port: workers.MessagePort | workers.Worker, + port: workers.MessagePort | workers.Worker | undefined | null, workerFns: string[], private hostFns: AsyncMethods, protected logger: Logger ) { - port.on('message', (message) => this.handleMessage(message)); + if (!port) { + throw new Error('Port not provided to worker. WorkerHost most likely not run from worker thread.'); + } + this.port = port; + this.port.on('message', (message) => this.handleMessage(message)); // Add expected methods to class workerFns.map((fn) => { @@ -123,7 +128,10 @@ abstract class WorkerIO { args, }); } catch (e) { - this.logger.error(e, `Failed to post message, function="${fnName}", args="${JSON.stringify(args)}"`); + this.logger.error( + e as any, + `Failed to post message, function="${String(fnName)}", args="${JSON.stringify(args)}"` + ); reject(e); } }); diff --git a/packages/node-core/src/meta/health.controller.ts b/packages/node-core/src/meta/health.controller.ts index 8f1edf21a7..7404e24c71 100644 --- a/packages/node-core/src/meta/health.controller.ts +++ b/packages/node-core/src/meta/health.controller.ts @@ -15,8 +15,8 @@ export class HealthController { getHealth(): void { try { this.healthService.getHealth(); - } catch (e) { - logger.error(e.message); + } catch (e: any) { + logger.error(e); throw new HttpException( { status: HttpStatus.INTERNAL_SERVER_ERROR, diff --git a/packages/node-core/src/meta/health.service.ts b/packages/node-core/src/meta/health.service.ts index 22aa3b19c2..17893effe3 100644 --- a/packages/node-core/src/meta/health.service.ts +++ b/packages/node-core/src/meta/health.service.ts @@ -20,7 +20,7 @@ export class HealthService { private currentProcessingTimestamp?: number; private blockTime: number; private healthTimeout: number; - private indexerHealthy: boolean; + private indexerHealthy?: boolean; constructor(protected nodeConfig: NodeConfig, private storeService: StoreService) { this.healthTimeout = Math.max(DEFAULT_TIMEOUT, this.nodeConfig.timeout * 1000); diff --git a/packages/node-core/src/profiler.ts b/packages/node-core/src/profiler.ts index ccc8a1256f..ad8d8d67c6 100644 --- a/packages/node-core/src/profiler.ts +++ b/packages/node-core/src/profiler.ts @@ -12,12 +12,12 @@ function isPromise(e: any): boolean { const logger = getLogger('profiler'); -function printCost(start: number, end: number, target: string, method: string): void { - logger.info(`${target}, ${method}, ${end - start} ms`); +function printCost(start: number, end: number, target: string, method: string | symbol): void { + logger.info(`${target}, ${method.toString()}, ${end - start} ms`); } export function profiler(enabled = true): MethodDecorator { - return (target, name: string, descriptor: PropertyDescriptor): void => { + return (target, name: string | symbol, descriptor: PropertyDescriptor): void => { if (enabled && !!descriptor && typeof descriptor.value === 'function') { const orig = descriptor.value; // tslint:disable no-function-expression no-invalid-this diff --git a/packages/node-core/src/utils/autoQueue.ts b/packages/node-core/src/utils/autoQueue.ts index f70c4cd1c8..0e1bf70cc0 100644 --- a/packages/node-core/src/utils/autoQueue.ts +++ b/packages/node-core/src/utils/autoQueue.ts @@ -5,7 +5,7 @@ import {EventEmitter2} from '@nestjs/event-emitter'; export interface IQueue { size: number; - capacity: number; + capacity: number | undefined; freeSpace: number | undefined; flush(): void; @@ -23,7 +23,7 @@ export class Queue implements IQueue { return this.items.length; } - get capacity(): number { + get capacity(): number | undefined { return this._capacity; } @@ -38,7 +38,7 @@ export class Queue implements IQueue { } putMany(items: T[]): void { - if (this.capacity && items.length > this.freeSpace) { + if (this.freeSpace && items.length > this.freeSpace) { throw new Error('Queue exceeds max size'); } this.items.push(...items); @@ -97,7 +97,7 @@ export class AutoQueue implements IQueue { return this.queue.size + this.processingTasks; } - get capacity(): number { + get capacity(): number | undefined { return this.queue.capacity; } @@ -114,7 +114,7 @@ export class AutoQueue implements IQueue { } putMany(tasks: Array>): Promise[] { - if (this.capacity && tasks.length > this.freeSpace) { + if (this.freeSpace && tasks.length > this.freeSpace) { throw new Error('Queue exceeds max size'); } diff --git a/packages/node-core/src/utils/blockSizeBuffer.ts b/packages/node-core/src/utils/blockSizeBuffer.ts index 3bd034867c..c1abf76ae8 100644 --- a/packages/node-core/src/utils/blockSizeBuffer.ts +++ b/packages/node-core/src/utils/blockSizeBuffer.ts @@ -8,7 +8,7 @@ export class BlockSizeBuffer extends Queue { super(capacity); } - average() { + average(): number { if (this.size === 0) { throw new Error('No block sizes to average'); } @@ -17,6 +17,10 @@ export class BlockSizeBuffer extends Queue { for (let i = 0; i < this.size; i++) { sum += this.items[i]; } + + if (!this.capacity) { + throw new Error('Capacity is expected to be defined for block size buffer'); + } return Math.floor(sum / this.capacity); } } diff --git a/packages/node-core/src/utils/fetchHelpers.ts b/packages/node-core/src/utils/fetchHelpers.ts index e2abfe6be8..d1b830169f 100644 --- a/packages/node-core/src/utils/fetchHelpers.ts +++ b/packages/node-core/src/utils/fetchHelpers.ts @@ -22,7 +22,7 @@ export async function retryOnFail( await delay(RETRY_DELAY); return retryOnFail(request, shouldRetry, --retries); } else { - logger.error(e, `Retries failed after ${RETRY_COUNT}`); + logger.error(e as Error, `Retries failed after ${RETRY_COUNT}`); throw e; } } diff --git a/packages/node-core/src/utils/graphql.ts b/packages/node-core/src/utils/graphql.ts index 8a4fad9e50..2922eed1c9 100644 --- a/packages/node-core/src/utils/graphql.ts +++ b/packages/node-core/src/utils/graphql.ts @@ -9,12 +9,19 @@ export function modelsTypeToModelAttributes(modelType: GraphQLModelsType, enums: const fields = modelType.fields; return Object.values(fields).reduce((acc, field) => { const allowNull = field.nullable; + + const type = field.isEnum + ? `${enums.get(field.type)}${field.isArray ? '[]' : ''}` + : field.isArray + ? getTypeByScalarName('Json')?.sequelizeType + : getTypeByScalarName(field.type)?.sequelizeType; + + if (type === undefined) { + throw new Error('Unable to get model type'); + } + const columnOption: ModelAttributeColumnOptions = { - type: field.isEnum - ? `${enums.get(field.type)}${field.isArray ? '[]' : ''}` - : field.isArray - ? getTypeByScalarName('Json').sequelizeType - : getTypeByScalarName(field.type).sequelizeType, + type, comment: field.description, allowNull, primaryKey: field.type === 'ID', @@ -27,11 +34,11 @@ export function modelsTypeToModelAttributes(modelType: GraphQLModelsType, enums: } return dataValue ? BigInt(dataValue) : null; }; - columnOption.set = function (val: unknown) { + columnOption.set = function (val: any) { if (field.isArray) { this.setDataValue( field.name, - (val as unknown[])?.map((v) => v.toString()) + (val as any[])?.map((v) => v.toString()) ); } else { this.setDataValue(field.name, val?.toString()); diff --git a/packages/node-core/src/utils/object.ts b/packages/node-core/src/utils/object.ts index 127aabc10a..5635efcb7f 100644 --- a/packages/node-core/src/utils/object.ts +++ b/packages/node-core/src/utils/object.ts @@ -7,7 +7,7 @@ export function assign( target: TObject, src: TSource1, src2?: TSource2 -): TObject & TSource1 & TSource2 { +): TObject & TSource1 & (TSource2 | undefined) { return assignWith(target, src, src2, (objValue, srcValue) => (isUndefined(srcValue) ? objValue : srcValue)); } diff --git a/packages/node-core/src/utils/project.spec.ts b/packages/node-core/src/utils/project.spec.ts index b511cf44e6..718f7ca7b6 100644 --- a/packages/node-core/src/utils/project.spec.ts +++ b/packages/node-core/src/utils/project.spec.ts @@ -6,6 +6,7 @@ import {cleanedBatchBlocks, transformBypassBlocks} from './project'; describe('bypass logic', () => { it('process bypassBlocks with ranges', () => { let bypassBlocks = transformBypassBlocks([20, 40, '5-10', 20, 140]); + expect(bypassBlocks).toEqual([20, 40, 5, 6, 7, 8, 9, 10, 140]); let currentBlockBatch = [1, 5, 7, 8, 20, 40, 100, 120]; const case_1 = cleanedBatchBlocks(bypassBlocks, currentBlockBatch); diff --git a/packages/node-core/src/utils/project.ts b/packages/node-core/src/utils/project.ts index 916c6d9b25..6ef1a75573 100644 --- a/packages/node-core/src/utils/project.ts +++ b/packages/node-core/src/utils/project.ts @@ -1,7 +1,7 @@ // Copyright 2020-2022 OnFinality Limited authors & contributors // SPDX-License-Identifier: Apache-2.0 -import {isNumber, range, uniq, without} from 'lodash'; +import {isNumber, range, uniq, without, flatten} from 'lodash'; import {QueryTypes, Sequelize} from 'sequelize'; import {NodeConfig} from '../configure/NodeConfig'; import {getLogger} from '../logger'; @@ -35,9 +35,9 @@ export function transformBypassBlocks(bypassBlocks: (number | string)[]): number if (!bypassBlocks?.length) return []; return uniq( - [].concat( - ...bypassBlocks.map((bypassEntry) => { - if (isNumber(bypassEntry)) return bypassEntry; + flatten( + bypassBlocks.map((bypassEntry) => { + if (isNumber(bypassEntry)) return [bypassEntry]; const splitRange = bypassEntry.split('-').map((val) => parseInt(val.trim(), 10)); return range(splitRange[0], splitRange[1] + 1); }) diff --git a/packages/node-core/src/utils/sync-helper.ts b/packages/node-core/src/utils/sync-helper.ts index fd4872a58c..519c9f3762 100644 --- a/packages/node-core/src/utils/sync-helper.ts +++ b/packages/node-core/src/utils/sync-helper.ts @@ -22,7 +22,7 @@ const byTagOrder = (a: [keyof SmartTags, any], b: [keyof SmartTags, any]) => { }; export function smartTags(tags: SmartTags, separator = '\n'): string { - return Object.entries(tags) + return (Object.entries(tags) as [keyof SmartTags, any][]) .sort(byTagOrder) .map(([k, v]) => `@${k} ${v}`) .join(separator); @@ -42,10 +42,6 @@ export function getUniqConstraint(tableName: string, field: string): string { return [tableName, field, 'uindex'].map(underscored).join('_'); } -function getExcludeConstraint(tableName: string): string { - return [tableName, '_id', '_block_range', 'exclude'].map(underscored).join('_'); -} - export function commentConstraintQuery(table: string, constraint: string, comment: string): string { return `COMMENT ON CONSTRAINT ${constraint} ON ${table} IS E'${comment}'`; } @@ -69,9 +65,9 @@ export function addTagsToForeignKeyMap( map.set(tableName, new Map()); } const tableKeys = map.get(tableName); - let foreignKeyTags = tableKeys.get(foreignKey) || ({} as SmartTags); + let foreignKeyTags = tableKeys?.get(foreignKey) || ({} as SmartTags); foreignKeyTags = Object.assign(foreignKeyTags, newTags); - tableKeys.set(foreignKey, foreignKeyTags); + tableKeys?.set(foreignKey, foreignKeyTags); } export const BTREE_GIST_EXTENSION_EXIST_QUERY = `SELECT * FROM pg_extension where extname = 'btree_gist'`; diff --git a/packages/node-core/test/v1.0.0/package.json b/packages/node-core/test/v1.0.0/package.json index 6e24d528c9..f99f6b7b35 100644 --- a/packages/node-core/test/v1.0.0/package.json +++ b/packages/node-core/test/v1.0.0/package.json @@ -21,7 +21,7 @@ "devDependencies": { "@polkadot/api": "^10", "@subql/types": "latest", - "typescript": "^4.1.3", + "typescript": "^4.9.5", "@subql/cli": "latest" }, "resolutions": { diff --git a/packages/node-core/tsconfig.json b/packages/node-core/tsconfig.json index 9a67f344d9..5dc46615a5 100644 --- a/packages/node-core/tsconfig.json +++ b/packages/node-core/tsconfig.json @@ -4,7 +4,8 @@ "tsBuildInfoFile": "dist/.tsbuildinfo", "rootDir": "src", "outDir": "dist", - "noImplicitAny": true + "noImplicitAny": true, + "strict": true }, "references": [{"path": "../common"}, {"path": "../utils"}, {"path": "../testing"}], "include": ["src/**/*"] diff --git a/packages/node/package.json b/packages/node/package.json index 0ca22b2caf..1165cbd2e8 100644 --- a/packages/node/package.json +++ b/packages/node/package.json @@ -48,7 +48,6 @@ "rxjs": "^7.5.2", "sequelize": "6.28.0", "tar": "^6.1.11", - "typescript": "^4.4.4", "vm2": "^3.9.9", "yargs": "^16.2.0" }, diff --git a/packages/node/src/configure/SubqueryProject.ts b/packages/node/src/configure/SubqueryProject.ts index 70d819268b..e411e13173 100644 --- a/packages/node/src/configure/SubqueryProject.ts +++ b/packages/node/src/configure/SubqueryProject.ts @@ -18,7 +18,6 @@ import { ProjectManifestV0_2_1Impl, ProjectManifestV0_3_0Impl, SubstrateDataSource, - FileType, ProjectManifestV1_0_0Impl, SubstrateBlockFilter, isRuntimeDs, @@ -53,11 +52,14 @@ const NOT_SUPPORT = (name: string) => { throw new Error(`Manifest specVersion ${name}() is not supported`); }; +// This is the runtime type after we have mapped genesisHash to chainId and endpoint/dict have been provided when dealing with deployments +type NetworkConfig = SubstrateProjectNetworkConfig & { chainId: string }; + @Injectable() export class SubqueryProject { id: string; root: string; - network: Partial; + network: NetworkConfig; dataSources: SubqlProjectDs[]; schema: GraphQLSchema; templates: SubqlProjectDsTemplate[]; @@ -106,14 +108,7 @@ export class SubqueryProject { } } -export interface SubqueryProjectNetwork { - chainId: string; - endpoint?: string[]; - dictionary?: string; - chaintypes?: FileType; -} - -function processChainId(network: any): SubqueryProjectNetwork { +function processChainId(network: any): NetworkConfig { if (network.chainId && network.genesisHash) { throw new Error('Please only provide one of chainId and genesisHash'); } else if (network.genesisHash && !network.chainId) { @@ -260,7 +255,7 @@ export async function generateTimestampReferenceForBlockFilters( if (isRuntimeDs(ds)) { const startBlock = ds.startBlock ?? 1; let block; - let timestampReference; + let timestampReference: Date; ds.mapping.handlers = await Promise.all( ds.mapping.handlers.map(async (handler) => { diff --git a/packages/node/src/indexer/api.service.spec.ts b/packages/node/src/indexer/api.service.spec.ts index ddcf299009..5938b12bcc 100644 --- a/packages/node/src/indexer/api.service.spec.ts +++ b/packages/node/src/indexer/api.service.spec.ts @@ -59,7 +59,9 @@ function testSubqueryProject(): SubqueryProject { return { network: { endpoint: testNetwork.endpoint, - genesisHash: + // genesisHash: + // '0xb0a8d493285c2df73290dfb7e61f870f17b41801197a149ca93654499ea3dafe', + chainId: '0xb0a8d493285c2df73290dfb7e61f870f17b41801197a149ca93654499ea3dafe', }, chainTypes: { diff --git a/packages/node/src/indexer/api.service.test.ts b/packages/node/src/indexer/api.service.test.ts index e15b265939..c0ea2d5504 100644 --- a/packages/node/src/indexer/api.service.test.ts +++ b/packages/node/src/indexer/api.service.test.ts @@ -25,6 +25,7 @@ function testSubqueryProject(endpoint: string[]): SubqueryProject { network: { endpoint, dictionary: `https://api.subquery.network/sq/subquery/dictionary-polkadot`, + chainId: '', }, dataSources: [], id: 'test', diff --git a/packages/node/src/indexer/api.service.ts b/packages/node/src/indexer/api.service.ts index d273b3f6ba..0fa3684ef7 100644 --- a/packages/node/src/indexer/api.service.ts +++ b/packages/node/src/indexer/api.service.ts @@ -34,7 +34,7 @@ const logger = getLogger('api'); @Injectable() export class ApiService - extends BaseApiService + extends BaseApiService implements OnApplicationShutdown { private fetchBlocksBatches = SubstrateUtil.fetchBlocksBatches; @@ -43,7 +43,7 @@ export class ApiService networkMeta: NetworkMetadataPayload; constructor( - @Inject('ISubqueryProject') protected project: SubqueryProject, + @Inject('ISubqueryProject') project: SubqueryProject, private connectionPoolService: ConnectionPoolService, private eventEmitter: EventEmitter2, private nodeConfig: NodeConfig, diff --git a/packages/node/src/indexer/blockDispatcher/block-dispatcher.service.ts b/packages/node/src/indexer/blockDispatcher/block-dispatcher.service.ts index 5520b4fea4..d543df42f6 100644 --- a/packages/node/src/indexer/blockDispatcher/block-dispatcher.service.ts +++ b/packages/node/src/indexer/blockDispatcher/block-dispatcher.service.ts @@ -46,21 +46,6 @@ export class BlockDispatcherService @Inject('ISubqueryProject') project: SubqueryProject, dynamicDsService: DynamicDsService, ) { - const fetchBlockBatchesWrapped = async ( - blockNums: number[], - ): Promise => { - const specChanged = await this.runtimeService.specChanged( - blockNums[blockNums.length - 1], - ); - - // If specVersion not changed, a known overallSpecVer will be pass in - // Otherwise use api to fetch runtimes - return this.apiService.fetchBlocks( - blockNums, - specChanged ? undefined : this.runtimeService.parentSpecVersion, - ); - }; - super( nodeConfig, eventEmitter, @@ -71,7 +56,18 @@ export class BlockDispatcherService poiService, project, dynamicDsService, - fetchBlockBatchesWrapped, + async (blockNums: number[]): Promise => { + const specChanged = await this.runtimeService.specChanged( + blockNums[blockNums.length - 1], + ); + + // If specVersion not changed, a known overallSpecVer will be pass in + // Otherwise use api to fetch runtimes + return this.apiService.fetchBlocks( + blockNums, + specChanged ? undefined : this.runtimeService.parentSpecVersion, + ); + }, ); } diff --git a/packages/node/src/indexer/dictionary.service.test.ts b/packages/node/src/indexer/dictionary.service.test.ts index 4c8b19b061..441668deb5 100644 --- a/packages/node/src/indexer/dictionary.service.test.ts +++ b/packages/node/src/indexer/dictionary.service.test.ts @@ -9,7 +9,9 @@ import { DictionaryService } from './dictionary.service'; function testSubqueryProject(): SubqueryProject { return { network: { + endpoint: '', dictionary: `https://api.subquery.network/sq/subquery/polkadot-dictionary`, + chainId: '', }, dataSources: [], id: 'test', diff --git a/packages/node/src/indexer/ds-processor.service.spec.ts b/packages/node/src/indexer/ds-processor.service.spec.ts index 3d52f947c9..9ab2f79c90 100644 --- a/packages/node/src/indexer/ds-processor.service.spec.ts +++ b/packages/node/src/indexer/ds-processor.service.spec.ts @@ -14,7 +14,7 @@ function getTestProject( ): SubqueryProject { return { network: { - genesisHash: '0x', + chainId: '0x', endpoint: ['wss://polkadot.api.onfinality.io/public-ws'], }, dataSources: [ diff --git a/packages/node/src/indexer/fetch.module.ts b/packages/node/src/indexer/fetch.module.ts index 878da44053..b7a77c34fc 100644 --- a/packages/node/src/indexer/fetch.module.ts +++ b/packages/node/src/indexer/fetch.module.ts @@ -13,9 +13,7 @@ import { SmartBatchService, StoreCacheService, } from '@subql/node-core'; -import { Sequelize } from 'sequelize'; - -import { SubqlProjectDs, SubqueryProject } from '../configure/SubqueryProject'; +import { SubqueryProject } from '../configure/SubqueryProject'; import { ApiService } from './api.service'; import { BlockDispatcherService, diff --git a/packages/node/src/indexer/fetch.service.spec.ts b/packages/node/src/indexer/fetch.service.spec.ts index 9a78da2420..381dd7a57f 100644 --- a/packages/node/src/indexer/fetch.service.spec.ts +++ b/packages/node/src/indexer/fetch.service.spec.ts @@ -287,6 +287,7 @@ function mockDictionaryService3(): DictionaryService { function testSubqueryProject(): SubqueryProject { return { network: { + chainId: '0x', endpoint: ['wss://polkadot.api.onfinality.io/public-ws'], }, chainTypes: { @@ -305,7 +306,8 @@ function testSubqueryProject(): SubqueryProject { function testSubqueryProjectV0_2_0(): SubqueryProject { return { network: { - genesisHash: '0x', + chainId: '0x', + endpoint: [], dictionary: `https://api.subquery.network/sq/subquery/dictionary-polkadot`, }, dataSources: [ @@ -353,7 +355,7 @@ function mockStoreService(): StoreService { getOperationMerkleRoot: () => { return null; }, - } as StoreService; + } as unknown as StoreService; } function mockStoreCache(): StoreCacheService { diff --git a/packages/node/src/indexer/fetch.service.test.ts b/packages/node/src/indexer/fetch.service.test.ts index fe161a169f..c565f634c7 100644 --- a/packages/node/src/indexer/fetch.service.test.ts +++ b/packages/node/src/indexer/fetch.service.test.ts @@ -43,6 +43,7 @@ const HTTP_ENDPOINT = 'https://polkadot.api.onfinality.io/public'; function testSubqueryProject(): SubqueryProject { return { network: { + chainId: '0x', endpoint: [WS_ENDPOINT], }, chainTypes: { diff --git a/packages/node/src/indexer/fetch.service.ts b/packages/node/src/indexer/fetch.service.ts index ba04c33953..3589830df1 100644 --- a/packages/node/src/indexer/fetch.service.ts +++ b/packages/node/src/indexer/fetch.service.ts @@ -278,9 +278,6 @@ export class FetchService implements OnApplicationShutdown { ); if (dictionaryValid) { - this.dictionaryService.setDictionaryStartHeight( - metadata?._metadata?.startHeight, - ); const rawSpecVersions = await this.dictionaryService.getSpecVersionsRaw(); this.runtimeService.setSpecVersionMap(rawSpecVersions); } else { @@ -420,7 +417,10 @@ export class FetchService implements OnApplicationShutdown { : initBlockHeight; }; - if (this.dictionaryService.startHeight > getStartBlockHeight()) { + if ( + this.useDictionary && + this.dictionaryService.startHeight > getStartBlockHeight() + ) { logger.warn( `Dictionary start height ${ this.dictionaryService.startHeight diff --git a/packages/node/src/indexer/indexer.manager.spec.ts b/packages/node/src/indexer/indexer.manager.spec.ts index ef02e756a1..c8c9bacbc3 100644 --- a/packages/node/src/indexer/indexer.manager.spec.ts +++ b/packages/node/src/indexer/indexer.manager.spec.ts @@ -63,6 +63,7 @@ const nodeConfig = new NodeConfig({ function testSubqueryProject_1(): SubqueryProject { return { network: { + chainId: '0x', endpoint: ['wss://polkadot.api.onfinality.io/public-ws'], }, dataSources: [ @@ -103,7 +104,7 @@ function testSubqueryProject_2(): SubqueryProject { network: { endpoint: ['wss://polkadot.api.onfinality.io/public-ws'], dictionary: `https://api.subquery.network/sq/subquery/dictionary-polkadot`, - genesisHash: + chainId: '0x91b171bb158e2d3848fa23a9f1c25182fb8e20313b2c1eb49219da7a70ce90c3', }, dataSources: [ @@ -144,7 +145,12 @@ function createIndexerManager( const dynamicDsService = new DynamicDsService(dsProcessorService, project); const storeCache = new StoreCacheService(sequilize, nodeConfig, eventEmitter); - const storeService = new StoreService(sequilize, nodeConfig, storeCache); + const storeService = new StoreService( + sequilize, + nodeConfig, + storeCache, + project, + ); const poiService = new PoiService(storeCache); const mmrService = new MmrService(nodeConfig, storeCache); const unfinalizedBlocksService = new UnfinalizedBlocksService( diff --git a/packages/node/src/indexer/sandbox.service.ts b/packages/node/src/indexer/sandbox.service.ts index ce37256c85..e4607e8df6 100644 --- a/packages/node/src/indexer/sandbox.service.ts +++ b/packages/node/src/indexer/sandbox.service.ts @@ -43,7 +43,7 @@ export class SandboxService { // api: await this.apiService.getPatchedApi(), store, root: this.project.root, - script: ds.mapping.entryScript, + // script: ds.mapping.entryScript, entry, }, this.nodeConfig, diff --git a/packages/node/src/indexer/worker/worker.ts b/packages/node/src/indexer/worker/worker.ts index b38d46c29d..0800fdca5d 100644 --- a/packages/node/src/indexer/worker/worker.ts +++ b/packages/node/src/indexer/worker/worker.ts @@ -58,7 +58,7 @@ async function initWorker(): Promise { } app = await NestFactory.create(WorkerModule, { - logger: new NestLogger(), // TIP: If the worker is crashing comment out this line for better logging + // logger: new NestLogger(), // TIP: If the worker is crashing comment out this line for better logging }); await app.init(); diff --git a/packages/node/src/utils/reindex.ts b/packages/node/src/utils/reindex.ts index f997b71e67..d69ecb04ce 100644 --- a/packages/node/src/utils/reindex.ts +++ b/packages/node/src/utils/reindex.ts @@ -21,10 +21,6 @@ export async function reindex( sequelize: Sequelize, forceCleanService?: ForceCleanService, ): Promise { - if (!storeService.historical) { - logger.warn('Unable to reindex, historical state not enabled'); - return; - } if (!lastProcessedHeight || lastProcessedHeight < targetBlockHeight) { logger.warn( `Skipping reindexing to block ${targetBlockHeight}: current indexing height ${lastProcessedHeight} is behind requested block`, diff --git a/packages/node/test/projectFixture/v0.2.0/package.json b/packages/node/test/projectFixture/v0.2.0/package.json index 9f35fcf772..cd69eeff03 100644 --- a/packages/node/test/projectFixture/v0.2.0/package.json +++ b/packages/node/test/projectFixture/v0.2.0/package.json @@ -21,7 +21,7 @@ "devDependencies": { "@polkadot/api": "^5.7.1", "@subql/types": "latest", - "typescript": "^4.1.3", + "typescript": "^4.9.5", "@subql/cli": "latest" } } diff --git a/packages/node/test/projectFixture/v0.3.0/package.json b/packages/node/test/projectFixture/v0.3.0/package.json index 9f35fcf772..cd69eeff03 100644 --- a/packages/node/test/projectFixture/v0.3.0/package.json +++ b/packages/node/test/projectFixture/v0.3.0/package.json @@ -21,7 +21,7 @@ "devDependencies": { "@polkadot/api": "^5.7.1", "@subql/types": "latest", - "typescript": "^4.1.3", + "typescript": "^4.9.5", "@subql/cli": "latest" } } diff --git a/packages/node/test/projectFixture/v1.0.0/package.json b/packages/node/test/projectFixture/v1.0.0/package.json index 6e24d528c9..f99f6b7b35 100644 --- a/packages/node/test/projectFixture/v1.0.0/package.json +++ b/packages/node/test/projectFixture/v1.0.0/package.json @@ -21,7 +21,7 @@ "devDependencies": { "@polkadot/api": "^10", "@subql/types": "latest", - "typescript": "^4.1.3", + "typescript": "^4.9.5", "@subql/cli": "latest" }, "resolutions": { diff --git a/packages/query/package.json b/packages/query/package.json index 0ee3900204..36702737db 100644 --- a/packages/query/package.json +++ b/packages/query/package.json @@ -66,7 +66,6 @@ "@types/lodash": "^4.14.178", "@types/rimraf": "^3.0.2", "@types/yargs": "^16.0.4", - "nodemon": "^2.0.15", - "typescript": "^4.4.4" + "nodemon": "^2.0.15" } } diff --git a/packages/types/src/interfaces.ts b/packages/types/src/interfaces.ts index 5843023a99..7908dff0a1 100644 --- a/packages/types/src/interfaces.ts +++ b/packages/types/src/interfaces.ts @@ -17,9 +17,9 @@ export type FunctionPropertyNames = { }[keyof T]; export interface Store { - get(entity: string, id: string): Promise; + get(entity: string, id: string): Promise; getByField(entity: string, field: string, value: any, options?: {offset?: number; limit?: number}): Promise; - getOneByField(entity: string, field: string, value: any): Promise; + getOneByField(entity: string, field: string, value: any): Promise; set(entity: string, id: string, data: Entity): Promise; bulkCreate(entity: string, data: Entity[]): Promise; //if fields in provided, only specify fields will be updated diff --git a/packages/types/src/project.ts b/packages/types/src/project.ts index 5526cd82e5..1eb0d65e6c 100644 --- a/packages/types/src/project.ts +++ b/packages/types/src/project.ts @@ -100,7 +100,7 @@ export interface SubstrateNetworkFilter { specName?: string; } -export type SubstrateDatasource = SubstrateRuntimeDatasource | SubstrateCustomDatasource; // | SubstrateBuiltinDataSource; +export type SubstrateDatasource = SubstrateRuntimeDatasource | SubstrateCustomDatasource; export interface FileReference { file: string; diff --git a/packages/validator/fixtures/package.json b/packages/validator/fixtures/package.json index ad8f0d1561..cce3dcc936 100644 --- a/packages/validator/fixtures/package.json +++ b/packages/validator/fixtures/package.json @@ -20,7 +20,7 @@ "license": "Apache-2.0", "devDependencies": { "@subql/types": "latest", - "typescript": "^4.1.3", + "typescript": "^4.9.5", "@subql/cli": "^0.7.3" } } diff --git a/yarn.lock b/yarn.lock index c4050c470a..24e13eab91 100644 --- a/yarn.lock +++ b/yarn.lock @@ -4177,7 +4177,7 @@ __metadata: "@subql/utils": "workspace:*" "@subql/x-merkle-mountain-range": ^2.0.0-0.1.2 "@types/async-lock": ^1 - "@willsoto/nestjs-prometheus": ^4.4.0 + "@willsoto/nestjs-prometheus": ^5.1.1 async-lock: ^1.4.0 async-mutex: ^0.4.0 lodash: ^4.17.21 @@ -4237,7 +4237,6 @@ __metadata: sequelize: 6.28.0 supertest: ^6.2.2 tar: ^6.1.11 - typescript: ^4.4.4 vm2: ^3.9.9 yargs: ^16.2.0 bin: @@ -4284,7 +4283,6 @@ __metadata: rimraf: ^3.0.2 rxjs: ^7.5.2 subscriptions-transport-ws: ^0.11.0 - typescript: ^4.4.4 yargs: ^16.2.0 bin: subql-query: ./bin/run @@ -5633,6 +5631,16 @@ __metadata: languageName: node linkType: hard +"@willsoto/nestjs-prometheus@npm:^5.1.1": + version: 5.1.1 + resolution: "@willsoto/nestjs-prometheus@npm:5.1.1" + peerDependencies: + "@nestjs/common": ^7.0.0 || ^8.0.0 || ^9.0.0 + prom-client: ^13.0.0 || ^14.0.0 + checksum: fa60f20442605c4243bc21b829be80a53ac2a0d1b7c5ab5c816048970ae57486cbba38c9d9707c989608501b27aa9a50acb5046dca76d19900a5719e068f57ae + languageName: node + linkType: hard + "@wry/context@npm:^0.6.0": version: 0.6.1 resolution: "@wry/context@npm:0.6.1" @@ -16766,7 +16774,7 @@ __metadata: ts-loader: ^9.2.6 ts-node: ^10.4.0 tsconfig-paths: ^3.12.0 - typescript: ^4.4.4 + typescript: ^4.9.5 languageName: unknown linkType: soft @@ -17626,13 +17634,13 @@ __metadata: languageName: node linkType: hard -"typescript@npm:^4.4.4": - version: 4.6.4 - resolution: "typescript@npm:4.6.4" +"typescript@npm:^4.9.5": + version: 4.9.5 + resolution: "typescript@npm:4.9.5" bin: tsc: bin/tsc tsserver: bin/tsserver - checksum: e7bfcc39cd4571a63a54e5ea21f16b8445268b9900bf55aee0e02ad981be576acc140eba24f1af5e3c1457767c96cea6d12861768fb386cf3ffb34013718631a + checksum: ee000bc26848147ad423b581bd250075662a354d84f0e06eb76d3b892328d8d4440b7487b5a83e851b12b255f55d71835b008a66cbf8f255a11e4400159237db languageName: node linkType: hard @@ -17646,13 +17654,13 @@ __metadata: languageName: node linkType: hard -"typescript@patch:typescript@^4.4.4#~builtin": - version: 4.6.4 - resolution: "typescript@patch:typescript@npm%3A4.6.4#~builtin::version=4.6.4&hash=493e53" +"typescript@patch:typescript@^4.9.5#~builtin": + version: 4.9.5 + resolution: "typescript@patch:typescript@npm%3A4.9.5#~builtin::version=4.9.5&hash=493e53" bin: tsc: bin/tsc tsserver: bin/tsserver - checksum: 8cff08bf66d9ecfbf9fcc5edde04a5a7923e6cac3b21d99b4e9a06973bf5bd7f9a83ec7eed24129c1b9e13fd861de8c1070110d4b9ce9f18ab57c6999e9c9a6f + checksum: 2eee5c37cad4390385db5db5a8e81470e42e8f1401b0358d7390095d6f681b410f2c4a0c496c6ff9ebd775423c7785cdace7bcdad76c7bee283df3d9718c0f20 languageName: node linkType: hard