Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync 20221117 #14

Merged
merged 50 commits into from
Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
00ceded
fix: incomplete ds data in same block (#1370)
hariu-starfish Oct 28, 2022
7fa5f60
[SKIP CI] Prerelease
hariu-starfish Oct 28, 2022
ba29e3a
Best Block (#1308)
jiqiang90 Oct 28, 2022
b0f0bbc
[SKIP CI] Prerelease
jiqiang90 Oct 28, 2022
226d37b
Fix bugs with unfinalized blocks (#1374)
stwiname Oct 31, 2022
3692d52
[SKIP CI] Prerelease
stwiname Oct 31, 2022
cd6f25c
[release] 20221028 (#1372)
stwiname Oct 31, 2022
41f6b6a
hot fix tests (#1360)
bz888 Nov 1, 2022
fe4896e
[SKIP CI] Prerelease
bz888 Nov 1, 2022
18e9b12
add ethereum to CLI and Validator (#1378)
guplersaxanoid Nov 3, 2022
db52b2c
Imporve dictionary query (#1371)
bz888 Nov 3, 2022
0338def
[SKIP CI] Prerelease
guplersaxanoid Nov 3, 2022
a7b85b0
Enable for better inheritance of generated entity modeld (#1377)
filo87 Nov 3, 2022
f36bd9f
[SKIP CI] Prerelease
filo87 Nov 3, 2022
3d239b4
fix comments issue with new package (#1380)
bz888 Nov 6, 2022
9c634a0
[SKIP CI] Prerelease
bz888 Nov 6, 2022
d8da906
fix logic with reindex and unfinalized height and dynamic ds (#1382)
jiqiang90 Nov 7, 2022
0d030d3
update polkadot to 9.7.1 (#1384)
jiqiang90 Nov 7, 2022
edf1d9f
[release] 20221107
jiqiang90 Nov 7, 2022
5e96a41
Fix remove alter table (#1387)
jiqiang90 Nov 7, 2022
1b38857
[SKIP CI] Prerelease
jiqiang90 Nov 7, 2022
f3841f0
[release] 20221108 (#1388)
jiqiang90 Nov 7, 2022
2034ef0
fix missing sequelize sync (#1389)
jiqiang90 Nov 8, 2022
dcf3b3c
[SKIP CI] Prerelease
jiqiang90 Nov 8, 2022
0ddc4fc
[release] 20221108 patch (#1390)
jiqiang90 Nov 8, 2022
601ae9e
reindex bind (#1391)
jiqiang90 Nov 8, 2022
87db442
[SKIP CI] Prerelease
jiqiang90 Nov 8, 2022
83fd8e5
[release] 20221109 (#1393)
jiqiang90 Nov 8, 2022
5deb01f
Handle fetch errors, then retry (#1386)
bz888 Nov 9, 2022
f778a61
[SKIP CI] Prerelease
bz888 Nov 9, 2022
11e3cdf
fix (#1395)
bz888 Nov 9, 2022
b594b20
[SKIP CI] Prerelease
bz888 Nov 9, 2022
6dd654e
[release] 20221109 node-core (#1394)
jiqiang90 Nov 9, 2022
f3d401b
Fix tests hanging (#1396)
stwiname Nov 9, 2022
7536945
[SKIP CI] Prerelease
stwiname Nov 9, 2022
569bf4c
Add distinct query plugin (#1274)
stwiname Nov 10, 2022
2cb84da
[SKIP CI] Prerelease
stwiname Nov 10, 2022
46f99d5
Add query distinct dependencies (#1398)
jiqiang90 Nov 11, 2022
6d69c33
[SKIP CI] Prerelease
jiqiang90 Nov 11, 2022
5803e7c
Break block dispatcher file up and move common code to base class (#1…
stwiname Nov 13, 2022
2574b44
[SKIP CI] Prerelease
stwiname Nov 13, 2022
642f039
Hot schema trigger (#1401)
bz888 Nov 14, 2022
1941e9d
[SKIP CI] Prerelease
bz888 Nov 14, 2022
d75f882
[release] 20221115 (#1402)
bz888 Nov 15, 2022
c6b5cbc
fix hot schema (#1404)
bz888 Nov 15, 2022
585ceb3
[SKIP CI] Prerelease
bz888 Nov 15, 2022
a37c9d3
[release] 20221115 (#1408)
bz888 Nov 15, 2022
1dab465
fix fetchblock for works (#1410)
bz888 Nov 16, 2022
53c2270
[SKIP CI] Prerelease
bz888 Nov 16, 2022
2ac0d74
Merge remote-tracking branch 'origin/main' into HEAD
stwiname Nov 16, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"typescript": "^4.4.4"
},
"resolutions": {
"@polkadot/api": "9.5.2",
"@polkadot/api": "9.7.1",
"@polkadot/util": "10.1.11",
"@terra-money/terra.js": "^3.0.11",
"node-fetch": "2.6.7"
Expand Down
2 changes: 1 addition & 1 deletion packages/node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
"@nestjs/schedule": "^1.0.2",
"@subql/common": "latest",
"@subql/common-ethereum": "workspace:*",
"@subql/node-core": "1.3.3",
"@subql/node-core": "1.4.2-0",
"@subql/types-ethereum": "workspace:*",
"@subql/utils": "latest",
"@subql/x-merkle-mountain-range": "2.0.0-0.1.2",
Expand Down
146 changes: 146 additions & 0 deletions packages/node/src/indexer/blockDispatcher/base-block-dispatcher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Copyright 2020-2021 OnFinality Limited authors & contributors
// SPDX-License-Identifier: Apache-2.0

import assert from 'assert';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { hexToU8a, u8aEq } from '@polkadot/util';
import { getLogger, IndexerEvent, IQueue, NodeConfig } from '@subql/node-core';
import { ProjectService } from '../project.service';

const logger = getLogger('BaseBlockDispatcherService');

export type ProcessBlockResponse = {
dynamicDsCreated: boolean;
operationHash: Uint8Array;
reindexBlockHeight: number;
};

export interface IBlockDispatcher {
init(onDynamicDsCreated: (height: number) => Promise<void>): Promise<void>;

enqueueBlocks(heights: number[]): void;

queueSize: number;
freeSize: number;
latestBufferedHeight: number | undefined;

// Remove all enqueued blocks, used when a dynamic ds is created
flushQueue(height: number): void;
rewind(height: number): Promise<void>;
}

const NULL_MERKEL_ROOT = hexToU8a('0x00');

function isNullMerkelRoot(operationHash: Uint8Array): boolean {
return u8aEq(operationHash, NULL_MERKEL_ROOT);
}

export abstract class BaseBlockDispatcher<Q extends IQueue>
implements IBlockDispatcher
{
protected _latestBufferedHeight: number;
protected _processedBlockCount: number;
protected latestProcessedHeight: number;
protected currentProcessingHeight: number;
protected onDynamicDsCreated: (height: number) => Promise<void>;

constructor(
protected nodeConfig: NodeConfig,
protected eventEmitter: EventEmitter2,
protected projectService: ProjectService,
protected queue: Q,
) {}

abstract enqueueBlocks(heights: number[]): void;
abstract init(
onDynamicDsCreated: (height: number) => Promise<void>,
): Promise<void>;

get queueSize(): number {
return this.queue.size;
}

get freeSize(): number {
return this.queue.freeSpace;
}

get latestBufferedHeight(): number {
return this._latestBufferedHeight;
}

set latestBufferedHeight(height: number) {
this.eventEmitter.emit(IndexerEvent.BlocknumberQueueSize, {
value: this.queueSize,
});
this._latestBufferedHeight = height;
}

protected setProcessedBlockCount(processedBlockCount: number): void {
this._processedBlockCount = processedBlockCount;
this.eventEmitter.emit(IndexerEvent.BlockProcessedCount, {
processedBlockCount,
timestamp: Date.now(),
});
}

// Compare it with current indexing number, if last corrected is already indexed
// rewind, also flush queued blocks, drop current indexing transaction, set last processed to correct block too
// if rollback is greater than current index flush queue only
async rewind(lastCorrectHeight: number): Promise<void> {
if (lastCorrectHeight <= this.currentProcessingHeight) {
logger.info(
`Found last verified block at height ${lastCorrectHeight}, rewinding...`,
);
await this.projectService.reindex(lastCorrectHeight);
this.latestProcessedHeight = lastCorrectHeight;
logger.info(`Successful rewind to block ${lastCorrectHeight}!`);
}
this.flushQueue(lastCorrectHeight);
logger.info(`Queued blocks flushed!`); //Also last buffered height reset, next fetching should start after lastCorrectHeight
}

flushQueue(height: number): void {
this.latestBufferedHeight = height;
this.queue.flush();
}

// Is called directly before a block is processed
protected preProcessBlock(height: number): void {
this.currentProcessingHeight = height;
this.eventEmitter.emit(IndexerEvent.BlockProcessing, {
height,
timestamp: Date.now(),
});
}

// Is called directly after a block is processed
protected async postProcessBlock(
height: number,
processBlockResponse: ProcessBlockResponse,
): Promise<void> {
const { dynamicDsCreated, operationHash, reindexBlockHeight } =
processBlockResponse;
if (reindexBlockHeight !== null && reindexBlockHeight !== undefined) {
await this.rewind(reindexBlockHeight);
this.latestProcessedHeight = reindexBlockHeight;
} else {
if (this.nodeConfig.proofOfIndex && !isNullMerkelRoot(operationHash)) {
if (!this.projectService.blockOffset) {
// Which means during project init, it has not found offset and set value
await this.projectService.upsertMetadataBlockOffset(height - 1);
}
void this.projectService.setBlockOffset(height - 1);
}
if (dynamicDsCreated) {
await this.onDynamicDsCreated(height);
}
assert(
!this.latestProcessedHeight || height > this.latestProcessedHeight,
`Block processed out of order. Height: ${height}. Latest: ${this.latestProcessedHeight}`,
);
// In memory _processedBlockCount increase, db metadata increase BlockCount in indexer.manager
this.setProcessedBlockCount(this._processedBlockCount + 1);
this.latestProcessedHeight = height;
}
}
}
179 changes: 179 additions & 0 deletions packages/node/src/indexer/blockDispatcher/block-dispatcher.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// Copyright 2020-2021 OnFinality Limited authors & contributors
// SPDX-License-Identifier: Apache-2.0

import { Injectable, OnApplicationShutdown } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import {
ApiService,
getLogger,
NodeConfig,
IndexerEvent,
delay,
profilerWrap,
AutoQueue,
Queue,
} from '@subql/node-core';
import { last } from 'lodash';
import { IndexerManager } from '../indexer.manager';
import { ProjectService } from '../project.service';
import { BaseBlockDispatcher } from './base-block-dispatcher';

const logger = getLogger('BlockDispatcherService');

/**
* @description Intended to behave the same as WorkerBlockDispatcherService but doesn't use worker threads or any parallel processing
*/
@Injectable()
export class BlockDispatcherService
extends BaseBlockDispatcher<Queue<number>>
implements OnApplicationShutdown
{
private processQueue: AutoQueue<void>;

private fetching = false;
private isShutdown = false;
private fetchBlocksBatches: ApiService['api']['fetchBlocks'];

constructor(
private apiService: ApiService,
nodeConfig: NodeConfig,
private indexerManager: IndexerManager,
eventEmitter: EventEmitter2,
projectService: ProjectService,
) {
super(
nodeConfig,
eventEmitter,
projectService,
new Queue(nodeConfig.batchSize * 3),
);
this.processQueue = new AutoQueue(nodeConfig.batchSize * 3);

const fetchBlocks = this.apiService.api.fetchBlocks.bind(
this.apiService.api,
);
if (this.nodeConfig.profiler) {
this.fetchBlocksBatches = profilerWrap(
fetchBlocks,
'EthereumUtil',
'fetchBlocksBatches',
);
} else {
this.fetchBlocksBatches = fetchBlocks;
}
}

// eslint-disable-next-line @typescript-eslint/require-await
async init(
onDynamicDsCreated: (height: number) => Promise<void>,
): Promise<void> {
this.onDynamicDsCreated = onDynamicDsCreated;
const blockAmount = await this.projectService.getProcessedBlockCount();
this.setProcessedBlockCount(blockAmount ?? 0);
}

onApplicationShutdown(): void {
this.isShutdown = true;
this.processQueue.abort();
}

enqueueBlocks(heights: number[]): void {
if (!heights.length) return;

logger.info(
`Enqueing blocks ${heights[0]}...${last(heights)}, total ${
heights.length
} blocks`,
);

this.queue.putMany(heights);
this.latestBufferedHeight = last(heights);

void this.fetchBlocksFromQueue().catch((e) => {
logger.error(e, 'Failed to fetch blocks from queue');
if (!this.isShutdown) {
process.exit(1);
}
});
}

flushQueue(height: number): void {
super.flushQueue(height);
this.processQueue.flush();
}

private async fetchBlocksFromQueue(): Promise<void> {
if (this.fetching || this.isShutdown) return;
// Process queue is full, no point in fetching more blocks
// if (this.processQueue.freeSpace < this.nodeConfig.batchSize) return;

this.fetching = true;

try {
while (!this.isShutdown) {
const blockNums = this.queue.takeMany(
Math.min(this.nodeConfig.batchSize, this.processQueue.freeSpace),
);
// Used to compare before and after as a way to check if queue was flushed
const bufferedHeight = this._latestBufferedHeight;

// Queue is empty
if (!blockNums.length) {
// The process queue might be full so no block nums were taken, wait and try again
if (this.queue.size) {
await delay(1);
continue;
}
break;
}

logger.info(
`fetch block [${blockNums[0]},${
blockNums[blockNums.length - 1]
}], total ${blockNums.length} blocks`,
);

const blocks = await this.fetchBlocksBatches(blockNums);

if (bufferedHeight > this._latestBufferedHeight) {
logger.debug(`Queue was reset for new DS, discarding fetched blocks`);
continue;
}
const blockTasks = blocks.map((block) => async () => {
const height = block.block.block.header.number.toNumber();
try {
this.preProcessBlock(height);

const processBlockResponse = await this.indexerManager.indexBlock(
block,
);

await this.postProcessBlock(height, processBlockResponse);
} catch (e) {
if (this.isShutdown) {
return;
}
logger.error(
e,
`failed to index block at height ${height} ${
e.handler ? `${e.handler}(${e.stack ?? ''})` : ''
}`,
);
throw e;
}
});

// There can be enough of a delay after fetching blocks that shutdown could now be true
if (this.isShutdown) break;

this.processQueue.putMany(blockTasks);

this.eventEmitter.emit(IndexerEvent.BlockQueueSize, {
value: this.processQueue.size,
});
}
} finally {
this.fetching = false;
}
}
}
12 changes: 12 additions & 0 deletions packages/node/src/indexer/blockDispatcher/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright 2020-2021 OnFinality Limited authors & contributors
// SPDX-License-Identifier: Apache-2.0

import { IBlockDispatcher } from './base-block-dispatcher';
import { BlockDispatcherService } from './block-dispatcher.service';
import { WorkerBlockDispatcherService } from './worker-block-dispatcher.service';

export {
IBlockDispatcher,
BlockDispatcherService,
WorkerBlockDispatcherService,
};
Loading