diff --git a/packages/cli/src/controller/migrate-controller.ts b/packages/cli/src/controller/migrate-controller.ts index 6951ecef1d..6cf8d44d8b 100644 --- a/packages/cli/src/controller/migrate-controller.ts +++ b/packages/cli/src/controller/migrate-controller.ts @@ -10,7 +10,7 @@ import { ChainTypes, loadSubstrateProjectManifest, } from '@subql/common-substrate'; -import {loadTerraProjectManifest, TerraProjectManifestVersioned} from '@subql/common-terra'; +import {loadTerraProjectManifest, TerraProjectManifestVersioned, TerraProjectNetworkV0_3_0} from '@subql/common-terra'; import {classToPlain} from 'class-transformer'; import {cli} from 'cli-ux'; import inquirer from 'inquirer'; @@ -63,7 +63,9 @@ export async function prepare( if (project.runner.node.name === SUBSTRATE_NODE_NAME) { cli.action.start('Getting network genesis hash from endpoint for Chain ID'); try { - genesisHash = await getGenesisHash(projectNetwork.endpoint); + genesisHash = await getGenesisHash( + typeof projectNetwork.endpoint === 'string' ? projectNetwork.endpoint : projectNetwork.endpoint[0] + ); } catch (e) { genesisHash = null; } diff --git a/packages/common-substrate/src/project/versioned/v0_0_1/model.ts b/packages/common-substrate/src/project/versioned/v0_0_1/model.ts index d89ba49a3d..bf2c9184ee 100644 --- a/packages/common-substrate/src/project/versioned/v0_0_1/model.ts +++ b/packages/common-substrate/src/project/versioned/v0_0_1/model.ts @@ -25,8 +25,8 @@ import {SubstrateProjectNetworkConfig} from '../../types'; import {ManifestV0_0_1Mapping, ProjectManifestV0_0_1, RuntimeDataSourceV0_0_1} from './types'; export class ProjectNetworkV0_0_1 extends ChainTypes implements SubstrateProjectNetworkConfig { - @IsString() - endpoint: string; + @IsString({each: true}) + endpoint: string[]; @IsString() @IsOptional() dictionary?: string; diff --git a/packages/common-substrate/src/project/versioned/v0_2_0/model.ts b/packages/common-substrate/src/project/versioned/v0_2_0/model.ts index 73e987a95e..825e6c0386 100644 --- a/packages/common-substrate/src/project/versioned/v0_2_0/model.ts +++ b/packages/common-substrate/src/project/versioned/v0_2_0/model.ts @@ -34,9 +34,9 @@ export class ProjectNetworkDeploymentV0_2_0 { } export class ProjectNetworkV0_2_0 extends ProjectNetworkDeploymentV0_2_0 { - @IsString() + @IsString({each: true}) @IsOptional() - endpoint?: string; + endpoint?: string | string[]; @IsString() @IsOptional() dictionary?: string; diff --git a/packages/common-substrate/src/project/versioned/v0_3_0/types.ts b/packages/common-substrate/src/project/versioned/v0_3_0/types.ts index 77a1ca4095..fde977d811 100644 --- a/packages/common-substrate/src/project/versioned/v0_3_0/types.ts +++ b/packages/common-substrate/src/project/versioned/v0_3_0/types.ts @@ -18,7 +18,7 @@ export interface SubstrateProjectManifestV0_3_0 extends ISubstrateProjectManifes network: { genesisHash: string; - endpoint?: string; + endpoint?: string | string[]; dictionary?: string; chaintypes?: { file: string; diff --git a/packages/common-substrate/src/project/versioned/v1_0_0/model.ts b/packages/common-substrate/src/project/versioned/v1_0_0/model.ts index 315a09e402..0936a32d60 100644 --- a/packages/common-substrate/src/project/versioned/v1_0_0/model.ts +++ b/packages/common-substrate/src/project/versioned/v1_0_0/model.ts @@ -74,9 +74,9 @@ export class ProjectNetworkDeploymentV1_0_0 { } export class ProjectNetworkV1_0_0 extends ProjectNetworkDeploymentV1_0_0 { - @IsString() + @IsString({each: true}) @IsOptional() - endpoint?: string; + endpoint?: string | string[]; @IsString() @IsOptional() dictionary?: string; diff --git a/packages/common/src/project/types.ts b/packages/common/src/project/types.ts index 00d7bb3c93..41c9d6e02b 100644 --- a/packages/common/src/project/types.ts +++ b/packages/common/src/project/types.ts @@ -11,7 +11,7 @@ export interface IProjectManifest { } export interface ProjectNetworkConfig { - endpoint: string; + endpoint: string | string[]; dictionary?: string; bypassBlocks?: (number | string)[]; //genesisHash?: string; diff --git a/packages/common/src/project/utils.ts b/packages/common/src/project/utils.ts index 5111d2e131..c9455304a8 100644 --- a/packages/common/src/project/utils.ts +++ b/packages/common/src/project/utils.ts @@ -59,7 +59,7 @@ export function validateSemver(current: string, required: string): boolean { @ValidatorConstraint({name: 'semver', async: false}) export class SemverVersionValidator implements ValidatorConstraintInterface { validate(value: string | null | undefined): boolean { - if (valid(value, {includePrerelease: false}) === null) { + if (valid(value) === null) { return validRange(value, {includePrerelease: false}) !== null; } else { return prerelease(value) === null; diff --git a/packages/common/src/project/versioned/v0_2_0/types.ts b/packages/common/src/project/versioned/v0_2_0/types.ts index e84b134d8f..0926fdd27b 100644 --- a/packages/common/src/project/versioned/v0_2_0/types.ts +++ b/packages/common/src/project/versioned/v0_2_0/types.ts @@ -11,7 +11,7 @@ export interface ProjectManifestV0_2_0 { }; network: { genesisHash: string; - endpoint?: string; + endpoint?: string | string[]; dictionary?: string; chaintypes?: { file: string; diff --git a/packages/common/src/project/versioned/v1_0_0/types.ts b/packages/common/src/project/versioned/v1_0_0/types.ts index 92b8728a5c..095fb46215 100644 --- a/packages/common/src/project/versioned/v1_0_0/types.ts +++ b/packages/common/src/project/versioned/v1_0_0/types.ts @@ -26,7 +26,7 @@ export interface ProjectManifestV1_0_0; + apiDisconnect(): Promise; +} + +@Injectable() +export class ConnectionPoolService implements OnApplicationShutdown { + private allApi: T[] = []; + private connectionPool: Record = {}; + private taskCounter = 0; + + async onApplicationShutdown(): Promise { + await Promise.all( + Object.keys(this.connectionPool)?.map((key) => this.connectionPool[toNumber(key)].apiDisconnect()) + ); + } + + addToConnections(api: T): void { + this.allApi.push(api); + this.connectionPool[this.allApi.length - 1] = api; + } + + addBatchToConnections(apis: T[]): void { + apis.forEach((api) => this.addToConnections(api)); + } + + async connectToApi(apiIndex: number): Promise { + await this.allApi[apiIndex].apiConnect(); + } + + get api(): T { + const index = this.getNextConnectedApiIndex(); + if (index === -1) { + throw new Error('No connected api'); + } + return this.connectionPool[index]; + } + + getNextConnectedApiIndex(): number { + if (Object.keys(this.connectionPool).length === 0) { + return -1; + } + const nextIndex = this.taskCounter % Object.keys(this.connectionPool).length; + this.taskCounter++; + return toNumber(Object.keys(this.connectionPool)[nextIndex]); + } + + get numConnections(): number { + return Object.keys(this.connectionPool).length; + } + + async handleApiDisconnects(apiIndex: number, endpoint: string): Promise { + logger.warn(`disconnected from ${endpoint}`); + delete this.connectionPool[apiIndex]; + + try { + logger.debug(`reconnecting to ${endpoint}...`); + await this.connectToApi(apiIndex); + } catch (e) { + logger.error(`unable to reconnect to endpoint ${endpoint}`, e); + return; + } + + logger.info(`reconnected to ${endpoint}!`); + this.connectionPool[apiIndex] = this.allApi[apiIndex]; + } +} diff --git a/packages/node-core/src/indexer/dictionary.service.test.ts b/packages/node-core/src/indexer/dictionary.service.test.ts index 457d27e8ae..849e5a19c7 100644 --- a/packages/node-core/src/indexer/dictionary.service.test.ts +++ b/packages/node-core/src/indexer/dictionary.service.test.ts @@ -105,7 +105,7 @@ const HAPPY_PATH_CONDITIONS: DictionaryQueryEntry[] = [ const nodeConfig = new NodeConfig({ subquery: 'asdf', subqueryName: 'asdf', - networkEndpoint: 'wss://polkadot.api.onfinality.io/public-ws', + networkEndpoint: ['wss://polkadot.api.onfinality.io/public-ws'], dictionaryTimeout: 10, }); diff --git a/packages/node-core/src/indexer/index.ts b/packages/node-core/src/indexer/index.ts index 008b63fe00..cb1a30fb18 100644 --- a/packages/node-core/src/indexer/index.ts +++ b/packages/node-core/src/indexer/index.ts @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 export * from './benchmark.service'; +export * from './connectionPool.service'; export * from './entities'; export * from './PoiBlock'; export * from './types'; @@ -12,4 +13,4 @@ export * from './mmr.service'; export * from './worker'; export * from './dictionary.service'; export * from './sandbox'; -export * from './smartBatch.service'; \ No newline at end of file +export * from './smartBatch.service'; diff --git a/packages/node-core/src/utils/object.ts b/packages/node-core/src/utils/object.ts index 579e39dedf..127aabc10a 100644 --- a/packages/node-core/src/utils/object.ts +++ b/packages/node-core/src/utils/object.ts @@ -20,3 +20,14 @@ export function camelCaseObjectKey(object: Record): object { {} ); } + +export function splitArrayByRatio(arr: number[], weights: number[]): number[][] { + const result: number[][] = []; + let start = 0; + for (let i = 0; i < weights.length; i++) { + const end = Math.floor(arr.length * weights[i]) + start; + result.push(arr.slice(start, end)); + start = end; + } + return result; +} diff --git a/packages/node/src/configure/SubqueryProject.spec.ts b/packages/node/src/configure/SubqueryProject.spec.ts index 8f23d1fbff..87641f9e9f 100644 --- a/packages/node/src/configure/SubqueryProject.spec.ts +++ b/packages/node/src/configure/SubqueryProject.spec.ts @@ -36,7 +36,7 @@ describe('SubqueryProject', () => { it('convert local 0.2.0 manifest to project object', async () => { //manually pass the endpoint const project = await SubqueryProject.create(projectDirV0_2_0, { - endpoint: 'wss://rpc.polkadot.io/public-ws', + endpoint: ['wss://rpc.polkadot.io/public-ws'], }); expect((project.dataSources[1] as any).processor.file).toMatch( @@ -47,7 +47,7 @@ describe('SubqueryProject', () => { it('convert local 0.3.0 manifest to project object', async () => { //manually pass the endpoint const project = await SubqueryProject.create(projectDirV0_3_0, { - endpoint: 'wss://rpc.polkadot.io/public-ws', + endpoint: ['wss://rpc.polkadot.io/public-ws'], }); expect((project.dataSources[1] as any).processor.file).toMatch( @@ -60,7 +60,7 @@ describe('SubqueryProject', () => { //manually pass the endpoint const project = await SubqueryProject.create( deployment, - { endpoint: 'wss://rpc.polkadot.io/public-ws' }, + { endpoint: ['wss://rpc.polkadot.io/public-ws'] }, { ipfs: 'http://127.0.0.1:8080' }, ); }, 5000000); @@ -77,7 +77,7 @@ describe('SubqueryProject', () => { }, }; const project = await SubqueryProject.create(projectDirV1_0_0, { - endpoint: 'wss://rpc.polkadot.io/public-ws', + endpoint: ['wss://rpc.polkadot.io/public-ws'], }); expect(project.runner).toMatchObject(expectedRunner); @@ -85,7 +85,7 @@ describe('SubqueryProject', () => { it('check processChainId', async () => { const project = await SubqueryProject.create(projectDirV1_0_0, { - endpoint: 'wss://rpc.polkadot.io/public-ws', + endpoint: ['wss://rpc.polkadot.io/public-ws'], }); expect(project.network.chainId).toMatch( '0x401a1f9dca3da46f5c4091016c8a2f26dcea05865116b286f60f668207d1474b', @@ -94,10 +94,10 @@ describe('SubqueryProject', () => { it('check loadProjectTemplates', async () => { const project = await SubqueryProject.create(templateProject, { - endpoint: 'wss://moonbeam-alpha.api.onfinality.io/public-ws', + endpoint: ['wss://moonbeam-alpha.api.onfinality.io/public-ws'], }); const project_v1 = await SubqueryProject.create(projectDirV1_0_0, { - endpoint: 'wss://rpc.polkadot.io/public-ws', + endpoint: ['wss://rpc.polkadot.io/public-ws'], }); expect(project_v1).not.toContain('template'); expect(project.templates.length).toBe(1); diff --git a/packages/node/src/configure/SubqueryProject.ts b/packages/node/src/configure/SubqueryProject.ts index f6e13f67dd..70d819268b 100644 --- a/packages/node/src/configure/SubqueryProject.ts +++ b/packages/node/src/configure/SubqueryProject.ts @@ -108,7 +108,7 @@ export class SubqueryProject { export interface SubqueryProjectNetwork { chainId: string; - endpoint?: string; + endpoint?: string[]; dictionary?: string; chaintypes?: FileType; } @@ -137,6 +137,10 @@ async function loadProjectFromManifestBase( ): Promise { const root = await getProjectRoot(reader); + if (typeof projectManifest.network.endpoint === 'string') { + projectManifest.network.endpoint = [projectManifest.network.endpoint]; + } + const network = processChainId({ ...projectManifest.network, ...networkOverrides, diff --git a/packages/node/src/configure/configure.module.ts b/packages/node/src/configure/configure.module.ts index 2df4d75492..8a8d2da8c5 100644 --- a/packages/node/src/configure/configure.module.ts +++ b/packages/node/src/configure/configure.module.ts @@ -108,7 +108,7 @@ export class ConfigureModule { config.subquery, omitBy( { - endpoint: config.networkEndpoint, + endpoint: config.networkEndpoints, dictionary: config.networkDictionary, }, isNil, @@ -170,7 +170,7 @@ export class ConfigureModule { argv.subquery, omitBy( { - endpoint: config.networkEndpoint, + endpoint: config.networkEndpoints, dictionary: config.networkDictionary, }, isNil, diff --git a/packages/node/src/indexer/api.service.spec.ts b/packages/node/src/indexer/api.service.spec.ts index 4360f7af54..a179c09f89 100644 --- a/packages/node/src/indexer/api.service.spec.ts +++ b/packages/node/src/indexer/api.service.spec.ts @@ -4,10 +4,12 @@ import { EventEmitter2 } from '@nestjs/event-emitter'; import { ApiPromise, WsProvider } from '@polkadot/api'; import { ProjectNetworkV0_0_1 } from '@subql/common-substrate'; +import { ConnectionPoolService, NodeConfig } from '@subql/node-core'; import { GraphQLSchema } from 'graphql'; import { omit } from 'lodash'; import { SubqueryProject } from '../configure/SubqueryProject'; import { ApiService } from './api.service'; +import { ApiPromiseConnection } from './apiPromise.connection'; jest.mock('@polkadot/api', () => { const ApiPromise = jest.fn(); @@ -23,7 +25,7 @@ jest.mock('@polkadot/api', () => { }); const testNetwork: ProjectNetworkV0_0_1 = { - endpoint: 'wss://kusama.api.onfinality.io/public-ws', + endpoint: ['wss://kusama.api.onfinality.io/public-ws'], types: { TestType: 'u32', }, @@ -46,6 +48,13 @@ const testNetwork: ProjectNetworkV0_0_1 = { typesSpec: { spec3: { TestType6: 'test' } }, }; +const nodeConfig = new NodeConfig({ + subquery: 'asdf', + subqueryName: 'asdf', + networkEndpoint: ['wss://polkadot.api.onfinality.io/public-ws'], + dictionaryTimeout: 10, +}); + function testSubqueryProject(): SubqueryProject { return { network: { @@ -71,7 +80,12 @@ function testSubqueryProject(): SubqueryProject { describe('ApiService', () => { it('read custom types from project manifest', async () => { const project = testSubqueryProject(); - const apiService = new ApiService(project, new EventEmitter2()); + const apiService = new ApiService( + project, + new ConnectionPoolService(), + new EventEmitter2(), + nodeConfig, + ); await apiService.init(); const { version } = require('../../package.json'); expect(WsProvider).toHaveBeenCalledWith(testNetwork.endpoint, 2500, { @@ -91,7 +105,12 @@ describe('ApiService', () => { // Now after manifest 1.0.0, will use chainId instead of genesisHash (project.network as any).chainId = '0x'; - const apiService = new ApiService(project, new EventEmitter2()); + const apiService = new ApiService( + project, + new ConnectionPoolService(), + new EventEmitter2(), + nodeConfig, + ); await expect(apiService.init()).rejects.toThrow(); }); diff --git a/packages/node/src/indexer/api.service.test.ts b/packages/node/src/indexer/api.service.test.ts index d1f9a20fe1..f357884052 100644 --- a/packages/node/src/indexer/api.service.test.ts +++ b/packages/node/src/indexer/api.service.test.ts @@ -20,7 +20,7 @@ const TEST_BLOCKHASH = const TEST_BLOCKNUMBER = 6721189; // kusama -function testSubqueryProject(endpoint: string): SubqueryProject { +function testSubqueryProject(endpoint: string[]): SubqueryProject { return { network: { endpoint, @@ -54,7 +54,7 @@ describe('ApiService', () => { providers: [ { provide: 'ISubqueryProject', - useFactory: () => testSubqueryProject(endpoint), + useFactory: () => testSubqueryProject([endpoint]), }, ApiService, ], @@ -70,7 +70,7 @@ describe('ApiService', () => { it('can instantiate api', async () => { const apiService = await prepareApiService(); - const api = apiService.getApi(); + const api = apiService.api; const apiAt = await api.at(TEST_BLOCKHASH); apiAt.registry; @@ -82,7 +82,7 @@ describe('ApiService', () => { it('api query is locked at specified block', async () => { const apiService = await prepareApiService(); - const api = apiService.getApi(); + const api = apiService.api; const blockhash = await api.rpc.chain.getBlockHash(2); const validators = await api.query.session.validators.at(blockhash); const block = await api.rpc.chain.getBlock(blockhash); @@ -102,7 +102,7 @@ describe('ApiService', () => { it('api query input is double map', async () => { const apiService = await prepareApiService(); - const api = apiService.getApi(); + const api = apiService.api; const blockhash = await api.rpc.chain.getBlockHash(6721189); const block = await api.rpc.chain.getBlock(blockhash); const mockBlock = wrapBlock(block, []) as unknown as SubstrateBlock; @@ -128,7 +128,7 @@ describe('ApiService', () => { const apiService = await prepareApiService( 'wss://polkadot.api.onfinality.io/public-ws', ); - const api = apiService.getApi(); + const api = apiService.api; const blockhash = await api.rpc.chain.getBlockHash(6721195); const block = await api.rpc.chain.getBlock(blockhash); const mockBlock = wrapBlock(block, []) as unknown as SubstrateBlock; @@ -157,7 +157,7 @@ describe('ApiService', () => { const apiService = await prepareApiService( 'wss://polkadot.api.onfinality.io/public-ws', ); - const api = apiService.getApi(); + const api = apiService.api; const blockhash = await api.rpc.chain.getBlockHash(5661443); const block = await api.rpc.chain.getBlock(blockhash); const mockBlock = wrapBlock(block, []) as unknown as SubstrateBlock; @@ -178,7 +178,7 @@ describe('ApiService', () => { it.skip('api consts is swapped to the specified block', async () => { const apiService = await prepareApiService(); - const api = apiService.getApi(); + const api = apiService.api; // upgrade at 4401242 that maxNominatorRewardedPerValidator changed from 256 to 128 let blockhash: BlockHash; const currentMaxNRPV = @@ -203,7 +203,7 @@ describe('ApiService', () => { // it('.tx.*.*, .derive.*.* are removed', async () => { // const apiService = await prepareApiService(); - // const api = apiService.getApi(); + // const api = apiService.api; // const multiResults = await Promise.all([ // await api.query.system.account.at(TEST_BLOCKHASH, account1), // await api.query.system.account.at(TEST_BLOCKHASH, account2), @@ -227,7 +227,7 @@ describe('ApiService', () => { // // it.skip('xxx.xxx.multi with input parameter is a double map', async () => { // const apiService = await prepareApiService(); - // const api = apiService.getApi(); + // const api = apiService.api; // const patchedApi = await apiService.getPatchedApi( // TEST_BLOCKHASH, // TEST_BLOCKNUMBER, @@ -254,7 +254,7 @@ describe('ApiService', () => { // it('api.queryMulti', async () => { // const account = 'E7ncQKp4xayUoUdpraxBjT7NzLoayLJA4TuPcKKboBkJ5GH'; // const apiService = await prepareApiService(); - // const api = apiService.getApi(); + // const api = apiService.api; // const patchedApi = await apiService.getPatchedApi( // TEST_BLOCKHASH, // TEST_BLOCKNUMBER, @@ -288,7 +288,7 @@ describe('ApiService', () => { // // it.skip('api.rx.queryMulti is not supported', async () => { // const apiService = await prepareApiService(); - // const api = apiService.getApi(); + // const api = apiService.api; // const patchedApi = await apiService.getPatchedApi( // TEST_BLOCKHASH, // TEST_BLOCKNUMBER, @@ -303,7 +303,7 @@ describe('ApiService', () => { // // it('support .entries', async () => { // const apiService = await prepareApiService(); - // const api = apiService.getApi(); + // const api = apiService.api; // const patchedApi = await apiService.getPatchedApi( // TEST_BLOCKHASH, // TEST_BLOCKNUMBER, @@ -318,7 +318,7 @@ describe('ApiService', () => { // // it('support historic api rpc', async () => { // const apiService = await prepareApiService(); - // const api = apiService.getApi(); + // const api = apiService.api; // // const blockhash = await api.rpc.chain.getBlockHash(4401242); // const patchedApi = await apiService.getPatchedApi(blockhash, 4401242); @@ -337,7 +337,7 @@ describe('ApiService', () => { // // it('successful set block hash when continuous call api.xxx.xxx.at ', async () => { // const apiService = await prepareApiService(); - // const api = apiService.getApi(); + // const api = apiService.api; // // const blockhash1 = await api.rpc.chain.getBlockHash(1378036); // let patchedApi = await apiService.getPatchedApi(blockhash1, 1378036); @@ -359,7 +359,7 @@ describe('ApiService', () => { // it('support http provider', async () => { const apiService = await prepareApiService(HTTP_ENDPOINT); - const api = apiService.getApi(); + const api = apiService.api; const blockhash = await api.rpc.chain.getBlockHash(1); const block = await api.rpc.chain.getBlock(blockhash); const mockBlock = wrapBlock(block, []) as unknown as SubstrateBlock; @@ -377,7 +377,7 @@ describe('ApiService', () => { const apiService = await prepareApiService( 'wss://moonbeam-alpha.api.onfinality.io/public-ws', ); - const api = apiService.getApi(); + const api = apiService.api; const blockNumber = 1545235; const blockhash = await api.rpc.chain.getBlockHash(blockNumber); diff --git a/packages/node/src/indexer/api.service.ts b/packages/node/src/indexer/api.service.ts index 341e2bff9d..e435e5f4d3 100644 --- a/packages/node/src/indexer/api.service.ts +++ b/packages/node/src/indexer/api.service.ts @@ -3,48 +3,62 @@ import { Inject, Injectable, OnApplicationShutdown } from '@nestjs/common'; import { EventEmitter2 } from '@nestjs/event-emitter'; -import { ApiPromise, WsProvider } from '@polkadot/api'; -import { ApiOptions, RpcMethodResult } from '@polkadot/api/types'; +import { ApiPromise } from '@polkadot/api'; +import { RpcMethodResult } from '@polkadot/api/types'; import { RuntimeVersion } from '@polkadot/types/interfaces'; import { AnyFunction, DefinitionRpcExt } from '@polkadot/types/types'; import { IndexerEvent, NetworkMetadataPayload, getLogger, + NodeConfig, + profilerWrap, + ConnectionPoolService, } from '@subql/node-core'; import { SubstrateBlock } from '@subql/types'; import { SubqueryProject } from '../configure/SubqueryProject'; -import { ApiAt } from './types'; -import { HttpProvider } from './x-provider/http'; - -// eslint-disable-next-line @typescript-eslint/no-var-requires -const { version: packageVersion } = require('../../package.json'); +import * as SubstrateUtil from '../utils/substrate'; +import { ApiPromiseConnection } from './apiPromise.connection'; +import { ApiAt, BlockContent } from './types'; const NOT_SUPPORT = (name: string) => () => { throw new Error(`${name}() is not supported`); }; // https://github.com/polkadot-js/api/blob/12750bc83d8d7f01957896a80a7ba948ba3690b7/packages/rpc-provider/src/ws/index.ts#L43 -const RETRY_DELAY = 2_500; +const MAX_RECONNECT_ATTEMPTS = 5; const TIMEOUT = 90 * 1000; const logger = getLogger('api'); @Injectable() export class ApiService implements OnApplicationShutdown { - private api: ApiPromise; + private fetchBlocksBatches = SubstrateUtil.fetchBlocksBatches; private currentBlockHash: string; private currentBlockNumber: number; - private apiOption: ApiOptions; networkMeta: NetworkMetadataPayload; constructor( @Inject('ISubqueryProject') protected project: SubqueryProject, + private connectionPoolService: ConnectionPoolService, private eventEmitter: EventEmitter2, + private nodeConfig: NodeConfig, ) {} async onApplicationShutdown(): Promise { - await Promise.all([this.api?.disconnect()]); + await this.connectionPoolService.onApplicationShutdown(); + } + + private metadataMismatchError( + metadata: string, + expected: string, + actual: string, + ): Error { + return Error( + `Value of ${metadata} does not match across all endpoints\n + Expected: ${expected} + Actual: ${actual}`, + ); } async init(): Promise { @@ -57,63 +71,89 @@ export class ApiService implements OnApplicationShutdown { process.exit(1); } - let provider: WsProvider | HttpProvider; - let throwOnConnect = false; - - const headers = { - 'User-Agent': `SubQuery-Node ${packageVersion}`, - }; - if (network.endpoint.startsWith('ws')) { - provider = new WsProvider( - network.endpoint, - RETRY_DELAY, - headers, - TIMEOUT, + if (this.nodeConfig?.profiler) { + this.fetchBlocksBatches = profilerWrap( + SubstrateUtil.fetchBlocksBatches, + 'SubstrateUtil', + 'fetchBlocksBatches', ); - } else if (network.endpoint.startsWith('http')) { - provider = new HttpProvider(network.endpoint, headers); - throwOnConnect = true; } - this.apiOption = { - provider, - throwOnConnect, - noInitWarn: true, - ...chainTypes, - }; - this.api = await ApiPromise.create(this.apiOption); - - this.eventEmitter.emit(IndexerEvent.ApiConnected, { value: 1 }); - this.api.on('connected', () => { - this.eventEmitter.emit(IndexerEvent.ApiConnected, { value: 1 }); - }); - this.api.on('disconnected', () => { - this.eventEmitter.emit(IndexerEvent.ApiConnected, { value: 0 }); - }); - - this.networkMeta = { - chain: this.api.runtimeChain.toString(), - specName: this.api.runtimeVersion.specName.toString(), - genesisHash: this.api.genesisHash.toString(), - }; - - if (network.chainId && network.chainId !== this.networkMeta.genesisHash) { - const err = new Error( - `Network chainId doesn't match expected genesisHash. Your SubQuery project is expecting to index data from "${ - network.chainId ?? network.genesisHash - }", however the endpoint that you are connecting to is different("${ - this.networkMeta.genesisHash - }). Please check that the RPC endpoint is actually for your desired network or update the genesisHash.`, - ); - logger.error(err, err.message); - throw err; - } + const connections: ApiPromiseConnection[] = []; + + await Promise.all( + network.endpoint.map(async (endpoint, i) => { + const connection = await ApiPromiseConnection.create(endpoint, { + chainTypes, + }); + const api = connection.api; + + this.eventEmitter.emit(IndexerEvent.ApiConnected, { + value: 1, + apiIndex: i, + endpoint: endpoint, + }); + + api.on('connected', () => { + this.eventEmitter.emit(IndexerEvent.ApiConnected, { + value: 1, + apiIndex: i, + endpoint: endpoint, + }); + }); + api.on('disconnected', () => { + this.eventEmitter.emit(IndexerEvent.ApiConnected, { + value: 0, + apiIndex: i, + endpoint: endpoint, + }); + this.connectionPoolService.handleApiDisconnects(i, endpoint); + }); + + if (!this.networkMeta) { + this.networkMeta = { + chain: api.runtimeChain.toString(), + specName: api.runtimeVersion.specName.toString(), + genesisHash: api.genesisHash.toString(), + }; + + if ( + network.chainId && + network.chainId !== this.networkMeta.genesisHash + ) { + const err = new Error( + `Network chainId doesn't match expected genesisHash. Your SubQuery project is expecting to index data from "${ + network.chainId ?? network.genesisHash + }", however the endpoint that you are connecting to is different("${ + this.networkMeta.genesisHash + }). Please check that the RPC endpoint is actually for your desired network or update the genesisHash.`, + ); + logger.error(err, err.message); + throw err; + } + } else { + const genesisHash = api.genesisHash.toString(); + if (this.networkMeta.genesisHash !== genesisHash) { + throw this.metadataMismatchError( + 'Genesis Hash', + this.networkMeta.genesisHash, + genesisHash, + ); + } + } + + logger.info(`Connected to ${endpoint} successfully`); + + connections.push(connection); + }), + ); + this.connectionPoolService.addBatchToConnections(connections); return this; } - getApi(): ApiPromise { - return this.api; + get api(): ApiPromise { + return this.connectionPoolService.api.api; } async getPatchedApi( @@ -123,11 +163,12 @@ export class ApiService implements OnApplicationShutdown { this.currentBlockHash = block.block.hash.toString(); this.currentBlockNumber = block.block.header.number.toNumber(); - const apiAt = (await this.api.at( + const api = this.api; + const apiAt = (await api.at( this.currentBlockHash, runtimeVersion, )) as ApiAt; - this.patchApiRpc(this.api, apiAt); + this.patchApiRpc(api, apiAt); return apiAt; } @@ -211,4 +252,55 @@ export class ApiService implements OnApplicationShutdown { return `api.rpc.${ext?.section ?? '*'}.${ext?.method ?? '*'}`; } + + private async fetchBlocksFromFirstAvailableEndpoint( + batch: number[], + overallSpecVer?: number, + ): Promise { + let reconnectAttempts = 0; + while (reconnectAttempts < MAX_RECONNECT_ATTEMPTS) { + try { + const blocks = await this.fetchBlocksBatches( + this.api, + batch, + overallSpecVer, + ); + return blocks; + } catch (e) { + logger.error(e, 'Failed to fetch blocks'); + reconnectAttempts++; + } + } + throw new Error( + `Maximum number of retries (${MAX_RECONNECT_ATTEMPTS}) reached.`, + ); + } + + async fetchBlocks( + blockNums: number[], + overallSpecVer?: number, + ): Promise { + const api = this.api; + try { + const blocks = await this.fetchBlocksBatches( + api, + blockNums, + overallSpecVer, + ); + return blocks; + } catch (e) { + logger.error( + e, + `Failed to fetch blocks ${blockNums[0]}...${ + blockNums[blockNums.length - 1] + }`, + ); + + const blocks = await this.fetchBlocksFromFirstAvailableEndpoint( + blockNums, + overallSpecVer, + ); + return blocks; + } + } } diff --git a/packages/node/src/indexer/apiPromise.connection.ts b/packages/node/src/indexer/apiPromise.connection.ts index 7e90950beb..1552570a9c 100644 --- a/packages/node/src/indexer/apiPromise.connection.ts +++ b/packages/node/src/indexer/apiPromise.connection.ts @@ -1,11 +1,58 @@ // Copyright 2020-2022 OnFinality Limited authors & contributors // SPDX-License-Identifier: Apache-2.0 -import { getLogger } from '@subql/node-core'; +import { ApiPromise, WsProvider } from '@polkadot/api'; +import { RegisteredTypes } from '@polkadot/types/types'; +import { ApiConnection } from '@subql/node-core'; +import { HttpProvider } from './x-provider/http'; -const logger = getLogger('connection'); +// eslint-disable-next-line @typescript-eslint/no-var-requires +const { version: packageVersion } = require('../../package.json'); + +const RETRY_DELAY = 2_500; + +export class ApiPromiseConnection implements ApiConnection { + constructor(private _api: ApiPromise) {} + + static async create( + endpoint: string, + args: { chainTypes: RegisteredTypes }, + ): Promise { + let provider: WsProvider | HttpProvider; + let throwOnConnect = false; + + const headers = { + 'User-Agent': `SubQuery-Node ${packageVersion}`, + }; + + if (endpoint.startsWith('ws')) { + provider = new WsProvider(endpoint, RETRY_DELAY, headers); + } else if (endpoint.startsWith('http')) { + provider = new HttpProvider(endpoint, headers); + throwOnConnect = true; + } + + const apiOption = { + provider, + throwOnConnect, + noInitWarn: true, + ...args.chainTypes, + }; + const api = await ApiPromise.create(apiOption); + return new ApiPromiseConnection(api); + } + + get api(): ApiPromise { + return this._api; + } + + async apiConnect(): Promise { + await this._api.connect(); + } + async apiDisconnect(): Promise { + await this._api.disconnect(); + } -export class ApiPromiseConnection { static handleError(e: Error): Error { let formatted_error: Error; if (e.message.startsWith(`No response received from RPC endpoint in`)) { diff --git a/packages/node/src/indexer/blockDispatcher/base-block-dispatcher.ts b/packages/node/src/indexer/blockDispatcher/base-block-dispatcher.ts index b50775a367..7aa114f1d4 100644 --- a/packages/node/src/indexer/blockDispatcher/base-block-dispatcher.ts +++ b/packages/node/src/indexer/blockDispatcher/base-block-dispatcher.ts @@ -4,7 +4,13 @@ import assert from 'assert'; import { EventEmitter2 } from '@nestjs/event-emitter'; import { hexToU8a, u8aEq } from '@polkadot/util'; -import { getLogger, IndexerEvent, IQueue, NodeConfig, SmartBatchService } from '@subql/node-core'; +import { + getLogger, + IndexerEvent, + IQueue, + NodeConfig, + SmartBatchService, +} from '@subql/node-core'; import { ProjectService } from '../project.service'; import { RuntimeService } from '../runtime/runtimeService'; diff --git a/packages/node/src/indexer/blockDispatcher/block-dispatcher.service.ts b/packages/node/src/indexer/blockDispatcher/block-dispatcher.service.ts index a55aa13866..f1a2630e03 100644 --- a/packages/node/src/indexer/blockDispatcher/block-dispatcher.service.ts +++ b/packages/node/src/indexer/blockDispatcher/block-dispatcher.service.ts @@ -9,7 +9,6 @@ import { NodeConfig, IndexerEvent, delay, - profilerWrap, AutoQueue, Queue, waitForBatchSize, @@ -17,11 +16,11 @@ import { SmartBatchService, } from '@subql/node-core'; import { last } from 'lodash'; -import * as SubstrateUtil from '../../utils/substrate'; import { ApiService } from '../api.service'; import { IndexerManager } from '../indexer.manager'; import { ProjectService } from '../project.service'; import { RuntimeService } from '../runtime/runtimeService'; +import { BlockContent } from '../types'; import { BaseBlockDispatcher } from './base-block-dispatcher'; const logger = getLogger('BlockDispatcherService'); @@ -39,7 +38,6 @@ export class BlockDispatcherService private fetching = false; private isShutdown = false; // private getRuntimeVersion: GetRuntimeVersion; - private fetchBlocksBatches = SubstrateUtil.fetchBlocksBatches; constructor( private apiService: ApiService, @@ -57,13 +55,6 @@ export class BlockDispatcherService smartBatchService, ); this.processQueue = new AutoQueue(nodeConfig.batchSize * 3); - if (this.nodeConfig.profiler) { - this.fetchBlocksBatches = profilerWrap( - SubstrateUtil.fetchBlocksBatches, - 'SubstrateUtil', - 'fetchBlocksBatches', - ); - } } // eslint-disable-next-line @typescript-eslint/require-await @@ -168,8 +159,7 @@ export class BlockDispatcherService await memoryLock.waitForUnlock(); } - const blocks = await this.fetchBlocksBatches( - this.apiService.getApi(), + const blocks = await this.apiService.fetchBlocks( blockNums, specChanged ? undefined : this.runtimeService.parentSpecVersion, ); @@ -182,6 +172,7 @@ export class BlockDispatcherService bufferedHeight > this._latestBufferedHeight || this.queue.peek() < Math.min(...blockNums) ) { + logger.info(`${this.queue.peek()} - ${Math.min(...blockNums)}`); logger.info(`Queue was reset for new DS, discarding fetched blocks`); continue; } diff --git a/packages/node/src/indexer/dictionary.service.test.ts b/packages/node/src/indexer/dictionary.service.test.ts index 796535dc32..4c8b19b061 100644 --- a/packages/node/src/indexer/dictionary.service.test.ts +++ b/packages/node/src/indexer/dictionary.service.test.ts @@ -21,7 +21,7 @@ function testSubqueryProject(): SubqueryProject { const nodeConfig = new NodeConfig({ subquery: 'asdf', subqueryName: 'asdf', - networkEndpoint: 'wss://polkadot.api.onfinality.io/public-ws', + networkEndpoint: ['wss://polkadot.api.onfinality.io/public-ws'], dictionaryTimeout: 10, }); diff --git a/packages/node/src/indexer/ds-processor.service.spec.ts b/packages/node/src/indexer/ds-processor.service.spec.ts index f2f3838d0c..3d52f947c9 100644 --- a/packages/node/src/indexer/ds-processor.service.spec.ts +++ b/packages/node/src/indexer/ds-processor.service.spec.ts @@ -15,7 +15,7 @@ function getTestProject( return { network: { genesisHash: '0x', - endpoint: 'wss://polkadot.api.onfinality.io/public-ws', + endpoint: ['wss://polkadot.api.onfinality.io/public-ws'], }, dataSources: [ { diff --git a/packages/node/src/indexer/fetch.module.ts b/packages/node/src/indexer/fetch.module.ts index be79c40619..fef8d9bb3f 100644 --- a/packages/node/src/indexer/fetch.module.ts +++ b/packages/node/src/indexer/fetch.module.ts @@ -9,7 +9,8 @@ import { StoreService, PoiService, NodeConfig, - SmartBatchService + ConnectionPoolService, + SmartBatchService, } from '@subql/node-core'; import { SubqueryProject } from '../configure/SubqueryProject'; @@ -33,6 +34,7 @@ import { UnfinalizedBlocksService } from './unfinalizedBlocks.service'; StoreService, ApiService, IndexerManager, + ConnectionPoolService, { provide: SmartBatchService, useFactory: (nodeConfig: NodeConfig) => { diff --git a/packages/node/src/indexer/fetch.service.spec.ts b/packages/node/src/indexer/fetch.service.spec.ts index cda09bdcf6..91b75466e9 100644 --- a/packages/node/src/indexer/fetch.service.spec.ts +++ b/packages/node/src/indexer/fetch.service.spec.ts @@ -15,7 +15,7 @@ import { NodeConfig, Dictionary, MetadataRepo, - SmartBatchService + SmartBatchService, } from '@subql/node-core'; import { GraphQLSchema } from 'graphql'; import { difference, range } from 'lodash'; @@ -40,7 +40,7 @@ jest.mock('../utils/substrate', () => const nodeConfig = new NodeConfig({ subquery: 'asdf', subqueryName: 'asdf', - networkEndpoint: 'wss://polkadot.api.onfinality.io/public-ws', + networkEndpoint: ['wss://polkadot.api.onfinality.io/public-ws'], dictionaryTimeout: 10, }); @@ -275,7 +275,7 @@ function mockDictionaryService3(): DictionaryService { function testSubqueryProject(): SubqueryProject { return { network: { - endpoint: 'wss://polkadot.api.onfinality.io/public-ws', + endpoint: ['wss://polkadot.api.onfinality.io/public-ws'], }, chainTypes: { types: { @@ -417,10 +417,8 @@ describe('FetchService', () => { project, ); const pendingInit = fetchService.init(1); - expect( - apiService.getApi().rpc.chain.getFinalizedHead, - ).toHaveBeenCalledTimes(1); - expect(apiService.getApi().rpc.chain.getHeader).toHaveBeenCalledTimes(1); + expect(apiService.api.rpc.chain.getFinalizedHead).toHaveBeenCalledTimes(1); + expect(apiService.api.rpc.chain.getHeader).toHaveBeenCalledTimes(1); await pendingInit; fetchService.onApplicationShutdown(); }); diff --git a/packages/node/src/indexer/fetch.service.test.ts b/packages/node/src/indexer/fetch.service.test.ts index 2a69e8912a..129fc2f05a 100644 --- a/packages/node/src/indexer/fetch.service.test.ts +++ b/packages/node/src/indexer/fetch.service.test.ts @@ -17,7 +17,7 @@ import { NodeConfig, PoiService, StoreService, - SmartBatchService + SmartBatchService, } from '@subql/node-core'; import { GraphQLSchema } from 'graphql'; import { Sequelize } from 'sequelize'; @@ -41,7 +41,7 @@ const HTTP_ENDPOINT = 'https://polkadot.api.onfinality.io/public'; function testSubqueryProject(): SubqueryProject { return { network: { - endpoint: WS_ENDPOINT, + endpoint: [WS_ENDPOINT], }, chainTypes: { types: { @@ -97,7 +97,7 @@ jest.setTimeout(200000); const nodeConfig = new NodeConfig({ subquery: 'asdf', subqueryName: 'asdf', - networkEndpoint: WS_ENDPOINT, + networkEndpoint: [WS_ENDPOINT], dictionaryTimeout: 10, batchSize: 5, }); @@ -502,7 +502,7 @@ describe('FetchService', () => { const indexerManager = mockIndexerManager(); //set dictionary to different network //set to a kusama network and use polkadot dictionary - project.network.endpoint = 'wss://kusama.api.onfinality.io/public-ws'; + project.network.endpoint = ['wss://kusama.api.onfinality.io/public-ws']; project.network.dictionary = 'https://api.subquery.network/sq/subquery/polkadot-dictionary'; project.dataSources = [ @@ -728,8 +728,9 @@ describe('FetchService', () => { await runtimeService.getSpecVersion(10337859); const specVersionMap = (runtimeService as any).specVersionMap; // If the last finalized block specVersion are same, we expect it will update the specVersion map - const latestSpecVersion = - await fetchService.api.rpc.state.getRuntimeVersion(); + const latestSpecVersion = await fetchService + .api() + .rpc.state.getRuntimeVersion(); // This should be match if dictionary is fully synced expect(Number(specVersionMap[specVersionMap.length - 1].id)).toBe( latestSpecVersion.specVersion.toNumber(), @@ -791,7 +792,7 @@ describe('FetchService', () => { const project = testSubqueryProject(); project.dataSources[0].startBlock = 3467085; - project.network.endpoint = 'wss://karura-rpc-0.aca-api.network'; + project.network.endpoint = ['wss://karura-rpc-0.aca-api.network']; const indexerManager = mockIndexerManager(); diff --git a/packages/node/src/indexer/fetch.service.ts b/packages/node/src/indexer/fetch.service.ts index c6a339dbf3..e6b4d6eb64 100644 --- a/packages/node/src/indexer/fetch.service.ts +++ b/packages/node/src/indexer/fetch.service.ts @@ -118,8 +118,8 @@ export class FetchService implements OnApplicationShutdown { this.isShutdown = true; } - get api(): ApiPromise { - return this.apiService.getApi(); + api(): ApiPromise { + return this.apiService.api; } async syncDynamicDatascourcesFromMeta(): Promise { @@ -136,7 +136,7 @@ export class FetchService implements OnApplicationShutdown { isRuntimeDataSourceV0_2_0(ds) || !(ds as RuntimeDataSourceV0_0_1).filter?.specName || (ds as RuntimeDataSourceV0_0_1).filter.specName === - this.api.runtimeVersion.specName.toString(), + this.api().runtimeVersion.specName.toString(), ); // Only run the ds that is equal or less than startBlock @@ -242,8 +242,8 @@ export class FetchService implements OnApplicationShutdown { this.project.network.bypassBlocks, ).filter((blk) => blk >= startHeight); } - if (this.api) { - const CHAIN_INTERVAL = calcInterval(this.api) + if (this.api()) { + const CHAIN_INTERVAL = calcInterval(this.api()) .muln(INTERVAL_PERCENT) .toNumber(); @@ -323,8 +323,10 @@ export class FetchService implements OnApplicationShutdown { return; } try { - const finalizedHash = await this.api.rpc.chain.getFinalizedHead(); - const finalizedHeader = await this.api.rpc.chain.getHeader(finalizedHash); + const finalizedHash = await this.api().rpc.chain.getFinalizedHead(); + const finalizedHeader = await this.api().rpc.chain.getHeader( + finalizedHash, + ); this.unfinalizedBlocksService.registerFinalizedBlock(finalizedHeader); const currentFinalizedHeight = finalizedHeader.number.toNumber(); if (this.latestFinalizedHeight !== currentFinalizedHeight) { @@ -346,7 +348,7 @@ export class FetchService implements OnApplicationShutdown { return; } try { - const bestHeader = await this.api.rpc.chain.getHeader(); + const bestHeader = await this.api().rpc.chain.getHeader(); const currentBestHeight = bestHeader.number.toNumber(); if (this.latestBestHeight !== currentBestHeight) { this.latestBestHeight = currentBestHeight; @@ -606,7 +608,7 @@ export class FetchService implements OnApplicationShutdown { if (dictionary !== undefined) { const { _metadata: metaData } = dictionary; - if (metaData.genesisHash !== this.api.genesisHash.toString()) { + if (metaData.genesisHash !== this.api().genesisHash.toString()) { logger.error( 'The dictionary that you have specified does not match the chain you are indexing, it will be ignored. Please update your project manifest to reference the correct dictionary', ); diff --git a/packages/node/src/indexer/indexer.manager.spec.ts b/packages/node/src/indexer/indexer.manager.spec.ts index e1bd0d1497..8681d715aa 100644 --- a/packages/node/src/indexer/indexer.manager.spec.ts +++ b/packages/node/src/indexer/indexer.manager.spec.ts @@ -11,11 +11,13 @@ import { PoiService, MmrService, NodeConfig, + ConnectionPoolService, } from '@subql/node-core'; import { GraphQLSchema } from 'graphql'; import { Sequelize } from 'sequelize'; import { SubqueryProject } from '../configure/SubqueryProject'; import { ApiService } from './api.service'; +import { ApiPromiseConnection } from './apiPromise.connection'; import { DsProcessorService } from './ds-processor.service'; import { DynamicDsService } from './dynamic-ds.service'; import { IndexerManager } from './indexer.manager'; @@ -54,13 +56,13 @@ jest.setTimeout(200000); const nodeConfig = new NodeConfig({ subquery: 'asdf', subqueryName: 'asdf', - networkEndpoint: 'wss://polkadot.api.onfinality.io/public-ws', + networkEndpoint: ['wss://polkadot.api.onfinality.io/public-ws'], }); function testSubqueryProject_1(): SubqueryProject { return { network: { - endpoint: 'wss://polkadot.api.onfinality.io/public-ws', + endpoint: ['wss://polkadot.api.onfinality.io/public-ws'], }, dataSources: [ { @@ -98,7 +100,7 @@ function testSubqueryProject_1(): SubqueryProject { function testSubqueryProject_2(): SubqueryProject { return { network: { - endpoint: 'wss://polkadot.api.onfinality.io/public-ws', + endpoint: ['wss://polkadot.api.onfinality.io/public-ws'], dictionary: `https://api.subquery.network/sq/subquery/dictionary-polkadot`, genesisHash: '0x91b171bb158e2d3848fa23a9f1c25182fb8e20313b2c1eb49219da7a70ce90c3', @@ -126,11 +128,17 @@ function testSubqueryProject_2(): SubqueryProject { function createIndexerManager( project: SubqueryProject, + connectionPoolService: ConnectionPoolService, nodeConfig: NodeConfig, ): IndexerManager { const sequilize = new Sequelize(); const eventEmitter = new EventEmitter2(); - const apiService = new ApiService(project, eventEmitter); + const apiService = new ApiService( + project, + connectionPoolService, + eventEmitter, + nodeConfig, + ); const dsProcessorService = new DsProcessorService(project, nodeConfig); const dynamicDsService = new DynamicDsService(dsProcessorService, project); @@ -188,14 +196,22 @@ describe('IndexerManager', () => { }); xit('should be able to start the manager (v0.0.1)', async () => { - indexerManager = createIndexerManager(testSubqueryProject_1(), nodeConfig); + indexerManager = createIndexerManager( + testSubqueryProject_1(), + new ConnectionPoolService(), + nodeConfig, + ); await expect(indexerManager.start()).resolves.toBe(undefined); expect(Object.keys((indexerManager as any).vms).length).toBe(1); }); xit('should be able to start the manager (v0.2.0)', async () => { - indexerManager = createIndexerManager(testSubqueryProject_2(), nodeConfig); + indexerManager = createIndexerManager( + testSubqueryProject_2(), + new ConnectionPoolService(), + nodeConfig, + ); await expect(indexerManager.start()).resolves.toBe(undefined); expect(Object.keys((indexerManager as any).vms).length).toBe(1); diff --git a/packages/node/src/indexer/indexer.manager.ts b/packages/node/src/indexer/indexer.manager.ts index fd5f300bb9..a065d2da19 100644 --- a/packages/node/src/indexer/indexer.manager.ts +++ b/packages/node/src/indexer/indexer.manager.ts @@ -70,8 +70,6 @@ export class IndexerManager { private projectService: ProjectService, ) { logger.info('indexer manager start'); - - this.api = this.apiService.getApi(); } @profiler(yargsOptions.argv.profiler) @@ -83,6 +81,7 @@ export class IndexerManager { operationHash: Uint8Array; reindexBlockHeight: number; }> { + this.api = this.apiService.api; const { block } = blockContent; let dynamicDsCreated = false; let reindexBlockHeight = null; diff --git a/packages/node/src/indexer/indexer.module.ts b/packages/node/src/indexer/indexer.module.ts index e0c1a5d01a..d2f783a7eb 100644 --- a/packages/node/src/indexer/indexer.module.ts +++ b/packages/node/src/indexer/indexer.module.ts @@ -3,9 +3,16 @@ import { Module } from '@nestjs/common'; import { EventEmitter2 } from '@nestjs/event-emitter'; -import { StoreService, PoiService, MmrService } from '@subql/node-core'; +import { + StoreService, + PoiService, + MmrService, + NodeConfig, + ConnectionPoolService, +} from '@subql/node-core'; import { SubqueryProject } from '../configure/SubqueryProject'; import { ApiService } from './api.service'; +import { ApiPromiseConnection } from './apiPromise.connection'; import { DsProcessorService } from './ds-processor.service'; import { DynamicDsService } from './dynamic-ds.service'; import { IndexerManager } from './indexer.manager'; @@ -19,17 +26,30 @@ import { WorkerService } from './worker/worker.service'; providers: [ IndexerManager, StoreService, + ConnectionPoolService, { provide: ApiService, useFactory: async ( project: SubqueryProject, + connectionPoolService: ConnectionPoolService, eventEmitter: EventEmitter2, + nodeConfig: NodeConfig, ) => { - const apiService = new ApiService(project, eventEmitter); + const apiService = new ApiService( + project, + connectionPoolService, + eventEmitter, + nodeConfig, + ); await apiService.init(); return apiService; }, - inject: ['ISubqueryProject', EventEmitter2], + inject: [ + 'ISubqueryProject', + ConnectionPoolService, + EventEmitter2, + NodeConfig, + ], }, SandboxService, DsProcessorService, diff --git a/packages/node/src/indexer/project.service.ts b/packages/node/src/indexer/project.service.ts index 6da398d200..bfec95d316 100644 --- a/packages/node/src/indexer/project.service.ts +++ b/packages/node/src/indexer/project.service.ts @@ -98,7 +98,7 @@ export class ProjectService { // Do extra work on main thread to setup stuff this.project.dataSources = await generateTimestampReferenceForBlockFilters( this.project.dataSources, - this.apiService.getApi(), + this.apiService.api, ); if (isMainThread) { this._schema = await this.ensureProject(); @@ -383,7 +383,7 @@ export class ProjectService { (ds) => !ds.filter?.specName || ds.filter.specName === - this.apiService.getApi().runtimeVersion.specName.toString(), + this.apiService.api.runtimeVersion.specName.toString(), ); } diff --git a/packages/node/src/indexer/runtime/base-runtime.service.ts b/packages/node/src/indexer/runtime/base-runtime.service.ts index 48b7fd1f22..9091e0d326 100644 --- a/packages/node/src/indexer/runtime/base-runtime.service.ts +++ b/packages/node/src/indexer/runtime/base-runtime.service.ts @@ -49,7 +49,7 @@ export abstract class BaseRuntimeService { } get api(): ApiPromise { - return this.apiService.getApi(); + return this.apiService.api; } getSpecFromMap( diff --git a/packages/node/src/indexer/sandbox.service.ts b/packages/node/src/indexer/sandbox.service.ts index 191e6633c1..b33e3090e6 100644 --- a/packages/node/src/indexer/sandbox.service.ts +++ b/packages/node/src/indexer/sandbox.service.ts @@ -41,7 +41,7 @@ export class SandboxService { } processor.freeze(api, 'api'); if (this.nodeConfig.unsafe) { - processor.freeze(this.apiService.getApi(), 'unsafeApi'); + processor.freeze(this.apiService.api, 'unsafeApi'); } return processor; } diff --git a/packages/node/src/indexer/unfinalizedBlocks.service.ts b/packages/node/src/indexer/unfinalizedBlocks.service.ts index 2f87414bc6..4db736f507 100644 --- a/packages/node/src/indexer/unfinalizedBlocks.service.ts +++ b/packages/node/src/indexer/unfinalizedBlocks.service.ts @@ -77,7 +77,7 @@ export class UnfinalizedBlocksService { } private get api(): ApiPromise { - return this.apiService.getApi(); + return this.apiService.api; } private get finalizedBlockNumber(): number { diff --git a/packages/node/src/indexer/worker/worker.service.ts b/packages/node/src/indexer/worker/worker.service.ts index 6d3db4d243..70702c332d 100644 --- a/packages/node/src/indexer/worker/worker.service.ts +++ b/packages/node/src/indexer/worker/worker.service.ts @@ -4,7 +4,6 @@ import { threadId } from 'node:worker_threads'; import { Injectable } from '@nestjs/common'; import { NodeConfig, getLogger, AutoQueue, memoryLock } from '@subql/node-core'; -import { fetchBlocksBatches } from '../../utils/substrate'; import { ApiService } from '../api.service'; import { SpecVersion } from '../dictionary.service'; import { IndexerManager } from '../indexer.manager'; @@ -73,8 +72,7 @@ export class WorkerService { logger.debug(`memory lock wait time: ${end - start}ms`); } - const [block] = await fetchBlocksBatches( - this.apiService.getApi(), + const [block] = await this.apiService.fetchBlocks( [height], specChanged ? undefined diff --git a/packages/node/src/utils/substrate.ts b/packages/node/src/utils/substrate.ts index 7aa5ae768a..6033e2ccc5 100644 --- a/packages/node/src/utils/substrate.ts +++ b/packages/node/src/utils/substrate.ts @@ -326,6 +326,7 @@ export async function fetchBlocksBatches( blockArray: number[], overallSpecVer?: number, ): Promise { + //await api.disconnect(); const blocks = await fetchBlocksArray(api, blockArray); const blockHashs = blocks.map((b) => b.block.header.hash); const parentBlockHashs = blocks.map((b) => b.block.header.parentHash); diff --git a/packages/query/src/configure/config.ts b/packages/query/src/configure/config.ts index 9ea73c1564..a5aca82069 100644 --- a/packages/query/src/configure/config.ts +++ b/packages/query/src/configure/config.ts @@ -7,6 +7,6 @@ export class Config { constructor(private readonly store: Record) {} get(key: string, defaultValue?: T): T { - return process.env[key.toUpperCase()] ?? get(this.store, key, defaultValue); + return (process.env[key.toUpperCase()] as unknown as T) ?? get(this.store, key, defaultValue); } } diff --git a/yarn.lock b/yarn.lock index f431547ed6..46a6708907 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3987,6 +3987,7 @@ __metadata: oclif: ^2.4.4 rimraf: ^3.0.2 simple-git: ^3.12.0 + terser-webpack-plugin: ^5.3.7 ts-loader: ^9.2.6 tslib: ^2.3.1 typechain: 8.1.1 @@ -4195,7 +4196,6 @@ __metadata: prom-client: ^14.0.1 sequelize: 6.28.0 source-map: ^0.7.4 - terser-webpack-plugin: ^5.3.7 vm2: ^3.9.9 yargs: ^16.2.0 languageName: unknown