Skip to content

Commit

Permalink
Rebase with main
Browse files Browse the repository at this point in the history
  • Loading branch information
stwiname committed Nov 13, 2024
1 parent d7f7c6e commit ad60722
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 163 deletions.
60 changes: 28 additions & 32 deletions packages/node-core/src/indexer/fetch.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,38 @@
// SPDX-License-Identifier: GPL-3.0

import assert from 'assert';
import { Inject, OnApplicationShutdown } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { SchedulerRegistry } from '@nestjs/schedule';
import { BaseDataSource } from '@subql/types-core';
import { range } from 'lodash';
import { IBlockchainService } from '../blockchain.service';
import { NodeConfig } from '../configure';
import { IndexerEvent } from '../events';
import { getLogger } from '../logger';
import { delay, filterBypassBlocks, getModulos, waitForBatchSize } from '../utils';
import { IBlockDispatcher } from './blockDispatcher';
import { mergeNumAndBlocksToNums } from './dictionary';
import { DictionaryService } from './dictionary/dictionary.service';
import { mergeNumAndBlocks } from './dictionary/utils';
import { StoreCacheService } from './storeCache';
import { BypassBlocks, IBlock, IProjectService } from './types';
import { IUnfinalizedBlocksServiceUtil } from './unfinalizedBlocks.service';
import {Inject, Injectable, OnApplicationShutdown} from '@nestjs/common';
import {EventEmitter2} from '@nestjs/event-emitter';
import {SchedulerRegistry} from '@nestjs/schedule';
import {BaseDataSource} from '@subql/types-core';
import {range} from 'lodash';
import {IBlockchainService} from '../blockchain.service';
import {NodeConfig} from '../configure';
import {IndexerEvent} from '../events';
import {getLogger} from '../logger';
import {delay, filterBypassBlocks, getModulos, waitForBatchSize} from '../utils';
import {IBlockDispatcher} from './blockDispatcher';
import {mergeNumAndBlocksToNums} from './dictionary';
import {DictionaryService} from './dictionary/dictionary.service';
import {mergeNumAndBlocks} from './dictionary/utils';
import {StoreCacheService} from './storeCache';
import {BypassBlocks, IBlock, IProjectService} from './types';
import {IUnfinalizedBlocksServiceUtil} from './unfinalizedBlocks.service';

const logger = getLogger('FetchService');

export abstract class BaseFetchService<DS extends BaseDataSource, B extends IBlockDispatcher<FB>, FB>
implements OnApplicationShutdown {
@Injectable()
export class FetchService<DS extends BaseDataSource, B extends IBlockDispatcher<FB>, FB>
implements OnApplicationShutdown
{
private _latestBestHeight?: number;
private _latestFinalizedHeight?: number;
private isShutdown = false;

protected abstract initBlockDispatcher(): Promise<void>;

// Gets called just before the loop is started
// Used by substrate to init runtime service and get runtime version data from the dictionary
protected abstract preLoopHook(data: { startHeight: number }): Promise<void>;

constructor(
private nodeConfig: NodeConfig,
protected projectService: IProjectService<DS>,
protected blockDispatcher: B,
@Inject('IProjectService') protected projectService: IProjectService<DS>,
@Inject('IBlockDispatcher') protected blockDispatcher: B,
protected dictionaryService: DictionaryService<DS, FB>,
private eventEmitter: EventEmitter2,
private schedulerRegistry: SchedulerRegistry,
Expand Down Expand Up @@ -220,7 +216,7 @@ export abstract class BaseFetchService<DS extends BaseDataSource, B extends IBlo
continue;
}
if (dictionary) {
const { batchBlocks } = dictionary;
const {batchBlocks} = dictionary;
// the last block returned from batch should have max height in this batch
const mergedBlocks = mergeNumAndBlocks(
this.getModuloBlocks(startBlockHeight, dictionary.lastBufferedHeight),
Expand Down Expand Up @@ -252,7 +248,7 @@ export abstract class BaseFetchService<DS extends BaseDataSource, B extends IBlo
// get all modulo numbers with a specific block ranges
private getModuloBlocks(startHeight: number, endHeight: number): number[] {
// Find relevant ds
const { endHeight: rangeEndHeight, value: relevantDS } = this.getRelevantDsDetails(startHeight);
const {endHeight: rangeEndHeight, value: relevantDS} = this.getRelevantDsDetails(startHeight);
const moduloNumbers = this.getModulos(relevantDS);
// no modulos in the filters been found in current ds
if (!moduloNumbers.length) return [];
Expand Down Expand Up @@ -283,10 +279,10 @@ export abstract class BaseFetchService<DS extends BaseDataSource, B extends IBlo
return !!handlers.length && moduloNumbers.length === handlers.length;
}

private getRelevantDsDetails(startBlockHeight: number): { endHeight: number | undefined; value: DS[] } {
private getRelevantDsDetails(startBlockHeight: number): {endHeight: number | undefined; value: DS[]} {
const details = this.projectService.getDataSourcesMap().getDetails(startBlockHeight);
assert(details, `Datasources not found for height ${startBlockHeight}`);
return { endHeight: details.endHeight, value: details.value };
return {endHeight: details.endHeight, value: details.value};
}

// Enqueue block sequentially
Expand All @@ -296,7 +292,7 @@ export abstract class BaseFetchService<DS extends BaseDataSource, B extends IBlo
latestHeight: number
): Promise<void> {
// End height from current dataSource
const { endHeight, value: relevantDs } = this.getRelevantDsDetails(startBlockHeight);
const {endHeight, value: relevantDs} = this.getRelevantDsDetails(startBlockHeight);
// Estimated range end height
const estRangeEndHeight = Math.min(
endHeight ?? Number.MAX_SAFE_INTEGER,
Expand Down
Loading

0 comments on commit ad60722

Please sign in to comment.