Skip to content

Commit

Permalink
Runtime fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
stwiname committed Apr 20, 2023
1 parent d626a55 commit f692456
Show file tree
Hide file tree
Showing 17 changed files with 67 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Worker = {
};

function initAutoQueue<T>(workers: number | undefined, batchSize: number): AutoQueue<T> {
assert(workers && workers < 0, 'Number of workers must be greater than 0');
assert(workers && workers > 0, 'Number of workers must be greater than 0');
return new AutoQueue(workers * batchSize * 2);
}

Expand Down
12 changes: 9 additions & 3 deletions packages/node-core/src/indexer/dictionary.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ export class DictionaryService implements OnApplicationShutdown {
});
}

setDictionaryStartHeight(start: number | undefined): void {
private setDictionaryStartHeight(start: number | undefined): void {
// Since not all dictionary has adopt start height, if it is not set, we just consider it is 1.
if (this._startHeight !== undefined) {
return;
Expand Down Expand Up @@ -315,11 +315,14 @@ export class DictionaryService implements OnApplicationShutdown {
}
return buildQuery(vars, nodes);
}
buildDictionaryEntryMap<DS extends {startBlock: number}>(
buildDictionaryEntryMap<DS extends {startBlock?: number}>(
dataSources: Array<DS>,
buildDictionaryQueryEntries: (startBlock: number) => DictionaryQueryEntry[]
): void {
for (const ds of dataSources.sort((a, b) => a.startBlock - b.startBlock)) {
const dsWithStartBlock = (dataSources.filter((ds) => !!ds.startBlock) as (DS & {startBlock: number})[]).sort(
(a, b) => a.startBlock - b.startBlock
);
for (const ds of dsWithStartBlock) {
this.mappedDictionaryQueryEntries.set(ds.startBlock, buildDictionaryQueryEntries(ds.startBlock));
}
}
Expand Down Expand Up @@ -366,6 +369,9 @@ export class DictionaryService implements OnApplicationShutdown {
this.nodeConfig.dictionaryTimeout
);
const _metadata = resp.data._metadata;

this.setDictionaryStartHeight(_metadata.startHeight);

return {_metadata};
} catch (err: any) {
if (JSON.stringify(err).includes(startHeightEscaped)) {
Expand Down
2 changes: 1 addition & 1 deletion packages/node-core/src/indexer/store.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ export class StoreService {
return this._blockHeight;
}

private get historical(): boolean {
get historical(): boolean {
assert(!!this._historical, new NoInitError());
return this._historical;
}
Expand Down
17 changes: 6 additions & 11 deletions packages/node/src/configure/SubqueryProject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import {
ProjectManifestV0_2_1Impl,
ProjectManifestV0_3_0Impl,
SubstrateDataSource,
FileType,
ProjectManifestV1_0_0Impl,
SubstrateBlockFilter,
isRuntimeDs,
Expand Down Expand Up @@ -53,11 +52,14 @@ const NOT_SUPPORT = (name: string) => {
throw new Error(`Manifest specVersion ${name}() is not supported`);
};

// This is the runtime type after we have mapped genesisHash to chainId and endpoint/dict have been provided when dealing with deployments
type NetworkConfig = SubstrateProjectNetworkConfig & { chainId: string };

@Injectable()
export class SubqueryProject {
id: string;
root: string;
network: SubstrateProjectNetworkConfig;
network: NetworkConfig;
dataSources: SubqlProjectDs[];
schema: GraphQLSchema;
templates: SubqlProjectDsTemplate[];
Expand Down Expand Up @@ -106,14 +108,7 @@ export class SubqueryProject {
}
}

export interface SubqueryProjectNetwork {
chainId: string;
endpoint?: string[];
dictionary?: string;
chaintypes?: FileType;
}

function processChainId(network: any): SubqueryProjectNetwork {
function processChainId(network: any): NetworkConfig {
if (network.chainId && network.genesisHash) {
throw new Error('Please only provide one of chainId and genesisHash');
} else if (network.genesisHash && !network.chainId) {
Expand Down Expand Up @@ -260,7 +255,7 @@ export async function generateTimestampReferenceForBlockFilters(
if (isRuntimeDs(ds)) {
const startBlock = ds.startBlock ?? 1;
let block;
let timestampReference;
let timestampReference: Date;

ds.mapping.handlers = await Promise.all(
ds.mapping.handlers.map(async (handler) => {
Expand Down
4 changes: 3 additions & 1 deletion packages/node/src/indexer/api.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ function testSubqueryProject(): SubqueryProject {
return {
network: {
endpoint: testNetwork.endpoint,
genesisHash:
// genesisHash:
// '0xb0a8d493285c2df73290dfb7e61f870f17b41801197a149ca93654499ea3dafe',
chainId:
'0xb0a8d493285c2df73290dfb7e61f870f17b41801197a149ca93654499ea3dafe',
},
chainTypes: {
Expand Down
1 change: 1 addition & 0 deletions packages/node/src/indexer/api.service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ function testSubqueryProject(endpoint: string[]): SubqueryProject {
network: {
endpoint,
dictionary: `https://api.subquery.network/sq/subquery/dictionary-polkadot`,
chainId: '',
},
dataSources: [],
id: 'test',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,6 @@ export class BlockDispatcherService
@Inject('ISubqueryProject') project: SubqueryProject,
dynamicDsService: DynamicDsService,
) {
const fetchBlockBatchesWrapped = async (
blockNums: number[],
): Promise<BlockContent[]> => {
const specChanged = await this.runtimeService.specChanged(
blockNums[blockNums.length - 1],
);

// If specVersion not changed, a known overallSpecVer will be pass in
// Otherwise use api to fetch runtimes
return this.apiService.fetchBlocks(
blockNums,
specChanged ? undefined : this.runtimeService.parentSpecVersion,
);
};

super(
nodeConfig,
eventEmitter,
Expand All @@ -71,7 +56,18 @@ export class BlockDispatcherService
poiService,
project,
dynamicDsService,
fetchBlockBatchesWrapped,
async (blockNums: number[]): Promise<BlockContent[]> => {
const specChanged = await this.runtimeService.specChanged(
blockNums[blockNums.length - 1],
);

// If specVersion not changed, a known overallSpecVer will be pass in
// Otherwise use api to fetch runtimes
return this.apiService.fetchBlocks(
blockNums,
specChanged ? undefined : this.runtimeService.parentSpecVersion,
);
},
);
}

Expand Down
2 changes: 2 additions & 0 deletions packages/node/src/indexer/dictionary.service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import { DictionaryService } from './dictionary.service';
function testSubqueryProject(): SubqueryProject {
return {
network: {
endpoint: '',
dictionary: `https://api.subquery.network/sq/subquery/polkadot-dictionary`,
chainId: '',
},
dataSources: [],
id: 'test',
Expand Down
2 changes: 1 addition & 1 deletion packages/node/src/indexer/ds-processor.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ function getTestProject(
): SubqueryProject {
return {
network: {
genesisHash: '0x',
chainId: '0x',
endpoint: ['wss://polkadot.api.onfinality.io/public-ws'],
},
dataSources: [
Expand Down
4 changes: 1 addition & 3 deletions packages/node/src/indexer/fetch.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ import {
SmartBatchService,
StoreCacheService,
} from '@subql/node-core';
import { Sequelize } from 'sequelize';

import { SubqlProjectDs, SubqueryProject } from '../configure/SubqueryProject';
import { SubqueryProject } from '../configure/SubqueryProject';
import { ApiService } from './api.service';
import {
BlockDispatcherService,
Expand Down
4 changes: 3 additions & 1 deletion packages/node/src/indexer/fetch.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ function mockDictionaryService3(): DictionaryService {
function testSubqueryProject(): SubqueryProject {
return {
network: {
chainId: '0x',
endpoint: ['wss://polkadot.api.onfinality.io/public-ws'],
},
chainTypes: {
Expand All @@ -305,7 +306,8 @@ function testSubqueryProject(): SubqueryProject {
function testSubqueryProjectV0_2_0(): SubqueryProject {
return {
network: {
genesisHash: '0x',
chainId: '0x',
endpoint: [],
dictionary: `https://api.subquery.network/sq/subquery/dictionary-polkadot`,
},
dataSources: [
Expand Down
1 change: 1 addition & 0 deletions packages/node/src/indexer/fetch.service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const HTTP_ENDPOINT = 'https://polkadot.api.onfinality.io/public';
function testSubqueryProject(): SubqueryProject {
return {
network: {
chainId: '0x',
endpoint: [WS_ENDPOINT],
},
chainTypes: {
Expand Down
8 changes: 4 additions & 4 deletions packages/node/src/indexer/fetch.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,6 @@ export class FetchService implements OnApplicationShutdown {
);

if (dictionaryValid) {
this.dictionaryService.setDictionaryStartHeight(
metadata?._metadata?.startHeight,
);
const rawSpecVersions = await this.dictionaryService.getSpecVersionsRaw();
this.runtimeService.setSpecVersionMap(rawSpecVersions);
} else {
Expand Down Expand Up @@ -420,7 +417,10 @@ export class FetchService implements OnApplicationShutdown {
: initBlockHeight;
};

if (this.dictionaryService.startHeight > getStartBlockHeight()) {
if (
this.useDictionary &&
this.dictionaryService.startHeight > getStartBlockHeight()
) {
logger.warn(
`Dictionary start height ${
this.dictionaryService.startHeight
Expand Down
3 changes: 2 additions & 1 deletion packages/node/src/indexer/indexer.manager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ const nodeConfig = new NodeConfig({
function testSubqueryProject_1(): SubqueryProject {
return {
network: {
chainId: '0x',
endpoint: ['wss://polkadot.api.onfinality.io/public-ws'],
},
dataSources: [
Expand Down Expand Up @@ -103,7 +104,7 @@ function testSubqueryProject_2(): SubqueryProject {
network: {
endpoint: ['wss://polkadot.api.onfinality.io/public-ws'],
dictionary: `https://api.subquery.network/sq/subquery/dictionary-polkadot`,
genesisHash:
chainId:
'0x91b171bb158e2d3848fa23a9f1c25182fb8e20313b2c1eb49219da7a70ce90c3',
},
dataSources: [
Expand Down
20 changes: 18 additions & 2 deletions packages/node/src/indexer/indexer.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,31 @@ import { WorkerUnfinalizedBlocksService } from './worker/worker.unfinalizedBlock
},
SandboxService,
DsProcessorService,
DynamicDsService,
{
provide: DynamicDsService,
useFactory: () => {
if (isMainThread) {
throw new Error('Expected to be worker thread');
}
return new WorkerDynamicDsService((global as any).host);
},
},
PoiService,
MmrService,
{
provide: 'IProjectService',
useClass: ProjectService,
},
WorkerService,
UnfinalizedBlocksService,
{
provide: UnfinalizedBlocksService,
useFactory: () => {
if (isMainThread) {
throw new Error('Expected to be worker thread');
}
return new WorkerUnfinalizedBlocksService((global as any).host);
},
},
WorkerRuntimeService,
],
exports: [StoreService, MmrService],
Expand Down
2 changes: 1 addition & 1 deletion packages/node/src/indexer/worker/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ async function initWorker(): Promise<void> {
}

app = await NestFactory.create(WorkerModule, {
logger: new NestLogger(), // TIP: If the worker is crashing comment out this line for better logging
// logger: new NestLogger(), // TIP: If the worker is crashing comment out this line for better logging
});

await app.init();
Expand Down
2 changes: 1 addition & 1 deletion packages/types/src/project.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ export interface SubstrateNetworkFilter {
specName?: string;
}

export type SubstrateDatasource = SubstrateRuntimeDatasource | SubstrateCustomDatasource; // | SubstrateBuiltinDataSource;
export type SubstrateDatasource = SubstrateRuntimeDatasource | SubstrateCustomDatasource;

export interface FileReference {
file: string;
Expand Down

0 comments on commit f692456

Please sign in to comment.