Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple endpoints #1551

Merged
merged 14 commits into from
Mar 29, 2023
Merged
6 changes: 4 additions & 2 deletions packages/cli/src/controller/migrate-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
ChainTypes,
loadSubstrateProjectManifest,
} from '@subql/common-substrate';
import {loadTerraProjectManifest, TerraProjectManifestVersioned} from '@subql/common-terra';
import {loadTerraProjectManifest, TerraProjectManifestVersioned, TerraProjectNetworkV0_3_0} from '@subql/common-terra';
import {classToPlain} from 'class-transformer';
import {cli} from 'cli-ux';
import inquirer from 'inquirer';
Expand Down Expand Up @@ -63,7 +63,9 @@ export async function prepare(
if (project.runner.node.name === SUBSTRATE_NODE_NAME) {
cli.action.start('Getting network genesis hash from endpoint for Chain ID');
try {
genesisHash = await getGenesisHash(projectNetwork.endpoint);
genesisHash = await getGenesisHash(
typeof projectNetwork.endpoint === 'string' ? projectNetwork.endpoint : projectNetwork.endpoint[0]
);
} catch (e) {
genesisHash = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import {SubstrateProjectNetworkConfig} from '../../types';
import {ManifestV0_0_1Mapping, ProjectManifestV0_0_1, RuntimeDataSourceV0_0_1} from './types';

export class ProjectNetworkV0_0_1 extends ChainTypes implements SubstrateProjectNetworkConfig {
@IsString()
endpoint: string;
@IsString({each: true})
endpoint: string[];
@IsString()
@IsOptional()
dictionary?: string;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ export class ProjectNetworkDeploymentV0_2_0 {
}

export class ProjectNetworkV0_2_0 extends ProjectNetworkDeploymentV0_2_0 {
@IsString()
@IsString({each: true})
@IsOptional()
endpoint?: string;
endpoint?: string | string[];
@IsString()
@IsOptional()
dictionary?: string;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export interface SubstrateProjectManifestV0_3_0 extends ISubstrateProjectManifes

network: {
genesisHash: string;
endpoint?: string;
endpoint?: string | string[];
dictionary?: string;
chaintypes?: {
file: string;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ export class ProjectNetworkDeploymentV1_0_0 {
}

export class ProjectNetworkV1_0_0 extends ProjectNetworkDeploymentV1_0_0 {
@IsString()
@IsString({each: true})
@IsOptional()
endpoint?: string;
endpoint?: string | string[];
@IsString()
@IsOptional()
dictionary?: string;
Expand Down
2 changes: 1 addition & 1 deletion packages/common/src/project/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export interface IProjectManifest<D> {
}

export interface ProjectNetworkConfig {
endpoint: string;
endpoint: string | string[];
dictionary?: string;
bypassBlocks?: (number | string)[];
//genesisHash?: string;
Expand Down
2 changes: 1 addition & 1 deletion packages/common/src/project/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export function validateSemver(current: string, required: string): boolean {
@ValidatorConstraint({name: 'semver', async: false})
export class SemverVersionValidator implements ValidatorConstraintInterface {
validate(value: string | null | undefined): boolean {
if (valid(value, {includePrerelease: false}) === null) {
if (valid(value) === null) {
return validRange(value, {includePrerelease: false}) !== null;
} else {
return prerelease(value) === null;
Expand Down
2 changes: 1 addition & 1 deletion packages/common/src/project/versioned/v0_2_0/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export interface ProjectManifestV0_2_0<D extends object = BaseDataSource> {
};
network: {
genesisHash: string;
endpoint?: string;
endpoint?: string | string[];
dictionary?: string;
chaintypes?: {
file: string;
Expand Down
2 changes: 1 addition & 1 deletion packages/common/src/project/versioned/v1_0_0/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export interface ProjectManifestV1_0_0<T extends object = TemplateBase, D extend
templates?: T[];
network: {
chainId: string;
endpoint?: string;
endpoint?: string | string[];
dictionary?: string;
bypassBlocks?: (number | string)[];
chaintypes?: {
Expand Down
8 changes: 5 additions & 3 deletions packages/node-core/src/configure/NodeConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export interface IConfig {
readonly blockTime: number;
readonly debug: boolean;
readonly preferRange: boolean;
readonly networkEndpoint?: string;
readonly networkEndpoint?: string[];
readonly networkDictionary?: string;
readonly dictionaryResolver?: string;
readonly outputFmt?: 'json';
Expand Down Expand Up @@ -109,8 +109,10 @@ export class NodeConfig implements IConfig {
return this._config.batchSize;
}

get networkEndpoint(): string | undefined {
return this._config.networkEndpoint;
get networkEndpoints(): string[] | undefined {
return typeof this._config.networkEndpoint === 'string'
? [this._config.networkEndpoint]
: this._config.networkEndpoint;
}

get networkDictionary(): string | undefined {
Expand Down
76 changes: 76 additions & 0 deletions packages/node-core/src/indexer/connectionPool.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2020-2022 OnFinality Limited authors & contributors
// SPDX-License-Identifier: Apache-2.0

import {Injectable, OnApplicationShutdown} from '@nestjs/common';
import {toNumber} from 'lodash';
import {getLogger} from '../logger';

const logger = getLogger('api');

export interface ApiConnection {
apiConnect(): Promise<void>;
apiDisconnect(): Promise<void>;
}

@Injectable()
export class ConnectionPoolService<T extends ApiConnection> implements OnApplicationShutdown {
private allApi: T[] = [];
private connectionPool: Record<number, T> = {};
private taskCounter = 0;

async onApplicationShutdown(): Promise<void> {
await Promise.all(
Object.keys(this.connectionPool)?.map((key) => this.connectionPool[toNumber(key)].apiDisconnect())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can use Object.entries here

);
}

addToConnections(api: T): void {
this.allApi.push(api);
this.connectionPool[this.allApi.length - 1] = api;
}

addBatchToConnections(apis: T[]): void {
apis.forEach((api) => this.addToConnections(api));
}

async connectToApi(apiIndex: number): Promise<void> {
await this.allApi[apiIndex].apiConnect();
}

get api(): T {
const index = this.getNextConnectedApiIndex();
if (index === -1) {
throw new Error('No connected api');
}
return this.connectionPool[index];
}

getNextConnectedApiIndex(): number {
if (Object.keys(this.connectionPool).length === 0) {
return -1;
}
const nextIndex = this.taskCounter % Object.keys(this.connectionPool).length;
this.taskCounter++;
return toNumber(Object.keys(this.connectionPool)[nextIndex]);
}

get numConnections(): number {
return Object.keys(this.connectionPool).length;
}

async handleApiDisconnects(apiIndex: number, endpoint: string): Promise<void> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe its worth adding in here a check that we have at least some connections. It would be worth testing what happens if they all disconnect (turn off your internet)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it retries to fetch about 5 times and exits after that

logger.warn(`disconnected from ${endpoint}`);
delete this.connectionPool[apiIndex];

try {
logger.debug(`reconnecting to ${endpoint}...`);
await this.connectToApi(apiIndex);
} catch (e) {
logger.error(`unable to reconnect to endpoint ${endpoint}`, e);
return;
}

logger.info(`reconnected to ${endpoint}!`);
this.connectionPool[apiIndex] = this.allApi[apiIndex];
}
}
2 changes: 1 addition & 1 deletion packages/node-core/src/indexer/dictionary.service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ const HAPPY_PATH_CONDITIONS: DictionaryQueryEntry[] = [
const nodeConfig = new NodeConfig({
subquery: 'asdf',
subqueryName: 'asdf',
networkEndpoint: 'wss://polkadot.api.onfinality.io/public-ws',
networkEndpoint: ['wss://polkadot.api.onfinality.io/public-ws'],
dictionaryTimeout: 10,
});

Expand Down
3 changes: 2 additions & 1 deletion packages/node-core/src/indexer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

export * from './benchmark.service';
export * from './connectionPool.service';
export * from './entities';
export * from './PoiBlock';
export * from './types';
Expand All @@ -12,4 +13,4 @@ export * from './mmr.service';
export * from './worker';
export * from './dictionary.service';
export * from './sandbox';
export * from './smartBatch.service';
export * from './smartBatch.service';
11 changes: 11 additions & 0 deletions packages/node-core/src/utils/object.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,14 @@ export function camelCaseObjectKey(object: Record<string, any>): object {
{}
);
}

export function splitArrayByRatio(arr: number[], weights: number[]): number[][] {
const result: number[][] = [];
let start = 0;
for (let i = 0; i < weights.length; i++) {
const end = Math.floor(arr.length * weights[i]) + start;
result.push(arr.slice(start, end));
start = end;
}
return result;
}
14 changes: 7 additions & 7 deletions packages/node/src/configure/SubqueryProject.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ describe('SubqueryProject', () => {
it('convert local 0.2.0 manifest to project object', async () => {
//manually pass the endpoint
const project = await SubqueryProject.create(projectDirV0_2_0, {
endpoint: 'wss://rpc.polkadot.io/public-ws',
endpoint: ['wss://rpc.polkadot.io/public-ws'],
});

expect((project.dataSources[1] as any).processor.file).toMatch(
Expand All @@ -47,7 +47,7 @@ describe('SubqueryProject', () => {
it('convert local 0.3.0 manifest to project object', async () => {
//manually pass the endpoint
const project = await SubqueryProject.create(projectDirV0_3_0, {
endpoint: 'wss://rpc.polkadot.io/public-ws',
endpoint: ['wss://rpc.polkadot.io/public-ws'],
});

expect((project.dataSources[1] as any).processor.file).toMatch(
Expand All @@ -60,7 +60,7 @@ describe('SubqueryProject', () => {
//manually pass the endpoint
const project = await SubqueryProject.create(
deployment,
{ endpoint: 'wss://rpc.polkadot.io/public-ws' },
{ endpoint: ['wss://rpc.polkadot.io/public-ws'] },
{ ipfs: 'http://127.0.0.1:8080' },
);
}, 5000000);
Expand All @@ -77,15 +77,15 @@ describe('SubqueryProject', () => {
},
};
const project = await SubqueryProject.create(projectDirV1_0_0, {
endpoint: 'wss://rpc.polkadot.io/public-ws',
endpoint: ['wss://rpc.polkadot.io/public-ws'],
});

expect(project.runner).toMatchObject(expectedRunner);
}, 500000);

it('check processChainId', async () => {
const project = await SubqueryProject.create(projectDirV1_0_0, {
endpoint: 'wss://rpc.polkadot.io/public-ws',
endpoint: ['wss://rpc.polkadot.io/public-ws'],
});
expect(project.network.chainId).toMatch(
'0x401a1f9dca3da46f5c4091016c8a2f26dcea05865116b286f60f668207d1474b',
Expand All @@ -94,10 +94,10 @@ describe('SubqueryProject', () => {

it('check loadProjectTemplates', async () => {
const project = await SubqueryProject.create(templateProject, {
endpoint: 'wss://moonbeam-alpha.api.onfinality.io/public-ws',
endpoint: ['wss://moonbeam-alpha.api.onfinality.io/public-ws'],
});
const project_v1 = await SubqueryProject.create(projectDirV1_0_0, {
endpoint: 'wss://rpc.polkadot.io/public-ws',
endpoint: ['wss://rpc.polkadot.io/public-ws'],
});
expect(project_v1).not.toContain('template');
expect(project.templates.length).toBe(1);
Expand Down
6 changes: 5 additions & 1 deletion packages/node/src/configure/SubqueryProject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ export class SubqueryProject {

export interface SubqueryProjectNetwork {
chainId: string;
endpoint?: string;
endpoint?: string[];
dictionary?: string;
chaintypes?: FileType;
}
Expand Down Expand Up @@ -137,6 +137,10 @@ async function loadProjectFromManifestBase(
): Promise<SubqueryProject> {
const root = await getProjectRoot(reader);

if (typeof projectManifest.network.endpoint === 'string') {
projectManifest.network.endpoint = [projectManifest.network.endpoint];
}

const network = processChainId({
...projectManifest.network,
...networkOverrides,
Expand Down
4 changes: 2 additions & 2 deletions packages/node/src/configure/configure.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ export class ConfigureModule {
config.subquery,
omitBy<SubstrateProjectNetworkConfig>(
{
endpoint: config.networkEndpoint,
endpoint: config.networkEndpoints,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be

Suggested change
endpoint: config.networkEndpoints,
endpoints: config.networkEndpoints,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's endpoint (type is inherited from @subql/common module)

dictionary: config.networkDictionary,
},
isNil,
Expand Down Expand Up @@ -170,7 +170,7 @@ export class ConfigureModule {
argv.subquery,
omitBy<SubstrateProjectNetworkConfig>(
{
endpoint: config.networkEndpoint,
endpoint: config.networkEndpoints,
dictionary: config.networkDictionary,
},
isNil,
Expand Down
25 changes: 22 additions & 3 deletions packages/node/src/indexer/api.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
import { EventEmitter2 } from '@nestjs/event-emitter';
import { ApiPromise, WsProvider } from '@polkadot/api';
import { ProjectNetworkV0_0_1 } from '@subql/common-substrate';
import { ConnectionPoolService, NodeConfig } from '@subql/node-core';
import { GraphQLSchema } from 'graphql';
import { omit } from 'lodash';
import { SubqueryProject } from '../configure/SubqueryProject';
import { ApiService } from './api.service';
import { ApiPromiseConnection } from './apiPromise.connection';

jest.mock('@polkadot/api', () => {
const ApiPromise = jest.fn();
Expand All @@ -23,7 +25,7 @@ jest.mock('@polkadot/api', () => {
});

const testNetwork: ProjectNetworkV0_0_1 = {
endpoint: 'wss://kusama.api.onfinality.io/public-ws',
endpoint: ['wss://kusama.api.onfinality.io/public-ws'],
types: {
TestType: 'u32',
},
Expand All @@ -46,6 +48,13 @@ const testNetwork: ProjectNetworkV0_0_1 = {
typesSpec: { spec3: { TestType6: 'test' } },
};

const nodeConfig = new NodeConfig({
subquery: 'asdf',
subqueryName: 'asdf',
networkEndpoint: ['wss://polkadot.api.onfinality.io/public-ws'],
dictionaryTimeout: 10,
});

function testSubqueryProject(): SubqueryProject {
return {
network: {
Expand All @@ -71,7 +80,12 @@ function testSubqueryProject(): SubqueryProject {
describe('ApiService', () => {
it('read custom types from project manifest', async () => {
const project = testSubqueryProject();
const apiService = new ApiService(project, new EventEmitter2());
const apiService = new ApiService(
project,
new ConnectionPoolService<ApiPromiseConnection>(),
new EventEmitter2(),
nodeConfig,
);
await apiService.init();
const { version } = require('../../package.json');
expect(WsProvider).toHaveBeenCalledWith(testNetwork.endpoint, 2500, {
Expand All @@ -91,7 +105,12 @@ describe('ApiService', () => {
// Now after manifest 1.0.0, will use chainId instead of genesisHash
(project.network as any).chainId = '0x';

const apiService = new ApiService(project, new EventEmitter2());
const apiService = new ApiService(
project,
new ConnectionPoolService<ApiPromiseConnection>(),
new EventEmitter2(),
nodeConfig,
);

await expect(apiService.init()).rejects.toThrow();
});
Expand Down
Loading