Skip to content

Commit

Permalink
Validate dictionary (#1152)
Browse files Browse the repository at this point in the history
* init

* validate during init

* patch getSpecVersionMap err

* fixed tests

* tidy up

* refactor getspecversion
  • Loading branch information
bz888 authored Jul 4, 2022
1 parent 075a218 commit 89700d7
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 79 deletions.
2 changes: 1 addition & 1 deletion packages/node/src/indexer/dictionary.service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ describe('DictionaryService', () => {
const project = testSubqueryProject();
const dictionaryService = new DictionaryService(project);

const specVersions = await dictionaryService.getSpecVersion();
const specVersions = await dictionaryService.getSpecVersions();
console.log(specVersions);
}, 500000);
});
69 changes: 47 additions & 22 deletions packages/node/src/indexer/dictionary.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ export type Dictionary = {
//TODO
// specVersions: number[];
};

export type SpecVersionDictionary = {
_metadata: MetaData;
specVersions: SpecVersion[];
};

const logger = getLogger('dictionary');
const { argv } = getYargsOption();

Expand Down Expand Up @@ -239,40 +245,59 @@ export class DictionaryService implements OnApplicationShutdown {
return buildQuery(vars, nodes);
}

async getSpecVersion(): Promise<SpecVersion[]> {
parseSpecVersions(raw: SpecVersionDictionary): SpecVersion[] {
if (raw === undefined) {
return [];
}
const specVersionBlockHeightSet = new Set<SpecVersion>();
const specVersions = (raw.specVersions as any).nodes;
const _metadata = raw._metadata;

// Add range for -1 specVersions
for (let i = 0; i < specVersions.length - 1; i++) {
specVersionBlockHeightSet.add({
id: specVersions[i].id,
start: Number(specVersions[i].blockHeight),
end: Number(specVersions[i + 1].blockHeight) - 1,
});
}
if (specVersions && specVersions.length >= 0) {
// Add range for the last specVersion
if (_metadata.lastProcessedHeight) {
specVersionBlockHeightSet.add({
id: specVersions[specVersions.length - 1].id,
start: Number(specVersions[specVersions.length - 1].blockHeight),
end: Number(_metadata.lastProcessedHeight),
});
}
}
return Array.from(specVersionBlockHeightSet);
}

async getSpecVersionsRaw(): Promise<SpecVersionDictionary> {
const { query } = this.specVersionQuery();
try {
const resp = await this.client.query({
query: gql(query),
});
const specVersionBlockHeightSet = new Set<SpecVersion>();

const _metadata = resp.data._metadata;
const specVersions = resp.data.specVersions.nodes;
// Add range for -1 specVersions
for (let i = 0; i < resp.data.specVersions.nodes.length - 1; i++) {
specVersionBlockHeightSet.add({
id: specVersions[i].id,
start: Number(specVersions[i].blockHeight),
end: Number(specVersions[i + 1].blockHeight) - 1,
});
}
if (specVersions && specVersions.length >= 0) {
// Add range for the last specVersion
if (_metadata.lastProcessedHeight) {
specVersionBlockHeightSet.add({
id: specVersions[specVersions.length - 1].id,
start: Number(specVersions[specVersions.length - 1].blockHeight),
end: Number(_metadata.lastProcessedHeight),
});
}
}
return Array.from(specVersionBlockHeightSet);
const specVersions = resp.data.specVersions;
return { _metadata, specVersions };
} catch (err) {
logger.warn(err, `failed to fetch specVersion result`);
return undefined;
}
}

async getSpecVersions(): Promise<SpecVersion[]> {
try {
return this.parseSpecVersions(await this.getSpecVersionsRaw());
} catch {
return undefined;
}
}

private specVersionQuery(): GqlQuery {
const nodes: GqlNode[] = [
{
Expand Down
11 changes: 7 additions & 4 deletions packages/node/src/indexer/fetch.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,16 @@ function mockDictionaryService(
});
return {
getDictionary: mockDictionary,
getSpecVersion: jest.fn(() => [{ id: '1', start: 1, end: 29231 }]),
getSpecVersions: jest.fn(() => [{ id: '1', start: 1, end: 29231 }]),
getSpecVersionsRaw: jest.fn(() => mockDictionaryRet),
} as any;
}

function mockDictionaryService1(): DictionaryService {
return {
getDictionary: jest.fn(() => mockDictionaryBatches),
getSpecVersion: jest.fn(() => [{ id: '1', start: 1, end: 29231 }]),
getSpecVersions: jest.fn(() => [{ id: '1', start: 1, end: 29231 }]),
getSpecVersionsRaw: jest.fn(() => mockDictionaryBatches),
} as any;
}

Expand All @@ -180,7 +182,8 @@ function mockDictionaryService2(): DictionaryService {
function mockDictionaryService3(): DictionaryService {
return {
getDictionary: jest.fn(() => mockDictionaryNoBatches),
getSpecVersion: jest.fn(() => [{ id: '1', start: 1, end: 29231 }]),
getSpecVersions: jest.fn(() => [{ id: '1', start: 1, end: 29231 }]),
getSpecVersionsRaw: jest.fn(() => mockDictionaryNoBatches),
} as any;
}
function testSubqueryProject(): SubqueryProject {
Expand Down Expand Up @@ -402,7 +405,7 @@ describe('FetchService', () => {
}
});
await loopPromise;
expect(dictionaryValidationSpy).toHaveBeenCalledTimes(1);
expect(dictionaryValidationSpy).toHaveBeenCalledTimes(2);
expect(nextEndBlockHeightSpy).toHaveBeenCalledTimes(1);
//we expect after use the original method, next loop will still use dictionary by default
expect((fetchService as any).useDictionary).toBeTruthy();
Expand Down
29 changes: 8 additions & 21 deletions packages/node/src/indexer/fetch.service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ describe('FetchService', () => {
}
});
await loopPromise;
expect(dictionaryValidationSpy).not.toBeCalled();
expect(dictionaryValidationSpy).toBeCalledTimes(1);
expect(nextEndBlockHeightSpy).toBeCalled();
// fetchService.onApplicationShutdown()
// await delay(0.5)
Expand Down Expand Up @@ -216,7 +216,7 @@ describe('FetchService', () => {
}
});
await loopPromise;
expect(dictionaryValidationSpy).not.toBeCalled();
expect(dictionaryValidationSpy).toBeCalledTimes(1);
expect(nextEndBlockHeightSpy).toBeCalled();
// fetchService.onApplicationShutdown()
// await delay(0.5)
Expand Down Expand Up @@ -264,10 +264,8 @@ describe('FetchService', () => {
}
});
await loopPromise;
expect(dictionaryValidationSpy).not.toBeCalled();
expect(dictionaryValidationSpy).toBeCalledTimes(1);
expect(nextEndBlockHeightSpy).toBeCalled();
// fetchService.onApplicationShutdown()
// await delay(0.5)
}, 500000);

