Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix missing speChanged logic with getRuntime #1421

Merged
merged 7 commits into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/node/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { ScheduleModule } from '@nestjs/schedule';
import { DbModule } from '@subql/node-core';
import { ConfigureModule } from './configure/configure.module';
import { FetchModule } from './indexer/fetch.module';
import { RuntimeService } from './indexer/runtimeService';
import { MetaModule } from './meta/meta.module';

@Module({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,13 @@

import assert from 'assert';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { RuntimeVersion } from '@polkadot/types/interfaces';
import { hexToU8a, u8aEq } from '@polkadot/util';
import { getLogger, IndexerEvent, IQueue, NodeConfig } from '@subql/node-core';
import { SubstrateBlock } from '@subql/types';
import { ProjectService } from '../project.service';
import { RuntimeService } from '../runtimeService';

const logger = getLogger('BaseBlockDispatcherService');

type GetRuntimeVersion = (block: SubstrateBlock) => Promise<RuntimeVersion>;

export type ProcessBlockResponse = {
dynamicDsCreated: boolean;
operationHash: Uint8Array;
Expand All @@ -21,8 +18,8 @@ export type ProcessBlockResponse = {

export interface IBlockDispatcher {
init(
runtimeVersionGetter: GetRuntimeVersion,
onDynamicDsCreated: (height: number) => Promise<void>,
runtimeService?: RuntimeService,
): Promise<void>;

enqueueBlocks(heights: number[]): void;
Expand Down Expand Up @@ -56,11 +53,11 @@ export abstract class BaseBlockDispatcher<Q extends IQueue>
protected eventEmitter: EventEmitter2,
protected projectService: ProjectService,
protected queue: Q,
protected runtimeService?: RuntimeService,
) {}

abstract enqueueBlocks(heights: number[]): void;
abstract init(
runtimeVersionGetter: GetRuntimeVersion,
onDynamicDsCreated: (height: number) => Promise<void>,
): Promise<void>;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

import { Injectable, OnApplicationShutdown } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { RuntimeVersion } from '@polkadot/types/interfaces';
import {
getLogger,
NodeConfig,
Expand All @@ -13,16 +12,14 @@ import {
AutoQueue,
Queue,
} from '@subql/node-core';
import { SubstrateBlock } from '@subql/types';
import { last } from 'lodash';
import * as SubstrateUtil from '../../utils/substrate';
import { ApiService } from '../api.service';
import { IndexerManager } from '../indexer.manager';
import { ProjectService } from '../project.service';
import { RuntimeService } from '../runtimeService';
import { BaseBlockDispatcher } from './base-block-dispatcher';

type GetRuntimeVersion = (block: SubstrateBlock) => Promise<RuntimeVersion>;

const logger = getLogger('BlockDispatcherService');

/**
Expand All @@ -37,7 +34,7 @@ export class BlockDispatcherService

private fetching = false;
private isShutdown = false;
private getRuntimeVersion: GetRuntimeVersion;
// private getRuntimeVersion: GetRuntimeVersion;
private fetchBlocksBatches = SubstrateUtil.fetchBlocksBatches;

constructor(
Expand Down Expand Up @@ -66,13 +63,13 @@ export class BlockDispatcherService

// eslint-disable-next-line @typescript-eslint/require-await
async init(
runtimeVersionGetter: GetRuntimeVersion,
onDynamicDsCreated: (height: number) => Promise<void>,
runtimeService?: RuntimeService,
): Promise<void> {
this.getRuntimeVersion = runtimeVersionGetter;
this.onDynamicDsCreated = onDynamicDsCreated;
const blockAmount = await this.projectService.getProcessedBlockCount();
this.setProcessedBlockCount(blockAmount ?? 0);
this.runtimeService = runtimeService;
}

onApplicationShutdown(): void {
Expand Down Expand Up @@ -136,9 +133,16 @@ export class BlockDispatcherService
}], total ${blockNums.length} blocks`,
);

const specChanged = await this.runtimeService.specChanged(
blockNums[blockNums.length - 1],
);

// If specVersion not changed, a known overallSpecVer will be pass in
// Otherwise use api to fetch runtimes
const blocks = await this.fetchBlocksBatches(
this.apiService.getApi(),
blockNums,
specChanged ? undefined : this.runtimeService.parentSpecVersion,
);

if (bufferedHeight > this._latestBufferedHeight) {
Expand All @@ -148,10 +152,12 @@ export class BlockDispatcherService
const blockTasks = blocks.map((block) => async () => {
const height = block.block.block.header.number.toNumber();
try {
const runtimeVersion = await this.getRuntimeVersion(block.block);
const runtimeVersion = await this.runtimeService.getRuntimeVersion(
block.block,
);

this.preProcessBlock(height);

// Inject runtimeVersion here to enhance api.at preparation
const processBlockResponse = await this.indexerManager.indexBlock(
block,
runtimeVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { SubstrateBlock } from '@subql/types';
import chalk from 'chalk';
import { last } from 'lodash';
import { ProjectService } from '../project.service';
import { RuntimeService } from '../runtimeService';
import {
FetchBlock,
ProcessBlock,
Expand All @@ -31,8 +32,6 @@ import { BaseBlockDispatcher } from './base-block-dispatcher';

const logger = getLogger('WorkerBlockDispatcherService');

type GetRuntimeVersion = (block: SubstrateBlock) => Promise<RuntimeVersion>;

type IIndexerWorker = {
processBlock: ProcessBlock;
fetchBlock: FetchBlock;
Expand Down Expand Up @@ -76,7 +75,6 @@ export class WorkerBlockDispatcherService
{
private workers: IndexerWorker[];
private numWorkers: number;
private getRuntimeVersion: GetRuntimeVersion;

private taskCounter = 0;
private isShutdown = false;
Expand All @@ -97,8 +95,8 @@ export class WorkerBlockDispatcherService
}

async init(
runtimeVersionGetter: GetRuntimeVersion,
onDynamicDsCreated: (height: number) => Promise<void>,
runtimeService?: RuntimeService,
): Promise<void> {
if (this.nodeConfig.unfinalizedBlocks) {
throw new Error(
Expand All @@ -110,11 +108,11 @@ export class WorkerBlockDispatcherService
new Array(this.numWorkers).fill(0).map(() => createIndexerWorker()),
);

this.getRuntimeVersion = runtimeVersionGetter;
this.onDynamicDsCreated = onDynamicDsCreated;

const blockAmount = await this.projectService.getProcessedBlockCount();
this.setProcessedBlockCount(blockAmount ?? 0);
this.runtimeService = runtimeService;
}

async onApplicationShutdown(): Promise<void> {
Expand Down Expand Up @@ -194,7 +192,7 @@ export class WorkerBlockDispatcherService
}

if (result) {
const runtimeVersion = await this.getRuntimeVersion({
const runtimeVersion = await this.runtimeService.getRuntimeVersion({
specVersion: result.specVersion,
block: {
header: {
Expand Down
2 changes: 2 additions & 0 deletions packages/node/src/indexer/fetch.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { DynamicDsService } from './dynamic-ds.service';
import { FetchService } from './fetch.service';
import { IndexerManager } from './indexer.manager';
import { ProjectService } from './project.service';
import { RuntimeService } from './runtimeService';
import { SandboxService } from './sandbox.service';
import { UnfinalizedBlocksService } from './unfinalizedBlocks.service';

Expand Down Expand Up @@ -69,6 +70,7 @@ import { UnfinalizedBlocksService } from './unfinalizedBlocks.service';
MmrService,
ProjectService,
UnfinalizedBlocksService,
RuntimeService,
],
exports: [StoreService, MmrService],
})
Expand Down
64 changes: 55 additions & 9 deletions packages/node/src/indexer/fetch.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import { DynamicDsService } from './dynamic-ds.service';
import { FetchService } from './fetch.service';
import { IndexerManager } from './indexer.manager';
import { ProjectService } from './project.service';
import { RuntimeService } from './runtimeService';
import { BlockContent } from './types';
import { UnfinalizedBlocksService } from './unfinalizedBlocks.service';

Expand Down Expand Up @@ -187,6 +188,38 @@ const mockDictionaryBatches: Dictionary = {
batchBlocks: [14000, 14200, 14300, 14500, 14600, 14700, 14800, 14900],
};

const mockDictionarySpecVersions = {
_metadata: {
lastProcessedHeight: 15000,
lastProcessedTimestamp: 123124151,
targetHeight: 16000,
chain: 'Polkadot',
specName: 'polkadot',
genesisHash: '0x12345',
indexerHealthy: true,
indexerNodeVersion: '0.16.1',
queryNodeVersion: '0.6.0',
rowCountEstimate: [{ table: '', estimate: 0 }],
},
specVersions: {
nodes: [
{ id: '0', blockHeight: 1 },
{ id: '1', blockHeight: 29232 },
{ id: '5', blockHeight: 188837 },
{ id: '6', blockHeight: 199406 },
{ id: '7', blockHeight: 214265 },
{ id: '8', blockHeight: 244359 },
{ id: '9', blockHeight: 303080 },
{ id: '10', blockHeight: 314202 },
{ id: '11', blockHeight: 342401 },
{ id: '12', blockHeight: 443964 },
{ id: '13', blockHeight: 528471 },
{ id: '14', blockHeight: 687752 },
{ id: '15', blockHeight: 746086 },
],
},
};

function mockDictionaryService(
cb?: (mock: jest.Mock) => void,
): DictionaryService {
Expand All @@ -199,26 +232,29 @@ function mockDictionaryService(
return {
getDictionary: mockDictionary,
getSpecVersions: jest.fn(() => [{ id: '1', start: 1, end: 29231 }]),
getSpecVersionsRaw: jest.fn(() => mockDictionaryRet),
getSpecVersionsRaw: jest.fn(() => mockDictionarySpecVersions),
buildDictionaryEntryMap: jest.fn(),
getDictionaryQueryEntries: jest.fn(() => []),
getDictionaryQueryEntries: jest.fn(() => [{}, {}, {}]),
scopedDictionaryEntries: jest.fn(() => mockDictionaryNoBatches),
} as any;
}

function mockDictionaryService1(): DictionaryService {
return {
getDictionary: jest.fn(() => mockDictionaryBatches),
getSpecVersions: jest.fn(() => [{ id: '1', start: 1, end: 29231 }]),
getSpecVersionsRaw: jest.fn(() => mockDictionaryBatches),
getSpecVersionsRaw: jest.fn(() => mockDictionarySpecVersions),
buildDictionaryEntryMap: jest.fn(),
getDictionaryQueryEntries: jest.fn(() => []),
getDictionaryQueryEntries: jest.fn(() => [{}, {}]),
scopedDictionaryEntries: jest.fn(() => mockDictionaryBatches),
} as any;
}

function mockDictionaryService2(): DictionaryService {
return {
getDictionary: jest.fn(() => undefined),
buildDictionaryEntryMap: jest.fn(),
getSpecVersions: jest.fn(() => mockDictionarySpecVersions),
getDictionaryQueryEntries: jest.fn(() => []),
} as any;
}
Expand All @@ -227,9 +263,10 @@ function mockDictionaryService3(): DictionaryService {
return {
getDictionary: jest.fn(() => mockDictionaryNoBatches),
getSpecVersions: jest.fn(() => [{ id: '1', start: 1, end: 29231 }]),
getSpecVersionsRaw: jest.fn(() => mockDictionaryNoBatches),
getSpecVersionsRaw: jest.fn(() => mockDictionarySpecVersions),
buildDictionaryEntryMap: jest.fn(),
getDictionaryQueryEntries: jest.fn(() => []),
scopedDictionaryEntries: jest.fn(() => mockDictionaryNoBatches),
getDictionaryQueryEntries: jest.fn(() => [{}, {}]),
} as any;
}

Expand Down Expand Up @@ -334,6 +371,7 @@ async function createFetchService(
unfinalizedBlocksService,
eventEmitter,
new SchedulerRegistry(),
new RuntimeService(apiService, dictionaryService),
);
}

Expand Down Expand Up @@ -437,7 +475,8 @@ describe('FetchService', () => {
project,
batchSize,
);
fetchService.prefetchMeta = jest.fn();

(fetchService as any).runtimeService.prefetchMeta = jest.fn();

const pendingCondition = new Promise((resolve) => {
// eslint-disable-next-line @typescript-eslint/require-await
Expand All @@ -462,7 +501,8 @@ describe('FetchService', () => {
fetchService.onApplicationShutdown();
}, 500000);

it("skip use dictionary once if dictionary 's lastProcessedHeight < startBlockHeight", async () => {
// skip this test, we are using dictionaryValidation method with startHeight, rather than use local useDictionary
it.skip("skip use dictionary once if dictionary 's lastProcessedHeight < startBlockHeight", async () => {
const batchSize = 20;
project.network.dictionary =
'https://api.subquery.network/sq/subquery/dictionary-polkadot';
Expand Down Expand Up @@ -527,6 +567,7 @@ describe('FetchService', () => {
unfinalizedBlocksService,
eventEmitter,
schedulerRegistry,
new RuntimeService(apiService, dictionaryService),
);

const nextEndBlockHeightSpy = jest.spyOn(
Expand All @@ -538,6 +579,7 @@ describe('FetchService', () => {
`dictionaryValidation`,
);
await fetchService.init(1000);

(fetchService as any).latestFinalizedHeight = 1005;
blockDispatcher.latestBufferedHeight = undefined;
// (fetchService as any).latestProcessedHeight = undefined;
Expand Down Expand Up @@ -620,14 +662,17 @@ describe('FetchService', () => {
unfinalizedBlocksService,
eventEmitter,
schedulerRegistry,
new RuntimeService(apiService, dictionaryService),
);
await fetchService.init(1000);
const nextEndBlockHeightSpy = jest.spyOn(
fetchService as any,
`nextEndBlockHeight`,
);
fetchService.prefetchMeta = jest.fn();

(fetchService as any).latestFinalizedHeight = 16000;
const runtimeService = (fetchService as any).runtimeService;
runtimeService.prefetchMeta = jest.fn();
blockDispatcher.latestBufferedHeight = undefined;
// (fetchService as any).latestProcessedHeight = undefined;
// const loopPromise = fetchService.startLoop(1000);
Expand Down Expand Up @@ -706,6 +751,7 @@ describe('FetchService', () => {
unfinalizedBlocksService,
eventEmitter,
schedulerRegistry,
new RuntimeService(apiService, dictionaryService),
);
const nextEndBlockHeightSpy = jest.spyOn(
fetchService as any,
Expand Down
Loading