Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Integrate new contract interface, process batches #58

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/update-feeds/dapi-data-registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ import { type DapiDataRegistry, DapiDataRegistry__factory } from '../../typechai
export const getDapiDataRegistry = (address: string, provider: ethers.providers.StaticJsonRpcProvider) =>
DapiDataRegistry__factory.connect(address, provider);

export type ReadDapisResponse = Awaited<ReturnType<DapiDataRegistry['readDapis']>>;
export type ReadDapiResponse = Awaited<ReturnType<DapiDataRegistry['readDapiWithIndex']>>;
94 changes: 43 additions & 51 deletions src/update-feeds/update-feeds.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import { go } from '@api3/promise-utils';
import { ethers } from 'ethers';
import { range, size } from 'lodash';
import { chunk, range, size } from 'lodash';

import type { Chain } from '../config/schema';
import { logger } from '../logger';
import { getState } from '../state';
import { isFulfilled, sleep } from '../utils';
import { sleep } from '../utils';

import { getDapiDataRegistry, type ReadDapisResponse } from './dapi-data-registry';
import { getDapiDataRegistry, type ReadDapiResponse } from './dapi-data-registry';

export const startUpdateFeedLoops = async () => {
const state = getState();
Expand All @@ -34,65 +34,57 @@ export const startUpdateFeedLoops = async () => {
})
);
};

export const runUpdateFeed = async (providerName: string, chain: Chain, chainId: string) => {
const { dataFeedBatchSize, dataFeedUpdateInterval, providers, contracts } = chain;
// https://github.com/api3dao/dapi-management/pull/3/files#diff-b6941851ebc92dc9691bbf0cb701fe9c4595cb78488c3bb92ad6e4b917719f4fR374
// TODO baseLogContext, actually test this against local node, add delay to reduce
export const runUpdateFeed = async (providerName: string, chain: Chain, _chainId: string) => {
// const { dataFeedBatchSize, dataFeedUpdateInterval, providers, contracts } = chain;
const { providers, contracts } = chain;
// TODO: Consider adding a start timestamp (as ID) to the logs to identify batches from this runUpdateFeed tick.
const baseLogContext = { chainId, providerName };
// const baseLogContext = { chainId, providerName };

// Create a provider and connect it to the DapiDataRegistry contract.
const provider = new ethers.providers.StaticJsonRpcProvider(providers[providerName]);
const dapiDataRegistry = getDapiDataRegistry(contracts.DapiDataRegistry, provider);
const voidSigner = new ethers.VoidSigner(ethers.constants.AddressZero, provider);

// TODO split out batch size to constant
const dapiTuples = await chunk(range(10_000), 10).reduce<
Promise<{ responses: ReadDapiResponse[]; endOfBatch: boolean }>
>(
async (accu, dapiIndexBatch) => {
const resolvedAccu = await accu;
if (resolvedAccu.endOfBatch) {
return accu;
}

logger.debug(`Fetching first batch of dAPIs batches`, baseLogContext);
const firstBatchStartTime = Date.now();
const goFirstBatch = await go(async () => {
// TODO: Use multicall to fetch this is a single RPC call.
return {
batch: await dapiDataRegistry.readDapis(0, dataFeedBatchSize),
// eslint-disable-next-line unicorn/no-await-expression-member
totalDapisCount: (await dapiDataRegistry.dapisCount()).toNumber(),
};
});
if (!goFirstBatch.success) {
logger.error(`Failed to get first active dAPIs batch`, goFirstBatch.error, baseLogContext);
return;
}
const { batch: firstBatch, totalDapisCount: totalCount } = goFirstBatch.data;
const processFirstBatchPromise = processBatch(firstBatch);

// Calculate the stagger time between the rest of the batches.
const batchesCount = totalCount / dataFeedBatchSize;
const staggerTime = batchesCount <= 1 ? 0 : (dataFeedUpdateInterval / batchesCount) * 1000;

// Wait the remaining stagger time required after fetching the first batch.
const firstBatchDuration = Date.now() - firstBatchStartTime;
await sleep(Math.max(0, staggerTime - firstBatchDuration));

// Fetch the rest of the batches in parallel in a staggered way.
logger.debug('Fetching batches of active dAPIs', { batchesCount, staggerTime, ...baseLogContext });
const otherBatches = await Promise.allSettled(
range(1, batchesCount).map(async (batchIndex) => {
await sleep((batchIndex - 1) * staggerTime);
const readBatch = dapiIndexBatch.map((index) =>
dapiDataRegistry.interface.encodeFunctionData('readDapiWithIndex', [index])
);

// Read beacon batch onchain values
const goDatafeedsTryMulticall = await go(
async () => dapiDataRegistry.connect(voidSigner).callStatic.tryMulticall(readBatch),
{
onAttemptError: (goError) =>
logger.warn(`Failed attempt to read beacon data using multicall.`, { error: goError.error }),
}
);

if (!goDatafeedsTryMulticall.success) {
logger.warn(`Unable to read beacon data using multicall.`, { error: goDatafeedsTryMulticall.error });
return { ...resolvedAccu, endOfBatch: true };
}

logger.debug(`Fetching batch of active dAPIs`, { batchIndex, ...baseLogContext });
return dapiDataRegistry.readDapis(batchIndex * dataFeedBatchSize, dataFeedBatchSize);
})
return { responses: [...resolvedAccu.responses], endOfBatch: false };
},
Promise.resolve({ responses: [], endOfBatch: false })
);
for (const batch of otherBatches.filter((batch) => !isFulfilled(batch))) {
logger.error(`Failed to get active dAPIs batch`, (batch as PromiseRejectedResult).reason, baseLogContext);
}
const processOtherBatchesPromises = otherBatches
.filter((result) => isFulfilled(result))
.map(async (result) => processBatch((result as PromiseFulfilledResult<ReadDapisResponse>).value));

// Wait for all the batches to be processed.
//
await Promise.allSettled(dapiTuples.responses.map(async (element) => processDApi(element)));
// Carried over from previous code
// TODO: Consider returning some information (success/error) and log some statistics (e.g. how many dAPIs were
// updated, etc...).
await Promise.all([processFirstBatchPromise, ...processOtherBatchesPromises]);
};

export const processBatch = async (_batch: ReadDapisResponse) => {
export const processDApi = async (_dAPI: ReadDapiResponse) => {
// TODO: Implement.
};
Loading