it('not use dictionary if one of the handler filter module or method is not defined', async () => {
Expand Down Expand Up @@ -320,12 +318,11 @@ describe('FetchService', () => {
}
});
await loopPromise;
expect(dictionaryValidationSpy).not.toBeCalled();
expect(dictionaryValidationSpy).toBeCalledTimes(1);
expect(nextEndBlockHeightSpy).toBeCalled();
// fetchService.onApplicationShutdown()
// await delay(0.5)
}, 500000);

// at init
it('set useDictionary to false if dictionary metadata not match with the api', async () => {
const batchSize = 5;
const project = testSubqueryProject();
Expand Down Expand Up @@ -375,8 +372,8 @@ describe('FetchService', () => {
await loopPromise;
expect(dictionaryValidationSpy).toBeCalledTimes(1);
expect(nextEndBlockHeightSpy).toBeCalled();
// fetchService.onApplicationShutdown()
// await delay(0.5)
expect(dictionaryValidationSpy).toReturnWith(false);
expect((fetchService as any).specVersionMap.length).toBe(0);
}, 500000);

it('use dictionary and specVersionMap to get block specVersion', async () => {
Expand Down Expand Up @@ -412,8 +409,6 @@ describe('FetchService', () => {
const getSpecFromMapSpy = jest.spyOn(fetchService, 'getSpecFromMap');
const specVersion = await fetchService.getSpecVersion(8638105);
expect(getSpecFromMapSpy).toBeCalledTimes(1);
// fetchService.onApplicationShutdown()
// await delay(0.5)
}, 500000);

it('use api to get block specVersion when blockHeight out of specVersionMap', async () => {
Expand Down Expand Up @@ -456,8 +451,6 @@ describe('FetchService', () => {
expect(getSpecFromMapSpy).toBeCalledTimes(1);
// this large blockHeight should be thrown
expect(getSpecFromApiSpy).toBeCalledTimes(1);
// fetchService.onApplicationShutdown()
// await delay(0.5)
}, 500000);

it('only fetch SpecVersion from dictionary once', async () => {
Expand Down Expand Up @@ -488,16 +481,14 @@ describe('FetchService', () => {

fetchService = await createFetchService(project, batchSize);
const dictionaryService = (fetchService as any).dictionaryService;
const getSpecVersionSpy = jest.spyOn(dictionaryService, 'getSpecVersion');
const getSpecVersionSpy = jest.spyOn(dictionaryService, 'getSpecVersions');

await fetchService.init();

await fetchService.getSpecVersion(8638105);
await fetchService.getSpecVersion(8638200);

expect(getSpecVersionSpy).toBeCalledTimes(1);
// fetchService.onApplicationShutdown()
// await delay(0.5)
}, 500000);

it('update specVersionMap once when specVersion map is out', async () => {
Expand Down Expand Up @@ -542,8 +533,6 @@ describe('FetchService', () => {
expect(Number(specVersionMap[specVersionMap.length - 1].id)).toBe(
latestSpecVersion.specVersion.toNumber(),
);
// fetchService.onApplicationShutdown()
// await delay(0.5)
}, 500000);

it('prefetch meta for different specVersion range', async () => {
Expand Down Expand Up @@ -588,7 +577,5 @@ describe('FetchService', () => {
await fetchService.prefetchMeta(9738720); // in 9180
// Should be called 91151,9170,9180
expect(getPrefechMetaSpy).toBeCalledTimes(3);
// fetchService.onApplicationShutdown()
// await delay(0.5)
}, 500000);
});
67 changes: 36 additions & 31 deletions packages/node/src/indexer/fetch.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
SubstrateCustomHandler,
} from '@subql/types';

import { MetaData } from '@subql/utils';
import { isUndefined, range, sortBy, template, uniqBy } from 'lodash';
import { NodeConfig } from '../configure/NodeConfig';
import { SubqlProjectDs, SubqueryProject } from '../configure/SubqueryProject';
Expand All @@ -39,11 +40,7 @@ import { calcInterval } from '../utils/substrate';
import { getYargsOption } from '../yargs';
import { ApiService } from './api.service';
import { BlockedQueue } from './BlockedQueue';
import {
Dictionary,
DictionaryService,
SpecVersion,
} from './dictionary.service';
import { DictionaryService, SpecVersion } from './dictionary.service';
import { DsProcessorService } from './ds-processor.service';
import { DynamicDsService } from './dynamic-ds.service';
import { IndexerEvent } from './events';
Expand Down Expand Up @@ -292,11 +289,6 @@ export class FetchService implements OnApplicationShutdown {
!!this.project.network.dictionary;
}

addInterval(name: string, milliseconds: number, handler: () => void): void {
const interval = setInterval(handler.bind(this), milliseconds);
this.schedulerRegistry.addInterval(name, interval);
}

async init(): Promise<void> {
if (this.api) {
const CHAIN_INTERVAL = calcInterval(this.api)
Expand Down Expand Up @@ -325,8 +317,13 @@ export class FetchService implements OnApplicationShutdown {
await this.getFinalizedBlockHead();
await this.getBestBlockHead();

if (this.useDictionary) {
const specVersionResponse = await this.dictionaryService.getSpecVersion();
const validChecker = this.dictionaryValidation(
await this.dictionaryService.getSpecVersionsRaw(),
);

if (this.useDictionary && validChecker) {
const specVersionResponse =
await this.dictionaryService.getSpecVersions();
if (specVersionResponse !== undefined) {
this.specVersionMap = specVersionResponse;
}
Expand Down Expand Up @@ -562,7 +559,7 @@ export class FetchService implements OnApplicationShutdown {
// Assume dictionary is synced
if (blockHeight + SPEC_VERSION_BLOCK_GAP < this.latestFinalizedHeight) {
const response = this.useDictionary
? await this.dictionaryService.getSpecVersion()
? await this.dictionaryService.getSpecVersions()
: undefined;
if (response !== undefined) {
this.specVersionMap = response;
Expand Down Expand Up @@ -643,26 +640,34 @@ export class FetchService implements OnApplicationShutdown {
}

private dictionaryValidation(
{ _metadata: metaData }: Dictionary,
startBlockHeight: number,
dictionary: { _metadata: MetaData },
startBlockHeight?: number,
): boolean {
if (metaData.genesisHash !== this.api.genesisHash.toString()) {
logger.warn(`Dictionary is disabled since now`);
this.useDictionary = false;
this.eventEmitter.emit(IndexerEvent.UsingDictionary, {
value: Number(this.useDictionary),
});
this.eventEmitter.emit(IndexerEvent.SkipDictionary);
return false;
}
if (metaData.lastProcessedHeight < startBlockHeight) {
logger.warn(
`Dictionary indexed block is behind current indexing block height`,
);
this.eventEmitter.emit(IndexerEvent.SkipDictionary);
return false;
if (dictionary !== undefined) {
const { _metadata: metaData } = dictionary;

if (metaData.genesisHash !== this.api.genesisHash.toString()) {
logger.warn(`Dictionary is disabled since now`);
this.useDictionary = false;
this.eventEmitter.emit(IndexerEvent.UsingDictionary, {
value: Number(this.useDictionary),
});
this.eventEmitter.emit(IndexerEvent.SkipDictionary);
return false;
}

if (startBlockHeight !== undefined) {
if (metaData.lastProcessedHeight < startBlockHeight) {
logger.warn(
`Dictionary indexed block is behind current indexing block height`,
);
this.eventEmitter.emit(IndexerEvent.SkipDictionary);
return false;
}
}
return true;
}
return true;
return false;
}

private setLatestBufferedHeight(height: number): void {
Expand Down

0 comments on commit 89700d7

Please sign in to comment.