Skip to content

Commit

Permalink
Merge pull request #189 from lidofinance/feat/write-and-read-queries-…
Browse files Browse the repository at this point in the history
…speedup

Feat/write and read queries speedup
  • Loading branch information
vgorkavenko authored Jul 11, 2023
2 parents 3af6b60 + 51e113e commit eba5ce2
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 63 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,6 @@ And if `ethereum_validators_monitoring_data_actuality < 1h` it allows you to rec
| validator_count_invalid_attestation | nos_name, reason | Number of validators with invalid properties (head, target, source) \ high inc. delay in attestation for each user Node Operator |
| validator_count_invalid_attestation_last_n_epoch | nos_name, reason, epoch_interval | Number of validators with invalid properties (head, target, source) \ high inc. delay in attestation last `BAD_ATTESTATION_EPOCHS` epoch for each user Node Operator |
| validator_count_miss_attestation_last_n_epoch | nos_name, epoch_interval | Number of validators miss attestation last `BAD_ATTESTATION_EPOCHS` epoch for each user Node Operator |
| validator_count_high_avg_inc_delay_of_n_epoch | nos_name, epoch_interval | Number of validators with high avg inc. delay of N epochs for each user Node Operator |
| validator_count_high_inc_delay_last_n_epoch | nos_name, epoch_interval | Number of validators with inc. delay > 2 last N epochs for each user Node Operator |
| validator_count_invalid_attestation_property_last_n_epoch | nos_name, epoch_interval | Number of validators with two invalid attestation property (head or target or source) last N epochs for each user Node Operator |
| high_reward_validator_count_miss_attestation_last_n_epoch | nos_name, epoch_interval | Number of validators miss attestation last `BAD_ATTESTATION_EPOCHS` epoch (with possible high reward in the future) for each user Node Operator |
Expand Down
5 changes: 5 additions & 0 deletions src/common/config/env.validation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ export class EnvironmentVariables {
@Transform(({ value }) => parseInt(value, 10), { toClassOnly: true })
public DB_INSERT_CHUNK_SIZE = 50000;

@IsNumber()
@Min(2)
@Transform(({ value }) => parseInt(value, 10), { toClassOnly: true })
public DB_INSERT_MAX_WORKERS_COUNT = 20;

@IsNotEmpty()
@IsInt()
@Min(1)
Expand Down
1 change: 0 additions & 1 deletion src/common/prometheus/prometheus.constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ export const METRIC_OTHER_VALIDATOR_COUNT_INVALID_ATTESTATION = `other_validator
export const METRIC_VALIDATOR_COUNT_INVALID_ATTESTATION = `validator_count_invalid_attestation`;
export const METRIC_VALIDATOR_COUNT_MISS_ATTESTATION_LAST_N_EPOCH = `validator_count_miss_attestation_last_n_epoch`;
export const METRIC_VALIDATOR_COUNT_INVALID_ATTESTATION_LAST_N_EPOCH = `validator_count_invalid_attestation_last_n_epoch`;
export const METRIC_VALIDATOR_COUNT_HIGH_AVG_INC_DELAY_ATTESTATION_OF_N_EPOCH = `validator_count_high_avg_inc_delay_of_n_epoch`;
export const METRIC_VALIDATOR_COUNT_HIGH_INC_DELAY_ATTESTATION_LAST_N_EPOCH = `validator_count_high_inc_delay_last_n_epoch`;
export const METRIC_VALIDATOR_COUNT_INVALID_ATTESTATION_PROPERTY_LAST_N_EPOCH = `validator_count_invalid_attestation_property_last_n_epoch`;
export const METRIC_HIGH_REWARD_VALIDATOR_COUNT_MISS_ATTESTATION_LAST_N_EPOCH = `high_reward_validator_count_miss_attestation_last_n_epoch`;
Expand Down
7 changes: 0 additions & 7 deletions src/common/prometheus/prometheus.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ import {
METRIC_VALIDATORS,
METRIC_VALIDATOR_BALANCES_DELTA,
METRIC_VALIDATOR_COUNT_GOOD_PROPOSE,
METRIC_VALIDATOR_COUNT_HIGH_AVG_INC_DELAY_ATTESTATION_OF_N_EPOCH,
METRIC_VALIDATOR_COUNT_HIGH_INC_DELAY_ATTESTATION_LAST_N_EPOCH,
METRIC_VALIDATOR_COUNT_INVALID_ATTESTATION,
METRIC_VALIDATOR_COUNT_INVALID_ATTESTATION_LAST_N_EPOCH,
Expand Down Expand Up @@ -350,12 +349,6 @@ export class PrometheusService implements OnApplicationBootstrap {
labelNames: ['nos_module_id', 'nos_id', 'nos_name', 'reason', 'epoch_interval'],
});

public validatorsCountHighAvgIncDelayAttestationOfNEpoch = this.getOrCreateMetric('Gauge', {
name: METRIC_VALIDATOR_COUNT_HIGH_AVG_INC_DELAY_ATTESTATION_OF_N_EPOCH,
help: 'number of validators with high avg inc. delay of N epochs',
labelNames: ['nos_module_id', 'nos_id', 'nos_name', 'epoch_interval'],
});

public validatorsCountHighIncDelayAttestationLastNEpoch = this.getOrCreateMetric('Gauge', {
name: METRIC_VALIDATOR_COUNT_HIGH_INC_DELAY_ATTESTATION_LAST_N_EPOCH,
help: 'number of validators with high inc. delay last N epochs',
Expand Down
8 changes: 0 additions & 8 deletions src/duty/attestation/attestation.metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ export class AttestationMetrics {
this.invalidHeadAttestationsLastNEpoch(),
this.invalidTargetAttestationsLastNEpoch(),
this.invalidSourceAttestationsLastNEpoch(),
this.highAvgIncDelayAttestationsOfNEpoch(),
// metrics for alerts
this.incDelayGtTwoAttestationsLastNEpoch(),
this.invalidAttestationPropertyGtOneLastNEpoch(),
Expand Down Expand Up @@ -139,13 +138,6 @@ export class AttestationMetrics {
});
}

private async highAvgIncDelayAttestationsOfNEpoch() {
const data = await this.storage.getValidatorCountHighAvgIncDelayAttestationOfNEpochQuery(this.processedEpoch);
setUserOperatorsMetric(this.prometheus.validatorsCountHighAvgIncDelayAttestationOfNEpoch, data, this.operators, {
epoch_interval: this.epochInterval,
});
}

private async incDelayGtTwoAttestationsLastNEpoch() {
const data = await this.storage.getValidatorCountIncDelayGtTwoAttestationsLastNEpoch(this.processedEpoch);
setUserOperatorsMetric(this.prometheus.validatorsCountHighIncDelayAttestationLastNEpoch, data, this.operators, {
Expand Down
25 changes: 18 additions & 7 deletions src/storage/clickhouse/clickhouse.constants.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import { ValStatus } from 'common/consensus-provider';
import { Epoch } from 'common/consensus-provider/types';

const perfStatuses = [ValStatus.ActiveOngoing, ValStatus.ActiveExiting, ValStatus.ActiveSlashed, ValStatus.PendingInitialized]
.map((s) => `'${s}'`)
.join(',');

export const avgValidatorBalanceDelta = (epoch: Epoch): string => `
SELECT
current.val_nos_module_id as val_nos_module_id,
Expand All @@ -10,7 +14,7 @@ export const avgValidatorBalanceDelta = (epoch: Epoch): string => `
SELECT val_balance, val_id, val_nos_module_id, val_nos_id
FROM validators_summary
WHERE
val_status != '${ValStatus.PendingQueued}' AND
val_status in [${perfStatuses}] AND
val_nos_id IS NOT NULL AND
val_stuck = 0 AND
epoch = ${epoch}
Expand All @@ -20,7 +24,7 @@ export const avgValidatorBalanceDelta = (epoch: Epoch): string => `
SELECT val_balance, val_id, val_nos_id
FROM validators_summary
WHERE
val_status != '${ValStatus.PendingQueued}' AND
val_status in [${perfStatuses}] AND
val_nos_id IS NOT NULL AND
val_stuck = 0 AND
epoch = (${epoch} - 6)
Expand All @@ -36,6 +40,7 @@ export const avgValidatorBalanceDelta = (epoch: Epoch): string => `
FROM validators_summary
WHERE
val_nos_id IS NOT NULL AND
val_status in [${perfStatuses}] AND
val_balance_withdrawn > 0 AND
val_stuck = 0 AND
epoch > (${epoch} - 6) AND epoch <= ${epoch}
Expand All @@ -57,7 +62,7 @@ export const validatorQuantile0001BalanceDeltasQuery = (epoch: Epoch): string =>
SELECT val_balance, val_id, val_nos_id, val_nos_module_id
FROM validators_summary
WHERE
val_status != '${ValStatus.PendingQueued}' AND
val_status in [${perfStatuses}] AND
val_nos_id IS NOT NULL AND
val_stuck = 0 AND
epoch = ${epoch}
Expand All @@ -67,7 +72,7 @@ export const validatorQuantile0001BalanceDeltasQuery = (epoch: Epoch): string =>
SELECT val_balance, val_id, val_nos_id
FROM validators_summary
WHERE
val_status != '${ValStatus.PendingQueued}' AND
val_status in [${perfStatuses}] AND
val_nos_id IS NOT NULL AND
val_stuck = 0 AND
epoch = (${epoch} - 6)
Expand All @@ -83,6 +88,7 @@ export const validatorQuantile0001BalanceDeltasQuery = (epoch: Epoch): string =>
FROM validators_summary
WHERE
val_nos_id IS NOT NULL AND
val_status in [${perfStatuses}] AND
val_balance_withdrawn > 0 AND
val_stuck = 0 AND
epoch > (${epoch} - 6) AND epoch <= ${epoch}
Expand All @@ -104,7 +110,7 @@ export const validatorsCountWithNegativeDeltaQuery = (epoch: Epoch): string => `
SELECT val_balance, val_id, val_nos_module_id, val_nos_id, val_slashed
FROM validators_summary
WHERE
val_status != '${ValStatus.PendingQueued}' AND
val_status in [${perfStatuses}] AND
val_nos_id IS NOT NULL AND
val_stuck = 0 AND
epoch = ${epoch}
Expand All @@ -114,7 +120,7 @@ export const validatorsCountWithNegativeDeltaQuery = (epoch: Epoch): string => `
SELECT val_balance, val_id, val_nos_id
FROM validators_summary
WHERE
val_status != '${ValStatus.PendingQueued}' AND
val_status in [${perfStatuses}] AND
val_nos_id IS NOT NULL AND
val_stuck = 0 AND
epoch = (${epoch} - 6)
Expand All @@ -130,6 +136,7 @@ export const validatorsCountWithNegativeDeltaQuery = (epoch: Epoch): string => `
FROM validators_summary
WHERE
val_nos_id IS NOT NULL AND
val_status in [${perfStatuses}] AND
val_balance_withdrawn > 0 AND
val_stuck = 0 AND
epoch > (${epoch} - 6) AND epoch <= ${epoch}
Expand Down Expand Up @@ -233,13 +240,15 @@ export const validatorCountHighAvgIncDelayAttestationOfNEpochQuery = (epoch: Epo
SELECT val_id, val_nos_module_id, val_nos_id, att_inc_delay
FROM validators_summary
WHERE
att_happened = 1 AND
val_status in [${perfStatuses}] AND
val_stuck = 0 AND
(epoch <= ${epoch} AND epoch > (${epoch} - ${epochInterval}))
LIMIT 1 BY epoch, val_id
)
GROUP BY val_id, val_nos_module_id, val_nos_id
HAVING avg_inclusion_delay > 2
)
WHERE avg_inclusion_delay > 2
GROUP BY val_nos_id, val_nos_module_id
`;
};
Expand Down Expand Up @@ -356,6 +365,7 @@ export const totalBalance24hDifferenceQuery = (epoch: Epoch): string => `
FROM validators_summary
WHERE
val_nos_id IS NOT NULL AND
val_status != '${ValStatus.WithdrawalDone}' AND
val_balance_withdrawn > 0 AND
val_stuck = 0 AND
epoch > (${epoch} - 225) AND epoch <= ${epoch}
Expand Down Expand Up @@ -673,6 +683,7 @@ export const userNodeOperatorsRewardsAndPenaltiesStats = (epoch: Epoch): string
FROM validators_summary
WHERE
val_nos_id IS NOT NULL AND
val_status != '${ValStatus.WithdrawalDone}' AND
val_balance_withdrawn > 0 AND
val_stuck = 0 AND
epoch = ${epoch}
Expand Down
114 changes: 75 additions & 39 deletions src/storage/clickhouse/clickhouse.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ export class ClickhouseService implements OnModuleInit {
private readonly minBackoff: number;
private readonly maxBackoff: number;
private readonly chunkSize: number;
private readonly maxWorkers: number;
private readonly retry: ReturnType<typeof retrier>;

private async select<T>(query: string): Promise<T> {
Expand All @@ -86,6 +87,7 @@ export class ClickhouseService implements OnModuleInit {
this.minBackoff = this.config.get('DB_MIN_BACKOFF_SEC');
this.maxBackoff = this.config.get('DB_MAX_BACKOFF_SEC');
this.chunkSize = this.config.get('DB_INSERT_CHUNK_SIZE');
this.maxWorkers = this.config.get('DB_INSERT_MAX_WORKERS_COUNT');

this.logger.log(`DB backoff set to (min=[${this.minBackoff}], max=[${this.maxBackoff}] seconds`);
this.logger.log(`DB max retries set to [${this.maxRetries}]`);
Expand All @@ -99,7 +101,7 @@ export class ClickhouseService implements OnModuleInit {
database: this.config.get('DB_NAME'),
compression: {
response: true,
request: true,
request: false,
},
});
}
Expand Down Expand Up @@ -132,51 +134,85 @@ export class ClickhouseService implements OnModuleInit {

@TrackTask('write-summary')
public async writeSummary(summary: IterableIterator<ValidatorDutySummary>): Promise<void> {
let indexesChunk = [];
let summaryChunk = [];
let chunkSize = 0;
const writeChunks = async (indexesChunk, summaryChunk) => {
return await allSettled([
this.retry(
async () =>
await this.db.insert({
table: 'validators_index',
values: Stream.Readable.from(indexesChunk, { objectMode: true }),
format: 'JSONEachRow',
const runWriteTasks = (stream: Stream.Readable): Promise<any>[] => {
const indexes = this.retry(async () =>
this.db.insert({
table: 'validators_index',
values: stream.pipe(
new Stream.Transform({
transform(chunk, encoding, callback) {
callback(null, { val_id: chunk.val_id, val_pubkey: chunk.val_pubkey });
},
objectMode: true,
}),
),
this.retry(
async () =>
await this.db.insert({
table: 'validators_summary',
values: Stream.Readable.from(summaryChunk, { objectMode: true }),
format: 'JSONEachRow',
),
format: 'JSONEachRow',
}),
);
const summaries = this.retry(async () =>
this.db.insert({
table: 'validators_summary',
values: stream.pipe(
new Stream.Transform({
transform(chunk, encoding, callback) {
callback(null, {
...chunk,
val_balance: chunk.val_balance.toString(),
val_effective_balance: chunk.val_effective_balance.toString(),
val_balance_withdrawn: chunk.val_balance_withdrawn?.toString(),
propose_earned_reward: chunk.propose_earned_reward?.toString(),
propose_missed_reward: chunk.propose_missed_reward?.toString(),
propose_penalty: chunk.propose_penalty?.toString(),
sync_meta: undefined,
val_pubkey: undefined,
});
},
objectMode: true,
}),
),
]);
),
format: 'JSONEachRow',
}),
);

return [indexes, summaries];
};
for (const v of summary) {
indexesChunk.push({ val_id: v.val_id, val_pubkey: v.val_pubkey });
summaryChunk.push({
...v,
val_balance: v.val_balance.toString(),
val_effective_balance: v.val_effective_balance.toString(),
val_balance_withdrawn: v.val_balance_withdrawn?.toString(),
propose_earned_reward: v.propose_earned_reward?.toString(),
propose_missed_reward: v.propose_missed_reward?.toString(),
propose_penalty: v.propose_penalty?.toString(),
sync_meta: undefined,
val_pubkey: undefined,

const initWrite = () => {
const stream = new Stream.Readable({
objectMode: true,
read() {
//
},
});
chunkSize++;
if (chunkSize == this.chunkSize) {
await writeChunks(indexesChunk, summaryChunk);
[indexesChunk, summaryChunk] = [[], []];
chunkSize = 0;
writeTasks.push(...runWriteTasks(stream));
streams.push(stream);
return stream;
};

const waitWrite = async () => {
streams.forEach((stream) => (stream.readable ? stream.push(null) : null));
await allSettled(writeTasks);
streams.forEach((stream) => (stream.destroyed ? null : stream.destroy()));
};

const writeTasks = [];
const streams = [];

let stream = initWrite();

let index = 0;
for (const data of summary) {
if (Number(index) % this.chunkSize == 0) {
await unblock();
if (writeTasks.length % this.maxWorkers == 0) {
await waitWrite();
}
stream = initWrite();
}
stream.push(data);
index++;
}
if (chunkSize) await writeChunks(indexesChunk, summaryChunk);
await waitWrite();
}

@TrackTask('write-epoch-meta')
Expand Down

0 comments on commit eba5ce2

Please sign in to comment.