From c547c2435b4552353aa244da8bee739283355f1b Mon Sep 17 00:00:00 2001 From: peter <1674920+peterbitfly@users.noreply.github.com> Date: Thu, 21 Sep 2023 12:11:09 +0200 Subject: [PATCH 1/3] (BIDS-2472) gather stats data in parallel --- db/statistics.go | 178 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 130 insertions(+), 48 deletions(-) diff --git a/db/statistics.go b/db/statistics.go index 31322730c7..a0508f9c29 100644 --- a/db/statistics.go +++ b/db/statistics.go @@ -52,13 +52,7 @@ func WriteValidatorStatisticsForDay(day uint64, concurrencyTotal uint64, concurr } exported := Exported{} - tx, err := WriterDb.Beginx() - if err != nil { - return fmt.Errorf("error starting transaction: %w", err) - } - defer tx.Rollback() - - err = tx.Get(&exported, ` + err := WriterDb.Get(&exported, ` SELECT status, failed_attestations_exported, @@ -95,35 +89,72 @@ func WriteValidatorStatisticsForDay(day uint64, concurrencyTotal uint64, concurr return nil } - if exported.FailedAttestations { - logger.Infof("Skipping failed attestations") - } else if err := WriteValidatorFailedAttestationsStatisticsForDay(validators, day, concurrencyFailedAttestations, tx); err != nil { - return fmt.Errorf("error in WriteValidatorFailedAttestationsStatisticsForDay: %w", err) - } + g := &errgroup.Group{} - if exported.SyncDuties { - logger.Infof("Skipping sync duties") - } else if err := WriteValidatorSyncDutiesForDay(validators, day, tx); err != nil { - return fmt.Errorf("error in WriteValidatorSyncDutiesForDay: %w", err) - } + g.Go(func() error { + if exported.FailedAttestations { + logger.Infof("Skipping failed attestations") + } else if err := WriteValidatorFailedAttestationsStatisticsForDay(validators, day, concurrencyFailedAttestations); err != nil { + return fmt.Errorf("error in WriteValidatorFailedAttestationsStatisticsForDay: %w", err) + } + return nil + }) - if exported.WithdrawalsDeposits { - logger.Infof("Skipping withdrawals / deposits") - } else if err := WriteValidatorDepositWithdrawals(day, tx); err != nil { - return fmt.Errorf("error in WriteValidatorDepositWithdrawals: %w", err) - } + g.Go(func() error { + if exported.SyncDuties { + logger.Infof("Skipping sync duties") + } else if err := WriteValidatorSyncDutiesForDay(validators, day); err != nil { + return fmt.Errorf("error in WriteValidatorSyncDutiesForDay: %w", err) + } + return nil + }) + + g.Go(func() error { + if exported.WithdrawalsDeposits { + logger.Infof("Skipping withdrawals / deposits") + } else if err := WriteValidatorDepositWithdrawals(day); err != nil { + return fmt.Errorf("error in WriteValidatorDepositWithdrawals: %w", err) + } + return nil + }) + + g.Go(func() error { + if exported.BlockStats { + logger.Infof("Skipping block stats") + } else if err := WriteValidatorBlockStats(day); err != nil { + return fmt.Errorf("error in WriteValidatorBlockStats: %w", err) + } + return nil + }) - if exported.BlockStats { - logger.Infof("Skipping block stats") - } else if err := WriteValidatorBlockStats(day, tx); err != nil { - return fmt.Errorf("error in WriteValidatorBlockStats: %w", err) + g.Go(func() error { + if exported.Balance { + logger.Infof("Skipping balances") + } else if err := WriteValidatorBalances(validators, day); err != nil { + return fmt.Errorf("error in WriteValidatorBalances: %w", err) + } + return nil + }) + + g.Go(func() error { + if exported.ElRewards { + logger.Infof("Skipping el rewards") + } else if err := WriteValidatorElIcome(day); err != nil { + return fmt.Errorf("error in WriteValidatorElIcome: %w", err) + } + return nil + }) + + err = g.Wait() + if err != nil { + return fmt.Errorf("error during gather/write of stats: %w", err) } - if exported.Balance { - logger.Infof("Skipping balances") - } else if err := WriteValidatorBalances(validators, day, tx); err != nil { - return fmt.Errorf("error in WriteValidatorBalances: %w", err) + tx, err := WriterDb.Beginx() + if err != nil { + return fmt.Errorf("error starting transaction: %w", err) } + defer tx.Rollback() if exported.ClRewards { logger.Infof("Skipping cl rewards") @@ -131,12 +162,6 @@ func WriteValidatorStatisticsForDay(day uint64, concurrencyTotal uint64, concurr return fmt.Errorf("error in WriteValidatorClIcome: %w", err) } - if exported.ElRewards { - logger.Infof("Skipping el rewards") - } else if err := WriteValidatorElIcome(day, tx); err != nil { - return fmt.Errorf("error in WriteValidatorElIcome: %w", err) - } - if exported.TotalAccumulation { logger.Infof("Skipping total accumulation") } else if err := WriteValidatorTotalAccumulation(day, concurrencyTotal, tx); err != nil { @@ -489,7 +514,7 @@ func WriteValidatorTotalPerformance(day uint64, concurrency uint64, tx *sqlx.Tx) return nil } -func WriteValidatorBlockStats(day uint64, tx *sqlx.Tx) error { +func WriteValidatorBlockStats(day uint64) error { exportStart := time.Now() defer func() { metrics.TaskDuration.WithLabelValues("db_update_validator_block_stats").Observe(time.Since(exportStart).Seconds()) @@ -499,12 +524,18 @@ func WriteValidatorBlockStats(day uint64, tx *sqlx.Tx) error { return err } + tx, err := WriterDb.Beginx() + if err != nil { + return fmt.Errorf("error starting transaction: %w", err) + } + defer tx.Rollback() + firstEpoch, lastEpoch := utils.GetFirstAndLastEpochForDay(day) start := time.Now() logger.Infof("exporting proposed_blocks, missed_blocks and orphaned_blocks statistics") - _, err := tx.Exec(` + _, err = tx.Exec(` insert into validator_stats (validatorindex, day, proposed_blocks, missed_blocks, orphaned_blocks) ( select proposer, $3, sum(case when status = '1' then 1 else 0 end), sum(case when status = '2' then 1 else 0 end), sum(case when status = '3' then 1 else 0 end) @@ -540,11 +571,15 @@ func WriteValidatorBlockStats(day uint64, tx *sqlx.Tx) error { return err } + err = tx.Commit() + if err != nil { + return err + } logger.Infof("block statistics export of day %v completed, took %v", day, time.Since(exportStart)) return nil } -func WriteValidatorElIcome(day uint64, tx *sqlx.Tx) error { +func WriteValidatorElIcome(day uint64) error { exportStart := time.Now() defer func() { metrics.TaskDuration.WithLabelValues("db_update_validator_el_income_stats").Observe(time.Since(exportStart).Seconds()) @@ -554,6 +589,12 @@ func WriteValidatorElIcome(day uint64, tx *sqlx.Tx) error { return err } + tx, err := WriterDb.Beginx() + if err != nil { + return fmt.Errorf("error starting transaction: %w", err) + } + defer tx.Rollback() + firstEpoch, lastEpoch := utils.GetFirstAndLastEpochForDay(day) start := time.Now() @@ -571,7 +612,7 @@ func WriteValidatorElIcome(day uint64, tx *sqlx.Tx) error { blocks := make([]*Container, 0) blocksMap := make(map[uint64]*Container) - err := tx.Select(&blocks, "SELECT slot, exec_block_number, proposer FROM blocks WHERE epoch >= $1 AND epoch <= $2 AND exec_block_number > 0 AND status = '1'", firstEpoch, lastEpoch) + err = tx.Select(&blocks, "SELECT slot, exec_block_number, proposer FROM blocks WHERE epoch >= $1 AND epoch <= $2 AND exec_block_number > 0 AND status = '1'", firstEpoch, lastEpoch) if err != nil { return fmt.Errorf("error retrieving blocks data for firstEpoch [%v] and lastEpoch [%v]: %w", firstEpoch, lastEpoch, err) } @@ -649,6 +690,10 @@ func WriteValidatorElIcome(day uint64, tx *sqlx.Tx) error { return err } + err = tx.Commit() + if err != nil { + return err + } logger.Infof("el rewards statistics export of day %v completed, took %v", day, time.Since(exportStart)) return nil } @@ -713,8 +758,6 @@ func WriteValidatorClIcome(validators []uint64, day uint64, concurrency uint64, end = int(maxValidatorIndex) } - logrus.Info(start, end) - g.Go(func() error { select { case <-gCtx.Done(): @@ -762,12 +805,11 @@ func WriteValidatorClIcome(validators []uint64, day uint64, concurrency uint64, if err = markColumnExported(day, "cl_rewards_exported", tx); err != nil { return err } - logger.Infof("cl rewards statistics export of day %v completed, took %v", day, time.Since(exportStart)) return nil } -func WriteValidatorBalances(validators []uint64, day uint64, tx *sqlx.Tx) error { +func WriteValidatorBalances(validators []uint64, day uint64) error { ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute*10)) defer cancel() @@ -780,6 +822,12 @@ func WriteValidatorBalances(validators []uint64, day uint64, tx *sqlx.Tx) error return err } + tx, err := WriterDb.Beginx() + if err != nil { + return fmt.Errorf("error starting transaction: %w", err) + } + defer tx.Rollback() + firstEpoch, lastEpoch := utils.GetFirstAndLastEpochForDay(day) start := time.Now() @@ -857,11 +905,15 @@ func WriteValidatorBalances(validators []uint64, day uint64, tx *sqlx.Tx) error return err } + err = tx.Commit() + if err != nil { + return err + } logger.Infof("balance statistics export of day %v completed, took %v", day, time.Since(exportStart)) return nil } -func WriteValidatorDepositWithdrawals(day uint64, tx *sqlx.Tx) error { +func WriteValidatorDepositWithdrawals(day uint64) error { exportStart := time.Now() defer func() { metrics.TaskDuration.WithLabelValues("db_update_validator_deposit_withdrawal_stats").Observe(time.Since(exportStart).Seconds()) @@ -871,6 +923,12 @@ func WriteValidatorDepositWithdrawals(day uint64, tx *sqlx.Tx) error { return err } + tx, err := WriterDb.Beginx() + if err != nil { + return fmt.Errorf("error starting transaction: %w", err) + } + defer tx.Rollback() + // The end_balance of a day is the balance after the first slot of the last epoch of that day. // Therefore the last 31 slots of the day are not included in the end_balance of that day. // Since our income calculation is base on subtracting end_balances the deposits and withdrawals that happen during those slots must be added to the next day instead. @@ -897,7 +955,7 @@ func WriteValidatorDepositWithdrawals(day uint64, tx *sqlx.Tx) error { withdrawals_amount = NULL WHERE day = $1%s;`, firstDayExtraCondition) - _, err := tx.Exec(resetQry, day) + _, err = tx.Exec(resetQry, day) if err != nil { return fmt.Errorf("error resetting validator_stats for day [%v]: %w", day, err) } @@ -970,11 +1028,15 @@ func WriteValidatorDepositWithdrawals(day uint64, tx *sqlx.Tx) error { return err } + err = tx.Commit() + if err != nil { + return err + } logger.Infof("deposits and withdrawals statistics export of day %v completed, took %v", day, time.Since(exportStart)) return nil } -func WriteValidatorSyncDutiesForDay(validators []uint64, day uint64, tx *sqlx.Tx) error { +func WriteValidatorSyncDutiesForDay(validators []uint64, day uint64) error { exportStart := time.Now() defer func() { metrics.TaskDuration.WithLabelValues("db_update_validator_sync_stats").Observe(time.Since(exportStart).Seconds()) @@ -984,6 +1046,12 @@ func WriteValidatorSyncDutiesForDay(validators []uint64, day uint64, tx *sqlx.Tx return err } + tx, err := WriterDb.Beginx() + if err != nil { + return fmt.Errorf("error starting transaction: %w", err) + } + defer tx.Rollback() + startEpoch, endEpoch := utils.GetFirstAndLastEpochForDay(day) if startEpoch < utils.Config.Chain.Config.AltairForkEpoch && endEpoch > utils.Config.Chain.Config.AltairForkEpoch { startEpoch = utils.Config.Chain.Config.AltairForkEpoch @@ -1045,11 +1113,15 @@ func WriteValidatorSyncDutiesForDay(validators []uint64, day uint64, tx *sqlx.Tx return err } + err = tx.Commit() + if err != nil { + return err + } logger.Infof("sync duties and statistics export of day %v completed, took %v", day, time.Since(exportStart)) return nil } -func WriteValidatorFailedAttestationsStatisticsForDay(validators []uint64, day uint64, concurrency uint64, tx *sqlx.Tx) error { +func WriteValidatorFailedAttestationsStatisticsForDay(validators []uint64, day uint64, concurrency uint64) error { ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute*10)) defer cancel() exportStart := time.Now() @@ -1061,6 +1133,12 @@ func WriteValidatorFailedAttestationsStatisticsForDay(validators []uint64, day u return err } + tx, err := WriterDb.Beginx() + if err != nil { + return fmt.Errorf("error starting transaction: %w", err) + } + defer tx.Rollback() + firstEpoch, lastEpoch := utils.GetFirstAndLastEpochForDay(day) start := time.Now() @@ -1132,6 +1210,10 @@ func WriteValidatorFailedAttestationsStatisticsForDay(validators []uint64, day u return err } + err = tx.Commit() + if err != nil { + return err + } logger.Infof("'failed attestation' statistics export of day %v completed, took %v", day, time.Since(exportStart)) return nil } From 6a5f43b1cd0c0b17e7b9679e68f64d5c22e496b7 Mon Sep 17 00:00:00 2001 From: peter <1674920+peterbitfly@users.noreply.github.com> Date: Thu, 21 Sep 2023 12:49:57 +0200 Subject: [PATCH 2/3] (BIDS-2472) demote long bigtable query runtime log messages to warnings --- db/bigtable.go | 34 ++++++++++---------- db/bigtable_eth1.go | 78 ++++++++++++++++++++++----------------------- 2 files changed, 56 insertions(+), 56 deletions(-) diff --git a/db/bigtable.go b/db/bigtable.go index 5489c84414..1230d526a9 100644 --- a/db/bigtable.go +++ b/db/bigtable.go @@ -212,7 +212,7 @@ func (bigtable Bigtable) GetMachineMetricsMachineNames(userID uint64) ([]string, <-tmr.C logger.WithFields(logrus.Fields{ "userId": userID, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() names, err := bigtable.getMachineMetricNamesMap(userID, 300) @@ -235,7 +235,7 @@ func (bigtable Bigtable) GetMachineMetricsMachineCount(userID uint64) (uint64, e <-tmr.C logger.WithFields(logrus.Fields{ "userId": userID, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) @@ -259,7 +259,7 @@ func (bigtable Bigtable) GetMachineMetricsNode(userID uint64, limit, offset int) "userId": userID, "limit": limit, "offset": offset, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() return getMachineMetrics(bigtable, "beaconnode", userID, limit, offset, @@ -284,7 +284,7 @@ func (bigtable Bigtable) GetMachineMetricsValidator(userID uint64, limit, offset "userId": userID, "limit": limit, "offset": offset, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() return getMachineMetrics(bigtable, "validator", userID, limit, offset, @@ -309,7 +309,7 @@ func (bigtable Bigtable) GetMachineMetricsSystem(userID uint64, limit, offset in "userId": userID, "limit": limit, "offset": offset, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() return getMachineMetrics(bigtable, "system", userID, limit, offset, @@ -384,7 +384,7 @@ func (bigtable Bigtable) GetMachineMetricsForNotifications(rowKeys gcp_bigtable. <-tmr.C logger.WithFields(logrus.Fields{ "rowKeys": rowKeys, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*200)) @@ -844,7 +844,7 @@ func (bigtable *Bigtable) GetMaxValidatorindexForEpoch(epoch uint64) (uint64, er <-tmr.C logger.WithFields(logrus.Fields{ "epoch": epoch, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute*5)) @@ -873,7 +873,7 @@ func (bigtable *Bigtable) GetValidatorBalanceHistory(validators []uint64, startE "validators_count": len(validators), "startEpoch": startEpoch, "endEpoch": endEpoch, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() if len(validators) == 0 { @@ -977,7 +977,7 @@ func (bigtable *Bigtable) GetValidatorAttestationHistory(validators []uint64, st "validatorsCount": len(validators), "startEpoch": startEpoch, "endEpoch": endEpoch, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() if len(validators) == 0 { @@ -1147,7 +1147,7 @@ func (bigtable *Bigtable) GetLastAttestationSlots(validators []uint64) (map[uint <-tmr.C logger.WithFields(logrus.Fields{ "validatorsCount": len(validators), - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() valLen := len(validators) @@ -1213,7 +1213,7 @@ func (bigtable *Bigtable) GetSyncParticipationBySlotRange(startSlot, endSlot uin logger.WithFields(logrus.Fields{ "startSlot": startSlot, "endSlot": endSlot, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute*5)) @@ -1256,7 +1256,7 @@ func (bigtable *Bigtable) GetValidatorMissedAttestationHistory(validators []uint "validatorsCount": len(validators), "startEpoch": startEpoch, "endEpoch": endEpoch, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() if len(validators) == 0 { @@ -1369,7 +1369,7 @@ func (bigtable *Bigtable) GetValidatorSyncDutiesHistory(validators []uint64, sta "validatorsCount": len(validators), "startSlot": startSlot, "endSlot": endSlot, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() if len(validators) == 0 { @@ -1470,7 +1470,7 @@ func (bigtable *Bigtable) GetValidatorMissedAttestationsCount(validators []uint6 "validatorsCount": len(validators), "startEpoch": firstEpoch, "endEpoch": lastEpoch, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() if firstEpoch > lastEpoch { @@ -1608,7 +1608,7 @@ func (bigtable *Bigtable) GetValidatorBalanceStatistics(validators []uint64, sta "validatorsCount": len(validators), "startEpoch": startEpoch, "endEpoch": endEpoch, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() type ResultContainer struct { @@ -1694,7 +1694,7 @@ func (bigtable *Bigtable) GetValidatorProposalHistory(validators []uint64, start "validatorsCount": len(validators), "startEpoch": startEpoch, "endEpoch": endEpoch, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() if len(validators) == 0 { @@ -1882,7 +1882,7 @@ func (bigtable *Bigtable) GetValidatorIncomeDetailsHistory(validators []uint64, "validatorsCount": len(validators), "startEpoch": startEpoch, "endEpoch": endEpoch, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() if len(validators) == 0 { diff --git a/db/bigtable_eth1.go b/db/bigtable_eth1.go index 2dc2ae6e8e..05ab1812b5 100644 --- a/db/bigtable_eth1.go +++ b/db/bigtable_eth1.go @@ -140,7 +140,7 @@ func (bigtable *Bigtable) GetBlockFromBlocksTable(number uint64) (*types.Eth1Blo <-tmr.C logger.WithFields(logrus.Fields{ "validators": number, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) @@ -212,7 +212,7 @@ func (bigtable *Bigtable) GetLastBlockInBlocksTable() (int, error) { defer tmr.Stop() go func() { <-tmr.C - logger.Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + logger.Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) @@ -294,7 +294,7 @@ func (bigtable *Bigtable) GetLastBlockInDataTable() (int, error) { defer tmr.Stop() go func() { <-tmr.C - logger.Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + logger.Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) @@ -391,7 +391,7 @@ func (bigtable *Bigtable) GetMostRecentBlockFromDataTable() (*types.Eth1BlockInd defer tmr.Stop() go func() { <-tmr.C - logger.Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + logger.Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30)) @@ -458,7 +458,7 @@ func (bigtable *Bigtable) GetFullBlocksDescending(stream chan<- *types.Eth1Block logger.WithFields(logrus.Fields{ "high": high, "low": low, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*180)) @@ -520,7 +520,7 @@ func (bigtable *Bigtable) GetBlocksIndexedMultiple(blockNumbers []uint64, limit logger.WithFields(logrus.Fields{ "blockNumbers": blockNumbers, "limit": limit, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() rowList := gcp_bigtable.RowList{} @@ -556,7 +556,7 @@ func (bigtable *Bigtable) GetBlocksDescending(start, limit uint64) ([]*types.Eth logger.WithFields(logrus.Fields{ "start": start, "limit": limit, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() if start < 1 || limit < 1 || limit > start { @@ -1880,7 +1880,7 @@ func (bigtable *Bigtable) GetEth1TxForAddress(prefix string, limit int64) ([]*ty logger.WithFields(logrus.Fields{ "prefix": prefix, "limit": limit, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30)) @@ -1939,7 +1939,7 @@ func (bigtable *Bigtable) GetAddressesNamesArMetadata(names *map[string]string, logger.WithFields(logrus.Fields{ "names": names, "inputMetadata": inputMetadata, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() outputMetadata := make(map[string]*types.ERC20Metadata) @@ -1989,7 +1989,7 @@ func (bigtable *Bigtable) GetIndexedEth1Transaction(txHash []byte) (*types.Eth1T <-tmr.C logger.WithFields(logrus.Fields{ "txHash": txHash, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30)) @@ -2022,7 +2022,7 @@ func (bigtable *Bigtable) GetAddressTransactionsTableData(address []byte, search "address": address, "search": search, "pageToken": pageToken, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() if pageToken == "" { @@ -2083,7 +2083,7 @@ func (bigtable *Bigtable) GetEth1BlocksForAddress(prefix string, limit int64) ([ logger.WithFields(logrus.Fields{ "prefix": prefix, "limit": limit, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30)) @@ -2143,7 +2143,7 @@ func (bigtable *Bigtable) GetAddressBlocksMinedTableData(address string, search "address": address, "search": search, "pageToken": pageToken, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() if pageToken == "" { @@ -2183,7 +2183,7 @@ func (bigtable *Bigtable) GetEth1UnclesForAddress(prefix string, limit int64) ([ logger.WithFields(logrus.Fields{ "prefix": prefix, "limit": limit, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30)) @@ -2243,7 +2243,7 @@ func (bigtable *Bigtable) GetAddressUnclesMinedTableData(address string, search "address": address, "search": search, "pageToken": pageToken, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() if pageToken == "" { @@ -2281,7 +2281,7 @@ func (bigtable *Bigtable) GetEth1ItxForAddress(prefix string, limit int64) ([]*t logger.WithFields(logrus.Fields{ "prefix": prefix, "limit": limit, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30)) @@ -2345,7 +2345,7 @@ func (bigtable *Bigtable) GetAddressInternalTableData(address []byte, search str "address": address, "search": search, "pageToken": pageToken, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() // defaults to most recent @@ -2404,7 +2404,7 @@ func (bigtable *Bigtable) GetInternalTransfersForTransaction(transaction []byte, logger.WithFields(logrus.Fields{ "transaction": transaction, "from": from, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30)) @@ -2489,7 +2489,7 @@ func (bigtable *Bigtable) GetArbitraryTokenTransfersForTransaction(transaction [ <-tmr.C logger.WithFields(logrus.Fields{ "transaction": transaction, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30)) @@ -2608,7 +2608,7 @@ func (bigtable *Bigtable) GetEth1ERC20ForAddress(prefix string, limit int64) ([] logger.WithFields(logrus.Fields{ "prefix": prefix, "limit": limit, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30)) @@ -2666,7 +2666,7 @@ func (bigtable *Bigtable) GetAddressErc20TableData(address []byte, search string "address": address, "search": search, "pageToken": pageToken, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() if pageToken == "" { @@ -2734,7 +2734,7 @@ func (bigtable *Bigtable) GetEth1ERC721ForAddress(prefix string, limit int64) ([ logger.WithFields(logrus.Fields{ "prefix": prefix, "limit": limit, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30)) @@ -2796,7 +2796,7 @@ func (bigtable *Bigtable) GetAddressErc721TableData(address string, search strin "address": address, "search": search, "pageToken": pageToken, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() if pageToken == "" { @@ -2852,7 +2852,7 @@ func (bigtable *Bigtable) GetEth1ERC1155ForAddress(prefix string, limit int64) ( logger.WithFields(logrus.Fields{ "prefix": prefix, "limit": limit, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30)) @@ -2911,7 +2911,7 @@ func (bigtable *Bigtable) GetAddressErc1155TableData(address string, search stri "address": address, "search": search, "pageToken": pageToken, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() if pageToken == "" { @@ -2968,7 +2968,7 @@ func (bigtable *Bigtable) GetMetadataUpdates(prefix string, startToken string, l logger.WithFields(logrus.Fields{ "prefix": prefix, "startToken": startToken, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute*120)) @@ -3005,7 +3005,7 @@ func (bigtable *Bigtable) GetMetadata(startToken string, limit int) ([]string, [ logger.WithFields(logrus.Fields{ "startToken": startToken, "limit": limit, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute*120)) @@ -3043,7 +3043,7 @@ func (bigtable *Bigtable) GetMetadataForAddress(address []byte) (*types.Eth1Addr <-tmr.C logger.WithFields(logrus.Fields{ "address": address, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30)) @@ -3154,7 +3154,7 @@ func (bigtable *Bigtable) GetBalanceForAddress(address []byte, token []byte) (*t logger.WithFields(logrus.Fields{ "address": address, "token": token, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30)) @@ -3200,7 +3200,7 @@ func (bigtable *Bigtable) GetERC20MetadataForAddress(address []byte) (*types.ERC <-tmr.C logger.WithFields(logrus.Fields{ "address": address, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() if len(address) == 1 { @@ -3336,7 +3336,7 @@ func (bigtable *Bigtable) GetAddressName(address []byte) (string, error) { <-tmr.C logger.WithFields(logrus.Fields{ "address": address, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30)) @@ -3378,7 +3378,7 @@ func (bigtable *Bigtable) GetAddressNames(addresses map[string]string) error { <-tmr.C logger.WithFields(logrus.Fields{ "addresses": addresses, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() if len(addresses) == 0 { @@ -3430,7 +3430,7 @@ func (bigtable *Bigtable) GetContractMetadata(address []byte) (*types.ContractMe <-tmr.C logger.WithFields(logrus.Fields{ "address": address, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30)) @@ -3619,7 +3619,7 @@ func (bigtable *Bigtable) GetBlockKeys(blockNumber uint64, blockHash []byte) ([] logger.WithFields(logrus.Fields{ "blockNumber": blockNumber, "blockHash": blockHash, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30)) @@ -3690,7 +3690,7 @@ func (bigtable *Bigtable) GetEth1TxForToken(prefix string, limit int64) ([]*type logger.WithFields(logrus.Fields{ "prefix": prefix, "limit": limit, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30)) @@ -3810,7 +3810,7 @@ func (bigtable *Bigtable) SearchForAddress(addressPrefix []byte, limit int) ([]* logger.WithFields(logrus.Fields{ "addressPrefix": addressPrefix, "limit": limit, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30)) @@ -3863,7 +3863,7 @@ func (bigtable *Bigtable) GetSignatureImportStatus(st types.SignatureType) (*typ <-tmr.C logger.WithFields(logrus.Fields{ "st": st, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30)) @@ -3954,7 +3954,7 @@ func (bigtable *Bigtable) GetSignature(hex string, st types.SignatureType) (*str logger.WithFields(logrus.Fields{ "hex": hex, "st": st, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30)) @@ -4085,7 +4085,7 @@ func (bigtable *Bigtable) GetGasNowHistory(ts, pastTs time.Time) ([]types.GasNow logger.WithFields(logrus.Fields{ "ts": ts, "pastTs": pastTs, - }).Errorf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) + }).Warnf("%s call took longer than %v", utils.GetCurrentFuncName(), REPORT_TIMEOUT) }() ctx, done := context.WithTimeout(context.Background(), time.Second*30) From a13e59a27d56fd8cd62725ac4c3ec50fc739ce1a Mon Sep 17 00:00:00 2001 From: peter <1674920+peterbitfly@users.noreply.github.com> Date: Thu, 21 Sep 2023 13:07:57 +0200 Subject: [PATCH 3/3] (BIDS-2472) rollback stats changes --- db/statistics.go | 241 +++++++++++++++++++++++------------------------ 1 file changed, 118 insertions(+), 123 deletions(-) diff --git a/db/statistics.go b/db/statistics.go index a0508f9c29..4da7d2f119 100644 --- a/db/statistics.go +++ b/db/statistics.go @@ -52,7 +52,13 @@ func WriteValidatorStatisticsForDay(day uint64, concurrencyTotal uint64, concurr } exported := Exported{} - err := WriterDb.Get(&exported, ` + tx, err := WriterDb.Beginx() + if err != nil { + return fmt.Errorf("error starting transaction: %w", err) + } + defer tx.Rollback() + + err = tx.Get(&exported, ` SELECT status, failed_attestations_exported, @@ -89,72 +95,35 @@ func WriteValidatorStatisticsForDay(day uint64, concurrencyTotal uint64, concurr return nil } - g := &errgroup.Group{} - - g.Go(func() error { - if exported.FailedAttestations { - logger.Infof("Skipping failed attestations") - } else if err := WriteValidatorFailedAttestationsStatisticsForDay(validators, day, concurrencyFailedAttestations); err != nil { - return fmt.Errorf("error in WriteValidatorFailedAttestationsStatisticsForDay: %w", err) - } - return nil - }) - - g.Go(func() error { - if exported.SyncDuties { - logger.Infof("Skipping sync duties") - } else if err := WriteValidatorSyncDutiesForDay(validators, day); err != nil { - return fmt.Errorf("error in WriteValidatorSyncDutiesForDay: %w", err) - } - return nil - }) - - g.Go(func() error { - if exported.WithdrawalsDeposits { - logger.Infof("Skipping withdrawals / deposits") - } else if err := WriteValidatorDepositWithdrawals(day); err != nil { - return fmt.Errorf("error in WriteValidatorDepositWithdrawals: %w", err) - } - return nil - }) - - g.Go(func() error { - if exported.BlockStats { - logger.Infof("Skipping block stats") - } else if err := WriteValidatorBlockStats(day); err != nil { - return fmt.Errorf("error in WriteValidatorBlockStats: %w", err) - } - return nil - }) + if exported.FailedAttestations { + logger.Infof("Skipping failed attestations") + } else if err := WriteValidatorFailedAttestationsStatisticsForDay(validators, day, concurrencyFailedAttestations, tx); err != nil { + return fmt.Errorf("error in WriteValidatorFailedAttestationsStatisticsForDay: %w", err) + } - g.Go(func() error { - if exported.Balance { - logger.Infof("Skipping balances") - } else if err := WriteValidatorBalances(validators, day); err != nil { - return fmt.Errorf("error in WriteValidatorBalances: %w", err) - } - return nil - }) + if exported.SyncDuties { + logger.Infof("Skipping sync duties") + } else if err := WriteValidatorSyncDutiesForDay(validators, day, tx); err != nil { + return fmt.Errorf("error in WriteValidatorSyncDutiesForDay: %w", err) + } - g.Go(func() error { - if exported.ElRewards { - logger.Infof("Skipping el rewards") - } else if err := WriteValidatorElIcome(day); err != nil { - return fmt.Errorf("error in WriteValidatorElIcome: %w", err) - } - return nil - }) + if exported.WithdrawalsDeposits { + logger.Infof("Skipping withdrawals / deposits") + } else if err := WriteValidatorDepositWithdrawals(day, tx); err != nil { + return fmt.Errorf("error in WriteValidatorDepositWithdrawals: %w", err) + } - err = g.Wait() - if err != nil { - return fmt.Errorf("error during gather/write of stats: %w", err) + if exported.BlockStats { + logger.Infof("Skipping block stats") + } else if err := WriteValidatorBlockStats(day, tx); err != nil { + return fmt.Errorf("error in WriteValidatorBlockStats: %w", err) } - tx, err := WriterDb.Beginx() - if err != nil { - return fmt.Errorf("error starting transaction: %w", err) + if exported.Balance { + logger.Infof("Skipping balances") + } else if err := WriteValidatorBalances(validators, day, tx); err != nil { + return fmt.Errorf("error in WriteValidatorBalances: %w", err) } - defer tx.Rollback() if exported.ClRewards { logger.Infof("Skipping cl rewards") @@ -162,6 +131,12 @@ func WriteValidatorStatisticsForDay(day uint64, concurrencyTotal uint64, concurr return fmt.Errorf("error in WriteValidatorClIcome: %w", err) } + if exported.ElRewards { + logger.Infof("Skipping el rewards") + } else if err := WriteValidatorElIcome(day, tx); err != nil { + return fmt.Errorf("error in WriteValidatorElIcome: %w", err) + } + if exported.TotalAccumulation { logger.Infof("Skipping total accumulation") } else if err := WriteValidatorTotalAccumulation(day, concurrencyTotal, tx); err != nil { @@ -514,7 +489,7 @@ func WriteValidatorTotalPerformance(day uint64, concurrency uint64, tx *sqlx.Tx) return nil } -func WriteValidatorBlockStats(day uint64) error { +func WriteValidatorBlockStats(day uint64, tx *sqlx.Tx) error { exportStart := time.Now() defer func() { metrics.TaskDuration.WithLabelValues("db_update_validator_block_stats").Observe(time.Since(exportStart).Seconds()) @@ -524,15 +499,22 @@ func WriteValidatorBlockStats(day uint64) error { return err } - tx, err := WriterDb.Beginx() + start := time.Now() + resetQry := ` + UPDATE validator_stats SET + proposed_blocks = NULL, + missed_blocks = NULL, + orphaned_blocks = NULL + WHERE day = $1;` + _, err := tx.Exec(resetQry, day) if err != nil { - return fmt.Errorf("error starting transaction: %w", err) + return fmt.Errorf("error resetting proposer duty validator_stats for day [%v]: %w", day, err) } - defer tx.Rollback() + logger.Infof("proposer duty reset completed, took %v", time.Since(start)) firstEpoch, lastEpoch := utils.GetFirstAndLastEpochForDay(day) - start := time.Now() + start = time.Now() logger.Infof("exporting proposed_blocks, missed_blocks and orphaned_blocks statistics") _, err = tx.Exec(` @@ -571,15 +553,11 @@ func WriteValidatorBlockStats(day uint64) error { return err } - err = tx.Commit() - if err != nil { - return err - } logger.Infof("block statistics export of day %v completed, took %v", day, time.Since(exportStart)) return nil } -func WriteValidatorElIcome(day uint64) error { +func WriteValidatorElIcome(day uint64, tx *sqlx.Tx) error { exportStart := time.Now() defer func() { metrics.TaskDuration.WithLabelValues("db_update_validator_el_income_stats").Observe(time.Since(exportStart).Seconds()) @@ -589,15 +567,21 @@ func WriteValidatorElIcome(day uint64) error { return err } - tx, err := WriterDb.Beginx() + start := time.Now() + resetQry := ` + UPDATE validator_stats SET + el_rewards_wei = NULL, + mev_rewards_wei = NULL + WHERE day = $1;` + _, err := tx.Exec(resetQry, day) if err != nil { - return fmt.Errorf("error starting transaction: %w", err) + return fmt.Errorf("error resetting el income validator_stats for day [%v]: %w", day, err) } - defer tx.Rollback() + logger.Infof("el income reset completed, took %v", time.Since(start)) firstEpoch, lastEpoch := utils.GetFirstAndLastEpochForDay(day) - start := time.Now() + start = time.Now() logger.Infof("exporting mev & el rewards") @@ -690,10 +674,6 @@ func WriteValidatorElIcome(day uint64) error { return err } - err = tx.Commit() - if err != nil { - return err - } logger.Infof("el rewards statistics export of day %v completed, took %v", day, time.Since(exportStart)) return nil } @@ -711,6 +691,17 @@ func WriteValidatorClIcome(validators []uint64, day uint64, concurrency uint64, } start := time.Now() + resetQry := ` + UPDATE validator_stats SET + cl_rewards_gwei = NULL + WHERE day = $1;` + _, err := tx.Exec(resetQry, day) + if err != nil { + return fmt.Errorf("error resetting cl income validator_stats for day [%v]: %w", day, err) + } + logger.Infof("cl income reset completed, took %v", time.Since(start)) + + start = time.Now() logger.Infof("validating if required data has been exported for cl rewards") type Exported struct { LastBalanceExported bool `db:"last_balance_exported"` @@ -718,7 +709,7 @@ func WriteValidatorClIcome(validators []uint64, day uint64, concurrency uint64, CurrentWithdrawalsDepositsExported bool `db:"cur_withdrawals_deposits_exported"` } exported := Exported{} - err := tx.Get(&exported, ` + err = tx.Get(&exported, ` SELECT last.balance_exported as last_balance_exported, cur.balance_exported as cur_balance_exported, cur.withdrawals_deposits_exported as cur_withdrawals_deposits_exported FROM validator_stats_status cur INNER JOIN validator_stats_status last @@ -805,11 +796,12 @@ func WriteValidatorClIcome(validators []uint64, day uint64, concurrency uint64, if err = markColumnExported(day, "cl_rewards_exported", tx); err != nil { return err } + logger.Infof("cl rewards statistics export of day %v completed, took %v", day, time.Since(exportStart)) return nil } -func WriteValidatorBalances(validators []uint64, day uint64) error { +func WriteValidatorBalances(validators []uint64, day uint64, tx *sqlx.Tx) error { ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute*10)) defer cancel() @@ -822,15 +814,27 @@ func WriteValidatorBalances(validators []uint64, day uint64) error { return err } - tx, err := WriterDb.Beginx() - if err != nil { - return fmt.Errorf("error starting transaction: %w", err) - } - defer tx.Rollback() + start := time.Now() + resetQry := ` + UPDATE validator_stats SET + min_balance = NULL, + max_balance = NULL, + min_effective_balance = NULL, + max_effective_balance = NULL, + start_balance = NULL, + start_effective_balance = NULL, + end_balance = NULL, + end_effective_balance = NULL + WHERE day = $1;` + _, err := tx.Exec(resetQry, day) + if err != nil { + return fmt.Errorf("error resetting balances validator_stats for day [%v]: %w", day, err) + } + logger.Infof("balances reset completed, took %v", time.Since(start)) firstEpoch, lastEpoch := utils.GetFirstAndLastEpochForDay(day) - start := time.Now() + start = time.Now() logger.Infof("exporting min_balance, max_balance, min_effective_balance, max_effective_balance, start_balance, start_effective_balance, end_balance and end_effective_balance statistics") balanceStatistics, err := BigtableClient.GetValidatorBalanceStatistics(validators, firstEpoch, lastEpoch) @@ -905,15 +909,11 @@ func WriteValidatorBalances(validators []uint64, day uint64) error { return err } - err = tx.Commit() - if err != nil { - return err - } logger.Infof("balance statistics export of day %v completed, took %v", day, time.Since(exportStart)) return nil } -func WriteValidatorDepositWithdrawals(day uint64) error { +func WriteValidatorDepositWithdrawals(day uint64, tx *sqlx.Tx) error { exportStart := time.Now() defer func() { metrics.TaskDuration.WithLabelValues("db_update_validator_deposit_withdrawal_stats").Observe(time.Since(exportStart).Seconds()) @@ -923,12 +923,6 @@ func WriteValidatorDepositWithdrawals(day uint64) error { return err } - tx, err := WriterDb.Beginx() - if err != nil { - return fmt.Errorf("error starting transaction: %w", err) - } - defer tx.Rollback() - // The end_balance of a day is the balance after the first slot of the last epoch of that day. // Therefore the last 31 slots of the day are not included in the end_balance of that day. // Since our income calculation is base on subtracting end_balances the deposits and withdrawals that happen during those slots must be added to the next day instead. @@ -955,11 +949,11 @@ func WriteValidatorDepositWithdrawals(day uint64) error { withdrawals_amount = NULL WHERE day = $1%s;`, firstDayExtraCondition) - _, err = tx.Exec(resetQry, day) + _, err := tx.Exec(resetQry, day) if err != nil { - return fmt.Errorf("error resetting validator_stats for day [%v]: %w", day, err) + return fmt.Errorf("error resetting deposit & withdrawal validator_stats for day [%v]: %w", day, err) } - logger.Infof("reset completed, took %v", time.Since(start)) + logger.Infof("deposit & withdrawal reset completed, took %v", time.Since(start)) start = time.Now() logrus.Infof("Update Withdrawals + Deposits for day [%v] slot %v -> %v", day, firstSlot, lastSlot) @@ -1028,15 +1022,11 @@ func WriteValidatorDepositWithdrawals(day uint64) error { return err } - err = tx.Commit() - if err != nil { - return err - } logger.Infof("deposits and withdrawals statistics export of day %v completed, took %v", day, time.Since(exportStart)) return nil } -func WriteValidatorSyncDutiesForDay(validators []uint64, day uint64) error { +func WriteValidatorSyncDutiesForDay(validators []uint64, day uint64, tx *sqlx.Tx) error { exportStart := time.Now() defer func() { metrics.TaskDuration.WithLabelValues("db_update_validator_sync_stats").Observe(time.Since(exportStart).Seconds()) @@ -1046,11 +1036,18 @@ func WriteValidatorSyncDutiesForDay(validators []uint64, day uint64) error { return err } - tx, err := WriterDb.Beginx() + start := time.Now() + resetQry := ` + UPDATE validator_stats SET + participated_sync = NULL, + missed_sync = NULL, + orphaned_sync = NULL + WHERE day = $1;` + _, err := tx.Exec(resetQry, day) if err != nil { - return fmt.Errorf("error starting transaction: %w", err) + return fmt.Errorf("error resetting sync duty validator_stats for day [%v]: %w", day, err) } - defer tx.Rollback() + logger.Infof("sync duty reset completed, took %v", time.Since(start)) startEpoch, endEpoch := utils.GetFirstAndLastEpochForDay(day) if startEpoch < utils.Config.Chain.Config.AltairForkEpoch && endEpoch > utils.Config.Chain.Config.AltairForkEpoch { @@ -1060,7 +1057,7 @@ func WriteValidatorSyncDutiesForDay(validators []uint64, day uint64) error { return nil } - start := time.Now() + start = time.Now() logrus.Infof("Update Sync duties for day [%v] epoch %v -> %v", day, startEpoch, endEpoch) syncStats, err := BigtableClient.GetValidatorSyncDutiesStatistics(validators, startEpoch, endEpoch) @@ -1113,15 +1110,11 @@ func WriteValidatorSyncDutiesForDay(validators []uint64, day uint64) error { return err } - err = tx.Commit() - if err != nil { - return err - } logger.Infof("sync duties and statistics export of day %v completed, took %v", day, time.Since(exportStart)) return nil } -func WriteValidatorFailedAttestationsStatisticsForDay(validators []uint64, day uint64, concurrency uint64) error { +func WriteValidatorFailedAttestationsStatisticsForDay(validators []uint64, day uint64, concurrency uint64, tx *sqlx.Tx) error { ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute*10)) defer cancel() exportStart := time.Now() @@ -1133,15 +1126,21 @@ func WriteValidatorFailedAttestationsStatisticsForDay(validators []uint64, day u return err } - tx, err := WriterDb.Beginx() + start := time.Now() + resetQry := ` + UPDATE validator_stats SET + missed_attestations = NULL, + orphaned_attestations = NULL + WHERE day = $1;` + _, err := tx.Exec(resetQry, day) if err != nil { - return fmt.Errorf("error starting transaction: %w", err) + return fmt.Errorf("error resetting attestation duty validator_stats for day [%v]: %w", day, err) } - defer tx.Rollback() + logger.Infof("attestation duty reset completed, took %v", time.Since(start)) firstEpoch, lastEpoch := utils.GetFirstAndLastEpochForDay(day) - start := time.Now() + start = time.Now() logrus.Infof("exporting 'failed attestations' statistics firstEpoch: %v lastEpoch: %v", firstEpoch, lastEpoch) @@ -1210,10 +1209,6 @@ func WriteValidatorFailedAttestationsStatisticsForDay(validators []uint64, day u return err } - err = tx.Commit() - if err != nil { - return err - } logger.Infof("'failed attestation' statistics export of day %v completed, took %v", day, time.Since(exportStart)) return nil }