diff --git a/README.md b/README.md index ac8ccdd..eca5b64 100644 --- a/README.md +++ b/README.md @@ -114,11 +114,16 @@ These tables, along with their indices, take over 90% of the space required for Pruning data removes data older than a certain age from the two database tables mentioned above. Pruning is set for each table individually and so it is possible to prune either or both of the tables, and to have different retentions for each. For example, the following configuration: +It is also possible to retain data for specific validator pubkeys. ```yaml summarizer: validators: balance-retention: "P6M" epoch-retention: "P1Y" + retain: + - 0xab0bdda0f85f842f431beaccf1250bf1fd7ba51b4100fd64364b6401fda85bb0069b3e715b58819684e7fc0b10a72a34 + - 0x876dd4705157eb66dc71bc2e07fb151ea53e1a62a0bb980a7ce72d15f58944a8a3752d754f52f4a60dbfc7b18169f268 + - 0x9314c6de0386635e2799af798884c2ea09c63b9f079e572acc00b06a7faccce501ea4dfc0b1a23b8603680a5e3481327 ``` This will store 6 month's worth of balances, and 1 year's worth of epoch summaries. Retention periods are [ISO 8601 durations](https://en.wikipedia.org/wiki/ISO_8601#Durations). Note that if it is not desired to retain any balance or epoch summary data then the retention can be set to "PT0s". @@ -204,6 +209,23 @@ eth1deposits: # keep track of this itself, however if you wish to start from a different block this # can be set. # start-block: 500 +summarizer: + enable: true + epochs: + enable: true + blocks: + enable: true + validators: + enable: false + # retention period for validator balances (ISO_8601 duration format) + # balance-retention: "P6M" + # retention period for validator epoch summaries (ISO_8601 duration format) + # epoch-retention: "P1Y" + # validator pubkeys for which to ignore above retention values (it won't prune validator balances and summaries) + # retain: + # - 0xab0bdda0f85f842f431beaccf1250bf1fd7ba51b4100fd64364b6401fda85bb0069b3e715b58819684e7fc0b10a72a34 + # - 0x876dd4705157eb66dc71bc2e07fb151ea53e1a62a0bb980a7ce72d15f58944a8a3752d754f52f4a60dbfc7b18169f268 + # - 0x9314c6de0386635e2799af798884c2ea09c63b9f079e572acc00b06a7faccce501ea4dfc0b1a23b8603680a5e3481327 ``` ## Support diff --git a/main.go b/main.go index fe24aaf..742275d 100644 --- a/main.go +++ b/main.go @@ -15,6 +15,7 @@ package main import ( "context" + "encoding/hex" "fmt" "net/http" @@ -31,6 +32,7 @@ import ( eth2client "github.com/attestantio/go-eth2-client" "github.com/attestantio/go-eth2-client/api" + "github.com/attestantio/go-eth2-client/spec/phase0" homedir "github.com/mitchellh/go-homedir" "github.com/pkg/errors" zerologger "github.com/rs/zerolog/log" @@ -566,6 +568,24 @@ func startSummarizer( return nil, nil } + validatorRetainPubkeys := viper.GetStringSlice("summarizer.validators.retain") + + validatorRetain := make([]phase0.BLSPubKey, 0) + + for _, pubkey := range validatorRetainPubkeys { + pubkey = strings.TrimPrefix(pubkey, "0x") + + bytes, err := hex.DecodeString(pubkey) + if err != nil { + return nil, errors.Wrap(err, fmt.Sprintf("failed to parse pubkey \"%s\"", pubkey)) + } + + var pubKey phase0.BLSPubKey + copy(pubKey[:], bytes) + + validatorRetain = append(validatorRetain, pubKey) + } + standardSummarizer, err := standardsummarizer.New(ctx, standardsummarizer.WithLogLevel(util.LogLevel("summarizer")), standardsummarizer.WithMonitor(monitor), @@ -578,6 +598,7 @@ func startSummarizer( standardsummarizer.WithMaxDaysPerRun(viper.GetUint64("summarizer.max-days-per-run")), standardsummarizer.WithValidatorEpochRetention(viper.GetString("summarizer.validators.epoch-retention")), standardsummarizer.WithValidatorBalanceRetention(viper.GetString("summarizer.validators.balance-retention")), + standardsummarizer.WithValidatorRetain(validatorRetain), ) if err != nil { return nil, errors.Wrap(err, "failed to create summarizer service") diff --git a/services/chaindb/postgresql/setblobsidecars.go b/services/chaindb/postgresql/setblobsidecars.go index b5f4808..9f0c92d 100644 --- a/services/chaindb/postgresql/setblobsidecars.go +++ b/services/chaindb/postgresql/setblobsidecars.go @@ -73,7 +73,6 @@ func (s *Service) SetBlobSidecars(ctx context.Context, blobSidecars []*chaindb.B kzgCommitmentInclusionProof, }, nil })) - if err != nil { if err := nestedTx.Rollback(ctx); err != nil { return errors.Wrap(err, "failed to roll back nested transaction") diff --git a/services/chaindb/postgresql/validatorepochsummaries.go b/services/chaindb/postgresql/validatorepochsummaries.go index e14d32f..02c19ba 100644 --- a/services/chaindb/postgresql/validatorepochsummaries.go +++ b/services/chaindb/postgresql/validatorepochsummaries.go @@ -518,7 +518,7 @@ WHERE f_validator_index = $1 } // PruneValidatorEpochSummaries prunes validator epoch summaries up to (but not including) the given point. -func (s *Service) PruneValidatorEpochSummaries(ctx context.Context, to phase0.Epoch, retain []phase0.ValidatorIndex) error { +func (s *Service) PruneValidatorEpochSummaries(ctx context.Context, to phase0.Epoch, retain []phase0.BLSPubKey) error { ctx, span := otel.Tracer("wealdtech.chaind.services.chaindb.postgresql").Start(ctx, "PruneValidatorEpochSummaries") defer span.End() @@ -530,18 +530,27 @@ func (s *Service) PruneValidatorEpochSummaries(ctx context.Context, to phase0.Ep // Build the query. queryBuilder := strings.Builder{} queryVals := make([]any, 0) + queryVals = append(queryVals, to) - queryBuilder.WriteString(` + if len(retain) > 0 { + queryBuilder.WriteString(` DELETE FROM t_validator_epoch_summaries +USING t_validators WHERE f_epoch <= $1 -`) - queryVals = append(queryVals, to) +AND t_validator_epoch_summaries.f_validator_index = t_validators.f_index +AND NOT (t_validators.f_public_key = ANY($2))`) - if len(retain) > 0 { + pubkeysBytes := make([][]byte, 0, len(retain)) + + for i := range retain { + pubkeysBytes = append(pubkeysBytes, retain[i][:]) + } + + queryVals = append(queryVals, pubkeysBytes) + } else { queryBuilder.WriteString(` -AND f_validator_index NOT IN($2) -`) - queryVals = append(queryVals, retain) +DELETE FROM t_validator_epoch_summaries +WHERE f_epoch <= $1`) } if e := log.Trace(); e.Enabled() { diff --git a/services/chaindb/postgresql/validators.go b/services/chaindb/postgresql/validators.go index 573be6b..e612edd 100644 --- a/services/chaindb/postgresql/validators.go +++ b/services/chaindb/postgresql/validators.go @@ -716,7 +716,7 @@ func validatorBalanceFromRow(rows pgx.Rows) (*chaindb.ValidatorBalance, error) { } // PruneValidatorBalances prunes validator balances up to (but not including) the given epoch. -func (s *Service) PruneValidatorBalances(ctx context.Context, to phase0.Epoch, retain []phase0.ValidatorIndex) error { +func (s *Service) PruneValidatorBalances(ctx context.Context, to phase0.Epoch, retain []phase0.BLSPubKey) error { ctx, span := otel.Tracer("wealdtech.chaind.services.chaindb.postgresql").Start(ctx, "PruneValidatorBalances") defer span.End() @@ -729,17 +729,27 @@ func (s *Service) PruneValidatorBalances(ctx context.Context, to phase0.Epoch, r queryBuilder := strings.Builder{} queryVals := make([]any, 0) - queryBuilder.WriteString(` -DELETE FROM t_validator_balances -WHERE f_epoch <= $1 -`) queryVals = append(queryVals, to) if len(retain) > 0 { queryBuilder.WriteString(` -AND f_validator_index NOT IN($2) -`) - queryVals = append(queryVals, retain) +DELETE FROM t_validator_balances +USING t_validators +WHERE f_epoch <= $1 +AND t_validator_balances.f_validator_index = t_validators.f_index +AND NOT (t_validators.f_public_key = ANY($2))`) + + pubkeysBytes := make([][]byte, 0, len(retain)) + + for i := range retain { + pubkeysBytes = append(pubkeysBytes, retain[i][:]) + } + + queryVals = append(queryVals, pubkeysBytes) + } else { + queryBuilder.WriteString(` +DELETE FROM t_validator_balances +WHERE f_epoch <= $1`) } if e := log.Trace(); e.Enabled() { diff --git a/services/chaindb/service.go b/services/chaindb/service.go index c780e9a..4142cdc 100644 --- a/services/chaindb/service.go +++ b/services/chaindb/service.go @@ -343,7 +343,7 @@ type AggregateValidatorBalancesProvider interface { // ValidatorBalancesPruner defines functions to prune validator balances. type ValidatorBalancesPruner interface { // PruneValidatorBalances prunes validator balances up to (but not including) the given epoch. - PruneValidatorBalances(ctx context.Context, to phase0.Epoch, retain []phase0.ValidatorIndex) error + PruneValidatorBalances(ctx context.Context, to phase0.Epoch, retain []phase0.BLSPubKey) error } // ValidatorsSetter defines functions to create and update validator information. @@ -410,7 +410,7 @@ type ValidatorEpochSummariesProvider interface { // ValidatorEpochSummariesPruner defines functions to prune validator epoch summaries. type ValidatorEpochSummariesPruner interface { // PruneValidatorEpochSummaries prunes validator epoch summaries up to (but not including) the given point. - PruneValidatorEpochSummaries(ctx context.Context, to phase0.Epoch, retain []phase0.ValidatorIndex) error + PruneValidatorEpochSummaries(ctx context.Context, to phase0.Epoch, retain []phase0.BLSPubKey) error } // ValidatorEpochSummariesSetter defines functions to create and update validator epoch summaries. diff --git a/services/spec/standard/service.go b/services/spec/standard/service.go index a6df61d..772ddd5 100644 --- a/services/spec/standard/service.go +++ b/services/spec/standard/service.go @@ -79,7 +79,7 @@ func New(ctx context.Context, params ...Parameter) (*Service, error) { s.updateSpec(ctx) // Set up a periodic refresh of the spec information. - runtimeFunc := func(ctx context.Context, data any) (time.Time, error) { + runtimeFunc := func(_ context.Context, _ any) (time.Time, error) { // Run daily. return time.Now().AddDate(0, 0, 1), nil } diff --git a/services/summarizer/standard/handler.go b/services/summarizer/standard/handler.go index dd93c8c..0173766 100644 --- a/services/summarizer/standard/handler.go +++ b/services/summarizer/standard/handler.go @@ -222,7 +222,7 @@ func (s *Service) summarizeValidators(ctx context.Context, targetEpoch phase0.Ep log.Trace().Uint64("first_epoch", uint64(firstEpoch)).Uint64("target_epoch", uint64(targetEpoch)).Msg("Validators catchup bounds") for epoch := firstEpoch; epoch <= targetEpoch; epoch++ { - log.Trace().Uint64("epoch", uint64(epoch)).Msg("Summarizing epoch") + log.Trace().Uint64("epoch", uint64(epoch)).Msg("Summarizing validators in epoch") if err := s.summarizeValidatorsInEpoch(ctx, md, epoch); err != nil { return errors.Wrap(err, fmt.Sprintf("failed to update validator summaries in epoch %d", epoch)) } diff --git a/services/summarizer/standard/parameters.go b/services/summarizer/standard/parameters.go index 7e33acf..aba5777 100644 --- a/services/summarizer/standard/parameters.go +++ b/services/summarizer/standard/parameters.go @@ -17,6 +17,7 @@ import ( "errors" eth2client "github.com/attestantio/go-eth2-client" + "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/rs/zerolog" "github.com/wealdtech/chaind/services/chaindb" "github.com/wealdtech/chaind/services/chaintime" @@ -32,6 +33,7 @@ type parameters struct { epochSummaries bool blockSummaries bool validatorSummaries bool + validatorRetain []phase0.BLSPubKey validatorEpochRetention string maxDaysPerRun uint64 validatorBalanceRetention string @@ -104,6 +106,13 @@ func WithValidatorSummaries(enabled bool) Parameter { }) } +// WithValidatorRetain states if the module should retain balance and epoch summaries for a subset of validator. +func WithValidatorRetain(validatorRetain []phase0.BLSPubKey) Parameter { + return parameterFunc(func(p *parameters) { + p.validatorRetain = validatorRetain + }) +} + // WithMaxDaysPerRun provides the maximum number of days to process in a single run of the summarizer. func WithMaxDaysPerRun(maxDaysPerRun uint64) Parameter { return parameterFunc(func(p *parameters) { diff --git a/services/summarizer/standard/prune.go b/services/summarizer/standard/prune.go index 529ea76..12ba611 100644 --- a/services/summarizer/standard/prune.go +++ b/services/summarizer/standard/prune.go @@ -70,7 +70,7 @@ func (s *Service) pruneBalances(ctx context.Context, summaryEpoch phase0.Epoch) return errors.Wrap(err, "failed to begin transaction to prune validator balances") } - if err := s.chainDB.(chaindb.ValidatorBalancesPruner).PruneValidatorBalances(ctx, pruneEpoch, nil); err != nil { + if err := s.chainDB.(chaindb.ValidatorBalancesPruner).PruneValidatorBalances(ctx, pruneEpoch, s.validatorRetain); err != nil { cancel() return errors.Wrap(err, "failed to prune validator balances") } @@ -114,7 +114,7 @@ func (s *Service) pruneEpochs(ctx context.Context, summaryEpoch phase0.Epoch) er return errors.Wrap(err, "failed to begin transaction to prune validator epoch summaries") } - if err := s.chainDB.(chaindb.ValidatorEpochSummariesPruner).PruneValidatorEpochSummaries(ctx, pruneEpoch, nil); err != nil { + if err := s.chainDB.(chaindb.ValidatorEpochSummariesPruner).PruneValidatorEpochSummaries(ctx, pruneEpoch, s.validatorRetain); err != nil { cancel() return errors.Wrap(err, "failed to prune validator epoch summaries") } diff --git a/services/summarizer/standard/service.go b/services/summarizer/standard/service.go index 8a38b2d..46071d4 100644 --- a/services/summarizer/standard/service.go +++ b/services/summarizer/standard/service.go @@ -50,6 +50,7 @@ type Service struct { blockSummaries bool validatorSummaries bool maxDaysPerRun uint64 + validatorRetain []phase0.BLSPubKey validatorEpochRetention *util.CalendarDuration validatorBalanceRetention *util.CalendarDuration activitySem *semaphore.Weighted @@ -172,6 +173,7 @@ func New(ctx context.Context, params ...Parameter) (*Service, error) { blockSummaries: parameters.blockSummaries, validatorSummaries: parameters.validatorSummaries, maxDaysPerRun: parameters.maxDaysPerRun, + validatorRetain: parameters.validatorRetain, validatorEpochRetention: validatorEpochRetention, validatorBalanceRetention: validatorBalanceRetention, activitySem: semaphore.NewWeighted(1),