Skip to content

Commit

Permalink
Create indexer manager base class
Browse files Browse the repository at this point in the history
  • Loading branch information
stwiname committed Apr 28, 2023
1 parent 32ea1ca commit 5f6c4a3
Show file tree
Hide file tree
Showing 18 changed files with 379 additions and 280 deletions.
2 changes: 1 addition & 1 deletion packages/common/src/project/versioned/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export abstract class ProjectManifestBaseImpl<D extends object> {
}

export interface BaseDataSource<
F = Record<string, unknown>,
F = any,
H extends BaseHandler<F> = BaseHandler<F>,
T extends BaseMapping<F, H> = BaseMapping<F, H>
> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const logger = getLogger('BaseBlockDispatcherService');
export type ProcessBlockResponse = {
dynamicDsCreated: boolean;
blockHash: string;
reindexBlockHeight: number;
reindexBlockHeight: number | null;
};

export interface IBlockDispatcher {
Expand Down
2 changes: 1 addition & 1 deletion packages/node-core/src/indexer/fetch.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export abstract class BaseFetchService<
DS extends {startBlock?: number; mapping: {handlers: any}},
B extends IBlockDispatcher,
D extends DictionaryService,
P extends IDSProcessorService
P extends IDSProcessorService<DS>
> implements OnApplicationShutdown
{
private _latestBestHeight?: number;
Expand Down
1 change: 1 addition & 0 deletions packages/node-core/src/indexer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ export * from './blockDispatcher';
export * from './testing.service';
export * from './project.service';
export * from './fetch.service';
export * from './indexer.manager';
244 changes: 244 additions & 0 deletions packages/node-core/src/indexer/indexer.manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
// Copyright 2020-2022 OnFinality Limited authors & contributors
// SPDX-License-Identifier: Apache-2.0

import {BaseDataSource} from '@subql/common';
import {ApiService} from '../api.service';
import {NodeConfig} from '../configure';
import {getLogger} from '../logger';
import {profilerWrap} from '../profiler';
import {ProcessBlockResponse} from './blockDispatcher';
import {DynamicDsService} from './dynamic-ds.service';
import {IndexerSandbox} from './sandbox';
import {IDSProcessorService, IIndexerManager} from './types';

const logger = getLogger('indexer');

export type HandlerKinds = string[];
export type FilterTypeMap = Record<string, (data: any, filter: any) => boolean>;
export type ProcessorTypeMap<FM extends FilterTypeMap> = {[K in keyof FM]: (data: any) => boolean};
export type HandlerInputTypeMap<FM extends FilterTypeMap> = {[K in keyof FM]: any};

export interface CustomHandler<K extends string = string, F = Record<string, unknown>> {
handler: string;
kind: K;
filter?: F;
}

export abstract class BaseIndexerManager<
A, // Api Type
B, // Block Type
DS extends BaseDataSource,
CDS extends DS, // Custom datasource
FilterMap extends FilterTypeMap,
ProcessorMap extends ProcessorTypeMap<FilterMap>,
HandlerInputMap extends HandlerInputTypeMap<FilterMap>
> implements IIndexerManager<B, DS>
{
abstract start(): Promise<void>;
abstract indexBlock(block: B, datasources: DS[], ...args: any[]): Promise<ProcessBlockResponse>;
abstract getBlockHeight(block: B): number;
abstract getBlockHash(block: B): string;

protected abstract isRuntimeDs(ds: DS): ds is DS;
protected abstract isCustomDs(ds: DS): ds is CDS;

// Uses asSecondLayerHandlerProcessor_1_0_0 in substrate to transfrom from v0.0.0 -> v1.0.0
protected abstract updateCustomProcessor: (processor: any) => any;

protected abstract processUnfinalizedBlocks(block: B): Promise<number | null>;
protected abstract indexBlockData(
block: B,
dataSources: DS[],
getVM: (d: DS) => Promise<IndexerSandbox>
): Promise<void>;

protected abstract baseCustomHandlerFilter(kind: keyof FilterMap, data: any, baseFilter: any): boolean;

constructor(
protected readonly apiService: ApiService,
protected readonly nodeConfig: NodeConfig,
private sandboxService: {getDsProcessor: (ds: DS, api: A) => IndexerSandbox},
private dsProcessorService: IDSProcessorService<CDS>,
private dynamicDsService: DynamicDsService<DS>,
private filterMap: FilterMap,
private processorMap: ProcessorMap
) {
logger.info('indexer manager start');
}

protected async internalIndexBlock(
block: B,
dataSources: DS[],
getApi: () => Promise<A>
): Promise<ProcessBlockResponse> {
let dynamicDsCreated = false;
const blockHeight = this.getBlockHeight(block);

const filteredDataSources = this.filterDataSources(blockHeight, dataSources);

this.assertDataSources(filteredDataSources, blockHeight);

let apiAt: A;
const reindexBlockHeight = await this.processUnfinalizedBlocks(block);

// Only index block if we're not going to reindex
if (!reindexBlockHeight) {
await this.indexBlockData(block, filteredDataSources, async (ds: DS) => {
// Injected runtimeVersion from fetch service might be outdated
apiAt = apiAt ?? (await getApi());

const vm = this.sandboxService.getDsProcessor(ds, apiAt);

// Inject function to create ds into vm
vm.freeze(async (templateName: string, args?: Record<string, unknown>) => {
const newDs = await this.dynamicDsService.createDynamicDatasource({
templateName,
args,
startBlock: blockHeight,
});
// Push the newly created dynamic ds to be processed this block on any future extrinsics/events
filteredDataSources.push(newDs);
dynamicDsCreated = true;
}, 'createDynamicDatasource');

return vm;
});
}

return {
dynamicDsCreated,
blockHash: this.getBlockHash(block),
reindexBlockHeight,
};
}

private filterDataSources(nextProcessingHeight: number, dataSources: DS[]): DS[] {
let filteredDs: DS[];

filteredDs = dataSources.filter((ds) => ds.startBlock !== undefined && ds.startBlock <= nextProcessingHeight);

if (filteredDs.length === 0) {
logger.error(`Did not find any matching datasouces`);
process.exit(1);
}
// perform filter for custom ds
filteredDs = filteredDs.filter((ds) => {
if (this.isCustomDs(ds)) {
return this.dsProcessorService.getDsProcessor(ds).dsFilterProcessor(ds, this.apiService.api);
} else {
return true;
}
});

if (!filteredDs.length) {
logger.error(`Did not find any datasources with associated processor`);
process.exit(1);
}
return filteredDs;
}

private assertDataSources(ds: DS[], blockHeight: number) {
if (!ds.length) {
logger.error(
`Your start block is greater than the current indexed block height in your database. Either change your startBlock (project.yaml) to <= ${blockHeight}
or delete your database and start again from the currently specified startBlock`
);
process.exit(1);
}
}

protected async indexData<K extends keyof FilterMap>(
kind: K,
data: HandlerInputMap[K],
ds: DS,
getVM: (ds: DS) => Promise<IndexerSandbox>
): Promise<void> {
let vm: IndexerSandbox;
if (this.isRuntimeDs(ds)) {
const handlers = ds.mapping.handlers.filter(
(h) => h.kind === kind && this.filterMap[kind](data as any, h.filter)
);

for (const handler of handlers) {
vm = vm! ?? (await getVM(ds));
this.nodeConfig.profiler
? await profilerWrap(vm.securedExec.bind(vm), 'handlerPerformance', handler.handler)(handler.handler, [data])
: await vm.securedExec(handler.handler, [data]);
}
} else if (this.isCustomDs(ds)) {
const handlers = this.filterCustomDsHandlers<K>(ds, data, this.processorMap[kind], (data, baseFilter) => {
// TODO why doesnt this use this.filterMap?
return this.baseCustomHandlerFilter(kind, data, baseFilter);
});

for (const handler of handlers) {
vm = vm! ?? (await getVM(ds));
await this.transformAndExecuteCustomDs(ds, vm, handler, data);
}
}
}

private filterCustomDsHandlers<K extends keyof FilterMap>(
ds: CDS, //SubstrateCustomDataSource<string, SubstrateNetworkFilter>,
data: HandlerInputMap[K],
baseHandlerCheck: ProcessorMap[K],
baseFilter: (data: HandlerInputMap[K], baseFilter: any) => boolean
): CustomHandler[] {
const plugin = this.dsProcessorService.getDsProcessor(ds);

return ds.mapping.handlers
.filter((handler) => {
const processor = plugin.handlerProcessors[handler.kind];
if (baseHandlerCheck(processor)) {
processor.baseFilter;
return baseFilter(data, processor.baseFilter);
}
return false;
})
.filter((handler) => {
const processor = this.updateCustomProcessor(plugin.handlerProcessors[handler.kind]);

try {
return processor.filterProcessor({
filter: handler.filter,
input: data,
ds,
});
} catch (e: any) {
logger.error(e, 'Failed to run ds processer filter.');
throw e;
}
});
}

private async transformAndExecuteCustomDs<K extends keyof FilterMap>(
ds: CDS, //SubstrateCustomDataSource<string, SubstrateNetworkFilter>,
vm: IndexerSandbox,
handler: CustomHandler,
data: HandlerInputMap[K]
): Promise<void> {
const plugin = this.dsProcessorService.getDsProcessor(ds);
const assets = await this.dsProcessorService.getAssets(ds);

const processor = this.updateCustomProcessor(plugin.handlerProcessors[handler.kind]);

const transformedData = await processor
.transformer({
input: data,
ds,
filter: handler.filter,
api: this.apiService.api,
assets,
})
.catch((e: any) => {
logger.error(e, 'Failed to transform data with ds processor.');
throw e;
});

// We can not run this in parallel. the transformed data items may be dependent on one another.
// An example of this is with Acala EVM packing multiple EVM logs into a single Substrate event
for (const _data of transformedData) {
await vm.securedExec(handler.handler, [_data]);
}
}
}
2 changes: 1 addition & 1 deletion packages/node-core/src/indexer/project.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export abstract class BaseProjectService<DS extends {startBlock?: number}> imple
abstract reindex(targetBlockHeight: number): Promise<void>;

constructor(
private readonly dsProcessorService: IDSProcessorService,
private readonly dsProcessorService: IDSProcessorService<DS>,
protected readonly apiService: ApiService,
private readonly poiService: PoiService,
protected readonly mmrService: MmrService,
Expand Down
14 changes: 13 additions & 1 deletion packages/node-core/src/indexer/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,18 @@ export interface IProjectService<DS> {
getAllDataSources(blockHeight: number): Promise<DS[]>;
}

export interface IDSProcessorService {
// Equivalent to SubstrateDatasourceProcessor
type DsProcessor<DS> = {
kind: string;
validate: (ds: DS, assets: Record<string, string>) => void;
dsFilterProcessor: (ds: DS, api: any) => boolean;
handlerProcessors: Record<string, any>;
};

// Represents the DsProcessorService in the node
// TODO move DsProcessorService to node-core so we can remove this
export interface IDSProcessorService<DS> {
validateProjectCustomDatasources(): Promise<void>;
getDsProcessor(ds: DS): DsProcessor<DS>;
getAssets(ds: DS): Promise<Record<string, string>>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@ import {
BlockDispatcher,
ProcessBlockResponse,
} from '@subql/node-core';
import {
SubqlProjectDs,
SubqueryProject,
} from '../../configure/SubqueryProject';
import { SubstrateDatasource } from '@subql/types';
import { SubqueryProject } from '../../configure/SubqueryProject';
import { ApiService } from '../api.service';
import { DynamicDsService } from '../dynamic-ds.service';
import { IndexerManager } from '../indexer.manager';
Expand All @@ -28,7 +26,7 @@ import { BlockContent } from '../types';
*/
@Injectable()
export class BlockDispatcherService
extends BlockDispatcher<BlockContent, SubqlProjectDs>
extends BlockDispatcher<BlockContent, SubstrateDatasource>
implements OnApplicationShutdown
{
private runtimeService: RuntimeService;
Expand All @@ -38,7 +36,8 @@ export class BlockDispatcherService
nodeConfig: NodeConfig,
private indexerManager: IndexerManager,
eventEmitter: EventEmitter2,
@Inject('IProjectService') projectService: IProjectService<SubqlProjectDs>,
@Inject('IProjectService')
projectService: IProjectService<SubstrateDatasource>,
smartBatchService: SmartBatchService,
storeService: StoreService,
storeCacheService: StoreCacheService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import {
HostDynamicDS,
WorkerBlockDispatcher,
} from '@subql/node-core';
import { Store } from '@subql/types';
import { Store, SubstrateDatasource } from '@subql/types';
import {
SubqlProjectDs,
SubqueryProject,
Expand All @@ -37,12 +37,12 @@ type IndexerWorker = IIndexerWorker & {

async function createIndexerWorker(
store: Store,
dynamicDsService: IDynamicDsService<SubqlProjectDs>,
dynamicDsService: IDynamicDsService<SubstrateDatasource>,
unfinalizedBlocksService: IUnfinalizedBlocksService,
): Promise<IndexerWorker> {
const indexerWorker = Worker.create<
IInitIndexerWorker,
HostDynamicDS<SubqlProjectDs> & HostStore & HostUnfinalizedBlocks
HostDynamicDS<SubstrateDatasource> & HostStore & HostUnfinalizedBlocks
>(
path.resolve(__dirname, '../../../dist/indexer/worker/worker.js'),
[
Expand Down Expand Up @@ -83,15 +83,16 @@ async function createIndexerWorker(

@Injectable()
export class WorkerBlockDispatcherService
extends WorkerBlockDispatcher<SubqlProjectDs, IndexerWorker>
extends WorkerBlockDispatcher<SubstrateDatasource, IndexerWorker>
implements OnApplicationShutdown
{
private runtimeService: RuntimeService;

constructor(
nodeConfig: NodeConfig,
eventEmitter: EventEmitter2,
@Inject('IProjectService') projectService: IProjectService<SubqlProjectDs>,
@Inject('IProjectService')
projectService: IProjectService<SubstrateDatasource>,
smartBatchService: SmartBatchService,
storeService: StoreService,
storeCacheService: StoreCacheService,
Expand Down
4 changes: 3 additions & 1 deletion packages/node/src/indexer/ds-processor.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ export class DsPluginSandbox extends Sandbox {
}

@Injectable()
export class DsProcessorService implements IDSProcessorService {
export class DsProcessorService
implements IDSProcessorService<SubstrateCustomDatasource>
{
private processorCache: {
[entry: string]: SubstrateDatasourceProcessor<
string,
Expand Down
Loading

0 comments on commit 5f6c4a3

Please sign in to comment.