Skip to content

Commit

Permalink
[Recovery] Make backoff parameters globally configurable (#1818)
Browse files Browse the repository at this point in the history
  • Loading branch information
hariso authored Sep 3, 2024
1 parent 7e52888 commit 0e3280d
Show file tree
Hide file tree
Showing 3 changed files with 302 additions and 126 deletions.
70 changes: 68 additions & 2 deletions pkg/conduit/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ package conduit

import (
"os"
"time"

"github.com/conduitio/conduit-commons/database"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/plugin/connector/builtin"
"github.com/rs/zerolog"
"golang.org/x/exp/constraints"
)

const (
Expand Down Expand Up @@ -81,8 +83,15 @@ type Config struct {
}

Pipelines struct {
Path string
ExitOnError bool
Path string
ExitOnError bool
ErrorRecovery struct {
MinDelay time.Duration
MaxDelay time.Duration
BackoffFactor int
MaxRetries int
HealthyAfter time.Duration
}
}

ConnectorPlugins map[string]sdk.Connector
Expand All @@ -104,19 +113,31 @@ type Config struct {

func DefaultConfig() Config {
var cfg Config

cfg.DB.Type = DBTypeBadger
cfg.DB.Badger.Path = "conduit.db"
cfg.DB.Postgres.Table = "conduit_kv_store"
cfg.DB.SQLite.Path = "conduit.db"
cfg.DB.SQLite.Table = "conduit_kv_store"

cfg.API.Enabled = true
cfg.API.HTTP.Address = ":8080"
cfg.API.GRPC.Address = ":8084"

cfg.Log.Level = "info"
cfg.Log.Format = "cli"

cfg.Connectors.Path = "./connectors"

cfg.Processors.Path = "./processors"

cfg.Pipelines.Path = "./pipelines"
cfg.Pipelines.ErrorRecovery.MinDelay = time.Second
cfg.Pipelines.ErrorRecovery.MaxDelay = 10 * time.Minute
cfg.Pipelines.ErrorRecovery.BackoffFactor = 2
cfg.Pipelines.ErrorRecovery.MaxRetries = 0
cfg.Pipelines.ErrorRecovery.HealthyAfter = 5 * time.Minute

cfg.SchemaRegistry.Type = SchemaRegistryTypeBuiltin

cfg.ConnectorPlugins = builtin.DefaultBuiltinConnectors
Expand Down Expand Up @@ -167,6 +188,32 @@ func (c Config) validateSchemaRegistryConfig() error {
return nil
}

func (c Config) validateErrorRecovery() error {
errRecoveryCfg := c.Pipelines.ErrorRecovery
var errs []error

if err := requirePositiveValue("min-delay", errRecoveryCfg.MinDelay); err != nil {
errs = append(errs, err)
}
if err := requirePositiveValue("max-delay", errRecoveryCfg.MaxDelay); err != nil {
errs = append(errs, err)
}
if errRecoveryCfg.MaxDelay > 0 && errRecoveryCfg.MinDelay > errRecoveryCfg.MaxDelay {
errs = append(errs, cerrors.New(`"min-delay" should be smaller than "max-delay"`))
}
if err := requireNonNegativeValue("backoff-factor", errRecoveryCfg.BackoffFactor); err != nil {
errs = append(errs, err)
}
if err := requireNonNegativeValue("max-retries", errRecoveryCfg.MaxRetries); err != nil {
errs = append(errs, err)
}
if err := requirePositiveValue("healthy-after", errRecoveryCfg.HealthyAfter); err != nil {
errs = append(errs, err)
}

return cerrors.Join(errs...)
}

func (c Config) Validate() error {
// TODO simplify validation with struct tags

Expand Down Expand Up @@ -212,6 +259,9 @@ func (c Config) Validate() error {
return invalidConfigFieldErr("pipelines.path")
}

if err := c.validateErrorRecovery(); err != nil {
return cerrors.Errorf("invalid error recovery config: %w", err)
}
return nil
}

Expand All @@ -222,3 +272,19 @@ func invalidConfigFieldErr(name string) error {
func requiredConfigFieldErr(name string) error {
return cerrors.Errorf("%q config value is required", name)
}

func requireNonNegativeValue[T constraints.Integer](name string, value T) error {
if value < 0 {
return cerrors.Errorf("%q config value mustn't be negative (got: %v)", name, value)
}

return nil
}

func requirePositiveValue[T constraints.Integer](name string, value T) error {
if value <= 0 {
return cerrors.Errorf("%q config value must be positive (got: %v)", name, value)
}

return nil
}
Loading

0 comments on commit 0e3280d

Please sign in to comment.