Skip to content

Commit

Permalink
fix: double accounting (#138)
Browse files Browse the repository at this point in the history
* fix: double accounting

* fix: test

* fix: eventloop (#139)
  • Loading branch information
vgorkavenko authored Mar 21, 2023
1 parent 48b1ce8 commit d62f767
Show file tree
Hide file tree
Showing 17 changed files with 89 additions and 50 deletions.
13 changes: 13 additions & 0 deletions src/common/functions/allSettled.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// wait for all promises to resolve and throws if any error occurs
export async function allSettled(values: Promise<any>[]): Promise<any[]> {
const results = await Promise.allSettled(values);
const failed = results.filter((r: PromiseSettledResult<any>) => r.status == 'rejected');
if (failed.length > 0) {
throw new global.AggregateError(
failed.map((r: PromiseRejectedResult) => r.reason),
failed.flatMap((r: any) => Array.from(r.reason.message, r.reason.stack || '')).join('\n'),
);
}

return results.map((r: PromiseFulfilledResult<any>) => r.value);
}
3 changes: 2 additions & 1 deletion src/duty/attestation/attestation.metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Inject, Injectable, LoggerService } from '@nestjs/common';

import { ConfigService } from 'common/config';
import { Epoch } from 'common/eth-providers/consensus-provider/types';
import { allSettled } from 'common/functions/allSettled';
import { PrometheusService, TrackTask, setOtherOperatorsMetric, setUserOperatorsMetric } from 'common/prometheus';
import { RegistryService, RegistrySourceOperator } from 'common/validators-registry';
import { ClickhouseService } from 'storage/clickhouse';
Expand Down Expand Up @@ -34,7 +35,7 @@ export class AttestationMetrics {
this.logger.log('Calculating attestation metrics');
this.processedEpoch = epoch;
this.operators = await this.registryService.getOperators();
await Promise.all([
await allSettled([
this.perfectAttestationsLastEpoch(),
this.missedAttestationsLastEpoch(),
this.highIncDelayAttestationsLastEpoch(),
Expand Down
8 changes: 7 additions & 1 deletion src/duty/attestation/attestation.rewards.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Inject, Injectable, LoggerService } from '@nestjs/common';

import { ConfigService } from 'common/config';
import { Epoch } from 'common/eth-providers/consensus-provider/types';
import { unblock } from 'common/functions/unblock';
import { PrometheusService } from 'common/prometheus';

import { SummaryService } from '../summary';
Expand Down Expand Up @@ -42,6 +43,8 @@ export class AttestationRewards {
Math.trunc(perfect.source * epochMeta.state.base_reward * 32 * sourceParticipation) +
Math.trunc(perfect.target * epochMeta.state.base_reward * 32 * targetParticipation) +
Math.trunc(perfect.head * epochMeta.state.base_reward * 32 * headParticipation);
const maxBatchSize = 10000;
let index = 0;
for (const v of this.summary.epoch(epoch).values()) {
// Calculate attestation rewards from previous epoch
const pv = this.summary.epoch(epoch - 1).get(v.val_id);
Expand Down Expand Up @@ -88,7 +91,10 @@ export class AttestationRewards {
att_missed_reward,
att_penalty,
});
index++;
if (index % maxBatchSize == 0) {
await unblock();
}
}
return true;
}
}
5 changes: 3 additions & 2 deletions src/duty/attestation/attestation.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { streamArray } from 'stream-json/streamers/StreamArray';
import { ConfigService } from 'common/config';
import { ConsensusProviderService } from 'common/eth-providers';
import { Epoch, Slot } from 'common/eth-providers/consensus-provider/types';
import { allSettled } from 'common/functions/allSettled';
import { range } from 'common/functions/range';
import { unblock } from 'common/functions/unblock';
import { PrometheusService, TrackTask } from 'common/prometheus';
Expand Down Expand Up @@ -67,7 +68,7 @@ export class AttestationService {

protected async processAttestation(epoch: Epoch, attestation: SlotAttestation, committee: number[]) {
const attestationFlags = { source: [], target: [], head: [] };
const [canonHead, canonTarget, canonSource] = await Promise.all([
const [canonHead, canonTarget, canonSource] = await allSettled([
this.getCanonSlotRoot(attestation.slot),
this.getCanonSlotRoot(attestation.target_epoch * this.slotsInEpoch),
this.getCanonSlotRoot(attestation.source_epoch * this.slotsInEpoch),
Expand Down Expand Up @@ -177,7 +178,7 @@ export class AttestationService {
}).finally(() => pipeline.destroy());
};

await Promise.all([processCommittees(this.processedEpoch - 1), processCommittees(this.processedEpoch)]);
await allSettled([processCommittees(this.processedEpoch - 1), processCommittees(this.processedEpoch)]);
return committees;
}
}
5 changes: 3 additions & 2 deletions src/duty/duty.metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Inject, Injectable, LoggerService } from '@nestjs/common';
import { ConfigService } from 'common/config';
import { ConsensusProviderService } from 'common/eth-providers';
import { Epoch } from 'common/eth-providers/consensus-provider/types';
import { allSettled } from 'common/functions/allSettled';
import { PrometheusService, TrackTask } from 'common/prometheus';

import { ClickhouseService } from '../storage';
Expand Down Expand Up @@ -34,7 +35,7 @@ export class DutyMetrics {
@TrackTask('calc-all-duties-metrics')
public async calculate(epoch: Epoch, possibleHighRewardValidators: string[]): Promise<any> {
this.logger.log('Calculating duties metrics of user validators');
await Promise.all([
await allSettled([
this.withPossibleHighReward(epoch, possibleHighRewardValidators),
this.stateMetrics.calculate(epoch),
this.withdrawalsMetrics.calculate(epoch),
Expand All @@ -45,7 +46,7 @@ export class DutyMetrics {
}

private async withPossibleHighReward(epoch: Epoch, possibleHighRewardValidators: string[]): Promise<void> {
await Promise.all([
await allSettled([
this.attestationMetrics.calculate(epoch, possibleHighRewardValidators),
this.proposeMetrics.calculate(epoch, possibleHighRewardValidators),
this.syncMetrics.calculate(epoch, possibleHighRewardValidators),
Expand Down
3 changes: 2 additions & 1 deletion src/duty/duty.rewards.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Inject, Injectable, LoggerService } from '@nestjs/common';

import { ConfigService } from 'common/config';
import { Epoch } from 'common/eth-providers/consensus-provider/types';
import { allSettled } from 'common/functions/allSettled';
import { PrometheusService, TrackTask } from 'common/prometheus';

import { AttestationRewards } from './attestation';
Expand All @@ -26,6 +27,6 @@ export class DutyRewards {
// todo: 'Slashed' case
// todo: 'Inactivity leak' case
this.logger.log('Calculate rewards for all duties');
await Promise.all([this.attestationRewards.calculate(epoch), this.syncRewards.calculate(epoch), this.proposerRewards.calculate(epoch)]);
await allSettled([this.attestationRewards.calculate(epoch), this.syncRewards.calculate(epoch), this.proposerRewards.calculate(epoch)]);
}
}
24 changes: 13 additions & 11 deletions src/duty/duty.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { ConfigService } from 'common/config';
import { BlockHeaderResponse, ConsensusProviderService } from 'common/eth-providers';
import { BlockCacheService } from 'common/eth-providers/consensus-provider/block-cache';
import { Epoch, Slot } from 'common/eth-providers/consensus-provider/types';
import { allSettled } from 'common/functions/allSettled';
import { range } from 'common/functions/range';
import { unblock } from 'common/functions/unblock';
import { PrometheusService, TrackTask } from 'common/prometheus';
Expand Down Expand Up @@ -41,18 +42,19 @@ export class DutyService {
) {}

public async checkAndWrite({ epoch, stateSlot }: { epoch: Epoch; stateSlot: Slot }): Promise<string[]> {
// Prefetch will be done before main checks because duty by state requests are heavy
// and while we wait for their responses we fetch blocks and headers.
// If for some reason prefetch task will be slower than duty by state requests,
// blocks and headers will be fetched inside tasks of checks
const [, , possibleHighRewardVals] = await Promise.all([
this.prefetch(epoch),
const [, , possibleHighRewardVals] = await allSettled([
// Prefetch will be done before main checks because duty by state requests are heavy
// and while we wait for their responses we fetch blocks and headers.
// If for some reason prefetch task will be slower than duty by state requests,
// blocks and headers will be fetched inside tasks of checks
// so, it can be optional when failed
this.prefetch(epoch).catch(() => undefined),
this.checkAll(epoch, stateSlot),
// optional task to get possible high reward validators for head epoch
// Optional task to get possible high reward validators for head epoch
// it's nice to have but not critical
this.getPossibleHighRewardValidators().catch(() => []),
]);
await Promise.all([this.writeEpochMeta(epoch), this.writeSummary(epoch)]);
await allSettled([this.writeEpochMeta(epoch), this.writeSummary(epoch)]);
this.summary.clear();
await this.storage.updateEpochProcessing({ epoch, is_stored: true });
return possibleHighRewardVals;
Expand All @@ -62,7 +64,7 @@ export class DutyService {
protected async checkAll(epoch: Epoch, stateSlot: Slot): Promise<any> {
this.summary.clear();
this.logger.log('Checking duties of validators');
await Promise.all([
await allSettled([
this.state.check(epoch, stateSlot),
this.attestation.check(epoch, stateSlot),
this.sync.check(epoch, stateSlot),
Expand All @@ -85,7 +87,7 @@ export class DutyService {
const toFetch = slots.map((s) => [this.clClient.getBlockHeader(s), this.clClient.getBlockInfo(s)]).flat();
while (toFetch.length > 0) {
const chunk = toFetch.splice(0, 32);
await Promise.all(chunk);
await allSettled(chunk);
}
}

Expand All @@ -95,7 +97,7 @@ export class DutyService {
const headEpoch = Math.trunc(actualSlotHeader.header.message.slot / this.config.get('FETCH_INTERVAL_SLOTS'));
this.logger.log('Getting possible high reward validator indexes');
const propDependentRoot = await this.clClient.getDutyDependentRoot(headEpoch);
const [sync, prop] = await Promise.all([
const [sync, prop] = await allSettled([
this.clClient.getSyncCommitteeInfo('finalized', headEpoch),
this.clClient.getCanonicalProposerDuties(headEpoch, propDependentRoot),
]);
Expand Down
3 changes: 2 additions & 1 deletion src/duty/propose/propose.metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Inject, Injectable, LoggerService } from '@nestjs/common';

import { ConfigService } from 'common/config';
import { Epoch } from 'common/eth-providers/consensus-provider/types';
import { allSettled } from 'common/functions/allSettled';
import { PrometheusService, TrackTask, setOtherOperatorsMetric, setUserOperatorsMetric } from 'common/prometheus';
import { RegistryService, RegistrySourceOperator } from 'common/validators-registry';
import { ClickhouseService } from 'storage';
Expand All @@ -24,7 +25,7 @@ export class ProposeMetrics {
this.logger.log('Calculating propose metrics');
this.processedEpoch = epoch;
this.operators = await this.registryService.getOperators();
await Promise.all([this.goodProposes(), this.missProposes(), this.highRewardMissProposes(possibleHighRewardValidators)]);
await allSettled([this.goodProposes(), this.missProposes(), this.highRewardMissProposes(possibleHighRewardValidators)]);
}

private async goodProposes() {
Expand Down
3 changes: 2 additions & 1 deletion src/duty/state/state.metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Inject, Injectable, LoggerService } from '@nestjs/common';

import { ConfigService } from 'common/config';
import { Epoch } from 'common/eth-providers/consensus-provider/types';
import { allSettled } from 'common/functions/allSettled';
import { Owner, PrometheusService, PrometheusValStatus, TrackTask, setUserOperatorsMetric } from 'common/prometheus';
import { RegistryService, RegistrySourceOperator } from 'common/validators-registry';
import { LidoSourceService } from 'common/validators-registry/lido-source';
Expand All @@ -29,7 +30,7 @@ export class StateMetrics {
this.logger.log('Calculating state metrics');
this.processedEpoch = epoch;
this.operators = await this.registryService.getOperators();
await Promise.all([
await allSettled([
this.operatorsIdentifies(),
this.nosStats(),
this.userValidatorsStats(),
Expand Down
45 changes: 25 additions & 20 deletions src/duty/state/state.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@ import { chain } from 'stream-chain';
import { parser } from 'stream-json';
import { pick } from 'stream-json/filters/Pick';
import { streamArray } from 'stream-json/streamers/StreamArray';
import { batch } from 'stream-json/utils/Batch';

import { ConfigService } from 'common/config';
import { ConsensusProviderService, StateValidatorResponse, ValStatus } from 'common/eth-providers';
import { Epoch, Slot } from 'common/eth-providers/consensus-provider/types';
import { bigNumberSqrt } from 'common/functions/bigNumberSqrt';
import { unblock } from 'common/functions/unblock';
import { PrometheusService, TrackTask } from 'common/prometheus';
import { RegistryService } from 'common/validators-registry';
import { ClickhouseService } from 'storage/clickhouse';

import { bigNumberSqrt } from '../../common/functions/bigNumberSqrt';
import { SummaryService } from '../summary';

@Injectable()
Expand All @@ -37,27 +39,30 @@ export class StateService {
this.logger.log('Processing all validators state');
let activeValidatorsCount = 0;
let activeValidatorsEffectiveBalance = 0n;
const pipeline = chain([readStream, parser(), pick({ filter: 'data' }), streamArray()]);
const pipeline = chain([readStream, parser(), pick({ filter: 'data' }), streamArray(), batch({ batchSize: 10000 })]);
await new Promise((resolve, reject) => {
pipeline.on('data', async (data) => {
const state: StateValidatorResponse = data.value;
const index = Number(state.index);
const operator = this.registry.getOperatorKey(state.validator.pubkey);
this.summary.epoch(epoch).set({
epoch,
val_id: index,
val_pubkey: state.validator.pubkey,
val_nos_id: operator?.operatorIndex,
val_nos_name: operator?.operatorName,
val_slashed: state.validator.slashed,
val_status: state.status,
val_balance: BigInt(state.balance),
val_effective_balance: BigInt(state.validator.effective_balance),
});
if ([ValStatus.ActiveOngoing, ValStatus.ActiveExiting, ValStatus.ActiveSlashed].includes(state.status)) {
activeValidatorsCount++;
activeValidatorsEffectiveBalance += BigInt(state.validator.effective_balance) / BigInt(10 ** 9);
pipeline.on('data', async (batch) => {
for (const data of batch) {
const state: StateValidatorResponse = data.value;
const index = Number(state.index);
const operator = this.registry.getOperatorKey(state.validator.pubkey);
this.summary.epoch(epoch).set({
epoch,
val_id: index,
val_pubkey: state.validator.pubkey,
val_nos_id: operator?.operatorIndex,
val_nos_name: operator?.operatorName,
val_slashed: state.validator.slashed,
val_status: state.status,
val_balance: BigInt(state.balance),
val_effective_balance: BigInt(state.validator.effective_balance),
});
if ([ValStatus.ActiveOngoing, ValStatus.ActiveExiting, ValStatus.ActiveSlashed].includes(state.status)) {
activeValidatorsCount++;
activeValidatorsEffectiveBalance += BigInt(state.validator.effective_balance) / BigInt(10 ** 9);
}
}
await unblock();
});
pipeline.on('error', (error) => reject(error));
pipeline.on('end', () => resolve(true));
Expand Down
3 changes: 2 additions & 1 deletion src/duty/summary/summary.metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Inject, Injectable, LoggerService } from '@nestjs/common';
import { ConfigService } from 'common/config';
import { ConsensusProviderService } from 'common/eth-providers';
import { Epoch } from 'common/eth-providers/consensus-provider/types';
import { allSettled } from 'common/functions/allSettled';
import { PrometheusService, TrackTask, setUserOperatorsMetric } from 'common/prometheus';
import { RegistryService, RegistrySourceOperator } from 'common/validators-registry';
import { ClickhouseService } from 'storage';
Expand Down Expand Up @@ -32,7 +33,7 @@ export class SummaryMetrics {
this.logger.log('Calculating propose metrics');
this.processedEpoch = epoch;
this.operators = await this.registryService.getOperators();
await Promise.all([this.userRewards(), this.avgChainRewards(), this.common()]);
await allSettled([this.userRewards(), this.avgChainRewards(), this.common()]);
}

private async common() {
Expand Down
5 changes: 3 additions & 2 deletions src/duty/sync/sync.metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Inject, Injectable, LoggerService } from '@nestjs/common';

import { ConfigService } from 'common/config';
import { Epoch } from 'common/eth-providers/consensus-provider/types';
import { allSettled } from 'common/functions/allSettled';
import { PrometheusService, TrackTask, setOtherOperatorsMetric, setUserOperatorsMetric } from 'common/prometheus';
import { RegistryService, RegistrySourceOperator } from 'common/validators-registry';
import { ClickhouseService } from 'storage';
Expand All @@ -28,7 +29,7 @@ export class SyncMetrics {
this.processedEpoch = epoch;
this.operators = await this.registryService.getOperators();

await Promise.all([
await allSettled([
this.userAvgSyncPercent(),
this.otherAvgSyncPercent(),
this.operatorAvgSyncPercents(),
Expand All @@ -55,7 +56,7 @@ export class SyncMetrics {

private async syncParticipation(possibleHighRewardValidators: string[]) {
const chainAvgSyncPercent = await this.chainAvgSyncPercent();
await Promise.all([
await allSettled([
this.goodSyncParticipationLastEpoch(chainAvgSyncPercent),
this.badSyncParticipationLastEpoch(chainAvgSyncPercent),
this.badSyncParticipationLastNEpoch(chainAvgSyncPercent),
Expand Down
3 changes: 1 addition & 2 deletions src/duty/sync/sync.rewards.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export class SyncRewards {
protected readonly summary: SummaryService,
) {}

public calculate(epoch: Epoch) {
public async calculate(epoch: Epoch) {
const epochMeta = this.summary.epoch(epoch).getMeta();
let sync_earned_reward = 0;
let sync_missed_reward = 0;
Expand All @@ -30,6 +30,5 @@ export class SyncRewards {

this.summary.epoch(epoch).set({ epoch, val_id: v.val_id, sync_earned_reward, sync_penalty, sync_missed_reward });
}
return true;
}
}
3 changes: 2 additions & 1 deletion src/duty/withdrawal/withdrawals.metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Inject, Injectable, LoggerService } from '@nestjs/common';

import { ConfigService } from 'common/config';
import { Epoch } from 'common/eth-providers/consensus-provider/types';
import { allSettled } from 'common/functions/allSettled';
import { PrometheusService, TrackTask, setUserOperatorsMetric } from 'common/prometheus';
import { RegistryService, RegistrySourceOperator } from 'common/validators-registry';
import { ClickhouseService } from 'storage/clickhouse';
Expand All @@ -29,7 +30,7 @@ export class WithdrawalsMetrics {
this.logger.log('Calculating withdrawals metrics');
this.processedEpoch = epoch;
this.operators = this.registryService.getOperators();
await Promise.all([this.userNodeOperatorsWithdrawalsStats(), this.otherChainWithdrawalsStats()]);
await allSettled([this.userNodeOperatorsWithdrawalsStats(), this.otherChainWithdrawalsStats()]);
}

private async userNodeOperatorsWithdrawalsStats() {
Expand Down
5 changes: 3 additions & 2 deletions src/duty/withdrawal/withdrawals.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import { Inject, Injectable, LoggerService } from '@nestjs/common';
import { ConfigService } from 'common/config';
import { BlockInfoResponse, ConsensusProviderService } from 'common/eth-providers';
import { Epoch } from 'common/eth-providers/consensus-provider/types';
import { allSettled } from 'common/functions/allSettled';
import { range } from 'common/functions/range';
import { PrometheusService, TrackTask } from 'common/prometheus';
import { RegistryService } from 'common/validators-registry';
import { ClickhouseService } from 'storage/clickhouse';

import { range } from '../../common/functions/range';
import { SummaryService } from '../summary';

@Injectable()
Expand All @@ -30,7 +31,7 @@ export class WithdrawalsService {
const firstSlotInEpoch = epoch * slotsInEpoch;
const slots: number[] = range(firstSlotInEpoch, firstSlotInEpoch + slotsInEpoch);
const toFetch = slots.map((s) => this.clClient.getBlockInfo(s));
const blocks = (await Promise.all(toFetch)).filter((b) => b != undefined) as BlockInfoResponse[];
const blocks = (await allSettled(toFetch)).filter((b) => b != undefined) as BlockInfoResponse[];
for (const block of blocks) {
const withdrawals = block.message.body.execution_payload.withdrawals ?? [];
for (const withdrawal of withdrawals) {
Expand Down
Loading

0 comments on commit d62f767

Please sign in to comment.