Skip to content

Commit

Permalink
chore: yet another tweak (#146)
Browse files Browse the repository at this point in the history
  • Loading branch information
vgorkavenko committed Apr 4, 2023
1 parent 4496b7f commit ea6268b
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 26 deletions.
8 changes: 4 additions & 4 deletions src/duty/attestation/attestation.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,18 +175,19 @@ export class AttestationService {
pick({ filter: 'data' }),
streamArray(),
batch({ batchSize: 5 }),
(batch) => processBatch(this.processedEpoch - 1, batch),
async (batch) => processBatch(this.processedEpoch - 1, batch),
]);
const currPipeline = chain([
currStream,
parser(),
pick({ filter: 'data' }),
streamArray(),
batch({ batchSize: 5 }),
(batch) => processBatch(this.processedEpoch, batch),
async (batch) => processBatch(this.processedEpoch, batch),
]);

const processBatch = (epoch: Epoch, batch) => {
const processBatch = async (epoch: Epoch, batch) => {
await unblock();
for (const data of batch) {
const committee: AttestationCommitteeInfo = data.value;
// validator doesn't attests by default
Expand All @@ -198,7 +199,6 @@ export class AttestationService {
committee.validators.map((v) => Number(v)),
);
}
return batch;
};

const pipelineFinish = async (pipeline) => {
Expand Down
53 changes: 31 additions & 22 deletions src/duty/state/state.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { ConfigService } from 'common/config';
import { ConsensusProviderService, StateValidatorResponse, ValStatus } from 'common/eth-providers';
import { Epoch, Slot } from 'common/eth-providers/consensus-provider/types';
import { bigNumberSqrt } from 'common/functions/bigNumberSqrt';
import { unblock } from 'common/functions/unblock';
import { PrometheusService, TrackTask } from 'common/prometheus';
import { RegistryService } from 'common/validators-registry';
import { ClickhouseService } from 'storage/clickhouse';
Expand Down Expand Up @@ -38,29 +39,37 @@ export class StateService {
this.logger.log('Processing all validators state');
let activeValidatorsCount = 0;
let activeValidatorsEffectiveBalance = 0n;
const pipeline = chain([readStream, parser(), pick({ filter: 'data' }), streamArray(), batch({ batchSize: 1000 })]);
pipeline.on('data', (batch) => {
for (const data of batch) {
const state: StateValidatorResponse = data.value;
const index = Number(state.index);
const operator = this.registry.getOperatorKey(state.validator.pubkey);
this.summary.epoch(epoch).set({
epoch,
val_id: index,
val_pubkey: state.validator.pubkey,
val_nos_id: operator?.operatorIndex,
val_nos_name: operator?.operatorName,
val_slashed: state.validator.slashed,
val_status: state.status,
val_balance: BigInt(state.balance),
val_effective_balance: BigInt(state.validator.effective_balance),
});
if ([ValStatus.ActiveOngoing, ValStatus.ActiveExiting, ValStatus.ActiveSlashed].includes(state.status)) {
activeValidatorsCount++;
activeValidatorsEffectiveBalance += BigInt(state.validator.effective_balance) / BigInt(10 ** 9);
const pipeline = chain([
readStream,
parser(),
pick({ filter: 'data' }),
streamArray(),
batch({ batchSize: 100 }),
async (batch) => {
await unblock();
for (const data of batch) {
const state: StateValidatorResponse = data.value;
const index = Number(state.index);
const operator = this.registry.getOperatorKey(state.validator.pubkey);
this.summary.epoch(epoch).set({
epoch,
val_id: index,
val_pubkey: state.validator.pubkey,
val_nos_id: operator?.operatorIndex,
val_nos_name: operator?.operatorName,
val_slashed: state.validator.slashed,
val_status: state.status,
val_balance: BigInt(state.balance),
val_effective_balance: BigInt(state.validator.effective_balance),
});
if ([ValStatus.ActiveOngoing, ValStatus.ActiveExiting, ValStatus.ActiveSlashed].includes(state.status)) {
activeValidatorsCount++;
activeValidatorsEffectiveBalance += BigInt(state.validator.effective_balance) / BigInt(10 ** 9);
}
}
}
});
},
]);
pipeline.on('data', (data) => data);
await new Promise((resolve, reject) => {
pipeline.on('error', (error) => reject(error));
pipeline.on('end', () => resolve(true));
Expand Down

0 comments on commit ea6268b

Please sign in to comment.