diff --git a/README.md b/README.md index 34f96be0..db93cb5e 100644 --- a/README.md +++ b/README.md @@ -261,7 +261,6 @@ And if `ethereum_validators_monitoring_data_actuality < 1h` it allows you to rec | validator_count_invalid_attestation | nos_name, reason | Number of validators with invalid properties (head, target, source) \ high inc. delay in attestation for each user Node Operator | | validator_count_invalid_attestation_last_n_epoch | nos_name, reason, epoch_interval | Number of validators with invalid properties (head, target, source) \ high inc. delay in attestation last `BAD_ATTESTATION_EPOCHS` epoch for each user Node Operator | | validator_count_miss_attestation_last_n_epoch | nos_name, epoch_interval | Number of validators miss attestation last `BAD_ATTESTATION_EPOCHS` epoch for each user Node Operator | -| validator_count_high_avg_inc_delay_of_n_epoch | nos_name, epoch_interval | Number of validators with high avg inc. delay of N epochs for each user Node Operator | | validator_count_high_inc_delay_last_n_epoch | nos_name, epoch_interval | Number of validators with inc. delay > 2 last N epochs for each user Node Operator | | validator_count_invalid_attestation_property_last_n_epoch | nos_name, epoch_interval | Number of validators with two invalid attestation property (head or target or source) last N epochs for each user Node Operator | | high_reward_validator_count_miss_attestation_last_n_epoch | nos_name, epoch_interval | Number of validators miss attestation last `BAD_ATTESTATION_EPOCHS` epoch (with possible high reward in the future) for each user Node Operator | diff --git a/src/common/config/env.validation.ts b/src/common/config/env.validation.ts index 200ef7f0..b38f4aaa 100644 --- a/src/common/config/env.validation.ts +++ b/src/common/config/env.validation.ts @@ -120,6 +120,11 @@ export class EnvironmentVariables { @Transform(({ value }) => parseInt(value, 10), { toClassOnly: true }) public DB_INSERT_CHUNK_SIZE = 50000; + @IsNumber() + @Min(2) + @Transform(({ value }) => parseInt(value, 10), { toClassOnly: true }) + public DB_INSERT_MAX_WORKERS_COUNT = 20; + @IsNotEmpty() @IsInt() @Min(1) diff --git a/src/common/prometheus/prometheus.constants.ts b/src/common/prometheus/prometheus.constants.ts index f7b88f40..47ed4fab 100644 --- a/src/common/prometheus/prometheus.constants.ts +++ b/src/common/prometheus/prometheus.constants.ts @@ -38,7 +38,6 @@ export const METRIC_OTHER_VALIDATOR_COUNT_INVALID_ATTESTATION = `other_validator export const METRIC_VALIDATOR_COUNT_INVALID_ATTESTATION = `validator_count_invalid_attestation`; export const METRIC_VALIDATOR_COUNT_MISS_ATTESTATION_LAST_N_EPOCH = `validator_count_miss_attestation_last_n_epoch`; export const METRIC_VALIDATOR_COUNT_INVALID_ATTESTATION_LAST_N_EPOCH = `validator_count_invalid_attestation_last_n_epoch`; -export const METRIC_VALIDATOR_COUNT_HIGH_AVG_INC_DELAY_ATTESTATION_OF_N_EPOCH = `validator_count_high_avg_inc_delay_of_n_epoch`; export const METRIC_VALIDATOR_COUNT_HIGH_INC_DELAY_ATTESTATION_LAST_N_EPOCH = `validator_count_high_inc_delay_last_n_epoch`; export const METRIC_VALIDATOR_COUNT_INVALID_ATTESTATION_PROPERTY_LAST_N_EPOCH = `validator_count_invalid_attestation_property_last_n_epoch`; export const METRIC_HIGH_REWARD_VALIDATOR_COUNT_MISS_ATTESTATION_LAST_N_EPOCH = `high_reward_validator_count_miss_attestation_last_n_epoch`; diff --git a/src/common/prometheus/prometheus.service.ts b/src/common/prometheus/prometheus.service.ts index f00bba59..632777b1 100644 --- a/src/common/prometheus/prometheus.service.ts +++ b/src/common/prometheus/prometheus.service.ts @@ -59,7 +59,6 @@ import { METRIC_VALIDATORS, METRIC_VALIDATOR_BALANCES_DELTA, METRIC_VALIDATOR_COUNT_GOOD_PROPOSE, - METRIC_VALIDATOR_COUNT_HIGH_AVG_INC_DELAY_ATTESTATION_OF_N_EPOCH, METRIC_VALIDATOR_COUNT_HIGH_INC_DELAY_ATTESTATION_LAST_N_EPOCH, METRIC_VALIDATOR_COUNT_INVALID_ATTESTATION, METRIC_VALIDATOR_COUNT_INVALID_ATTESTATION_LAST_N_EPOCH, @@ -350,12 +349,6 @@ export class PrometheusService implements OnApplicationBootstrap { labelNames: ['nos_module_id', 'nos_id', 'nos_name', 'reason', 'epoch_interval'], }); - public validatorsCountHighAvgIncDelayAttestationOfNEpoch = this.getOrCreateMetric('Gauge', { - name: METRIC_VALIDATOR_COUNT_HIGH_AVG_INC_DELAY_ATTESTATION_OF_N_EPOCH, - help: 'number of validators with high avg inc. delay of N epochs', - labelNames: ['nos_module_id', 'nos_id', 'nos_name', 'epoch_interval'], - }); - public validatorsCountHighIncDelayAttestationLastNEpoch = this.getOrCreateMetric('Gauge', { name: METRIC_VALIDATOR_COUNT_HIGH_INC_DELAY_ATTESTATION_LAST_N_EPOCH, help: 'number of validators with high inc. delay last N epochs', diff --git a/src/duty/attestation/attestation.metrics.ts b/src/duty/attestation/attestation.metrics.ts index 83caf6ae..2d2c66c9 100644 --- a/src/duty/attestation/attestation.metrics.ts +++ b/src/duty/attestation/attestation.metrics.ts @@ -48,7 +48,6 @@ export class AttestationMetrics { this.invalidHeadAttestationsLastNEpoch(), this.invalidTargetAttestationsLastNEpoch(), this.invalidSourceAttestationsLastNEpoch(), - this.highAvgIncDelayAttestationsOfNEpoch(), // metrics for alerts this.incDelayGtTwoAttestationsLastNEpoch(), this.invalidAttestationPropertyGtOneLastNEpoch(), @@ -139,13 +138,6 @@ export class AttestationMetrics { }); } - private async highAvgIncDelayAttestationsOfNEpoch() { - const data = await this.storage.getValidatorCountHighAvgIncDelayAttestationOfNEpochQuery(this.processedEpoch); - setUserOperatorsMetric(this.prometheus.validatorsCountHighAvgIncDelayAttestationOfNEpoch, data, this.operators, { - epoch_interval: this.epochInterval, - }); - } - private async incDelayGtTwoAttestationsLastNEpoch() { const data = await this.storage.getValidatorCountIncDelayGtTwoAttestationsLastNEpoch(this.processedEpoch); setUserOperatorsMetric(this.prometheus.validatorsCountHighIncDelayAttestationLastNEpoch, data, this.operators, { diff --git a/src/storage/clickhouse/clickhouse.constants.ts b/src/storage/clickhouse/clickhouse.constants.ts index 1ca7e20b..491aef8d 100644 --- a/src/storage/clickhouse/clickhouse.constants.ts +++ b/src/storage/clickhouse/clickhouse.constants.ts @@ -1,6 +1,10 @@ import { ValStatus } from 'common/consensus-provider'; import { Epoch } from 'common/consensus-provider/types'; +const perfStatuses = [ValStatus.ActiveOngoing, ValStatus.ActiveExiting, ValStatus.ActiveSlashed, ValStatus.PendingInitialized] + .map((s) => `'${s}'`) + .join(','); + export const avgValidatorBalanceDelta = (epoch: Epoch): string => ` SELECT current.val_nos_module_id as val_nos_module_id, @@ -10,7 +14,7 @@ export const avgValidatorBalanceDelta = (epoch: Epoch): string => ` SELECT val_balance, val_id, val_nos_module_id, val_nos_id FROM validators_summary WHERE - val_status != '${ValStatus.PendingQueued}' AND + val_status in [${perfStatuses}] AND val_nos_id IS NOT NULL AND val_stuck = 0 AND epoch = ${epoch} @@ -20,7 +24,7 @@ export const avgValidatorBalanceDelta = (epoch: Epoch): string => ` SELECT val_balance, val_id, val_nos_id FROM validators_summary WHERE - val_status != '${ValStatus.PendingQueued}' AND + val_status in [${perfStatuses}] AND val_nos_id IS NOT NULL AND val_stuck = 0 AND epoch = (${epoch} - 6) @@ -36,6 +40,7 @@ export const avgValidatorBalanceDelta = (epoch: Epoch): string => ` FROM validators_summary WHERE val_nos_id IS NOT NULL AND + val_status in [${perfStatuses}] AND val_balance_withdrawn > 0 AND val_stuck = 0 AND epoch > (${epoch} - 6) AND epoch <= ${epoch} @@ -57,7 +62,7 @@ export const validatorQuantile0001BalanceDeltasQuery = (epoch: Epoch): string => SELECT val_balance, val_id, val_nos_id, val_nos_module_id FROM validators_summary WHERE - val_status != '${ValStatus.PendingQueued}' AND + val_status in [${perfStatuses}] AND val_nos_id IS NOT NULL AND val_stuck = 0 AND epoch = ${epoch} @@ -67,7 +72,7 @@ export const validatorQuantile0001BalanceDeltasQuery = (epoch: Epoch): string => SELECT val_balance, val_id, val_nos_id FROM validators_summary WHERE - val_status != '${ValStatus.PendingQueued}' AND + val_status in [${perfStatuses}] AND val_nos_id IS NOT NULL AND val_stuck = 0 AND epoch = (${epoch} - 6) @@ -83,6 +88,7 @@ export const validatorQuantile0001BalanceDeltasQuery = (epoch: Epoch): string => FROM validators_summary WHERE val_nos_id IS NOT NULL AND + val_status in [${perfStatuses}] AND val_balance_withdrawn > 0 AND val_stuck = 0 AND epoch > (${epoch} - 6) AND epoch <= ${epoch} @@ -104,7 +110,7 @@ export const validatorsCountWithNegativeDeltaQuery = (epoch: Epoch): string => ` SELECT val_balance, val_id, val_nos_module_id, val_nos_id, val_slashed FROM validators_summary WHERE - val_status != '${ValStatus.PendingQueued}' AND + val_status in [${perfStatuses}] AND val_nos_id IS NOT NULL AND val_stuck = 0 AND epoch = ${epoch} @@ -114,7 +120,7 @@ export const validatorsCountWithNegativeDeltaQuery = (epoch: Epoch): string => ` SELECT val_balance, val_id, val_nos_id FROM validators_summary WHERE - val_status != '${ValStatus.PendingQueued}' AND + val_status in [${perfStatuses}] AND val_nos_id IS NOT NULL AND val_stuck = 0 AND epoch = (${epoch} - 6) @@ -130,6 +136,7 @@ export const validatorsCountWithNegativeDeltaQuery = (epoch: Epoch): string => ` FROM validators_summary WHERE val_nos_id IS NOT NULL AND + val_status in [${perfStatuses}] AND val_balance_withdrawn > 0 AND val_stuck = 0 AND epoch > (${epoch} - 6) AND epoch <= ${epoch} @@ -233,13 +240,15 @@ export const validatorCountHighAvgIncDelayAttestationOfNEpochQuery = (epoch: Epo SELECT val_id, val_nos_module_id, val_nos_id, att_inc_delay FROM validators_summary WHERE + att_happened = 1 AND + val_status in [${perfStatuses}] AND val_stuck = 0 AND (epoch <= ${epoch} AND epoch > (${epoch} - ${epochInterval})) LIMIT 1 BY epoch, val_id ) GROUP BY val_id, val_nos_module_id, val_nos_id - HAVING avg_inclusion_delay > 2 ) + WHERE avg_inclusion_delay > 2 GROUP BY val_nos_id, val_nos_module_id `; }; @@ -356,6 +365,7 @@ export const totalBalance24hDifferenceQuery = (epoch: Epoch): string => ` FROM validators_summary WHERE val_nos_id IS NOT NULL AND + val_status != '${ValStatus.WithdrawalDone}' AND val_balance_withdrawn > 0 AND val_stuck = 0 AND epoch > (${epoch} - 225) AND epoch <= ${epoch} @@ -673,6 +683,7 @@ export const userNodeOperatorsRewardsAndPenaltiesStats = (epoch: Epoch): string FROM validators_summary WHERE val_nos_id IS NOT NULL AND + val_status != '${ValStatus.WithdrawalDone}' AND val_balance_withdrawn > 0 AND val_stuck = 0 AND epoch = ${epoch} diff --git a/src/storage/clickhouse/clickhouse.service.ts b/src/storage/clickhouse/clickhouse.service.ts index 539a0b8d..e353b814 100644 --- a/src/storage/clickhouse/clickhouse.service.ts +++ b/src/storage/clickhouse/clickhouse.service.ts @@ -71,6 +71,7 @@ export class ClickhouseService implements OnModuleInit { private readonly minBackoff: number; private readonly maxBackoff: number; private readonly chunkSize: number; + private readonly maxWorkers: number; private readonly retry: ReturnType; private async select(query: string): Promise { @@ -86,6 +87,7 @@ export class ClickhouseService implements OnModuleInit { this.minBackoff = this.config.get('DB_MIN_BACKOFF_SEC'); this.maxBackoff = this.config.get('DB_MAX_BACKOFF_SEC'); this.chunkSize = this.config.get('DB_INSERT_CHUNK_SIZE'); + this.maxWorkers = this.config.get('DB_INSERT_MAX_WORKERS_COUNT'); this.logger.log(`DB backoff set to (min=[${this.minBackoff}], max=[${this.maxBackoff}] seconds`); this.logger.log(`DB max retries set to [${this.maxRetries}]`); @@ -99,7 +101,7 @@ export class ClickhouseService implements OnModuleInit { database: this.config.get('DB_NAME'), compression: { response: true, - request: true, + request: false, }, }); } @@ -132,51 +134,85 @@ export class ClickhouseService implements OnModuleInit { @TrackTask('write-summary') public async writeSummary(summary: IterableIterator): Promise { - let indexesChunk = []; - let summaryChunk = []; - let chunkSize = 0; - const writeChunks = async (indexesChunk, summaryChunk) => { - return await allSettled([ - this.retry( - async () => - await this.db.insert({ - table: 'validators_index', - values: Stream.Readable.from(indexesChunk, { objectMode: true }), - format: 'JSONEachRow', + const runWriteTasks = (stream: Stream.Readable): Promise[] => { + const indexes = this.retry(async () => + this.db.insert({ + table: 'validators_index', + values: stream.pipe( + new Stream.Transform({ + transform(chunk, encoding, callback) { + callback(null, { val_id: chunk.val_id, val_pubkey: chunk.val_pubkey }); + }, + objectMode: true, }), - ), - this.retry( - async () => - await this.db.insert({ - table: 'validators_summary', - values: Stream.Readable.from(summaryChunk, { objectMode: true }), - format: 'JSONEachRow', + ), + format: 'JSONEachRow', + }), + ); + const summaries = this.retry(async () => + this.db.insert({ + table: 'validators_summary', + values: stream.pipe( + new Stream.Transform({ + transform(chunk, encoding, callback) { + callback(null, { + ...chunk, + val_balance: chunk.val_balance.toString(), + val_effective_balance: chunk.val_effective_balance.toString(), + val_balance_withdrawn: chunk.val_balance_withdrawn?.toString(), + propose_earned_reward: chunk.propose_earned_reward?.toString(), + propose_missed_reward: chunk.propose_missed_reward?.toString(), + propose_penalty: chunk.propose_penalty?.toString(), + sync_meta: undefined, + val_pubkey: undefined, + }); + }, + objectMode: true, }), - ), - ]); + ), + format: 'JSONEachRow', + }), + ); + + return [indexes, summaries]; }; - for (const v of summary) { - indexesChunk.push({ val_id: v.val_id, val_pubkey: v.val_pubkey }); - summaryChunk.push({ - ...v, - val_balance: v.val_balance.toString(), - val_effective_balance: v.val_effective_balance.toString(), - val_balance_withdrawn: v.val_balance_withdrawn?.toString(), - propose_earned_reward: v.propose_earned_reward?.toString(), - propose_missed_reward: v.propose_missed_reward?.toString(), - propose_penalty: v.propose_penalty?.toString(), - sync_meta: undefined, - val_pubkey: undefined, + + const initWrite = () => { + const stream = new Stream.Readable({ + objectMode: true, + read() { + // + }, }); - chunkSize++; - if (chunkSize == this.chunkSize) { - await writeChunks(indexesChunk, summaryChunk); - [indexesChunk, summaryChunk] = [[], []]; - chunkSize = 0; + writeTasks.push(...runWriteTasks(stream)); + streams.push(stream); + return stream; + }; + + const waitWrite = async () => { + streams.forEach((stream) => (stream.readable ? stream.push(null) : null)); + await allSettled(writeTasks); + streams.forEach((stream) => (stream.destroyed ? null : stream.destroy())); + }; + + const writeTasks = []; + const streams = []; + + let stream = initWrite(); + + let index = 0; + for (const data of summary) { + if (Number(index) % this.chunkSize == 0) { await unblock(); + if (writeTasks.length % this.maxWorkers == 0) { + await waitWrite(); + } + stream = initWrite(); } + stream.push(data); + index++; } - if (chunkSize) await writeChunks(indexesChunk, summaryChunk); + await waitWrite(); } @TrackTask('write-epoch-meta')