From 0886d751c52a9fc9b306bb26a328c0408bbfcd7d Mon Sep 17 00:00:00 2001 From: g11tech Date: Tue, 13 Sep 2022 22:25:49 +0530 Subject: [PATCH] Make batch sizes dynamic for eth1 fetch of blocks/logs (#4532) * Fix getBlocksByNumber batch concurrency * Make the blocks and logs fetching dynamic * add to eth1 dashboard * add tests to validate dynamic mechanism --- dashboards/lodestar_execution_engine.json | 18 +++- .../src/eth1/eth1DepositDataTracker.ts | 67 ++++++++++--- .../src/eth1/provider/eth1Provider.ts | 60 ++---------- .../src/metrics/metrics/lodestar.ts | 8 ++ .../unit/eth1/eth1DepositDataTracker.test.ts | 93 +++++++++++++++++++ 5 files changed, 182 insertions(+), 64 deletions(-) create mode 100644 packages/beacon-node/test/unit/eth1/eth1DepositDataTracker.test.ts diff --git a/dashboards/lodestar_execution_engine.json b/dashboards/lodestar_execution_engine.json index 6de47b9a99d6..be1cb057c79f 100644 --- a/dashboards/lodestar_execution_engine.json +++ b/dashboards/lodestar_execution_engine.json @@ -1001,9 +1001,25 @@ "interval": "", "legendFormat": "eth1_follow_distance_dynamic", "refId": "A" + }, + { + "exemplar": false, + "expr": "lodestar_eth1_blocks_batch_size_dynamic", + "hide": false, + "interval": "", + "legendFormat": "eth1_blocks_batch_size_dynamic", + "refId": "B" + }, + { + "exemplar": false, + "expr": "lodestar_eth1_logs_batch_size_dynamic", + "hide": false, + "interval": "", + "legendFormat": "eth1_logs_batch_size_dynamic", + "refId": "C" } ], - "title": "Eth1 Follow Distance Dynamic", + "title": "Eth1 Dynamic Stats", "type": "timeseries" }, { diff --git a/packages/beacon-node/src/eth1/eth1DepositDataTracker.ts b/packages/beacon-node/src/eth1/eth1DepositDataTracker.ts index 2f7cdf5e434b..d3f86cfa93a8 100644 --- a/packages/beacon-node/src/eth1/eth1DepositDataTracker.ts +++ b/packages/beacon-node/src/eth1/eth1DepositDataTracker.ts @@ -1,7 +1,8 @@ import {phase0, ssz} from "@lodestar/types"; import {IChainForkConfig} from "@lodestar/config"; import {BeaconStateAllForks, becomesNewEth1Data} from "@lodestar/state-transition"; -import {ErrorAborted, fromHex, ILogger, isErrorAborted, sleep} from "@lodestar/utils"; +import {ErrorAborted, TimeoutError, fromHex, ILogger, isErrorAborted, sleep} from "@lodestar/utils"; + import {IBeaconDb} from "../db/index.js"; import {IMetrics} from "../metrics/index.js"; import {Eth1DepositsCache} from "./eth1DepositsCache.js"; @@ -12,9 +13,14 @@ import {Eth1DataAndDeposits, IEth1Provider} from "./interface.js"; import {Eth1Options} from "./options.js"; import {HttpRpcError} from "./provider/jsonRpcHttpClient.js"; import {parseEth1Block} from "./provider/eth1Provider.js"; +import {isJsonRpcTruncatedError} from "./provider/utils.js"; const MAX_BLOCKS_PER_BLOCK_QUERY = 1000; +const MIN_BLOCKS_PER_BLOCK_QUERY = 10; + const MAX_BLOCKS_PER_LOG_QUERY = 1000; +const MIN_BLOCKS_PER_LOG_QUERY = 10; + /** Eth1 blocks happen every 14s approx, not need to update too often once synced */ const AUTO_UPDATE_PERIOD_MS = 60 * 1000; /** Prevent infinite loops */ @@ -53,7 +59,13 @@ export class Eth1DepositDataTracker { private depositsCache: Eth1DepositsCache; private eth1DataCache: Eth1DataCache; private lastProcessedDepositBlockNumber: number | null = null; + + /** Dynamically adjusted follow distance */ private eth1FollowDistance: number; + /** Dynamically adusted batch size to fetch deposit logs */ + private eth1GetBlocksBatchSizeDynamic = MAX_BLOCKS_PER_BLOCK_QUERY; + /** Dynamically adusted batch size to fetch deposit logs */ + private eth1GetLogsBatchSizeDynamic = MAX_BLOCKS_PER_LOG_QUERY; private readonly forcedEth1DataVote: phase0.Eth1Data | null; constructor( @@ -81,16 +93,20 @@ export class Eth1DepositDataTracker { if (metrics) { // Set constant value once metrics?.eth1.eth1FollowDistanceSecondsConfig.set(config.SECONDS_PER_ETH1_BLOCK * config.ETH1_FOLLOW_DISTANCE); - metrics.eth1.eth1FollowDistanceDynamic.addCollect(() => - metrics.eth1.eth1FollowDistanceDynamic.set(this.eth1FollowDistance) - ); + metrics.eth1.eth1FollowDistanceDynamic.addCollect(() => { + metrics.eth1.eth1FollowDistanceDynamic.set(this.eth1FollowDistance); + metrics.eth1.eth1GetBlocksBatchSizeDynamic.set(this.eth1GetBlocksBatchSizeDynamic); + metrics.eth1.eth1GetLogsBatchSizeDynamic.set(this.eth1GetLogsBatchSizeDynamic); + }); } - this.runAutoUpdate().catch((e: Error) => { - if (!(e instanceof ErrorAborted)) { - this.logger.error("Error on eth1 loop", {}, e); - } - }); + if (opts.enabled) { + this.runAutoUpdate().catch((e: Error) => { + if (!(e instanceof ErrorAborted)) { + this.logger.error("Error on eth1 loop", {}, e); + } + }); + } } /** @@ -202,9 +218,22 @@ export class Eth1DepositDataTracker { // The DB may contain deposits from a different chain making lastProcessedDepositBlockNumber > current chain tip // The Math.min() fixes those rare scenarios where fromBlock > toBlock const fromBlock = Math.min(remoteFollowBlock, this.getFromBlockToFetch(lastProcessedDepositBlockNumber)); - const toBlock = Math.min(remoteFollowBlock, fromBlock + MAX_BLOCKS_PER_LOG_QUERY - 1); + const toBlock = Math.min(remoteFollowBlock, fromBlock + this.eth1GetLogsBatchSizeDynamic - 1); + + let depositEvents; + try { + depositEvents = await this.eth1Provider.getDepositEvents(fromBlock, toBlock); + this.eth1GetLogsBatchSizeDynamic = Math.min(MAX_BLOCKS_PER_LOG_QUERY, this.eth1GetLogsBatchSizeDynamic * 2); + } catch (e) { + if (isJsonRpcTruncatedError(e as Error) || e instanceof TimeoutError) { + this.eth1GetLogsBatchSizeDynamic = Math.max( + MIN_BLOCKS_PER_LOG_QUERY, + Math.floor(this.eth1GetLogsBatchSizeDynamic / 2) + ); + } + throw e; + } - const depositEvents = await this.eth1Provider.getDepositEvents(fromBlock, toBlock); this.logger.verbose("Fetched deposits", {depositCount: depositEvents.length, fromBlock, toBlock}); this.metrics?.eth1.depositEventsFetched.inc(depositEvents.length); @@ -253,11 +282,23 @@ export class Eth1DepositDataTracker { ); const toBlock = Math.min( remoteFollowBlock, - fromBlock + MAX_BLOCKS_PER_BLOCK_QUERY - 1, // Block range is inclusive + fromBlock + this.eth1GetBlocksBatchSizeDynamic - 1, // Block range is inclusive lastProcessedDepositBlockNumber ); - const blocksRaw = await this.eth1Provider.getBlocksByNumber(fromBlock, toBlock); + let blocksRaw; + try { + blocksRaw = await this.eth1Provider.getBlocksByNumber(fromBlock, toBlock); + this.eth1GetBlocksBatchSizeDynamic = Math.min(MAX_BLOCKS_PER_BLOCK_QUERY, this.eth1GetBlocksBatchSizeDynamic * 2); + } catch (e) { + if (isJsonRpcTruncatedError(e as Error) || e instanceof TimeoutError) { + this.eth1GetBlocksBatchSizeDynamic = Math.max( + MIN_BLOCKS_PER_BLOCK_QUERY, + Math.floor(this.eth1GetBlocksBatchSizeDynamic / 2) + ); + } + throw e; + } const blocks = blocksRaw.map(parseEth1Block); this.logger.verbose("Fetched eth1 blocks", {blockCount: blocks.length, fromBlock, toBlock}); diff --git a/packages/beacon-node/src/eth1/provider/eth1Provider.ts b/packages/beacon-node/src/eth1/provider/eth1Provider.ts index b066b3026348..eeef0fdb3859 100644 --- a/packages/beacon-node/src/eth1/provider/eth1Provider.ts +++ b/packages/beacon-node/src/eth1/provider/eth1Provider.ts @@ -1,9 +1,8 @@ import {toHexString} from "@chainsafe/ssz"; import {phase0} from "@lodestar/types"; import {IChainConfig} from "@lodestar/config"; -import {fromHex, retry} from "@lodestar/utils"; +import {fromHex} from "@lodestar/utils"; -import {chunkifyInclusiveRange} from "../../util/chunkify.js"; import {linspace} from "../../util/numpy.js"; import {depositEventTopics, parseDepositLog} from "../utils/depositContract.js"; import {Eth1Block, IEth1Provider} from "../interface.js"; @@ -77,32 +76,12 @@ export class Eth1Provider implements IEth1Provider { } async getDepositEvents(fromBlock: number, toBlock: number): Promise { - const logsRawArr = await retry( - (attempt) => { - // Large log requests can return with code 200 but truncated, with broken JSON - // This retry will split a given block range into smaller ranges exponentially - // The underlying http client should handle network errors and retry - const chunkCount = 2 ** (attempt - 1); - const blockRanges = chunkifyInclusiveRange(fromBlock, toBlock, chunkCount); - return Promise.all( - blockRanges.map(([from, to]) => { - const options = { - fromBlock: from, - toBlock: to, - address: this.depositContractAddress, - topics: depositEventTopics, - }; - return this.getLogs(options); - }) - ); - }, - { - retries: 3, - retryDelay: 3000, - shouldRetry: isJsonRpcTruncatedError, - } - ); - + const logsRawArr = await this.getLogs({ + fromBlock, + toBlock, + address: this.depositContractAddress, + topics: depositEventTopics, + }); return logsRawArr.flat(1).map((log) => parseDepositLog(log)); } @@ -111,29 +90,10 @@ export class Eth1Provider implements IEth1Provider { */ async getBlocksByNumber(fromBlock: number, toBlock: number): Promise { const method = "eth_getBlockByNumber"; - const blocksArr = await retry( - (attempt) => { - // Large batch requests can return with code 200 but truncated, with broken JSON - // This retry will split a given block range into smaller ranges exponentially - // The underlying http client should handle network errors and retry - const chunkCount = 2 ** (attempt - 1); - const blockRanges = chunkifyInclusiveRange(fromBlock, toBlock, chunkCount); - return Promise.all( - blockRanges.map(([from, to]) => - this.rpc.fetchBatch( - linspace(from, to).map((blockNumber) => ({method, params: [numToQuantity(blockNumber), false]})), - getBlocksByNumberOpts - ) - ) - ); - }, - { - retries: 3, - retryDelay: 3000, - shouldRetry: isJsonRpcTruncatedError, - } + const blocksArr = await this.rpc.fetchBatch( + linspace(fromBlock, toBlock).map((blockNumber) => ({method, params: [numToQuantity(blockNumber), false]})), + getBlocksByNumberOpts ); - const blocks: EthJsonRpcBlockRaw[] = []; for (const block of blocksArr.flat(1)) { if (block) blocks.push(block); diff --git a/packages/beacon-node/src/metrics/metrics/lodestar.ts b/packages/beacon-node/src/metrics/metrics/lodestar.ts index 4c83bb798ebe..79e17e394d43 100644 --- a/packages/beacon-node/src/metrics/metrics/lodestar.ts +++ b/packages/beacon-node/src/metrics/metrics/lodestar.ts @@ -1078,6 +1078,14 @@ export function createLodestarMetrics( name: "lodestar_eth1_follow_distance_dynamic", help: "Eth1 dynamic follow distance changed by the deposit tracker if blocks are slow", }), + eth1GetBlocksBatchSizeDynamic: register.gauge({ + name: "lodestar_eth1_blocks_batch_size_dynamic", + help: "Dynamic batch size to fetch blocks", + }), + eth1GetLogsBatchSizeDynamic: register.gauge({ + name: "lodestar_eth1_logs_batch_size_dynamic", + help: "Dynamic batch size to fetch deposit logs", + }), // Merge Search info eth1MergeStatus: register.gauge({ diff --git a/packages/beacon-node/test/unit/eth1/eth1DepositDataTracker.test.ts b/packages/beacon-node/test/unit/eth1/eth1DepositDataTracker.test.ts new file mode 100644 index 000000000000..28e32cdbf744 --- /dev/null +++ b/packages/beacon-node/test/unit/eth1/eth1DepositDataTracker.test.ts @@ -0,0 +1,93 @@ +import {expect} from "chai"; +import sinon from "sinon"; +import {config} from "@lodestar/config/default"; +import {TimeoutError} from "@lodestar/utils"; + +import {Eth1DepositDataTracker} from "../../../src/eth1/eth1DepositDataTracker.js"; +import {Eth1Provider} from "../../../src/eth1/provider/eth1Provider.js"; +import {testLogger} from "../../utils/logger.js"; +import {defaultEth1Options} from "../../../src/eth1/options.js"; +import {BeaconDb} from "../../../src/db/beacon.js"; + +describe("Eth1DepositDataTracker", function () { + const sandbox = sinon.createSandbox(); + const controller = new AbortController(); + + const logger = testLogger(); + const opts = {...defaultEth1Options, enabled: false}; + const signal = controller.signal; + const eth1Provider = new Eth1Provider(config, opts, signal, null); + const db = sinon.createStubInstance(BeaconDb); + + const eth1DepositDataTracker = new Eth1DepositDataTracker( + opts, + {config, db, logger, signal, metrics: null}, + eth1Provider + ); + sinon + .stub( + (eth1DepositDataTracker as never) as { + getLastProcessedDepositBlockNumber: typeof eth1DepositDataTracker["getLastProcessedDepositBlockNumber"]; + }, + "getLastProcessedDepositBlockNumber" + ) + .resolves(0); + + sinon.stub(eth1DepositDataTracker["eth1DataCache"], "getHighestCachedBlockNumber").resolves(0); + sinon.stub(eth1DepositDataTracker["eth1DataCache"], "add").resolves(void 0); + + sinon.stub(eth1DepositDataTracker["depositsCache"], "getEth1DataForBlocks").resolves([]); + sinon.stub(eth1DepositDataTracker["depositsCache"], "add").resolves(void 0); + sinon.stub(eth1DepositDataTracker["depositsCache"], "getLowestDepositEventBlockNumber").resolves(0); + + const getBlocksByNumberStub = sinon.stub(eth1Provider, "getBlocksByNumber"); + const getDepositEventsStub = sinon.stub(eth1Provider, "getDepositEvents"); + + after(() => { + sandbox.restore(); + }); + + it("Should dynamically adjust blocks batch size", async function () { + let expectedSize = 1000; + expect(eth1DepositDataTracker["eth1GetBlocksBatchSizeDynamic"]).to.be.equal(expectedSize); + + // If there are timeerrors or parse errors then batch size should reduce + getBlocksByNumberStub.throws(new TimeoutError("timeout error")); + for (let i = 0; i < 10; i++) { + expectedSize = Math.max(Math.floor(expectedSize / 2), 10); + await eth1DepositDataTracker["updateBlockCache"](3000).catch((_e) => void 0); + expect(eth1DepositDataTracker["eth1GetBlocksBatchSizeDynamic"]).to.be.equal(expectedSize); + } + expect(expectedSize).to.be.equal(10); + + getBlocksByNumberStub.resolves([]); + for (let i = 0; i < 10; i++) { + expectedSize = Math.min(expectedSize * 2, 1000); + await eth1DepositDataTracker["updateBlockCache"](3000); + expect(eth1DepositDataTracker["eth1GetBlocksBatchSizeDynamic"]).to.be.equal(expectedSize); + } + expect(expectedSize).to.be.equal(1000); + }); + + it("Should dynamically adjust logs batch size", async function () { + let expectedSize = 1000; + expect(eth1DepositDataTracker["eth1GetLogsBatchSizeDynamic"]).to.be.equal(expectedSize); + + // If there are timeerrors or parse errors then batch size should reduce + getDepositEventsStub.throws(new TimeoutError("timeout error")); + for (let i = 0; i < 10; i++) { + expectedSize = Math.max(Math.floor(expectedSize / 2), 10); + await eth1DepositDataTracker["updateDepositCache"](3000).catch((_e) => void 0); + expect(eth1DepositDataTracker["eth1GetLogsBatchSizeDynamic"]).to.be.equal(expectedSize); + } + expect(expectedSize).to.be.equal(10); + + getDepositEventsStub.resolves([]); + for (let i = 0; i < 10; i++) { + expectedSize = Math.min(expectedSize * 2, 1000); + await eth1DepositDataTracker["updateDepositCache"](3000); + expect(eth1DepositDataTracker["eth1GetLogsBatchSizeDynamic"]).to.be.equal(expectedSize); + } + expect(expectedSize).to.be.equal(1000); + }); +});