From 0ca17834cc8b6bc3e8b562493d3b656dfec19126 Mon Sep 17 00:00:00 2001 From: peter <1674920+peterbitfly@users.noreply.github.com> Date: Mon, 12 Feb 2024 10:35:41 +0100 Subject: [PATCH 1/4] (NOBIDS) add rewards export to slot exporter --- exporter/exporter.go | 2 +- exporter/slot_exporter.go | 109 ++++++++++++++++++++++++++++++-------- 2 files changed, 89 insertions(+), 22 deletions(-) diff --git a/exporter/exporter.go b/exporter/exporter.go index 29cc1787f1..a5336e055f 100644 --- a/exporter/exporter.go +++ b/exporter/exporter.go @@ -52,7 +52,7 @@ func Start(client rpc.Client) { minWaitTimeBetweenRuns := time.Second * time.Duration(utils.Config.Chain.ClConfig.SecondsPerSlot) for { start := time.Now() - err := RunSlotExporter(client, firstRun) + err := RunSlotExporter(firstRun) if err != nil { logrus.Errorf("error during slot export run: %v", err) } else if err == nil && firstRun { diff --git a/exporter/slot_exporter.go b/exporter/slot_exporter.go index 3dd22e63c6..bcdc5c3052 100644 --- a/exporter/slot_exporter.go +++ b/exporter/slot_exporter.go @@ -5,9 +5,11 @@ import ( "database/sql" "eth2-exporter/db" "eth2-exporter/rpc" + "eth2-exporter/services" "eth2-exporter/types" "eth2-exporter/utils" "fmt" + "math/big" "strconv" "strings" "time" @@ -15,11 +17,32 @@ import ( "github.com/jmoiron/sqlx" "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" + + eth_rewards "github.com/gobitfly/eth-rewards" + "github.com/gobitfly/eth-rewards/beacon" ) -func RunSlotExporter(client rpc.Client, firstRun bool) error { +func RunSlotExporter(firstRun bool) error { + + var err error + var clClient rpc.Client + var clbClient *beacon.Client + + chainID := new(big.Int).SetUint64(utils.Config.Chain.ClConfig.DepositChainID) + if utils.Config.Indexer.Node.Type == "lighthouse" { + clClientUrl := fmt.Sprintf("http://%s:%s", utils.Config.Indexer.Node.Host, utils.Config.Indexer.Node.Port) + clClient, err = rpc.NewLighthouseClient(clClientUrl, chainID) + if err != nil { + utils.LogFatal(err, "new explorer lighthouse client error", 0) + } + + clbClient = beacon.NewClient(clClientUrl, time.Minute*5) + } else { + logrus.Fatalf("invalid note type %v specified. supported node types are prysm and lighthouse", utils.Config.Indexer.Node.Type) + } + // get the current chain head - head, err := client.GetChainHead() + head, err := clClient.GetChainHead() if err != nil { return fmt.Errorf("error retrieving chain head: %w", err) @@ -41,7 +64,7 @@ func RunSlotExporter(client rpc.Client, firstRun bool) error { if len(dbSlots) > 0 { if dbSlots[0] != 0 { logger.Infof("exporting genesis slot as it is missing in the database") - err := ExportSlot(client, 0, utils.EpochOfSlot(0) == head.HeadEpoch, tx) + err := ExportSlot(clClient, 0, utils.EpochOfSlot(0) == head.HeadEpoch, tx) if err != nil { return fmt.Errorf("error exporting slot %v: %w", 0, err) } @@ -61,7 +84,7 @@ func RunSlotExporter(client rpc.Client, firstRun bool) error { if previousSlot != currentSlot-1 { logger.Infof("slots between %v and %v are missing, exporting them", previousSlot, currentSlot) for slot := previousSlot + 1; slot <= currentSlot-1; slot++ { - err := ExportSlot(client, slot, false, tx) + err := ExportSlot(clClient, slot, false, tx) if err != nil { return fmt.Errorf("error exporting slot %v: %w", slot, err) @@ -79,7 +102,7 @@ func RunSlotExporter(client rpc.Client, firstRun bool) error { if err != nil { if err == sql.ErrNoRows { logger.Infof("db is empty, export genesis slot") - err := ExportSlot(client, 0, utils.EpochOfSlot(0) == head.HeadEpoch, tx) + err := ExportSlot(clClient, 0, utils.EpochOfSlot(0) == head.HeadEpoch, tx) if err != nil { return fmt.Errorf("error exporting slot %v: %w", 0, err) } @@ -93,7 +116,7 @@ func RunSlotExporter(client rpc.Client, firstRun bool) error { if lastDbSlot != head.HeadSlot { slotsExported := 0 for slot := lastDbSlot + 1; slot <= head.HeadSlot; slot++ { // export any new slots - err := ExportSlot(client, slot, utils.EpochOfSlot(slot) == head.HeadEpoch, tx) + err := ExportSlot(clClient, slot, utils.EpochOfSlot(slot) == head.HeadEpoch, tx) if err != nil { return fmt.Errorf("error exporting slot %v: %w", slot, err) } @@ -120,7 +143,7 @@ func RunSlotExporter(client rpc.Client, firstRun bool) error { return fmt.Errorf("error retrieving all non finalized slots from the db: %w", err) } for _, dbSlot := range dbNonFinalSlots { - header, err := client.GetBlockHeader(dbSlot.Slot) + header, err := clClient.GetBlockHeader(dbSlot.Slot) if err != nil { return fmt.Errorf("error retrieving block root for slot %v: %w", dbSlot.Slot, err) @@ -158,7 +181,7 @@ func RunSlotExporter(client rpc.Client, firstRun bool) error { if err != nil { return fmt.Errorf("error setting block %v as finalized (orphaned): %w", dbSlot.Slot, err) } - err = ExportSlot(client, dbSlot.Slot, utils.EpochOfSlot(dbSlot.Slot) == head.HeadEpoch, tx) + err = ExportSlot(clClient, dbSlot.Slot, utils.EpochOfSlot(dbSlot.Slot) == head.HeadEpoch, tx) if err != nil { return fmt.Errorf("error exporting slot %v: %w", dbSlot.Slot, err) } @@ -167,34 +190,78 @@ func RunSlotExporter(client rpc.Client, firstRun bool) error { // epoch transition slot has finalized, update epoch status if dbSlot.Slot%utils.Config.Chain.ClConfig.SlotsPerEpoch == 0 && dbSlot.Slot > utils.Config.Chain.ClConfig.SlotsPerEpoch-1 { epoch := utils.EpochOfSlot(dbSlot.Slot) - epochParticipationStats, err := client.GetValidatorParticipation(epoch - 1) - if err != nil { - return fmt.Errorf("error retrieving epoch participation statistics for epoch %v: %w", epoch, err) - } else { - logger.Printf("updating epoch %v with participation rate %v", epoch, epochParticipationStats.GlobalParticipationRate) - err := db.UpdateEpochStatus(epochParticipationStats, tx) + // a new epoch has been finalized, run all related tasks + // update epoch status + // export epoch rewards + wg := &errgroup.Group{} + + wg.Go(func() error { + epochParticipationStats, err := clClient.GetValidatorParticipation(epoch - 1) + if err != nil { + return fmt.Errorf("error retrieving epoch participation statistics for epoch %v: %w", epoch, err) + } else { + logger.Printf("updating epoch %v with participation rate %v", epoch, epochParticipationStats.GlobalParticipationRate) + err := db.UpdateEpochStatus(epochParticipationStats, tx) + + if err != nil { + return err + } + + logger.Infof("exporting validation queue") + queue, err := clClient.GetValidatorQueue() + if err != nil { + return fmt.Errorf("error retrieving validator queue data: %w", err) + } + + err = db.SaveValidatorQueue(queue, tx) + if err != nil { + return fmt.Errorf("error saving validator queue data: %w", err) + } + } + return nil + }) + + wg.Go(func() error { + start := time.Now() + + logrus.Infof("retrieving rewards details for epoch %d", epoch) + rewards, err := eth_rewards.GetRewardsForEpoch(epoch, clbClient, utils.Config.Eth1ErigonEndpoint) if err != nil { - return err + return fmt.Errorf("error retrieving reward details for epoch %v: %v", epoch, err) + } else { + logrus.Infof("retrieved %v reward details for epoch %v in %v", len(rewards), epoch, time.Since(start)) } - logger.Infof("exporting validation queue") - queue, err := client.GetValidatorQueue() + logrus.Infof("saving reward details for epoch %d", epoch) + err = db.BigtableClient.SaveValidatorIncomeDetails(uint64(epoch), rewards) if err != nil { - return fmt.Errorf("error retrieving validator queue data: %w", err) + return fmt.Errorf("error saving reward details to bigtable: %v", err) } - err = db.SaveValidatorQueue(queue, tx) + _, err = db.WriterDb.Exec("UPDATE epochs SET rewards_exported = true WHERE epoch = $1", epoch) + if err != nil { - return fmt.Errorf("error saving validator queue data: %w", err) + return fmt.Errorf("error marking rewards_exported as true for epoch %v: %v", epoch, err) } + + logrus.Infof("completed exporting reward details for epoch %d", epoch) + + services.ReportStatus("rewardsExporter", "Running", nil) + + return nil + }) + + err := wg.Wait() + if err != nil { + return err } } } else { // check if a late slot has been proposed in the meantime if len(dbSlot.BlockRoot) < 32 && header != nil { // we have no slot in the db, but the node has a slot, export it logger.Infof("exporting new slot %v", dbSlot.Slot) - err := ExportSlot(client, dbSlot.Slot, utils.EpochOfSlot(dbSlot.Slot) == head.HeadEpoch, tx) + err := ExportSlot(clClient, dbSlot.Slot, utils.EpochOfSlot(dbSlot.Slot) == head.HeadEpoch, tx) if err != nil { return fmt.Errorf("error exporting slot %v: %w", dbSlot.Slot, err) } From d75c5c35c48eb138f497f6ab19a2e497bcacf489 Mon Sep 17 00:00:00 2001 From: peter <1674920+peterbitfly@users.noreply.github.com> Date: Mon, 12 Feb 2024 10:50:31 +0100 Subject: [PATCH 2/4] (NOBIDS) adjust rewards epoch number --- exporter/slot_exporter.go | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/exporter/slot_exporter.go b/exporter/slot_exporter.go index bcdc5c3052..5a3352e26d 100644 --- a/exporter/slot_exporter.go +++ b/exporter/slot_exporter.go @@ -223,29 +223,34 @@ func RunSlotExporter(firstRun bool) error { }) wg.Go(func() error { + if epoch == 0 { + return nil + } + + rewardsEpoch := epoch - 1 start := time.Now() - logrus.Infof("retrieving rewards details for epoch %d", epoch) + logrus.Infof("retrieving rewards details for epoch %d", rewardsEpoch) rewards, err := eth_rewards.GetRewardsForEpoch(epoch, clbClient, utils.Config.Eth1ErigonEndpoint) if err != nil { - return fmt.Errorf("error retrieving reward details for epoch %v: %v", epoch, err) + return fmt.Errorf("error retrieving reward details for epoch %v: %v", rewardsEpoch, err) } else { - logrus.Infof("retrieved %v reward details for epoch %v in %v", len(rewards), epoch, time.Since(start)) + logrus.Infof("retrieved %v reward details for epoch %v in %v", len(rewards), rewardsEpoch, time.Since(start)) } - logrus.Infof("saving reward details for epoch %d", epoch) - err = db.BigtableClient.SaveValidatorIncomeDetails(uint64(epoch), rewards) + logrus.Infof("saving reward details for epoch %d", rewardsEpoch) + err = db.BigtableClient.SaveValidatorIncomeDetails(uint64(rewardsEpoch), rewards) if err != nil { return fmt.Errorf("error saving reward details to bigtable: %v", err) } - _, err = db.WriterDb.Exec("UPDATE epochs SET rewards_exported = true WHERE epoch = $1", epoch) + _, err = tx.Exec("UPDATE epochs SET rewards_exported = true WHERE epoch = $1", rewardsEpoch) if err != nil { - return fmt.Errorf("error marking rewards_exported as true for epoch %v: %v", epoch, err) + return fmt.Errorf("error marking rewards_exported as true for epoch %v: %v", rewardsEpoch, err) } - logrus.Infof("completed exporting reward details for epoch %d", epoch) + logrus.Infof("completed exporting reward details for epoch %d", rewardsEpoch) services.ReportStatus("rewardsExporter", "Running", nil) From c59f10412ece15f868e1b7de25724ec4b9fb27cb Mon Sep 17 00:00:00 2001 From: peter <1674920+peterbitfly@users.noreply.github.com> Date: Mon, 12 Feb 2024 11:48:54 +0100 Subject: [PATCH 3/4] (NOBIDS) refractor methods --- exporter/slot_exporter.go | 120 ++++++++++++++++++++------------------ 1 file changed, 63 insertions(+), 57 deletions(-) diff --git a/exporter/slot_exporter.go b/exporter/slot_exporter.go index 5a3352e26d..783220d15f 100644 --- a/exporter/slot_exporter.go +++ b/exporter/slot_exporter.go @@ -192,69 +192,14 @@ func RunSlotExporter(firstRun bool) error { epoch := utils.EpochOfSlot(dbSlot.Slot) // a new epoch has been finalized, run all related tasks - // update epoch status - // export epoch rewards wg := &errgroup.Group{} wg.Go(func() error { - epochParticipationStats, err := clClient.GetValidatorParticipation(epoch - 1) - if err != nil { - return fmt.Errorf("error retrieving epoch participation statistics for epoch %v: %w", epoch, err) - } else { - logger.Printf("updating epoch %v with participation rate %v", epoch, epochParticipationStats.GlobalParticipationRate) - err := db.UpdateEpochStatus(epochParticipationStats, tx) - - if err != nil { - return err - } - - logger.Infof("exporting validation queue") - queue, err := clClient.GetValidatorQueue() - if err != nil { - return fmt.Errorf("error retrieving validator queue data: %w", err) - } - - err = db.SaveValidatorQueue(queue, tx) - if err != nil { - return fmt.Errorf("error saving validator queue data: %w", err) - } - } - return nil + return updateEpochStatusAndValidatorQueue(clClient, epoch, tx) }) wg.Go(func() error { - if epoch == 0 { - return nil - } - - rewardsEpoch := epoch - 1 - start := time.Now() - - logrus.Infof("retrieving rewards details for epoch %d", rewardsEpoch) - rewards, err := eth_rewards.GetRewardsForEpoch(epoch, clbClient, utils.Config.Eth1ErigonEndpoint) - if err != nil { - return fmt.Errorf("error retrieving reward details for epoch %v: %v", rewardsEpoch, err) - } else { - logrus.Infof("retrieved %v reward details for epoch %v in %v", len(rewards), rewardsEpoch, time.Since(start)) - } - - logrus.Infof("saving reward details for epoch %d", rewardsEpoch) - err = db.BigtableClient.SaveValidatorIncomeDetails(uint64(rewardsEpoch), rewards) - if err != nil { - return fmt.Errorf("error saving reward details to bigtable: %v", err) - } - - _, err = tx.Exec("UPDATE epochs SET rewards_exported = true WHERE epoch = $1", rewardsEpoch) - - if err != nil { - return fmt.Errorf("error marking rewards_exported as true for epoch %v: %v", rewardsEpoch, err) - } - - logrus.Infof("completed exporting reward details for epoch %d", rewardsEpoch) - - services.ReportStatus("rewardsExporter", "Running", nil) - - return nil + return saveEpochRewards(epoch, clbClient, tx) }) err := wg.Wait() @@ -283,6 +228,67 @@ func RunSlotExporter(firstRun bool) error { } +func saveEpochRewards(epoch uint64, clbClient *beacon.Client, tx *sqlx.Tx) error { + if epoch == 0 { + return nil + } + + rewardsEpoch := epoch - 1 + start := time.Now() + + logrus.Infof("retrieving rewards details for epoch %d", rewardsEpoch) + rewards, err := eth_rewards.GetRewardsForEpoch(epoch, clbClient, utils.Config.Eth1ErigonEndpoint) + if err != nil { + return fmt.Errorf("error retrieving reward details for epoch %v: %v", rewardsEpoch, err) + } else { + logrus.Infof("retrieved %v reward details for epoch %v in %v", len(rewards), rewardsEpoch, time.Since(start)) + } + + logrus.Infof("saving reward details for epoch %d", rewardsEpoch) + err = db.BigtableClient.SaveValidatorIncomeDetails(uint64(rewardsEpoch), rewards) + if err != nil { + return fmt.Errorf("error saving reward details to bigtable: %v", err) + } + + _, err = tx.Exec("UPDATE epochs SET rewards_exported = true WHERE epoch = $1", rewardsEpoch) + + if err != nil { + return fmt.Errorf("error marking rewards_exported as true for epoch %v: %v", rewardsEpoch, err) + } + + logrus.Infof("completed exporting reward details for epoch %d", rewardsEpoch) + + services.ReportStatus("rewardsExporter", "Running", nil) + + return nil +} + +func updateEpochStatusAndValidatorQueue(clClient rpc.Client, epoch uint64, tx *sqlx.Tx) error { + epochParticipationStats, err := clClient.GetValidatorParticipation(epoch - 1) + if err != nil { + return fmt.Errorf("error retrieving epoch participation statistics for epoch %v: %w", epoch, err) + } else { + logger.Printf("updating epoch %v with participation rate %v", epoch, epochParticipationStats.GlobalParticipationRate) + err := db.UpdateEpochStatus(epochParticipationStats, tx) + + if err != nil { + return err + } + + logger.Infof("exporting validation queue") + queue, err := clClient.GetValidatorQueue() + if err != nil { + return fmt.Errorf("error retrieving validator queue data: %w", err) + } + + err = db.SaveValidatorQueue(queue, tx) + if err != nil { + return fmt.Errorf("error saving validator queue data: %w", err) + } + } + return nil +} + func ExportSlot(client rpc.Client, slot uint64, isHeadEpoch bool, tx *sqlx.Tx) error { isFirstSlotOfEpoch := slot%utils.Config.Chain.ClConfig.SlotsPerEpoch == 0 From f86d960e3efaa9f0fc1743f239a869de8782c17f Mon Sep 17 00:00:00 2001 From: Manuel <5877862+manuelsc@users.noreply.github.com> Date: Mon, 12 Feb 2024 16:56:20 +0100 Subject: [PATCH 4/4] (NOBIDS) slotviz struct --- exporter/slot_exporter.go | 68 ++++++++++++++++++++++++++++++++++----- types/exporter.go | 54 +++++++++++++++++++++++++++++++ 2 files changed, 114 insertions(+), 8 deletions(-) diff --git a/exporter/slot_exporter.go b/exporter/slot_exporter.go index 783220d15f..06ebcfd4d7 100644 --- a/exporter/slot_exporter.go +++ b/exporter/slot_exporter.go @@ -2,7 +2,9 @@ package exporter import ( "bytes" + "context" "database/sql" + "encoding/gob" "eth2-exporter/db" "eth2-exporter/rpc" "eth2-exporter/services" @@ -18,11 +20,12 @@ import ( "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" + "github.com/go-redis/redis/v8" eth_rewards "github.com/gobitfly/eth-rewards" "github.com/gobitfly/eth-rewards/beacon" ) -func RunSlotExporter(firstRun bool) error { +func RunSlotExporter(firstRun bool, redisClient *redis.Client) error { var err error var clClient rpc.Client @@ -64,7 +67,7 @@ func RunSlotExporter(firstRun bool) error { if len(dbSlots) > 0 { if dbSlots[0] != 0 { logger.Infof("exporting genesis slot as it is missing in the database") - err := ExportSlot(clClient, 0, utils.EpochOfSlot(0) == head.HeadEpoch, tx) + err := ExportSlot(clClient, 0, utils.EpochOfSlot(0) == head.HeadEpoch, tx, redisClient) if err != nil { return fmt.Errorf("error exporting slot %v: %w", 0, err) } @@ -84,7 +87,7 @@ func RunSlotExporter(firstRun bool) error { if previousSlot != currentSlot-1 { logger.Infof("slots between %v and %v are missing, exporting them", previousSlot, currentSlot) for slot := previousSlot + 1; slot <= currentSlot-1; slot++ { - err := ExportSlot(clClient, slot, false, tx) + err := ExportSlot(clClient, slot, false, tx, redisClient) if err != nil { return fmt.Errorf("error exporting slot %v: %w", slot, err) @@ -102,7 +105,7 @@ func RunSlotExporter(firstRun bool) error { if err != nil { if err == sql.ErrNoRows { logger.Infof("db is empty, export genesis slot") - err := ExportSlot(clClient, 0, utils.EpochOfSlot(0) == head.HeadEpoch, tx) + err := ExportSlot(clClient, 0, utils.EpochOfSlot(0) == head.HeadEpoch, tx, redisClient) if err != nil { return fmt.Errorf("error exporting slot %v: %w", 0, err) } @@ -116,7 +119,7 @@ func RunSlotExporter(firstRun bool) error { if lastDbSlot != head.HeadSlot { slotsExported := 0 for slot := lastDbSlot + 1; slot <= head.HeadSlot; slot++ { // export any new slots - err := ExportSlot(clClient, slot, utils.EpochOfSlot(slot) == head.HeadEpoch, tx) + err := ExportSlot(clClient, slot, utils.EpochOfSlot(slot) == head.HeadEpoch, tx, redisClient) if err != nil { return fmt.Errorf("error exporting slot %v: %w", slot, err) } @@ -181,7 +184,7 @@ func RunSlotExporter(firstRun bool) error { if err != nil { return fmt.Errorf("error setting block %v as finalized (orphaned): %w", dbSlot.Slot, err) } - err = ExportSlot(clClient, dbSlot.Slot, utils.EpochOfSlot(dbSlot.Slot) == head.HeadEpoch, tx) + err = ExportSlot(clClient, dbSlot.Slot, utils.EpochOfSlot(dbSlot.Slot) == head.HeadEpoch, tx, redisClient) if err != nil { return fmt.Errorf("error exporting slot %v: %w", dbSlot.Slot, err) } @@ -211,7 +214,7 @@ func RunSlotExporter(firstRun bool) error { } else { // check if a late slot has been proposed in the meantime if len(dbSlot.BlockRoot) < 32 && header != nil { // we have no slot in the db, but the node has a slot, export it logger.Infof("exporting new slot %v", dbSlot.Slot) - err := ExportSlot(clClient, dbSlot.Slot, utils.EpochOfSlot(dbSlot.Slot) == head.HeadEpoch, tx) + err := ExportSlot(clClient, dbSlot.Slot, utils.EpochOfSlot(dbSlot.Slot) == head.HeadEpoch, tx, redisClient) if err != nil { return fmt.Errorf("error exporting slot %v: %w", dbSlot.Slot, err) } @@ -289,7 +292,7 @@ func updateEpochStatusAndValidatorQueue(clClient rpc.Client, epoch uint64, tx *s return nil } -func ExportSlot(client rpc.Client, slot uint64, isHeadEpoch bool, tx *sqlx.Tx) error { +func ExportSlot(client rpc.Client, slot uint64, isHeadEpoch bool, tx *sqlx.Tx, redisClient *redis.Client) error { isFirstSlotOfEpoch := slot%utils.Config.Chain.ClConfig.SlotsPerEpoch == 0 epoch := slot / utils.Config.Chain.ClConfig.SlotsPerEpoch @@ -461,6 +464,55 @@ func ExportSlot(client rpc.Client, slot uint64, isHeadEpoch bool, tx *sqlx.Tx) e return fmt.Errorf("error exporting proposal to bigtable for slot %v: %w", block.Slot, err) } + // save the block to redis if it was produced during the last 60 minutes + if time.Since(utils.SlotToTime(block.Slot)) < time.Hour { + var serializedBlockData bytes.Buffer + enc := gob.NewEncoder(&serializedBlockData) + + // TODO: replace with: RedisCachedBlockSlotViz + redisCachedBlock := &types.RedisCachedBlock{ + Proposer: block.Proposer, + BlockRoot: block.BlockRoot, + Slot: block.Slot, + ParentRoot: block.ParentRoot, + StateRoot: block.StateRoot, + Signature: block.Signature, + RandaoReveal: block.RandaoReveal, + Graffiti: block.Graffiti, + Eth1Data: block.Eth1Data, + BodyRoot: block.BodyRoot, + ProposerSlashings: block.ProposerSlashings, + AttesterSlashings: block.AttesterSlashings, + Attestations: block.Attestations, + Deposits: block.Deposits, + VoluntaryExits: block.VoluntaryExits, + SyncAggregate: block.SyncAggregate, + SignedBLSToExecutionChange: block.SignedBLSToExecutionChange, + AttestationDuties: block.AttestationDuties, + SyncDuties: block.SyncDuties, + Finalized: block.Finalized, + EpochAssignments: block.EpochAssignments, + } + err := enc.Encode(redisCachedBlock) + if err != nil { + return fmt.Errorf("error serializing block to gob for slot %v: %w", block.Slot, err) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + + key := fmt.Sprintf("%d:%s:%d", utils.Config.Chain.ClConfig.DepositChainID, "block", block.Slot) + + expirationTime := utils.EpochToTime(epoch + 7) // keep it for at least 7 epochs in the cache + expirationDuration := time.Until(expirationTime) + logger.Infof("writing block to redis with a TTL of %v", expirationDuration) + err = redisClient.Set(ctx, key, serializedBlockData.Bytes(), expirationDuration).Err() + if err != nil { + return fmt.Errorf("error writing block to redis for slot %v: %w", block.Slot, err) + } + logger.Infof("writing block to redis completed") + } + // save the block data to the db err = db.SaveBlock(block, false, tx) if err != nil { diff --git a/types/exporter.go b/types/exporter.go index 0d7fbb7bd1..a3e375ec26 100644 --- a/types/exporter.go +++ b/types/exporter.go @@ -136,6 +136,60 @@ type Block struct { EpochAssignments *EpochAssignments Validators []*Validator } +type RedisCachedBlock struct { + Proposer uint64 + BlockRoot []byte + Slot uint64 + ParentRoot []byte + StateRoot []byte + Signature []byte + RandaoReveal []byte + Graffiti []byte + Eth1Data *Eth1Data + BodyRoot []byte + ProposerSlashings []*ProposerSlashing + AttesterSlashings []*AttesterSlashing + Attestations []*Attestation + Deposits []*Deposit + VoluntaryExits []*VoluntaryExit + SyncAggregate *SyncAggregate // warning: sync aggregate may be nil, for phase0 blocks + SignedBLSToExecutionChange []*SignedBLSToExecutionChange + AttestationDuties map[ValidatorIndex][]Slot + SyncDuties map[ValidatorIndex]bool + Finalized bool + EpochAssignments *EpochAssignments +} + +type ValidatorIndex32 uint32 + +type RedisCachedBlockSlotViz struct { + Proposer ValidatorIndex32 + ProposerCLReward int64 // negative in case missed + ProposerELReward uint64 + Slot uint64 // Slot/32 = epoch; to check for finality, slotviz needs epoch data as well (with sync committee participants usw) to work + AttestationSlashings map[ValidatorIndex32][]RedisCachedBlockSlashing // slasher index -> RedisCachedBlockSlashing + ProposerSlashings map[ValidatorIndex32][]RedisCachedBlockSlashing + AttestationDuties map[ValidatorIndex32][]RedisCachedBlockAttestations + SyncDuties map[ValidatorIndex32]RedisCachedBlockSync +} + +type RedisCachedBlockSlashing struct { + Slashed ValidatorIndex32 + Slot Slot + Earnings uint64 // slasher perspective + Penalty uint64 // slashed perspective +} + +type RedisCachedBlockAttestations struct { + ForSlot Slot + SourceEarnings int64 // negative in case missed + TargetEarnings int64 + HeadEarnings int64 +} + +type RedisCachedBlockSync struct { + Earnings int64 +} type SignedBLSToExecutionChange struct { Message BLSToExecutionChange