Skip to content

Commit

Permalink
analyzer: configurable MaxBackoffTimeout
Browse files Browse the repository at this point in the history
  • Loading branch information
ptrus committed Dec 18, 2024
1 parent a38130e commit 27b7f4e
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 6 deletions.
1 change: 1 addition & 0 deletions .changelog/837.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
analyzers: add configurable `MaxBackoffTimeout`
15 changes: 13 additions & 2 deletions analyzer/item/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type itemBasedAnalyzer[Item any] struct {
stopIfQueueEmptyFor time.Duration
fixedInterval time.Duration
interItemDelay time.Duration
maxBackoffTime time.Duration
analyzerName string

processor ItemProcessor[Item]
Expand Down Expand Up @@ -68,6 +69,10 @@ type ItemProcessor[Item any] interface {
// If fixedInterval is provided, the analyzer will process one batch every fixedInterval.
// By default, the analyzer will use a backoff mechanism that will attempt to run as
// fast as possible until encountering an error.
//
// If maxBackoffTime is provided, the backoff mechanism will cap the maximum backoff time
// to the provided value. By default (if not provided), the maximum backoff time is 6 seconds,
// which roughly corresponds to the expected consensus block time.
func NewAnalyzer[Item any](
name string,
cfg config.ItemBasedAnalyzerConfig,
Expand All @@ -83,6 +88,7 @@ func NewAnalyzer[Item any](
stopIfQueueEmptyFor: cfg.StopIfQueueEmptyFor,
fixedInterval: cfg.Interval,
interItemDelay: cfg.InterItemDelay,
maxBackoffTime: cfg.MaxBackoffTime,
analyzerName: name,
processor: processor,
target: target,
Expand Down Expand Up @@ -188,10 +194,15 @@ func processErrors(errs []error) (int, error) {

// Start starts the item based analyzer.
func (a *itemBasedAnalyzer[Item]) Start(ctx context.Context) {
// Cap the timeout at the expected consensus block time, if not provided.
maxBackoff := 6 * time.Second
if a.maxBackoffTime != 0 {
maxBackoff = a.maxBackoffTime
}

backoff, err := util.NewBackoff(
100*time.Millisecond,
// Cap the timeout at the expected consensus block time
6*time.Second,
maxBackoff,
)
if err != nil {
a.logger.Error("error configuring backoff policy",
Expand Down
3 changes: 2 additions & 1 deletion analyzer/item/item_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const (
GRANT SELECT ON ALL TABLES IN SCHEMA analysis TO PUBLIC`

testItemInsert = `
INSERT INTO analysis.item_analyzer_test(id)
INSERT INTO analysis.item_analyzer_test(id)
VALUES ($1)`

testItemCounts = `
Expand All @@ -54,6 +54,7 @@ var testItemBasedConfig = &config.ItemBasedAnalyzerConfig{
StopIfQueueEmptyFor: time.Second,
Interval: 0, // use backoff
InterItemDelay: 0,
MaxBackoffTime: 0,
}

type mockItem struct {
Expand Down
7 changes: 7 additions & 0 deletions analyzer/validatorstakinghistory/validatorstakinghistory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math"
"time"

"github.com/jackc/pgx/v5"
"github.com/oasisprotocol/oasis-core/go/common/crypto/signature"
Expand Down Expand Up @@ -54,6 +55,12 @@ func NewAnalyzer(
) (analyzer.Analyzer, error) {
logger = logger.With("analyzer", validatorHistoryAnalyzerName)

if cfg.MaxBackoffTime == 0 {
// Once caught up we will receive work only after epochs are finalized,
// so increase the maximum backoff timeout a bit.
cfg.MaxBackoffTime = 5 * time.Minute
}

// Find the epoch corresponding to startHeight.
if startHeight > math.MaxInt64 {
return nil, fmt.Errorf("startHeight %d is too large", startHeight)
Expand Down
5 changes: 5 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,11 @@ type ItemBasedAnalyzerConfig struct {
// new goroutines to process items within a batch. This is useful for cases where
// processItem() makes out of band requests to rate limited resources.
InterItemDelay time.Duration `koanf:"inter_item_delay"`

// MaxBackoffTime determines the maximum backoff time the analyzer will use when
// processing items. This is useful for cases where the expected analyzer cadence
// is different from the consensus block time (e.g. 6 seconds).
MaxBackoffTime time.Duration `koanf:"max_backoff_time"`
}

type EvmTokensAnalyzerConfig struct {
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e_regression/damask/e2e_config_2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ analysis:
evm_contract_code_emerald: { stop_if_queue_empty_for: 1s }
evm_abi_emerald: { stop_if_queue_empty_for: 10s } # Give evm_contract_verifier time to fetch ABIs first. The 10s has been enough in practice, but might need to be tuned in the future, espcially if the caching proxy has an empty cache.
evm_contract_verifier_emerald: { stop_if_queue_empty_for: 1s, sourcify_server_url: http://localhost:9191 }
validator_staking_history: { from: 8_048_956, stop_if_queue_empty_for: 1s }
validator_staking_history: { from: 8_048_956, stop_if_queue_empty_for: 1s, max_backoff_time: 6s }
# Some non-block analyzers are not tested in e2e regressions.
# They are largely not worth the trouble as they do not interact with rest of the system much.
# metadata_registry: {} # Awkward to inject mock registry responses.
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e_regression/eden/e2e_config_2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ analysis:
evm_contract_code_emerald: { stop_if_queue_empty_for: 1s }
evm_abi_emerald: { stop_if_queue_empty_for: 10s } # Give evm_contract_verifier time to fetch ABIs first. The 10s has been enough in practice, but might need to be tuned in the future, espcially if the caching proxy has an empty cache.
evm_contract_verifier_emerald: { stop_if_queue_empty_for: 1s, sourcify_server_url: http://localhost:9191 }
validator_staking_history: { from: 16_817_956, stop_if_queue_empty_for: 1s }
validator_staking_history: { from: 16_817_956, stop_if_queue_empty_for: 1s, max_backoff_time: 6s }
# Some non-block analyzers are not tested in e2e regressions.
# They are largely not worth the trouble as they do not interact with rest of the system much.
# metadata_registry: {} # Awkward to inject mock registry responses.
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e_regression/edenfast/e2e_config_2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ analysis:
evm_contract_code_emerald: { stop_if_queue_empty_for: 1s }
evm_abi_emerald: { stop_if_queue_empty_for: 10s } # Give evm_contract_verifier time to fetch ABIs first. The 10s has been enough in practice, but might need to be tuned in the future, espcially if the caching proxy has an empty cache.
evm_contract_verifier_emerald: { stop_if_queue_empty_for: 1s, sourcify_server_url: http://localhost:9191 }
validator_staking_history: { from: 16_817_956, stop_if_queue_empty_for: 1s }
validator_staking_history: { from: 16_817_956, stop_if_queue_empty_for: 1s, max_backoff_time: 6s }
# Some non-block analyzers are not tested in e2e regressions.
# They are largely not worth the trouble as they do not interact with rest of the system much.
# metadata_registry: {} # Awkward to inject mock registry responses.
Expand Down

0 comments on commit 27b7f4e

Please sign in to comment.