From 77b6eb8e7f1cd62197f7afb542fe0722dfc2ad02 Mon Sep 17 00:00:00 2001 From: Patrick Pfeiffer Date: Wed, 9 Oct 2024 16:09:47 +0200 Subject: [PATCH] fix(api): improve perf for /api/v1/app/dashboard --- handlers/api.go | 141 ++++++++++++++++++++----------------------- handlers/api_eth1.go | 119 ++++++++++++++++++++++-------------- utils/utils.go | 65 ++++++++++++++++++++ 3 files changed, 205 insertions(+), 120 deletions(-) diff --git a/handlers/api.go b/handlers/api.go index 156ff57759..185c5560bf 100644 --- a/handlers/api.go +++ b/handlers/api.go @@ -849,6 +849,9 @@ Combined validator get, performance, attestation efficency, sync committee stati Not public documented */ func ApiDashboard(w http.ResponseWriter, r *http.Request) { + obs := utils.NewTimingsObserver("ApiDashboard") + defer obs.End(time.Second * 10) + w.Header().Set("Content-Type", "application/json") j := json.NewEncoder(w) @@ -895,128 +898,83 @@ func ApiDashboard(w http.ResponseWriter, r *http.Request) { if len(queryIndices) > 0 { g.Go(func() error { - start := time.Now() + defer obs.Timer("getGeneralValidatorInfoForAppDashboard")() var err error validatorsData, err = getGeneralValidatorInfoForAppDashboard(queryIndices) - elapsed := time.Since(start) - if elapsed > 10*time.Second { - logger.Warnf("getGeneralValidatorInfoForAppDashboard(%v) took longer than 10 sec", queryIndices) - } return err }) g.Go(func() error { - start := time.Now() + defer obs.Timer("getValidatorEffectiveness")() var err error validatorEffectivenessData, err = getValidatorEffectiveness(epoch-1, queryIndices) - elapsed := time.Since(start) - if elapsed > 10*time.Second { - logger.Warnf("getValidatorEffectiveness(%v, %v) took longer than 10 sec", epoch-1, queryIndices) - } return err }) g.Go(func() error { - start := time.Now() + defer obs.Timer("getRocketpoolValidators")() var err error rocketpoolData, err = getRocketpoolValidators(queryIndices) - elapsed := time.Since(start) - if elapsed > 10*time.Second { - logger.Warnf("getRocketpoolValidators(%v) took longer than 10 sec", queryIndices) - } return err }) g.Go(func() error { - start := time.Now() + defer obs.Timer("getValidatorExecutionPerformance")() var err error executionPerformance, err = getValidatorExecutionPerformance(queryIndices) - elapsed := time.Since(start) - if elapsed > 10*time.Second { - logger.Warnf("getValidatorExecutionPerformance(%v) took longer than 10 sec", queryIndices) - } return err }) g.Go(func() error { - start := time.Now() + defer obs.Timer("getSyncCommitteeInfoForValidators")() var err error period := utils.SyncPeriodOfEpoch(epoch) currentSyncCommittee, err = getSyncCommitteeInfoForValidators(queryIndices, period) - elapsed := time.Since(start) - if elapsed > 10*time.Second { - logger.Warnf("getSyncCommitteeInfoForValidators(%v, %v) took longer than 10 sec", queryIndices, period) - } return err }) g.Go(func() error { - start := time.Now() + defer obs.Timer("getSyncCommitteeInfoForValidators+1")() var err error period := utils.SyncPeriodOfEpoch(epoch) + 1 nextSyncCommittee, err = getSyncCommitteeInfoForValidators(queryIndices, period) - elapsed := time.Since(start) - if elapsed > 10*time.Second { - logger.Warnf("SyncPeriodOfEpoch(%v) + 1 took longer than 10 sec", epoch) - logger.Warnf("getSyncCommitteeInfoForValidators(%v, %v) took longer than 10 sec", queryIndices, period) - } return err }) g.Go(func() error { - start := time.Now() + defer obs.Timer("getSyncCommitteeStatistics")() var err error syncCommitteeStats, err = getSyncCommitteeStatistics(queryIndices, epoch) - elapsed := time.Since(start) - if elapsed > 10*time.Second { - logger.Warnf("getSyncCommitteeStatistics(%v, %v) took longer than 10 sec", queryIndices, epoch) - } return err }) g.Go(func() error { - start := time.Now() + defer obs.Timer("getProposalLuckStats+1")() var err error proposalLuckStats, err = getProposalLuckStats(queryIndices) - elapsed := time.Since(start) - if elapsed > 10*time.Second { - logger.Warnf("getProposalLuck(%v, %v) took longer than 10 sec", queryIndices, epoch) - } return err }) } } g.Go(func() error { - start := time.Now() + defer obs.Timer("getEpoch-1")() var err error currentEpochData, err = getEpoch(int64(epoch) - 1) - elapsed := time.Since(start) - if elapsed > 10*time.Second { - logger.Warnf("getEpoch(%v) took longer than 10 sec", int64(epoch)-1) - } return err }) g.Go(func() error { - start := time.Now() + defer obs.Timer("getEpoch-10")() var err error olderEpochData, err = getEpoch(int64(epoch) - 10) - elapsed := time.Since(start) - if elapsed > 10*time.Second { - logger.Warnf("getEpoch(%v) took longer than 10 sec", int64(epoch)-10) - } return err }) g.Go(func() error { - start := time.Now() + defer obs.Timer("getRocketpoolStats")() var err error rocketpoolStats, err = getRocketpoolStats() - elapsed := time.Since(start) - if elapsed > 10*time.Second { - logger.Warnf("getRocketpoolStats() took longer than 10 sec") - } return err }) @@ -1074,6 +1032,9 @@ func getSyncCommitteeInfoForValidators(validators []uint64, period uint64) ([]in } func getSyncCommitteeStatistics(validators []uint64, epoch uint64) (*SyncCommitteesInfo, error) { + obs := utils.NewTimingsObserver("getSyncCommitteeStatistics") + defer obs.End(time.Second * 10) + if epoch < utils.Config.Chain.ClConfig.AltairForkEpoch { // no sync committee duties before altair fork return &SyncCommitteesInfo{}, nil @@ -1084,12 +1045,31 @@ func getSyncCommitteeStatistics(validators []uint64, epoch uint64) (*SyncCommitt return &SyncCommitteesInfo{}, nil } - expectedSlots, err := getExpectedSyncCommitteeSlots(validators, epoch) - if err != nil { - return nil, err - } + g := errgroup.Group{} + + var expectedSlots uint64 + g.Go(func() error { + defer obs.Timer("getExpectedSyncCommitteeSlots")() + var err error + expectedSlots, err = getExpectedSyncCommitteeSlots(validators, epoch) + if err != nil { + return err + } + return nil + }) - stats, err := getSyncCommitteeSlotsStatistics(validators, epoch) + var stats types.SyncCommitteesStats + g.Go(func() error { + defer obs.Timer("getSyncCommitteeSlotsStatistics")() + var err error + stats, err = getSyncCommitteeSlotsStatistics(validators, epoch) + if err != nil { + return err + } + return nil + }) + + err := g.Wait() if err != nil { return nil, err } @@ -1397,8 +1377,16 @@ func getRocketpoolValidators(queryIndices []uint64) ([]interface{}, error) { } func getGeneralValidatorInfoForAppDashboard(queryIndices []uint64) ([]interface{}, error) { - // we use MAX(validatorindex)+1 instead of COUNT(*) for querying the rank_count for performance-reasons - rows, err := db.ReaderDb.Query(` + obs := utils.NewTimingsObserver("getGeneralValidatorInfoForAppDashboard") + defer obs.End(time.Second * 10) + + g := new(errgroup.Group) + + var data []interface{} + g.Go(func() error { + defer obs.Timer("getValidators")() + // we use MAX(validatorindex)+1 instead of COUNT(*) for querying the rank_count for performance-reasons + rows, err := db.ReaderDb.Query(` WITH maxValidatorIndex AS ( SELECT MAX(validatorindex)+1 as total_count FROM validator_performance @@ -1427,20 +1415,21 @@ func getGeneralValidatorInfoForAppDashboard(queryIndices []uint64) ([]interface{ LEFT JOIN validator_names ON validator_names.publickey = validators.pubkey WHERE validators.validatorindex = ANY($1) ORDER BY validators.validatorindex`, pq.Array(queryIndices)) - if err != nil { - return nil, fmt.Errorf("error querying validators: %w", err) - } - defer rows.Close() - - data, err := utils.SqlRowsToJSON(rows) - if err != nil { - return nil, fmt.Errorf("error converting validators to json: %w", err) - } + if err != nil { + return fmt.Errorf("error querying validators: %w", err) + } + defer rows.Close() - g := new(errgroup.Group) + data, err = utils.SqlRowsToJSON(rows) + if err != nil { + return fmt.Errorf("error converting validators to json: %w", err) + } + return nil + }) var balances map[uint64][]*types.ValidatorBalance g.Go(func() error { + defer obs.Timer("GetValidatorBalanceHistory")() var err error balances, err = db.BigtableClient.GetValidatorBalanceHistory(queryIndices, services.LatestEpoch(), services.LatestEpoch()) if err != nil { @@ -1451,6 +1440,7 @@ func getGeneralValidatorInfoForAppDashboard(queryIndices []uint64) ([]interface{ var currentDayIncome map[uint64]int64 g.Go(func() error { + defer obs.Timer("GetCurrentDayClIncome")() var err error currentDayIncome, err = db.GetCurrentDayClIncome(queryIndices) if err != nil { @@ -1461,6 +1451,7 @@ func getGeneralValidatorInfoForAppDashboard(queryIndices []uint64) ([]interface{ var lastAttestationSlots map[uint64]uint64 g.Go(func() error { + defer obs.Timer("GetLastAttestationSlots")() var err error lastAttestationSlots, err = db.BigtableClient.GetLastAttestationSlots(queryIndices) if err != nil { @@ -1469,11 +1460,12 @@ func getGeneralValidatorInfoForAppDashboard(queryIndices []uint64) ([]interface{ return nil }) - err = g.Wait() + err := g.Wait() if err != nil { return nil, fmt.Errorf("error in validator errgroup: %w", err) } + obsAggreate := obs.Timer("aggregate") for _, entry := range data { eMap, ok := entry.(map[string]interface{}) if !ok { @@ -1505,6 +1497,7 @@ func getGeneralValidatorInfoForAppDashboard(queryIndices []uint64) ([]interface{ } } } + obsAggreate() return data, nil } diff --git a/handlers/api_eth1.go b/handlers/api_eth1.go index 7036022423..10c11e9973 100644 --- a/handlers/api_eth1.go +++ b/handlers/api_eth1.go @@ -22,6 +22,7 @@ import ( "github.com/lib/pq" "github.com/shopspring/decimal" "golang.org/x/exp/maps" + "golang.org/x/sync/errgroup" ) // ApiEth1Deposit godoc @@ -483,6 +484,11 @@ func formatBlocksForApiResponse(blocks []*types.Eth1BlockIndexed, relaysData map } func getValidatorExecutionPerformance(queryIndices []uint64) ([]types.ExecutionPerformanceResponse, error) { + obs := utils.NewTimingsObserver("getValidatorExecutionPerformance") + defer obs.End(time.Second * 10) + + g := errgroup.Group{} + latestEpoch := services.LatestEpoch() last31dTimestamp := time.Now().Add(-31 * utils.Day) last7dTimestamp := time.Now().Add(-7 * utils.Day) @@ -494,36 +500,44 @@ func getValidatorExecutionPerformance(queryIndices []uint64) ([]types.ExecutionP } validatorsPQArray := pq.Array(queryIndices) - var execBlocks []types.ExecBlockProposer - err := db.ReaderDb.Select(&execBlocks, - `SELECT - exec_block_number, - proposer - FROM blocks - WHERE proposer = ANY($1) - AND exec_block_number IS NOT NULL - AND exec_block_number > 0 - AND epoch > $2`, - validatorsPQArray, - monthRange, // 32d range - ) - if err != nil { - return nil, fmt.Errorf("error cannot get proposed blocks from db with indicies: %+v and epoch: %v, err: %w", queryIndices, latestEpoch, err) - } + var blocks []*types.Eth1BlockIndexed + var blockList []uint64 + var blockToProposerMap map[uint64]types.ExecBlockProposer + var relaysData map[common.Hash]types.RelaysData + g.Go(func() error { + defer obs.Timer("getBlocks")() + var execBlocks []types.ExecBlockProposer + err := db.ReaderDb.Select(&execBlocks, + `SELECT + exec_block_number, + proposer + FROM blocks + WHERE proposer = ANY($1) + AND exec_block_number IS NOT NULL + AND exec_block_number > 0 + AND epoch > $2`, + validatorsPQArray, + monthRange, // 32d range + ) + if err != nil { + return fmt.Errorf("error cannot get proposed blocks from db with indicies: %+v and epoch: %v, err: %w", queryIndices, latestEpoch, err) + } - blockList, blockToProposerMap := getBlockNumbersAndMapProposer(execBlocks) + blockList, blockToProposerMap = getBlockNumbersAndMapProposer(execBlocks) - blocks, err := db.BigtableClient.GetBlocksIndexedMultiple(blockList, 10000) - if err != nil { - return nil, fmt.Errorf("error cannot get blocks from bigtable using GetBlocksIndexedMultiple: %w", err) - } + blocks, err = db.BigtableClient.GetBlocksIndexedMultiple(blockList, 10000) + if err != nil { + return fmt.Errorf("error cannot get blocks from bigtable using GetBlocksIndexedMultiple: %w", err) + } - resultPerProposer := make(map[uint64]types.ExecutionPerformanceResponse) + relaysData, err = db.GetRelayDataForIndexedBlocks(blocks) + if err != nil { + return fmt.Errorf("error can not get relays data: %w", err) + } + return nil + }) - relaysData, err := db.GetRelayDataForIndexedBlocks(blocks) - if err != nil { - return nil, fmt.Errorf("error can not get relays data: %w", err) - } + resultPerProposer := make(map[uint64]types.ExecutionPerformanceResponse) type LongPerformanceResponse struct { Performance365d string `db:"el_performance_365d" json:"performance365d"` @@ -533,34 +547,47 @@ func getValidatorExecutionPerformance(queryIndices []uint64) ([]types.ExecutionP performanceList := []LongPerformanceResponse{} - err = db.ReaderDb.Select(&performanceList, ` + g.Go(func() error { + defer obs.Timer("getPerformance")() + err := db.ReaderDb.Select(&performanceList, ` SELECT validatorindex, CAST(COALESCE(mev_performance_365d, 0) AS text) AS el_performance_365d, CAST(COALESCE(mev_performance_total, 0) AS text) AS el_performance_total FROM validator_performance WHERE validatorindex = ANY($1)`, validatorsPQArray) - if err != nil { - return nil, fmt.Errorf("error can cl performance from db: %w", err) - } - for _, val := range performanceList { - performance365d, _ := new(big.Int).SetString(val.Performance365d, 10) - performanceTotal, _ := new(big.Int).SetString(val.PerformanceTotal, 10) - resultPerProposer[val.ValidatorIndex] = types.ExecutionPerformanceResponse{ - Performance1d: big.NewInt(0), - Performance7d: big.NewInt(0), - Performance31d: big.NewInt(0), - Performance365d: performance365d, - PerformanceTotal: performanceTotal, - ValidatorIndex: val.ValidatorIndex, + if err != nil { + return fmt.Errorf("error can cl performance from db: %w", err) } - } + for _, val := range performanceList { + performance365d, _ := new(big.Int).SetString(val.Performance365d, 10) + performanceTotal, _ := new(big.Int).SetString(val.PerformanceTotal, 10) + resultPerProposer[val.ValidatorIndex] = types.ExecutionPerformanceResponse{ + Performance1d: big.NewInt(0), + Performance7d: big.NewInt(0), + Performance31d: big.NewInt(0), + Performance365d: performance365d, + PerformanceTotal: performanceTotal, + ValidatorIndex: val.ValidatorIndex, + } + } + return nil + }) firstEpochTime := utils.EpochToTime(0) - lastStatsDay, err := services.LatestExportedStatisticDay() - if err != nil && err != db.ErrNoStats { - return nil, fmt.Errorf("error retrieving latest exported statistics day: %v", err) - } else if err == nil { - firstEpochTime = utils.EpochToTime((lastStatsDay + 1) * utils.EpochsPerDay()) + g.Go(func() error { + defer obs.Timer("getFirstEpochTime")() + lastStatsDay, err := services.LatestExportedStatisticDay() + if err != nil && err != db.ErrNoStats { + return fmt.Errorf("error retrieving latest exported statistics day: %v", err) + } else if err == nil { + firstEpochTime = utils.EpochToTime((lastStatsDay + 1) * utils.EpochsPerDay()) + } + return nil + }) + + err := g.Wait() + if err != nil { + return nil, err } for _, block := range blocks { diff --git a/utils/utils.go b/utils/utils.go index 35313ff243..85a47f9402 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -28,6 +28,8 @@ import ( "sort" "strconv" "strings" + "sync" + "sync/atomic" "syscall" "time" "unicode/utf8" @@ -1823,3 +1825,66 @@ func GetMaxAllowedDayRangeValidatorStats(validatorAmount int) int { return math.MaxInt } } + +type TimingsObserverTiming struct { + Name string + Duration time.Duration +} +type TimingsObserver struct { + Name string + startTime time.Time + timings []TimingsObserverTiming + timingsMu sync.Mutex + maxNameLen uint64 +} + +func NewTimingsObserver(name string) *TimingsObserver { + if name == "" { + name = "unnamed" + } + return &TimingsObserver{ + Name: name, + startTime: time.Now(), + timings: []TimingsObserverTiming{}, + } +} + +func (t *TimingsObserver) Timer(name string) func() { + start := time.Now() + return func() { + t.Observe(name, time.Since(start)) + } +} + +func (t *TimingsObserver) Observe(name string, dur time.Duration) { + if name == "" { + name = "unnamed" + } + if uint64(len(name)) > atomic.LoadUint64(&t.maxNameLen) { + atomic.StoreUint64(&t.maxNameLen, uint64(len(name))) + } + t.timingsMu.Lock() + defer t.timingsMu.Unlock() + t.timings = append(t.timings, TimingsObserverTiming{ + Name: name, + Duration: dur, + }) +} + +func (t *TimingsObserver) End(warnThreshold time.Duration) { + dur := time.Since(t.startTime) + if dur < warnThreshold { + return + } + t.timingsMu.Lock() + defer t.timingsMu.Unlock() + sort.Slice(t.timings, func(i, j int) bool { + return t.timings[i].Duration > t.timings[j].Duration + }) + str := "" + tpl := fmt.Sprintf("%%%ds: %%v\n", atomic.LoadUint64(&t.maxNameLen)) + for _, tt := range t.timings { + str += fmt.Sprintf(tpl, tt.Name, tt.Duration) + } + logrus.Warnf("timingsObserver: %s: total duration %s\n%s", t.Name, dur, str) +}