Skip to content

Commit

Permalink
feat: head working mode
Browse files Browse the repository at this point in the history
  • Loading branch information
vgorkavenko committed Jul 12, 2023
1 parent eba5ce2 commit 71fb8a8
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 22 deletions.
36 changes: 35 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,41 @@ Consensus layer validators monitoring bot, that fetches Lido or Custom Users Nod
from Execution layer and checks their performance in Consensus
layer by: balance delta, attestations, proposes, sync committee participation.

Bot uses finalized state (2 epochs back from HEAD) for fetching validator info,
Bot has two separate working modes: `head` and `finalized` for fetching validator info,
writes data to **Clickhouse**, displays aggregates by **Grafana**
dashboard, alerts about bad performance by **Prometheus + Alertmanger** and
routes notifications to Discord channel via **alertmanager-discord**.

## Working modes

You can switch working mode by providing `WORKING_MODE` environment variable with one of the following values:

### `finalized`
Default working mode. Fetches validator info from `finalized` epoch (2 epochs back from `head`).
It is more stable and reliable because of all data is already finalized.

**Pros**:
* No errors due to reorgs
* Less rewards calculation errors
* Accurate data in alerts and dashboard

**Cons**:
* 2 epochs delay in processing and critical alerts will be given with 2 epochs delay
* In case of long finality the app will not monitor and will wait for the finality

### `head`
Alternative working mode. Fetches validator info from `head` (non-finalized) epoch.
It is less stable and reliable because of data is not finalized yet. There can be some calculation errors because of reorgs.

**Pros**:
* Less delay in processing and critical alerts will be given with less delay
* In case of long finality the app will monitor and will not wait for the finality

**Cons**:
* Errors due to reorgs
* More rewards calculation errors
* Possible inaccurate data in alerts and dashboard

## Dashboards

