diff --git a/pkg/conduit/config.go b/pkg/conduit/config.go index 02680166c..22beabf40 100644 --- a/pkg/conduit/config.go +++ b/pkg/conduit/config.go @@ -16,6 +16,7 @@ package conduit import ( "os" + "time" "github.com/conduitio/conduit-commons/database" sdk "github.com/conduitio/conduit-connector-sdk" @@ -23,6 +24,7 @@ import ( "github.com/conduitio/conduit/pkg/foundation/log" "github.com/conduitio/conduit/pkg/plugin/connector/builtin" "github.com/rs/zerolog" + "golang.org/x/exp/constraints" ) const ( @@ -81,8 +83,15 @@ type Config struct { } Pipelines struct { - Path string - ExitOnError bool + Path string + ExitOnError bool + ErrorRecovery struct { + MinDelay time.Duration + MaxDelay time.Duration + BackoffFactor int + MaxRetries int + HealthyAfter time.Duration + } } ConnectorPlugins map[string]sdk.Connector @@ -104,19 +113,31 @@ type Config struct { func DefaultConfig() Config { var cfg Config + cfg.DB.Type = DBTypeBadger cfg.DB.Badger.Path = "conduit.db" cfg.DB.Postgres.Table = "conduit_kv_store" cfg.DB.SQLite.Path = "conduit.db" cfg.DB.SQLite.Table = "conduit_kv_store" + cfg.API.Enabled = true cfg.API.HTTP.Address = ":8080" cfg.API.GRPC.Address = ":8084" + cfg.Log.Level = "info" cfg.Log.Format = "cli" + cfg.Connectors.Path = "./connectors" + cfg.Processors.Path = "./processors" + cfg.Pipelines.Path = "./pipelines" + cfg.Pipelines.ErrorRecovery.MinDelay = time.Second + cfg.Pipelines.ErrorRecovery.MaxDelay = 10 * time.Minute + cfg.Pipelines.ErrorRecovery.BackoffFactor = 2 + cfg.Pipelines.ErrorRecovery.MaxRetries = 0 + cfg.Pipelines.ErrorRecovery.HealthyAfter = 5 * time.Minute + cfg.SchemaRegistry.Type = SchemaRegistryTypeBuiltin cfg.ConnectorPlugins = builtin.DefaultBuiltinConnectors @@ -167,6 +188,32 @@ func (c Config) validateSchemaRegistryConfig() error { return nil } +func (c Config) validateErrorRecovery() error { + errRecoveryCfg := c.Pipelines.ErrorRecovery + var errs []error + + if err := requirePositiveValue("min-delay", errRecoveryCfg.MinDelay); err != nil { + errs = append(errs, err) + } + if err := requirePositiveValue("max-delay", errRecoveryCfg.MaxDelay); err != nil { + errs = append(errs, err) + } + if errRecoveryCfg.MaxDelay > 0 && errRecoveryCfg.MinDelay > errRecoveryCfg.MaxDelay { + errs = append(errs, cerrors.New(`"min-delay" should be smaller than "max-delay"`)) + } + if err := requireNonNegativeValue("backoff-factor", errRecoveryCfg.BackoffFactor); err != nil { + errs = append(errs, err) + } + if err := requireNonNegativeValue("max-retries", errRecoveryCfg.MaxRetries); err != nil { + errs = append(errs, err) + } + if err := requirePositiveValue("healthy-after", errRecoveryCfg.HealthyAfter); err != nil { + errs = append(errs, err) + } + + return cerrors.Join(errs...) +} + func (c Config) Validate() error { // TODO simplify validation with struct tags @@ -212,6 +259,9 @@ func (c Config) Validate() error { return invalidConfigFieldErr("pipelines.path") } + if err := c.validateErrorRecovery(); err != nil { + return cerrors.Errorf("invalid error recovery config: %w", err) + } return nil } @@ -222,3 +272,19 @@ func invalidConfigFieldErr(name string) error { func requiredConfigFieldErr(name string) error { return cerrors.Errorf("%q config value is required", name) } + +func requireNonNegativeValue[T constraints.Integer](name string, value T) error { + if value < 0 { + return cerrors.Errorf("%q config value mustn't be negative (got: %v)", name, value) + } + + return nil +} + +func requirePositiveValue[T constraints.Integer](name string, value T) error { + if value <= 0 { + return cerrors.Errorf("%q config value must be positive (got: %v)", name, value) + } + + return nil +} diff --git a/pkg/conduit/config_test.go b/pkg/conduit/config_test.go index 1f473abfa..c252600c3 100644 --- a/pkg/conduit/config_test.go +++ b/pkg/conduit/config_test.go @@ -16,146 +16,215 @@ package conduit import ( "testing" + "time" "github.com/conduitio/conduit-commons/database/inmemory" + "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/matryer/is" ) func TestConfig_Validate(t *testing.T) { - is := is.New(t) - testCases := []struct { name string setupConfig func(Config) Config want error - }{{ - name: "valid", - setupConfig: func(c Config) Config { - return c - }, - want: nil, - }, { - name: "invalid DB type (empty)", - setupConfig: func(c Config) Config { - c.DB.Type = "" - return c - }, - want: invalidConfigFieldErr("db.type"), - }, { - name: "invalid DB type (invalid)", - setupConfig: func(c Config) Config { - c.DB.Type = "asdf" - return c - }, - want: invalidConfigFieldErr("db.type"), - }, { - name: "required DB badger path", - setupConfig: func(c Config) Config { - c.DB.Type = DBTypeBadger - c.DB.Badger.Path = "" - return c - }, - want: requiredConfigFieldErr("db.badger.path"), - }, { - name: "required DB Postgres connection string", - setupConfig: func(c Config) Config { - c.DB.Type = DBTypePostgres - c.DB.Postgres.ConnectionString = "" - return c - }, - want: requiredConfigFieldErr("db.postgres.connection-string"), - }, { - name: "required DB Postgres table", - setupConfig: func(c Config) Config { - c.DB.Type = DBTypePostgres - c.DB.Postgres.Table = "" - return c - }, - want: requiredConfigFieldErr("db.postgres.table"), - }, { - name: "custom DB driver", - setupConfig: func(c Config) Config { - c.DB.Type = "" - c.DB.Driver = &inmemory.DB{} // db driver explicitly defined - return c - }, - want: nil, - }, { - name: "required HTTP address", - setupConfig: func(c Config) Config { - c.API.HTTP.Address = "" - return c - }, - want: requiredConfigFieldErr("http.address"), - }, { - name: "required GRPC address", - setupConfig: func(c Config) Config { - c.API.GRPC.Address = "" - return c - }, - want: requiredConfigFieldErr("grpc.address"), - }, { - name: "disabled API valid", - setupConfig: func(c Config) Config { - c.API.Enabled = false - c.API.HTTP.Address = "" - c.API.GRPC.Address = "" - return c - }, - want: nil, - }, { - name: "invalid Log level (invalid)", - setupConfig: func(c Config) Config { - c.Log.Level = "who" - return c - }, - want: invalidConfigFieldErr("log.level"), - }, { - name: "invalid Log format (invalid)", - setupConfig: func(c Config) Config { - c.Log.Format = "someFormat" - return c - }, - want: invalidConfigFieldErr("log.format"), - }, { - name: "required Log level", - setupConfig: func(c Config) Config { - c.Log.Level = "" - return c - }, - want: requiredConfigFieldErr("log.level"), - }, { - name: "required Log format", - setupConfig: func(c Config) Config { - c.Log.Format = "" - return c - }, - want: requiredConfigFieldErr("log.format"), - }, { - name: "required pipelines path", - setupConfig: func(c Config) Config { - c.Pipelines.Path = "" - return c - }, - want: requiredConfigFieldErr("pipelines.path"), - }, { - name: "invalid pipelines path", - setupConfig: func(c Config) Config { - c.Pipelines.Path = "folder-does-not-exist" - return c - }, - want: invalidConfigFieldErr("pipelines.path"), - }} + }{ + { + name: "valid", + setupConfig: func(c Config) Config { + return c + }, + want: nil, + }, + { + name: "invalid DB type (empty)", + setupConfig: func(c Config) Config { + c.DB.Type = "" + return c + }, + want: invalidConfigFieldErr("db.type"), + }, + { + name: "invalid DB type (invalid)", + setupConfig: func(c Config) Config { + c.DB.Type = "asdf" + return c + }, + want: invalidConfigFieldErr("db.type"), + }, + { + name: "required DB badger path", + setupConfig: func(c Config) Config { + c.DB.Type = DBTypeBadger + c.DB.Badger.Path = "" + return c + }, + want: requiredConfigFieldErr("db.badger.path"), + }, + { + name: "required DB Postgres connection string", + setupConfig: func(c Config) Config { + c.DB.Type = DBTypePostgres + c.DB.Postgres.ConnectionString = "" + return c + }, + want: requiredConfigFieldErr("db.postgres.connection-string"), + }, + { + name: "required DB Postgres table", + setupConfig: func(c Config) Config { + c.DB.Type = DBTypePostgres + c.DB.Postgres.Table = "" + return c + }, + want: requiredConfigFieldErr("db.postgres.table"), + }, + { + name: "custom DB driver", + setupConfig: func(c Config) Config { + c.DB.Type = "" + c.DB.Driver = &inmemory.DB{} // db driver explicitly defined + return c + }, + want: nil, + }, + { + name: "required HTTP address", + setupConfig: func(c Config) Config { + c.API.HTTP.Address = "" + return c + }, + want: requiredConfigFieldErr("http.address"), + }, + { + name: "required GRPC address", + setupConfig: func(c Config) Config { + c.API.GRPC.Address = "" + return c + }, + want: requiredConfigFieldErr("grpc.address"), + }, + { + name: "disabled API valid", + setupConfig: func(c Config) Config { + c.API.Enabled = false + c.API.HTTP.Address = "" + c.API.GRPC.Address = "" + return c + }, + want: nil, + }, + { + name: "invalid Log level (invalid)", + setupConfig: func(c Config) Config { + c.Log.Level = "who" + return c + }, + want: invalidConfigFieldErr("log.level"), + }, + { + name: "invalid Log format (invalid)", + setupConfig: func(c Config) Config { + c.Log.Format = "someFormat" + return c + }, + want: invalidConfigFieldErr("log.format"), + }, + { + name: "required Log level", + setupConfig: func(c Config) Config { + c.Log.Level = "" + return c + }, + want: requiredConfigFieldErr("log.level"), + }, + { + name: "required Log format", + setupConfig: func(c Config) Config { + c.Log.Format = "" + return c + }, + want: requiredConfigFieldErr("log.format"), + }, + { + name: "required pipelines path", + setupConfig: func(c Config) Config { + c.Pipelines.Path = "" + return c + }, + want: requiredConfigFieldErr("pipelines.path"), + }, + { + name: "invalid pipelines path", + setupConfig: func(c Config) Config { + c.Pipelines.Path = "folder-does-not-exist" + return c + }, + want: invalidConfigFieldErr("pipelines.path"), + }, + { + name: "error recovery: negative min-delay", + setupConfig: func(c Config) Config { + c.Pipelines.ErrorRecovery.MinDelay = -time.Second + return c + }, + want: cerrors.New(`invalid error recovery config: "min-delay" config value must be positive (got: -1s)`), + }, + { + name: "error recovery: negative max-delay", + setupConfig: func(c Config) Config { + c.Pipelines.ErrorRecovery.MaxDelay = -time.Second + return c + }, + want: cerrors.New(`invalid error recovery config: "max-delay" config value must be positive (got: -1s)`), + }, + { + name: "error recovery: min-delay greater than max-delay", + setupConfig: func(c Config) Config { + c.Pipelines.ErrorRecovery.MinDelay = 2 * time.Second + c.Pipelines.ErrorRecovery.MaxDelay = time.Second + return c + }, + want: cerrors.New(`invalid error recovery config: "min-delay" should be smaller than "max-delay"`), + }, + { + name: "error recovery: negative backoff-factor", + setupConfig: func(c Config) Config { + c.Pipelines.ErrorRecovery.BackoffFactor = -1 + return c + }, + want: cerrors.New(`invalid error recovery config: "backoff-factor" config value mustn't be negative (got: -1)`), + }, + { + name: "error recovery: negative max-retries", + setupConfig: func(c Config) Config { + c.Pipelines.ErrorRecovery.MaxRetries = -1 + return c + }, + want: cerrors.New(`invalid error recovery config: "max-retries" config value mustn't be negative (got: -1)`), + }, + { + name: "error recovery: negative healthy-after", + setupConfig: func(c Config) Config { + c.Pipelines.ErrorRecovery.HealthyAfter = -time.Second + return c + }, + want: cerrors.New(`invalid error recovery config: "healthy-after" config value must be positive (got: -1s)`), + }, + } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { + is := is.New(t) + validConfig := DefaultConfig() validConfig.DB.Postgres.ConnectionString = "postgres://user:pass@localhost:5432/mydb?sslmode=disable" underTest := tc.setupConfig(validConfig) got := underTest.Validate() - if got == nil { - is.True(tc.want == nil) + if tc.want == nil { + is.NoErr(got) } else { + is.True(got != nil) // expected an error is.Equal(tc.want.Error(), got.Error()) } }) diff --git a/pkg/conduit/entrypoint.go b/pkg/conduit/entrypoint.go index d4667e114..e74cbad61 100644 --- a/pkg/conduit/entrypoint.go +++ b/pkg/conduit/entrypoint.go @@ -98,8 +98,49 @@ func (*Entrypoint) Flags(cfg *Config) *flag.FlagSet { flags.StringVar(&cfg.Connectors.Path, "connectors.path", cfg.Connectors.Path, "path to standalone connectors' directory") flags.StringVar(&cfg.Processors.Path, "processors.path", cfg.Processors.Path, "path to standalone processors' directory") - flags.StringVar(&cfg.Pipelines.Path, "pipelines.path", cfg.Pipelines.Path, "path to the directory that has the yaml pipeline configuration files, or a single pipeline configuration file") - flags.BoolVar(&cfg.Pipelines.ExitOnError, "pipelines.exit-on-error", cfg.Pipelines.ExitOnError, "exit Conduit if a pipeline experiences an error while running") + // Pipeline configuration + flags.StringVar( + &cfg.Pipelines.Path, + "pipelines.path", + cfg.Pipelines.Path, + "path to the directory that has the yaml pipeline configuration files, or a single pipeline configuration file", + ) + flags.BoolVar( + &cfg.Pipelines.ExitOnError, + "pipelines.exit-on-error", + cfg.Pipelines.ExitOnError, + "exit Conduit if a pipeline experiences an error while running", + ) + flags.DurationVar( + &cfg.Pipelines.ErrorRecovery.MinDelay, + "pipelines.error-recovery.min-delay", + cfg.Pipelines.ErrorRecovery.MinDelay, + "minimum delay before restart", + ) + flags.DurationVar( + &cfg.Pipelines.ErrorRecovery.MaxDelay, + "pipelines.error-recovery.max-delay", + cfg.Pipelines.ErrorRecovery.MaxDelay, + "maximum delay before restart", + ) + flags.IntVar( + &cfg.Pipelines.ErrorRecovery.BackoffFactor, + "pipelines.error-recovery.backoff-factor", + cfg.Pipelines.ErrorRecovery.BackoffFactor, + "backoff factor applied to the last delay", + ) + flags.IntVar( + &cfg.Pipelines.ErrorRecovery.MaxRetries, + "pipelines.error-recovery.max-retries", + cfg.Pipelines.ErrorRecovery.MaxRetries, + "maximum number of retries", + ) + flags.DurationVar( + &cfg.Pipelines.ErrorRecovery.HealthyAfter, + "pipelines.error-recovery.healthy-after", + cfg.Pipelines.ErrorRecovery.HealthyAfter, + "amount of time running without any errors after which a pipeline is considered healthy", + ) flags.StringVar(&cfg.SchemaRegistry.Type, "schema-registry.type", cfg.SchemaRegistry.Type, "schema registry type; accepts builtin,confluent") flags.StringVar(&cfg.SchemaRegistry.Confluent.ConnectionString, "schema-registry.confluent.connection-string", cfg.SchemaRegistry.Confluent.ConnectionString, "confluent schema registry connection string")