From 27b7f4ec5543f2d32f22e33b5ebcf2cff3130cbb Mon Sep 17 00:00:00 2001 From: ptrus Date: Wed, 18 Dec 2024 17:34:15 +0100 Subject: [PATCH] analyzer: configurable MaxBackoffTimeout --- .changelog/837.feature.md | 1 + analyzer/item/item.go | 15 +++++++++++++-- analyzer/item/item_test.go | 3 ++- .../validatorstakinghistory.go | 7 +++++++ config/config.go | 5 +++++ tests/e2e_regression/damask/e2e_config_2.yml | 2 +- tests/e2e_regression/eden/e2e_config_2.yml | 2 +- tests/e2e_regression/edenfast/e2e_config_2.yml | 2 +- 8 files changed, 31 insertions(+), 6 deletions(-) create mode 100644 .changelog/837.feature.md diff --git a/.changelog/837.feature.md b/.changelog/837.feature.md new file mode 100644 index 000000000..e1d40c9d6 --- /dev/null +++ b/.changelog/837.feature.md @@ -0,0 +1 @@ +analyzers: add configurable `MaxBackoffTimeout` diff --git a/analyzer/item/item.go b/analyzer/item/item.go index bf97a0f9c..c1740e720 100644 --- a/analyzer/item/item.go +++ b/analyzer/item/item.go @@ -36,6 +36,7 @@ type itemBasedAnalyzer[Item any] struct { stopIfQueueEmptyFor time.Duration fixedInterval time.Duration interItemDelay time.Duration + maxBackoffTime time.Duration analyzerName string processor ItemProcessor[Item] @@ -68,6 +69,10 @@ type ItemProcessor[Item any] interface { // If fixedInterval is provided, the analyzer will process one batch every fixedInterval. // By default, the analyzer will use a backoff mechanism that will attempt to run as // fast as possible until encountering an error. +// +// If maxBackoffTime is provided, the backoff mechanism will cap the maximum backoff time +// to the provided value. By default (if not provided), the maximum backoff time is 6 seconds, +// which roughly corresponds to the expected consensus block time. func NewAnalyzer[Item any]( name string, cfg config.ItemBasedAnalyzerConfig, @@ -83,6 +88,7 @@ func NewAnalyzer[Item any]( stopIfQueueEmptyFor: cfg.StopIfQueueEmptyFor, fixedInterval: cfg.Interval, interItemDelay: cfg.InterItemDelay, + maxBackoffTime: cfg.MaxBackoffTime, analyzerName: name, processor: processor, target: target, @@ -188,10 +194,15 @@ func processErrors(errs []error) (int, error) { // Start starts the item based analyzer. func (a *itemBasedAnalyzer[Item]) Start(ctx context.Context) { + // Cap the timeout at the expected consensus block time, if not provided. + maxBackoff := 6 * time.Second + if a.maxBackoffTime != 0 { + maxBackoff = a.maxBackoffTime + } + backoff, err := util.NewBackoff( 100*time.Millisecond, - // Cap the timeout at the expected consensus block time - 6*time.Second, + maxBackoff, ) if err != nil { a.logger.Error("error configuring backoff policy", diff --git a/analyzer/item/item_test.go b/analyzer/item/item_test.go index 85de0de32..b1ec5d64f 100644 --- a/analyzer/item/item_test.go +++ b/analyzer/item/item_test.go @@ -40,7 +40,7 @@ const ( GRANT SELECT ON ALL TABLES IN SCHEMA analysis TO PUBLIC` testItemInsert = ` - INSERT INTO analysis.item_analyzer_test(id) + INSERT INTO analysis.item_analyzer_test(id) VALUES ($1)` testItemCounts = ` @@ -54,6 +54,7 @@ var testItemBasedConfig = &config.ItemBasedAnalyzerConfig{ StopIfQueueEmptyFor: time.Second, Interval: 0, // use backoff InterItemDelay: 0, + MaxBackoffTime: 0, } type mockItem struct { diff --git a/analyzer/validatorstakinghistory/validatorstakinghistory.go b/analyzer/validatorstakinghistory/validatorstakinghistory.go index b8d8ccd04..72662713b 100644 --- a/analyzer/validatorstakinghistory/validatorstakinghistory.go +++ b/analyzer/validatorstakinghistory/validatorstakinghistory.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math" + "time" "github.com/jackc/pgx/v5" "github.com/oasisprotocol/oasis-core/go/common/crypto/signature" @@ -54,6 +55,12 @@ func NewAnalyzer( ) (analyzer.Analyzer, error) { logger = logger.With("analyzer", validatorHistoryAnalyzerName) + if cfg.MaxBackoffTime == 0 { + // Once caught up we will receive work only after epochs are finalized, + // so increase the maximum backoff timeout a bit. + cfg.MaxBackoffTime = 5 * time.Minute + } + // Find the epoch corresponding to startHeight. if startHeight > math.MaxInt64 { return nil, fmt.Errorf("startHeight %d is too large", startHeight) diff --git a/config/config.go b/config/config.go index 4425a25f0..71b2ab4f3 100644 --- a/config/config.go +++ b/config/config.go @@ -487,6 +487,11 @@ type ItemBasedAnalyzerConfig struct { // new goroutines to process items within a batch. This is useful for cases where // processItem() makes out of band requests to rate limited resources. InterItemDelay time.Duration `koanf:"inter_item_delay"` + + // MaxBackoffTime determines the maximum backoff time the analyzer will use when + // processing items. This is useful for cases where the expected analyzer cadence + // is different from the consensus block time (e.g. 6 seconds). + MaxBackoffTime time.Duration `koanf:"max_backoff_time"` } type EvmTokensAnalyzerConfig struct { diff --git a/tests/e2e_regression/damask/e2e_config_2.yml b/tests/e2e_regression/damask/e2e_config_2.yml index 09511b148..0ea5e1f76 100644 --- a/tests/e2e_regression/damask/e2e_config_2.yml +++ b/tests/e2e_regression/damask/e2e_config_2.yml @@ -20,7 +20,7 @@ analysis: evm_contract_code_emerald: { stop_if_queue_empty_for: 1s } evm_abi_emerald: { stop_if_queue_empty_for: 10s } # Give evm_contract_verifier time to fetch ABIs first. The 10s has been enough in practice, but might need to be tuned in the future, espcially if the caching proxy has an empty cache. evm_contract_verifier_emerald: { stop_if_queue_empty_for: 1s, sourcify_server_url: http://localhost:9191 } - validator_staking_history: { from: 8_048_956, stop_if_queue_empty_for: 1s } + validator_staking_history: { from: 8_048_956, stop_if_queue_empty_for: 1s, max_backoff_time: 6s } # Some non-block analyzers are not tested in e2e regressions. # They are largely not worth the trouble as they do not interact with rest of the system much. # metadata_registry: {} # Awkward to inject mock registry responses. diff --git a/tests/e2e_regression/eden/e2e_config_2.yml b/tests/e2e_regression/eden/e2e_config_2.yml index 87702f5b4..7b80fce5a 100644 --- a/tests/e2e_regression/eden/e2e_config_2.yml +++ b/tests/e2e_regression/eden/e2e_config_2.yml @@ -20,7 +20,7 @@ analysis: evm_contract_code_emerald: { stop_if_queue_empty_for: 1s } evm_abi_emerald: { stop_if_queue_empty_for: 10s } # Give evm_contract_verifier time to fetch ABIs first. The 10s has been enough in practice, but might need to be tuned in the future, espcially if the caching proxy has an empty cache. evm_contract_verifier_emerald: { stop_if_queue_empty_for: 1s, sourcify_server_url: http://localhost:9191 } - validator_staking_history: { from: 16_817_956, stop_if_queue_empty_for: 1s } + validator_staking_history: { from: 16_817_956, stop_if_queue_empty_for: 1s, max_backoff_time: 6s } # Some non-block analyzers are not tested in e2e regressions. # They are largely not worth the trouble as they do not interact with rest of the system much. # metadata_registry: {} # Awkward to inject mock registry responses. diff --git a/tests/e2e_regression/edenfast/e2e_config_2.yml b/tests/e2e_regression/edenfast/e2e_config_2.yml index 2a074530e..c0caab6aa 100644 --- a/tests/e2e_regression/edenfast/e2e_config_2.yml +++ b/tests/e2e_regression/edenfast/e2e_config_2.yml @@ -20,7 +20,7 @@ analysis: evm_contract_code_emerald: { stop_if_queue_empty_for: 1s } evm_abi_emerald: { stop_if_queue_empty_for: 10s } # Give evm_contract_verifier time to fetch ABIs first. The 10s has been enough in practice, but might need to be tuned in the future, espcially if the caching proxy has an empty cache. evm_contract_verifier_emerald: { stop_if_queue_empty_for: 1s, sourcify_server_url: http://localhost:9191 } - validator_staking_history: { from: 16_817_956, stop_if_queue_empty_for: 1s } + validator_staking_history: { from: 16_817_956, stop_if_queue_empty_for: 1s, max_backoff_time: 6s } # Some non-block analyzers are not tested in e2e regressions. # They are largely not worth the trouble as they do not interact with rest of the system much. # metadata_registry: {} # Awkward to inject mock registry responses.