Skip to content

Commit

Permalink
(BIDS-2481) Merge branch 'master' into BIDS-2481/All_datatables_shoul…
Browse files Browse the repository at this point in the history
…d_be_debounced
  • Loading branch information
Eisei24 committed Oct 2, 2023
2 parents e4b66ca + 3a9e34e commit a53bb3c
Show file tree
Hide file tree
Showing 69 changed files with 1,810 additions and 1,968 deletions.
2 changes: 1 addition & 1 deletion cmd/eth1indexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

"github.com/coocood/freecache"
"github.com/ethereum/go-ethereum/common"
_ "github.com/jackc/pgx/v4/stdlib"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/shopspring/decimal"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
Expand Down
2 changes: 1 addition & 1 deletion cmd/ethstore-exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"strconv"
"strings"

_ "github.com/jackc/pgx/v4/stdlib"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/sirupsen/logrus"
)

Expand Down
8 changes: 3 additions & 5 deletions cmd/explorer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (

"github.com/gorilla/csrf"
"github.com/gorilla/mux"
_ "github.com/jackc/pgx/v4/stdlib"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/phyber/negroni-gzip/gzip"
"github.com/stripe/stripe-go/v72"
"github.com/urfave/negroni"
Expand Down Expand Up @@ -458,7 +458,7 @@ func main() {
router.HandleFunc("/validator/{pubkey}/deposits", handlers.ValidatorDeposits).Methods("GET")
router.HandleFunc("/validator/{index}/slashings", handlers.ValidatorSlashings).Methods("GET")
router.HandleFunc("/validator/{index}/effectiveness", handlers.ValidatorAttestationInclusionEffectiveness).Methods("GET")
router.HandleFunc("/validator/{pubkey}/save", handlers.ValidatorSave).Methods("POST")
router.HandleFunc("/validator/{pubkey}/name", handlers.SaveValidatorName).Methods("POST")
router.HandleFunc("/watchlist/add", handlers.UsersModalAddValidator).Methods("POST")
router.HandleFunc("/validator/{pubkey}/remove", handlers.UserValidatorWatchlistRemove).Methods("POST")
router.HandleFunc("/validator/{index}/stats", handlers.ValidatorStatsTable).Methods("GET")
Expand Down Expand Up @@ -497,7 +497,6 @@ func main() {
router.HandleFunc("/search/{type}/{search}", handlers.SearchAhead).Methods("GET")
router.HandleFunc("/imprint", handlers.Imprint).Methods("GET")
router.HandleFunc("/mobile", handlers.MobilePage).Methods("GET")
router.HandleFunc("/mobile", handlers.MobilePagePost).Methods("POST")
router.HandleFunc("/tools/unitConverter", handlers.UnitConverter).Methods("GET")
router.HandleFunc("/tools/broadcast", handlers.Broadcast).Methods("GET")
router.HandleFunc("/tools/broadcast", handlers.BroadcastPost).Methods("POST")
Expand Down Expand Up @@ -555,7 +554,6 @@ func main() {

oauthRouter := router.PathPrefix("/user").Subrouter()
oauthRouter.HandleFunc("/authorize", handlers.UserAuthorizeConfirm).Methods("GET")
oauthRouter.HandleFunc("/cancel", handlers.UserAuthorizationCancel).Methods("GET")
oauthRouter.Use(csrfHandler)

authRouter := router.PathPrefix("/user").Subrouter()
Expand Down Expand Up @@ -666,7 +664,7 @@ func main() {

if utils.Config.Metrics.Enabled {
go func(addr string) {
logrus.Infof("Serving metrics on %v", addr)
logrus.Infof("serving metrics on %v", addr)
if err := metrics.Serve(addr); err != nil {
logrus.WithError(err).Fatal("Error serving metrics")
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/frontend-data-updater/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"fmt"
"math/big"

_ "github.com/jackc/pgx/v4/stdlib"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/sirupsen/logrus"

"net/http"
Expand Down
9 changes: 8 additions & 1 deletion cmd/migrations/bigtable/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func main() {

// export epoch data to bigtable
g := new(errgroup.Group)
g.SetLimit(5)
g.SetLimit(6)
g.Go(func() error {
err = db.BigtableClient.SaveValidatorBalances(epoch, data.Validators)
if err != nil {
Expand Down Expand Up @@ -155,6 +155,13 @@ func main() {
}
return nil
})
g.Go(func() error {
err = db.BigtableClient.MigrateIncomeDataV1V2Schema(epoch)
if err != nil {
return fmt.Errorf("error exporting sync committee duties to bigtable: %v", err)
}
return nil
})

err = g.Wait()
if err != nil {
Expand Down
57 changes: 35 additions & 22 deletions cmd/misc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"time"

"github.com/coocood/freecache"
_ "github.com/jackc/pgx/v4/stdlib"
_ "github.com/jackc/pgx/v5/stdlib"
utilMath "github.com/protolambda/zrnt/eth2/util/math"
"golang.org/x/sync/errgroup"

Expand All @@ -40,6 +40,7 @@ var opts = struct {
BatchSize uint64
DataConcurrency uint64
Transformers string
Table string
Family string
Key string
DryRun bool
Expand All @@ -55,6 +56,7 @@ func main() {
flag.Uint64Var(&opts.EndDay, "day-end", 0, "end day to debug")
flag.Uint64Var(&opts.Validator, "validator", 0, "validator to check for")
flag.Int64Var(&opts.TargetVersion, "target-version", -2, "Db migration target version, use -2 to apply up to the latest version, -1 to apply only the next version or the specific versions")
flag.StringVar(&opts.Table, "table", "", "big table table")
flag.StringVar(&opts.Family, "family", "", "big table family")
flag.StringVar(&opts.Key, "key", "", "big table key")
flag.Uint64Var(&opts.StartBlock, "blocks.start", 0, "Block to start indexing")
Expand Down Expand Up @@ -209,7 +211,7 @@ func main() {
case "debug-rewards":
CompareRewards(opts.StartDay, opts.EndDay, opts.Validator, bt)
case "clear-bigtable":
ClearBigtable(opts.Family, opts.Key, opts.DryRun, bt)
ClearBigtable(opts.Table, opts.Family, opts.Key, opts.DryRun, bt)
case "index-old-eth1-blocks":
IndexOldEth1Blocks(opts.StartBlock, opts.EndBlock, opts.BatchSize, opts.DataConcurrency, opts.Transformers, bt, erigonClient)
case "update-aggregation-bits":
Expand All @@ -221,6 +223,7 @@ func main() {
case "migrate-last-attestation-slot-bigtable":
migrateLastAttestationSlotToBigtable()
case "export-genesis-validators":
logrus.Infof("retrieving genesis validator state")
validators, err := rpcClient.GetValidatorState(0)
if err != nil {
logrus.Fatalf("error retrieving genesis validator state")
Expand All @@ -240,7 +243,7 @@ func main() {
ActivationEpoch: uint64(validator.Validator.ActivationEpoch),
ExitEpoch: uint64(validator.Validator.ExitEpoch),
WithdrawableEpoch: uint64(validator.Validator.WithdrawableEpoch),
Status: validator.Status,
Status: "active_online",
})
}

Expand Down Expand Up @@ -282,20 +285,19 @@ func main() {
}
}

_, err = db.WriterDb.Exec(`
INSERT INTO blocks_deposits (block_slot, block_index, publickey, withdrawalcredentials, amount, signature, valid_signature)
SELECT
0 as block_slot,
v.validatorindex as block_index,
v.pubkey as publickey,
v.withdrawalcredentials,
32*1e9 as amount,
'\x'::bytea as signature,
true
FROM validators v ON CONFLICT DO NOTHING`)
if err != nil {
logrus.Fatal(err)
for _, validator := range validators.Data {
logrus.Infof("exporting deposit data for genesis validator %v", validator.Index)
_, err = db.WriterDb.Exec(`INSERT INTO blocks_deposits (block_slot, block_root, block_index, publickey, withdrawalcredentials, amount, signature)
VALUES (0, '\x01', $1, $2, $3, $4, $5) ON CONFLICT DO NOTHING`,
validator.Index, utils.MustParseHex(validator.Validator.Pubkey), utils.MustParseHex(validator.Validator.WithdrawalCredentials), validator.Balance, []byte{0x0},
)
if err != nil {
logrus.Errorf("error exporting genesis-deposits: %v", err)
time.Sleep(time.Second * 60)
continue
}
}

_, err = db.WriterDb.Exec(`
INSERT INTO blocks (epoch, slot, blockroot, parentroot, stateroot, signature, syncaggregate_participation, proposerslashingscount, attesterslashingscount, attestationscount, depositscount, withdrawalcount, voluntaryexitscount, proposer, status, exec_transactions_count, eth1data_depositcount)
VALUES (0, 0, '\x'::bytea, '\x'::bytea, '\x'::bytea, '\x'::bytea, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
Expand Down Expand Up @@ -573,7 +575,7 @@ func CompareRewards(dayStart uint64, dayEnd uint64, validator uint64, bt *db.Big

}

func ClearBigtable(family string, key string, dryRun bool, bt *db.Bigtable) {
func ClearBigtable(table string, family string, key string, dryRun bool, bt *db.Bigtable) {

if !dryRun {
confirmation := utils.CmdPrompt(fmt.Sprintf("Are you sure you want to delete all big table entries starting with [%v] for family [%v]?", key, family))
Expand All @@ -582,15 +584,26 @@ func ClearBigtable(family string, key string, dryRun bool, bt *db.Bigtable) {
return
}
}
deletedKeys, err := bt.ClearByPrefix(family, key, dryRun)

if !strings.Contains(key, ":") {
logrus.Fatalf("provided invalid prefix: %s", key)
}

// admin, err := gcp_bigtable.NewAdminClient(context.Background(), utils.Config.Bigtable.Project, utils.Config.Bigtable.Instance)
// if err != nil {
// logrus.Fatal(err)
// }

// err = admin.DropRowRange(context.Background(), table, key)
// if err != nil {
// logrus.Fatal(err)
// }
err := bt.ClearByPrefix(table, family, key, dryRun)

if err != nil {
logrus.Fatalf("error deleting from bigtable: %v", err)
} else if dryRun {
logrus.Infof("the following keys would be deleted: %v", deletedKeys)
} else {
logrus.Infof("%v keys have been deleted", len(deletedKeys))
}
logrus.Info("delete completed")
}

// Let's find blocks that are missing in bt and index them.
Expand Down
2 changes: 1 addition & 1 deletion cmd/node-jobs-processor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"fmt"
"time"

_ "github.com/jackc/pgx/v4/stdlib"
_ "github.com/jackc/pgx/v5/stdlib"

"github.com/sirupsen/logrus"
)
Expand Down
2 changes: 1 addition & 1 deletion cmd/notification-collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
_ "eth2-exporter/docs"
_ "net/http/pprof"

_ "github.com/jackc/pgx/v4/stdlib"
_ "github.com/jackc/pgx/v5/stdlib"
)

func main() {
Expand Down
2 changes: 1 addition & 1 deletion cmd/notification-sender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
_ "eth2-exporter/docs"
_ "net/http/pprof"

_ "github.com/jackc/pgx/v4/stdlib"
_ "github.com/jackc/pgx/v5/stdlib"
)

func main() {
Expand Down
2 changes: 1 addition & 1 deletion cmd/rewards-exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

eth_rewards "github.com/gobitfly/eth-rewards"
"github.com/gobitfly/eth-rewards/beacon"
_ "github.com/jackc/pgx/v4/stdlib"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)
Expand Down
2 changes: 1 addition & 1 deletion cmd/signatures/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"net/http"
"time"

_ "github.com/jackc/pgx/v4/stdlib"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/sirupsen/logrus"

_ "net/http/pprof"
Expand Down
65 changes: 25 additions & 40 deletions cmd/statistics/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,18 @@ import (
"strings"
"time"

_ "github.com/jackc/pgx/v4/stdlib"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/sirupsen/logrus"
)

type options struct {
configPath string
statisticsDayToExport int64
statisticsDaysToExport string
statisticsValidatorToggle bool
statisticsResetColumns string
statisticsChartToggle bool
statisticsGraffitiToggle bool
concurrencyTotal uint64
concurrencyCl uint64
concurrencyFailedAttestations uint64
configPath string
statisticsDayToExport int64
statisticsDaysToExport string
statisticsValidatorToggle bool
statisticsChartToggle bool
statisticsGraffitiToggle bool
resetStatus bool
}

var opt = &options{}
Expand All @@ -38,12 +35,9 @@ func main() {
flag.Int64Var(&opt.statisticsDayToExport, "statistics.day", -1, "Day to export statistics (will export the day independent if it has been already exported or not")
flag.StringVar(&opt.statisticsDaysToExport, "statistics.days", "", "Days to export statistics (will export the day independent if it has been already exported or not")
flag.BoolVar(&opt.statisticsValidatorToggle, "validators.enabled", false, "Toggle exporting validator statistics")
flag.StringVar(&opt.statisticsResetColumns, "validators.reset", "", "validator_stats_status columns to reset. Comma separated. Use 'all' for complete resync.")
flag.BoolVar(&opt.statisticsChartToggle, "charts.enabled", false, "Toggle exporting chart series")
flag.BoolVar(&opt.statisticsGraffitiToggle, "graffiti.enabled", false, "Toggle exporting graffiti statistics")
flag.Uint64Var(&opt.concurrencyTotal, "concurrency.total", 10, "Concurrency to use when writing total rewards/performance postgres queries")
flag.Uint64Var(&opt.concurrencyCl, "concurrency.cl", 50, "Concurrency to use when writing cl postgres queries")
flag.Uint64Var(&opt.concurrencyFailedAttestations, "concurrency.fa", 10, "Concurrency to use when fetching failed attestaations from bt")
flag.BoolVar(&opt.resetStatus, "validators.reset", false, "Export stats independet if they have already been exported previously")

versionFlag := flag.Bool("version", false, "Show version and exit")
flag.Parse()
Expand Down Expand Up @@ -142,9 +136,11 @@ func main() {
logrus.Infof("exporting validator statistics for days %v-%v", firstDay, lastDay)
for d := firstDay; d <= lastDay; d++ {

clearStatsStatusTable(d, opt.statisticsResetColumns)
if opt.resetStatus {
clearStatsStatusTable(d)
}

err = db.WriteValidatorStatisticsForDay(uint64(d), opt.concurrencyTotal, opt.concurrencyCl, opt.concurrencyFailedAttestations)
err = db.WriteValidatorStatisticsForDay(uint64(d))
if err != nil {
utils.LogError(err, fmt.Errorf("error exporting stats for day %v", d), 0)
break
Expand Down Expand Up @@ -182,9 +178,11 @@ func main() {
} else if opt.statisticsDayToExport >= 0 {

if opt.statisticsValidatorToggle {
clearStatsStatusTable(uint64(opt.statisticsDayToExport), opt.statisticsResetColumns)
if opt.resetStatus {
clearStatsStatusTable(uint64(opt.statisticsDayToExport))
}

err = db.WriteValidatorStatisticsForDay(uint64(opt.statisticsDayToExport), opt.concurrencyTotal, opt.concurrencyCl, opt.concurrencyFailedAttestations)
err = db.WriteValidatorStatisticsForDay(uint64(opt.statisticsDayToExport))
if err != nil {
utils.LogError(err, fmt.Errorf("error exporting stats for day %v", opt.statisticsDayToExport), 0)
}
Expand All @@ -211,14 +209,14 @@ func main() {
return
}

go statisticsLoop(opt.concurrencyTotal, opt.concurrencyCl, opt.concurrencyFailedAttestations)
go statisticsLoop()

utils.WaitForCtrlC()

logrus.Println("exiting...")
}

func statisticsLoop(concurrencyTotal uint64, concurrencyCl uint64, concurrencyFailedAttestations uint64) {
func statisticsLoop() {
for {

latestEpoch := services.LatestFinalizedEpoch()
Expand Down Expand Up @@ -254,7 +252,7 @@ func statisticsLoop(concurrencyTotal uint64, concurrencyCl uint64, concurrencyFa
}
if lastExportedDayValidator <= previousDay || lastExportedDayValidator == 0 {
for day := lastExportedDayValidator; day <= previousDay; day++ {
err := db.WriteValidatorStatisticsForDay(day, concurrencyTotal, concurrencyCl, concurrencyFailedAttestations)
err := db.WriteValidatorStatisticsForDay(day)
if err != nil {
utils.LogError(err, fmt.Errorf("error exporting stats for day %v", day), 0)
break
Expand Down Expand Up @@ -305,23 +303,10 @@ func statisticsLoop(concurrencyTotal uint64, concurrencyCl uint64, concurrencyFa
}
}

func clearStatsStatusTable(day uint64, columns string) {
if columns == "all" {
logrus.Infof("Delete validator_stats_status for day %v", day)
_, err := db.WriterDb.Exec("DELETE FROM validator_stats_status WHERE day = $1", day)
if err != nil {
logrus.Fatalf("error resetting status for day %v: %v", day, err)
}
} else if len(columns) > 0 {
logrus.Infof("Resetting columns %v of validator_stats_status for day %v ", columns, day)
cols := strings.Join(strings.Split(columns, ","), " = false,")
_, err := db.WriterDb.Exec(fmt.Sprintf(`
UPDATE validator_stats_status
SET %v = false
WHERE day = $1
`, cols), day)
if err != nil {
logrus.Fatalf("error resetting status for day %v: %v", day, err)
}
func clearStatsStatusTable(day uint64) {
logrus.Infof("deleting validator_stats_status for day %v", day)
_, err := db.WriterDb.Exec("DELETE FROM validator_stats_status WHERE day = $1", day)
if err != nil {
logrus.Fatalf("error resetting status for day %v: %v", day, err)
}
}
Loading

0 comments on commit a53bb3c

Please sign in to comment.