Skip to content

Commit

Permalink
Add etherchain charts & correlation page
Browse files Browse the repository at this point in the history
feature/etherchain charts
  • Loading branch information
qu0b authored Dec 6, 2022
2 parents bd07dcc + 098a896 commit fe5e829
Show file tree
Hide file tree
Showing 16 changed files with 1,614 additions and 107 deletions.
2 changes: 2 additions & 0 deletions cmd/explorer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,8 @@ func main() {
router.HandleFunc("/burn/data", handlers.BurnPageData).Methods("GET")
router.HandleFunc("/gasnow", handlers.GasNow).Methods("GET")
router.HandleFunc("/gasnow/data", handlers.GasNowData).Methods("GET")
router.HandleFunc("/correlations", handlers.Correlations).Methods("GET")
router.HandleFunc("/correlations/data", handlers.CorrelationsData).Methods("POST")

router.HandleFunc("/vis", handlers.Vis).Methods("GET")
router.HandleFunc("/charts", handlers.Charts).Methods("GET")
Expand Down
16 changes: 10 additions & 6 deletions cmd/statistics/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"eth2-exporter/db"
"eth2-exporter/price"
"eth2-exporter/services"
"eth2-exporter/types"
"eth2-exporter/utils"
Expand All @@ -25,7 +26,7 @@ type options struct {
statisticsChartToggle bool
}

var opt options = options{}
var opt *options

func main() {
configPath := flag.String("config", "", "Path to the config file")
Expand All @@ -37,11 +38,12 @@ func main() {

flag.Parse()

opt = options{
opt = &options{
configPath: *configPath,
statisticsDayToExport: *statisticsDayToExport,
statisticsDaysToExport: *statisticsDaysToExport,
statisticsValidatorToggle: *statisticsChartToggle,
statisticsChartToggle: *statisticsChartToggle,
statisticsValidatorToggle: *statisticsValidatorToggle,
poolsDisabledFlag: *poolsDisabledFlag,
}

Expand Down Expand Up @@ -88,6 +90,8 @@ func main() {

db.InitBigtable(cfg.Bigtable.Project, cfg.Bigtable.Instance, fmt.Sprintf("%d", utils.Config.Chain.Config.DepositChainID))

price.Init(utils.Config.Chain.Config.DepositChainID)

if *statisticsDaysToExport != "" {
s := strings.Split(*statisticsDaysToExport, "-")
if len(s) < 2 {
Expand Down Expand Up @@ -125,7 +129,7 @@ func main() {
logrus.Fatalf("error resetting status for chart series status for day %v: %v", d, err)
}

err = db.WriteChartSeriesForDay(uint64(d))
err = db.WriteChartSeriesForDay(int64(d))
if err != nil {
logrus.Errorf("error exporting chart series from day %v: %v", d, err)
}
Expand Down Expand Up @@ -153,7 +157,7 @@ func main() {
logrus.Fatalf("error resetting status for chart series status for day %v: %v", *statisticsDayToExport, err)
}

err = db.WriteChartSeriesForDay(uint64(*statisticsDayToExport))
err = db.WriteChartSeriesForDay(int64(*statisticsDayToExport))
if err != nil {
logrus.Errorf("error exporting chart series from day %v: %v", *statisticsDayToExport, err)
}
Expand Down Expand Up @@ -229,7 +233,7 @@ func statisticsLoop() {
logrus.Infof("Chart statistics: latest epoch is %v, previous day is %v, last exported day is %v", latestEpoch, previousDay, lastExportedDayChart)
if lastExportedDayChart <= previousDay || lastExportedDayChart == 0 {
for day := lastExportedDayChart; day <= previousDay; day++ {
err = db.WriteChartSeriesForDay(day)
err = db.WriteChartSeriesForDay(int64(day))
if err != nil {
logrus.Errorf("error exporting chart series from day %v: %v", day, err)
}
Expand Down
84 changes: 72 additions & 12 deletions db/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,9 +656,9 @@ func (bigtable *Bigtable) GetValidatorBalanceHistory(validators []uint64, startE
rangeEnd := fmt.Sprintf("%s:e:b:%s", bigtable.chainId, reversedPaddedEpoch(startEpoch-uint64(limit)))
res := make(map[uint64][]*types.ValidatorBalance, len(validators))

if len(validators) == 0 {
return res, nil
}
// if len(validators) == 0 {
// return res, nil
// }

columnFilters := make([]gcp_bigtable.Filter, 0, len(validators))
for _, validator := range validators {
Expand Down Expand Up @@ -1241,7 +1241,7 @@ func (bigtable *Bigtable) SaveValidatorIncomeDetails(epoch uint64, rewards map[u
return nil
}

func (bigtable *Bigtable) GetEpochIncomeHistory(startEpoch uint64, limit int64) (*itypes.ValidatorEpochIncome, error) {
func (bigtable *Bigtable) GetEpochIncomeHistoryDescending(startEpoch uint64, limit int64) (*itypes.ValidatorEpochIncome, error) {
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30))
defer cancel()

Expand Down Expand Up @@ -1273,19 +1273,81 @@ func (bigtable *Bigtable) GetEpochIncomeHistory(startEpoch uint64, limit int64)
return &res, nil
}

func (bigtable *Bigtable) GetValidatorIncomeDetailsHistory(validators []uint64, startEpoch uint64, limit int64) (map[uint64]map[uint64]*itypes.ValidatorEpochIncome, error) {
func (bigtable *Bigtable) GetEpochIncomeHistory(epoch uint64) (*itypes.ValidatorEpochIncome, error) {

ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*40))
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30))
defer cancel()

key := fmt.Sprintf("%s:e:b:%s", bigtable.chainId, reversedPaddedEpoch(epoch))

family := gcp_bigtable.FamilyFilter(STATS_COLUMN_FAMILY)
columnFilter := gcp_bigtable.ColumnFilter(SUM_COLUMN)
filter := gcp_bigtable.RowFilter(gcp_bigtable.ChainFilters(family, columnFilter))

row, err := bigtable.tableBeaconchain.ReadRow(ctx, key, filter)
if err != nil {
return nil, fmt.Errorf("error reading income statistics from bigtable for epoch: %v err: %w", epoch, err)
}

if row != nil {
res := itypes.ValidatorEpochIncome{}
err := proto.Unmarshal(row[STATS_COLUMN_FAMILY][0].Value, &res)
if err != nil {
return nil, fmt.Errorf("error decoding income data for row %v: %w", row.Key(), err)
}
return &res, nil
}

// if there is no result we have to calculate the sum
income, err := bigtable.GetValidatorIncomeDetailsHistory([]uint64{}, epoch, 1)
if err != nil {
logger.WithError(err).Error("error getting validator income history")
}

total := &itypes.ValidatorEpochIncome{}

for _, epochs := range income {
for _, details := range epochs {
total.AttestationHeadReward += details.AttestationHeadReward
total.AttestationSourceReward += details.AttestationSourceReward
total.AttestationSourcePenalty += details.AttestationSourcePenalty
total.AttestationTargetReward += details.AttestationTargetReward
total.AttestationTargetPenalty += details.AttestationTargetPenalty
total.FinalityDelayPenalty += details.FinalityDelayPenalty
total.ProposerSlashingInclusionReward += details.ProposerSlashingInclusionReward
total.ProposerAttestationInclusionReward += details.ProposerAttestationInclusionReward
total.ProposerSyncInclusionReward += details.ProposerSyncInclusionReward
total.SyncCommitteeReward += details.SyncCommitteeReward
total.SyncCommitteePenalty += details.SyncCommitteePenalty
total.SlashingReward += details.SlashingReward
total.SlashingPenalty += details.SlashingPenalty
total.TxFeeRewardWei = utils.AddBigInts(total.TxFeeRewardWei, details.TxFeeRewardWei)
}
}

return total, nil
}

// GetValidatorIncomeDetailsHistory returns the validator income details, which have a garbage collection policy of one day.
func (bigtable *Bigtable) GetValidatorIncomeDetailsHistory(validators []uint64, startEpoch uint64, limit int64) (map[uint64]map[uint64]*itypes.ValidatorEpochIncome, error) {
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*180))
defer cancel()

endEpoch := startEpoch - uint64(limit)

endTime := utils.EpochToTime(endEpoch)

// if the end time + 25 hours is not after the current time the end epoch is older than 25 hours.
if !endTime.Add(time.Hour * 25).After(time.Now()) {
return nil, fmt.Errorf("error epoch range is outside of the garbage collection policy (1 day)")
}

rangeStart := fmt.Sprintf("%s:e:b:%s", bigtable.chainId, reversedPaddedEpoch(startEpoch))
rangeEnd := fmt.Sprintf("%s:e:b:%s", bigtable.chainId, reversedPaddedEpoch(startEpoch-uint64(limit)))
rangeEnd := fmt.Sprintf("%s:e:b:%s", bigtable.chainId, reversedPaddedEpoch(endEpoch))
// logger.Infof("range: %v to %v", rangeStart, rangeEnd)
res := make(map[uint64]map[uint64]*itypes.ValidatorEpochIncome, len(validators))

valLen := len(validators)
// if valLen == 0 {
// return res, nil
// }

// read entire row if you require more than 1000 validators
var columnFilters []gcp_bigtable.Filter
Expand All @@ -1310,15 +1372,13 @@ func (bigtable *Bigtable) GetValidatorIncomeDetailsHistory(validators []uint64,
if len(columnFilters) == 0 { // special case to retrieve data for all validators
filter = gcp_bigtable.FamilyFilter(INCOME_DETAILS_COLUMN_FAMILY)
}

err := bigtable.tableBeaconchain.ReadRows(ctx, gcp_bigtable.NewRange(rangeStart, rangeEnd), func(r gcp_bigtable.Row) bool {
for _, ri := range r[INCOME_DETAILS_COLUMN_FAMILY] {
validator, err := strconv.ParseUint(strings.TrimPrefix(ri.Column, INCOME_DETAILS_COLUMN_FAMILY+":"), 10, 64)
if err != nil {
logger.Errorf("error parsing validator from column key %v: %v", ri.Column, err)
return false
}

keySplit := strings.Split(r.Key(), ":")

epoch, err := strconv.ParseUint(keySplit[3], 10, 64)
Expand Down
111 changes: 74 additions & 37 deletions db/bigtable_eth1.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,35 +318,6 @@ func (bigtable *Bigtable) GetLastBlockInDataTable() (int, error) {
return lastBlock, nil
}

func (bigtable *Bigtable) GetFullBlockFromDataTable(number uint64) (*types.Eth1Block, error) {

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

paddedNumber := reversedPaddedBlockNumber(number)

row, err := bigtable.tableData.ReadRow(ctx, fmt.Sprintf("%s:%s", bigtable.chainId, paddedNumber))

if err != nil {
return nil, err
}

if len(row[DEFAULT_FAMILY]) == 0 { // block not found
logger.Errorf("block %v not found in data table", number)
return nil, ErrBlockNotFound
}
blocks := make([]*types.Eth1Block, 0, 1)
rowHandler := getFullBlockHandler(&blocks)

rowHandler(row)

if err != nil {
return nil, err
}

return blocks[0], nil
}

func (bigtable *Bigtable) GetMostRecentBlockFromDataTable() (*types.Eth1BlockIndexed, error) {
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30))
defer cancel()
Expand Down Expand Up @@ -446,28 +417,94 @@ func getFullBlockHandler(blocks *[]*types.Eth1Block) func(gcp_bigtable.Row) bool

// GetFullBlockDescending gets blocks starting at block start
func (bigtable *Bigtable) GetFullBlockDescending(start, limit uint64) ([]*types.Eth1Block, error) {
startPadded := reversedPaddedBlockNumber(start)
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30))
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*60))
defer cancel()

prefix := fmt.Sprintf("%s:%s", bigtable.chainId, startPadded)
if start < 1 || limit < 1 || limit > start {
return nil, fmt.Errorf("invalid block range provided (start: %v, limit: %v)", start, limit)
}

rowRange := gcp_bigtable.InfiniteRange(prefix) //gcp_bigtable.PrefixRange("1:1000000000")
startPadded := reversedPaddedBlockNumber(start)
endPadded := reversedPaddedBlockNumber(start - limit)

startKey := fmt.Sprintf("%s:%s", bigtable.chainId, startPadded)
endKey := fmt.Sprintf("%s:%s", bigtable.chainId, endPadded)

rowRange := gcp_bigtable.NewRange(startKey, endKey) //gcp_bigtable.PrefixRange("1:1000000000")

blocks := make([]*types.Eth1Block, 0, 100)
// if limit >= start { // handle retrieval of the first blocks
// rowRange = gcp_bigtable.InfiniteRange(startKey)
// }

rowFilter := gcp_bigtable.RowFilter(gcp_bigtable.ColumnFilter("data"))

rowHandler := getFullBlockHandler(&blocks)
blocks := make([]*types.Eth1Block, 0, limit)

rowHandler := func(row gcp_bigtable.Row) bool {
block := types.Eth1Block{}
err := proto.Unmarshal(row[DEFAULT_FAMILY_BLOCKS][0].Value, &block)
if err != nil {
logger.Errorf("error could not unmarschal proto object, err: %v", err)
return false
}
blocks = append(blocks, &block)
return true
}

startTime := time.Now()
err := bigtable.tableData.ReadRows(ctx, rowRange, rowHandler, gcp_bigtable.LimitRows(int64(limit)))
err := bigtable.tableBlocks.ReadRows(ctx, rowRange, rowHandler, rowFilter, gcp_bigtable.LimitRows(int64(limit)))
if err != nil {
return nil, err
}

logger.Infof("finished getting blocks from table data: %v", time.Since(startTime))
logger.Infof("finished getting blocks from table blocks: %v", time.Since(startTime))
return blocks, nil
}

// GetFullBlockDescending gets blocks starting at block start
func (bigtable *Bigtable) GetFullBlocksDescending(stream chan<- *types.Eth1Block, high, low uint64) error {
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*180))
defer cancel()

if high < 1 || low < 1 || high < low {
return fmt.Errorf("invalid block range provided (start: %v, limit: %v)", high, low)
}

highKey := fmt.Sprintf("%s:%s", bigtable.chainId, reversedPaddedBlockNumber(high))
lowKey := fmt.Sprintf("%s:%s", bigtable.chainId, reversedPaddedBlockNumber(low))

// the low key will have a higher reverse padded number
rowRange := gcp_bigtable.NewRange(highKey, lowKey) //gcp_bigtable.PrefixRange("1:1000000000")

// if limit >= start { // handle retrieval of the first blocks
// rowRange = gcp_bigtable.InfiniteRange(startKey)
// }

// logger.Infof("querying from (excl) %v to (incl) %v", low, high)

rowFilter := gcp_bigtable.RowFilter(gcp_bigtable.ColumnFilter("data"))

rowHandler := func(row gcp_bigtable.Row) bool {
block := types.Eth1Block{}
err := proto.Unmarshal(row[DEFAULT_FAMILY_BLOCKS][0].Value, &block)
if err != nil {
logger.Errorf("error could not unmarschal proto object, err: %v", err)
return false
}
stream <- &block
return true
}

// startTime := time.Now()
err := bigtable.tableBlocks.ReadRows(ctx, rowRange, rowHandler, rowFilter)
if err != nil {
return err
}

// logger.Infof("finished getting blocks from table blocks: %v", time.Since(startTime))
return nil
}

func (bigtable *Bigtable) GetBlocksIndexedMultiple(blockNumbers []uint64, limit uint64) ([]*types.Eth1BlockIndexed, error) {
rowList := gcp_bigtable.RowList{}
for _, block := range blockNumbers {
Expand Down
13 changes: 13 additions & 0 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2316,3 +2316,16 @@ func updateValidatorPerformance(tx *sqlx.Tx) error {

return tx.Commit()
}

func GetBlockNumber(slot uint64) (block uint64, err error) {
err = ReaderDb.Get(&block, `SELECT exec_block_number FROM blocks where slot = $1`, slot)
return
}

func SaveChartSeriesPoint(date time.Time, indicator string, value any) error {
_, err := WriterDb.Exec(`INSERT INTO chart_series (time, indicator, value) VALUES($1, $2, $3) ON CONFLICT (time, indicator) DO UPDATE SET value = EXCLUDED.value`, date, indicator, value)
if err != nil {
return fmt.Errorf("error calculating NON_FAILED_TX_GAS_USAGE chart_series: %w", err)
}
return err
}
Loading

0 comments on commit fe5e829

Please sign in to comment.