There are three dashboards in Grafana:
Expand Down Expand Up @@ -106,6 +136,10 @@ If you want to implement your own source, it must match [RegistrySource interfac
* **Required:** false
* **Default:** json
---
`WORKING_MODE` - Application working mode (finalized or head)
* **Required:** false
* **Default:** finalized
---
`DB_HOST` - Clickhouse server host
* **Required:** true
---
Expand Down
8 changes: 8 additions & 0 deletions src/common/config/env.validation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ export enum ValidatorRegistrySource {
KeysAPI = 'keysapi',
}

export enum WorkingMode {
Finalized = 'finalized',
Head = 'head',
}

const toBoolean = (value: any): boolean => {
if (typeof value === 'boolean') {
return value;
Expand Down Expand Up @@ -277,6 +282,9 @@ export class EnvironmentVariables {
@IsObject()
@Transform(({ value }) => JSON.parse(value), { toClassOnly: true })
public CRITICAL_ALERTS_ALERTMANAGER_LABELS = {};

@IsEnum(WorkingMode)
public WORKING_MODE = WorkingMode.Finalized;
}

export function validate(config: Record<string, unknown>) {
Expand Down
5 changes: 3 additions & 2 deletions src/common/consensus-provider/consensus-provider.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,10 @@ export class ConsensusProviderService {
);
}

public async getFinalizedBlockHeader(processingState: EpochProcessingState): Promise<BlockHeaderResponse | void> {
public async getLatestBlockHeader(processingState: EpochProcessingState): Promise<BlockHeaderResponse | void> {
const latestFrom = this.config.get('WORKING_MODE');
return await this.retryRequest<BlockHeaderResponse>(
async (apiURL: string) => this.apiGet(apiURL, this.endpoints.beaconHeaders('finalized')),
async (apiURL: string) => this.apiGet(apiURL, this.endpoints.beaconHeaders(latestFrom)),
{
maxRetries: this.config.get('CL_API_GET_BLOCK_INFO_MAX_RETRIES'),
useFallbackOnResolved: (r) => {
Expand Down
2 changes: 1 addition & 1 deletion src/common/functions/allSettled.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// wait for all promises to resolve and throws if any error occurs
export async function allSettled(values: Promise<any>[]): Promise<any[]> {
export async function allSettled(values: Promise<any>[] | any[]): Promise<any[]> {
const results = await Promise.allSettled(values);
const failed = results.filter((r: PromiseSettledResult<any>) => r.status == 'rejected');
if (failed.length > 0) {
Expand Down
5 changes: 3 additions & 2 deletions src/duty/duty.service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { LOGGER_PROVIDER } from '@lido-nestjs/logger';
import { Inject, Injectable, LoggerService } from '@nestjs/common';

import { ConfigService } from 'common/config';
import { ConfigService, WorkingMode } from 'common/config';
import { BlockHeaderResponse, ConsensusProviderService } from 'common/consensus-provider';
import { BlockCacheService } from 'common/consensus-provider/block-cache';
import { Epoch, Slot } from 'common/consensus-provider/types';
Expand Down Expand Up @@ -42,6 +42,7 @@ export class DutyService {
) {}

public async checkAndWrite({ epoch, stateSlot }: { epoch: Epoch; stateSlot: Slot }): Promise<string[]> {
const workingMode = this.config.get('WORKING_MODE');
const [, , possibleHighRewardVals] = await allSettled([
// Prefetch will be done before main checks because duty by state requests are heavy
// and while we wait for their responses we fetch blocks and headers.
Expand All @@ -52,7 +53,7 @@ export class DutyService {
this.checkAll(epoch, stateSlot),
// Optional task to get possible high reward validators for head epoch
// it's nice to have but not critical
this.getPossibleHighRewardValidators().catch(() => []),
workingMode == WorkingMode.Finalized ? this.getPossibleHighRewardValidators().catch(() => []) : [],
]);
await allSettled([this.writeEpochMeta(epoch), this.writeSummary(epoch)]);
this.summary.clear();
Expand Down
30 changes: 14 additions & 16 deletions src/inspector/inspector.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { LOGGER_PROVIDER } from '@lido-nestjs/logger';
import { Inject, Injectable, LoggerService, OnModuleInit } from '@nestjs/common';

import { CriticalAlertsService } from 'common/alertmanager';
import { ConfigService } from 'common/config';
import { ConfigService, WorkingMode } from 'common/config';
import { BlockHeaderResponse, ConsensusProviderService } from 'common/consensus-provider';
import { BlockCacheService } from 'common/consensus-provider/block-cache';
import { Slot } from 'common/consensus-provider/types';
Expand Down Expand Up @@ -37,12 +37,14 @@ export class InspectorService implements OnModuleInit {
public async startLoop(): Promise<never> {
const version = await this.clClient.getVersion();
this.logger.log(`Beacon chain API info [${version}]`);

// eslint-disable-next-line no-constant-condition
while (true) {
try {
const toProcess = await this.getEpochDataToProcess();
if (toProcess) {
if (this.config.get('WORKING_MODE') === WorkingMode.Head) {
this.logger.warn(`Working in HEAD mode. This can cause calculation errors and inaccurate data!`);
}
const { epoch, slot, is_stored, is_calculated } = toProcess;
let possibleHighRewardValidators = [];
if (!is_stored) {
Expand Down Expand Up @@ -71,29 +73,25 @@ export class InspectorService implements OnModuleInit {

protected async getEpochDataToProcess(): Promise<EpochProcessingState & { slot: Slot }> {
const chosen = await this.chooseEpochToProcess();
const latestFinalizedBeaconBlock = Number(
(<BlockHeaderResponse>await this.clClient.getFinalizedBlockHeader(chosen)).header.message.slot,
);
let latestFinalizedEpoch = Math.trunc(latestFinalizedBeaconBlock / this.config.get('FETCH_INTERVAL_SLOTS'));
if (latestFinalizedEpoch * this.config.get('FETCH_INTERVAL_SLOTS') == latestFinalizedBeaconBlock) {
// if it's the first slot of epoch, it finalizes previous epoch
latestFinalizedEpoch -= 1;
const latestBeaconBlock = Number((<BlockHeaderResponse>await this.clClient.getLatestBlockHeader(chosen)).header.message.slot);
let latestEpoch = Math.trunc(latestBeaconBlock / this.config.get('FETCH_INTERVAL_SLOTS'));
if (latestEpoch * this.config.get('FETCH_INTERVAL_SLOTS') == latestBeaconBlock) {
// if it's the first slot of epoch, it makes checkpoint for previous epoch
latestEpoch -= 1;
}
if (chosen.slot > latestFinalizedBeaconBlock) {
// new finalized slot hasn't happened, from which parent we can get information about needed state
// just wait `CHAIN_SLOT_TIME_SECONDS` until finality happens
if (chosen.slot > latestBeaconBlock) {
// new latest slot hasn't happened, from which parent we can get information about needed state
// just wait `CHAIN_SLOT_TIME_SECONDS` until new slot happens
const sleepTime = this.config.get('CHAIN_SLOT_TIME_SECONDS');
this.logger.log(
`Latest finalized epoch [${latestFinalizedEpoch}]. Waiting [${sleepTime}] seconds for next finalized epoch [${chosen.epoch}]`,
);
this.logger.log(`Latest epoch [${latestEpoch}]. Waiting [${sleepTime}] seconds for the end of epoch [${chosen.epoch}]`);

return new Promise((resolve) => {
setTimeout(() => resolve(undefined), sleepTime * 1000);
});
}
// new finalized epoch has happened, from which parent we can get information about needed state
const existedHeader = (await this.clClient.getBeaconBlockHeaderOrPreviousIfMissed(chosen.slot)).header.message;
this.logger.log(`Latest finalized epoch [${latestFinalizedEpoch}]. Next epoch to process [${chosen.epoch}]`);
this.logger.log(`Latest epoch [${latestEpoch}]. Next epoch to process [${chosen.epoch}]`);
if (chosen.slot == Number(existedHeader.slot)) {
this.logger.log(
`Epoch [${chosen.epoch}] is chosen to process with state slot [${chosen.slot}] with root [${existedHeader.state_root}]`,
Expand Down

0 comments on commit 71fb8a8

Please sign in to comment.