Skip to content

Commit

Permalink
feat: node 21.1 + undici + state from ssz
Browse files Browse the repository at this point in the history
  • Loading branch information
vgorkavenko committed Feb 9, 2024
1 parent 0fdb8cf commit 6df86d3
Show file tree
Hide file tree
Showing 6 changed files with 1,772 additions and 1,933 deletions.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM node:16-alpine as building
FROM node:21.2-alpine as building

WORKDIR /app

Expand All @@ -9,7 +9,7 @@ COPY ./tsconfig*.json ./
COPY ./src ./src
RUN yarn build

FROM node:16-alpine
FROM node:21.2-alpine

WORKDIR /app

Expand Down
6 changes: 4 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"start": "nest start",
"start:dev": "nest start --watch",
"start:debug": "nest start --debug --watch",
"start:prod": "node dist/src/main --trace-warnings --sync --sub-fin-head --max-old-space-size=8192",
"start:prod": "export NODE_OPTIONS=--max_old_space_size=8192 && node dist/src/main --trace-warnings --sync --sub-fin-head",
"test": "jest --detectOpenHandles --forceExit",
"test:watch": "jest --watch",
"test:cov": "jest --coverage",
Expand All @@ -34,6 +34,7 @@
"@lido-nestjs/execution": "^1.11.1",
"@lido-nestjs/logger": "^1.3.2",
"@lido-nestjs/registry": "^7.4.0",
"@lodestar/types": "^1.15.1",
"@mikro-orm/core": "^5.3.1",
"@mikro-orm/knex": "^5.3.1",
"@mikro-orm/nestjs": "^5.1.0",
Expand Down Expand Up @@ -62,7 +63,8 @@
"retry-ts": "^0.1.3",
"stream-chain": "^2.2.5",
"stream-json": "^1.7.5",
"typechain": "^5.2.0"
"typechain": "^5.2.0",
"undici": "^6.6.2"
},
"devDependencies": {
"@golevelup/ts-jest": "^0.3.7",
Expand Down
78 changes: 42 additions & 36 deletions src/common/consensus-provider/consensus-provider.service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { LOGGER_PROVIDER } from '@lido-nestjs/logger';
import { Inject, Injectable, LoggerService } from '@nestjs/common';
import { NonEmptyArray } from 'fp-ts/NonEmptyArray';
import { HTTPError, Response, got } from 'got-cjs';
import { request } from 'undici';
import BodyReadable from 'undici/types/readable';

import { ConfigService } from 'common/config';
import { range } from 'common/functions/range';
Expand Down Expand Up @@ -74,6 +75,7 @@ export class ConsensusProviderService {
proposerDutes: (epoch: Epoch): string => `eth/v1/validator/duties/proposer/${epoch}`,
attesterDuties: (epoch: Epoch): string => `eth/v1/validator/duties/attester/${epoch}`,
syncCommitteeDuties: (epoch: Epoch): string => `eth/v1/validator/duties/sync/${epoch}`,
state: (stateId: StateId): string => `eth/v2/debug/beacon/states/${stateId}`,
};

public constructor(
Expand Down Expand Up @@ -278,12 +280,21 @@ export class ConsensusProviderService {
return blockInfo;
}

public async getValidatorsState(stateId: StateId): Promise<Request> {
public async getValidatorsState(stateId: StateId): Promise<BodyReadable> {
return await this.retryRequest(async (apiURL: string) => await this.apiGetStream(apiURL, this.endpoints.validatorsState(stateId)), {
dataOnly: false,
});
}

public async getStateSSZ(stateId: StateId): Promise<BodyReadable> {
return await this.retryRequest(
async (apiURL: string) => await this.apiGetStream(apiURL, this.endpoints.state(stateId), { accept: 'application/octet-stream' }),
{
dataOnly: false,
},
);
}

public async getBlockInfo(blockId: BlockId): Promise<BlockInfoResponse | void> {
const cached: BlockCache = this.cache.get(String(blockId));
if (cached && (cached.missed || cached.info)) {
Expand Down Expand Up @@ -315,7 +326,7 @@ export class ConsensusProviderService {
return blockInfo;
}

public async getAttestationCommitteesInfo(stateId: StateId, epoch: Epoch): Promise<Request> {
public async getAttestationCommitteesInfo(stateId: StateId, epoch: Epoch): Promise<BodyReadable> {
return await this.retryRequest(
async (apiURL: string) => await this.apiGetStream(apiURL, this.endpoints.attestationCommittees(stateId, epoch)),
{
Expand Down Expand Up @@ -401,43 +412,38 @@ export class ConsensusProviderService {

@TrackCLRequest
protected async apiGet<T>(apiURL: string, subUrl: string): Promise<T> {
const res = await got
.get(urljoin(apiURL, subUrl), { timeout: { ...REQUEST_TIMEOUT_POLICY_MS, response: this.config.get('CL_API_GET_RESPONSE_TIMEOUT') } })
.catch((e) => {
if (e.response) {
throw new ResponseError(errRequest(e.response.body, subUrl, apiURL), e.response.statusCode);
}
throw new ResponseError(errCommon(e.message, subUrl, apiURL));
});
if (res.statusCode !== 200) {
throw new ResponseError(errRequest(res.body, subUrl, apiURL), res.statusCode);
}
try {
return JSON.parse(res.body);
} catch (e) {
throw new ResponseError(`Error converting response body to JSON. Body: ${res.body}`);
const { body, statusCode } = await request(urljoin(apiURL, subUrl), {
method: 'GET',
headersTimeout: this.config.get('CL_API_GET_RESPONSE_TIMEOUT'),
}).catch((e) => {
if (e.response) {
throw new ResponseError(errRequest(e.response.body, subUrl, apiURL), e.response.statusCode);
}
throw new ResponseError(errCommon(e.message, subUrl, apiURL));
});
if (statusCode !== 200) {
const errorText = await body.text();
throw new ResponseError(errRequest(errorText, subUrl, apiURL), statusCode);
}
return (await body.json()) as T;
}

@TrackCLRequest
protected async apiGetStream(apiURL: string, subUrl: string): Promise<Request> {
const readStream = got.stream.get(urljoin(apiURL, subUrl), {
timeout: { ...REQUEST_TIMEOUT_POLICY_MS, response: this.config.get('CL_API_GET_RESPONSE_TIMEOUT') },
protected async apiGetStream(apiURL: string, subUrl: string, headers?: Record<string, string>): Promise<BodyReadable> {
const { body, statusCode } = await request(urljoin(apiURL, subUrl), {
method: 'GET',
headersTimeout: this.config.get('CL_API_GET_RESPONSE_TIMEOUT'),
headers: headers,
}).catch((e) => {
if (e.response) {
throw new ResponseError(errRequest(e.response.body, subUrl, apiURL), e.response.statusCode);
}
throw new ResponseError(errCommon(e.message, subUrl, apiURL));
});

return new Promise((resolve, reject) => {
readStream.on('response', (r: Response) => {
if (r.statusCode != 200) reject(new HTTPError(r));
resolve(readStream);
});
readStream.on('error', (e) => reject(e));
})
.then((r: Request) => r)
.catch((e) => {
if (e instanceof HTTPError) {
throw new ResponseError(errRequest(<string>e.response.body, subUrl, apiURL), e.response.statusCode);
}
throw new ResponseError(errCommon(e.message, subUrl, apiURL));
});
if (statusCode !== 200) {
const errorText = await body.text();
throw new ResponseError(errRequest(errorText, subUrl, apiURL), statusCode);
}
return body;
}
}
112 changes: 66 additions & 46 deletions src/duty/state/state.service.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
import { BigNumber } from '@ethersproject/bignumber';
import { LOGGER_PROVIDER } from '@lido-nestjs/logger';
import { Inject, Injectable, LoggerService } from '@nestjs/common';
import { chain } from 'stream-chain';
import { parser } from 'stream-json';
import { pick } from 'stream-json/filters/Pick';
import { streamArray } from 'stream-json/streamers/StreamArray';
import { batch } from 'stream-json/utils/Batch';

import { ConfigService } from 'common/config';
import { ConsensusProviderService, StateValidatorResponse, ValStatus } from 'common/consensus-provider';
import { ConsensusProviderService, ValStatus } from 'common/consensus-provider';
import { Epoch, Slot } from 'common/consensus-provider/types';
import { bigNumberSqrt } from 'common/functions/bigNumberSqrt';
import { unblock } from 'common/functions/unblock';
Expand All @@ -17,6 +12,10 @@ import { SummaryService } from 'duty/summary';
import { ClickhouseService } from 'storage/clickhouse';
import { RegistryService } from 'validators-registry';

let types: typeof import('@lodestar/types');

const FAR_FUTURE_EPOCH = Infinity;

@Injectable()
export class StateService {
public constructor(
Expand All @@ -32,52 +31,44 @@ export class StateService {
@TrackTask('check-state-duties')
public async check(epoch: Epoch, stateSlot: Slot): Promise<void> {
const slotTime = await this.clClient.getSlotTime(epoch * this.config.get('FETCH_INTERVAL_SLOTS'));
await this.registry.updateKeysRegistry(Number(slotTime));
this.logger.log('Getting all validators state');
const [readStream, _] = await Promise.all([
this.clClient.getValidatorsState(stateSlot),
this.registry.updateKeysRegistry(Number(slotTime)),
]);
const stateBody = await this.clClient.getStateSSZ(stateSlot);
const stateSSZ = new Uint8Array(await stateBody.arrayBuffer());
this.logger.log('Processing all validators state');
let activeValidatorsCount = 0;
let activeValidatorsEffectiveBalance = 0n;
const stuckKeys = this.registry.getStuckKeys();
const pipeline = chain([
readStream,
parser(),
pick({ filter: 'data' }),
streamArray(),
batch({ batchSize: 100 }),
async (batch) => {
types = await eval(`import('@lodestar/types')`);
const stateView = types.ssz.capella.BeaconState.deserializeToView(stateSSZ);
const balances = stateView.balances.getAll();
// unblock every 1000 validators
for (const [index, validator] of stateView.validators.getAllReadonlyValues().entries()) {
if (index % 1000 === 0) {
await unblock();
for (const data of batch) {
const state: StateValidatorResponse = data.value;
const index = Number(state.index);
const operator = this.registry.getOperatorKey(state.validator.pubkey);
this.summary.epoch(epoch).set({
epoch,
val_id: index,
val_pubkey: state.validator.pubkey,
val_nos_module_id: operator?.moduleIndex,
val_nos_id: operator?.operatorIndex,
val_nos_name: operator?.operatorName,
val_slashed: state.validator.slashed,
val_status: state.status,
val_balance: BigInt(state.balance),
val_effective_balance: BigInt(state.validator.effective_balance),
val_stuck: stuckKeys.includes(state.validator.pubkey),
});
if ([ValStatus.ActiveOngoing, ValStatus.ActiveExiting, ValStatus.ActiveSlashed].includes(state.status)) {
activeValidatorsCount++;
activeValidatorsEffectiveBalance += BigInt(state.validator.effective_balance) / BigInt(10 ** 9);
}
}
},
]);
pipeline.on('data', (data) => data);
await new Promise((resolve, reject) => {
pipeline.on('error', (error) => reject(error));
pipeline.on('end', () => resolve(true));
}).finally(() => pipeline.destroy());
}
const status = this.getValidatorStatus(validator, epoch);
const pubkey = '0x'.concat(Buffer.from(validator.pubkey).toString('hex'));
const operator = this.registry.getOperatorKey(pubkey);
const v = {
epoch,
val_id: index,
val_pubkey: pubkey,
val_nos_module_id: operator?.moduleIndex,
val_nos_id: operator?.operatorIndex,
val_nos_name: operator?.operatorName,
val_slashed: validator.slashed,
val_status: status,
val_balance: BigInt(balances[index]),
val_effective_balance: BigInt(validator.effectiveBalance),
val_stuck: stuckKeys.includes(pubkey),
};
this.summary.epoch(epoch).set(v);
if ([ValStatus.ActiveOngoing, ValStatus.ActiveExiting, ValStatus.ActiveSlashed].includes(status)) {
activeValidatorsCount++;
activeValidatorsEffectiveBalance += BigInt(validator.effectiveBalance) / BigInt(10 ** 9);
}
}
const baseReward = Math.trunc(
BigNumber.from(64 * 10 ** 9)
.div(bigNumberSqrt(BigNumber.from(activeValidatorsEffectiveBalance).mul(10 ** 9)))
Expand All @@ -91,4 +82,33 @@ export class StateService {
},
});
}

//https://github.com/ChainSafe/lodestar/blob/stable/packages/beacon-node/src/api/impl/beacon/state/utils.ts
public getValidatorStatus(validator: any, currentEpoch: Epoch): ValStatus {
// pending
if (validator.activationEpoch > currentEpoch) {
if (validator.activationEligibilityEpoch === FAR_FUTURE_EPOCH) {
return ValStatus.PendingInitialized;
} else if (validator.activationEligibilityEpoch < FAR_FUTURE_EPOCH) {
return ValStatus.PendingQueued;
}
}
// active
if (validator.activationEpoch <= currentEpoch && currentEpoch < validator.exitEpoch) {
if (validator.exitEpoch === FAR_FUTURE_EPOCH) {
return ValStatus.ActiveOngoing;
} else if (validator.exitEpoch < FAR_FUTURE_EPOCH) {
return validator.slashed ? ValStatus.ActiveSlashed : ValStatus.ActiveExiting;
}
}
// exited
if (validator.exitEpoch <= currentEpoch && currentEpoch < validator.withdrawableEpoch) {
return validator.slashed ? ValStatus.ExitedSlashed : ValStatus.ExitedUnslashed;
}
// withdrawal
if (validator.withdrawableEpoch <= currentEpoch) {
return validator.effectiveBalance !== 0 ? ValStatus.WithdrawalPossible : ValStatus.WithdrawalDone;
}
throw new Error('ValidatorStatus unknown');
}
}
2 changes: 0 additions & 2 deletions src/duty/summary/summary.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,13 @@ export class SummaryService {
setMeta: (val: EpochMeta) => {
const curr = epochStorageData.meta;
epochStorageData.meta = merge(curr, val);
this.storage.set(epoch, epochStorageData);
},
getMeta: (): EpochMeta => {
return epochStorageData.meta;
},
set: (val: ValidatorDutySummary) => {
const curr = epochStorageData.summary.get(val.val_id) ?? {};
epochStorageData.summary.set(val.val_id, merge(curr, val));
this.storage.set(epoch, epochStorageData);
},
get: (val_id: ValidatorId): ValidatorDutySummary => {
return epochStorageData.summary.get(val_id);
Expand Down
Loading

0 comments on commit 6df86d3

Please sign in to comment.