diff --git a/docs/test-cases/pipeline-recovery.md b/docs/test-cases/pipeline-recovery.md index 6b4dee553..30871d86f 100644 --- a/docs/test-cases/pipeline-recovery.md +++ b/docs/test-cases/pipeline-recovery.md @@ -19,28 +19,24 @@ a retry. ```yaml version: "2.2" pipelines: - - id: file-pipeline + - id: chaos-to-log status: running - name: file-pipeline - description: dlq write error + description: Postgres source, file destination + dead-letter-queue: + plugin: standalone:chaos + settings: + writeMode: error connectors: - - id: chaos-src + - id: chaos type: source plugin: standalone:chaos - name: chaos-src + name: source settings: readMode: error - - id: log-dst + - id: destination type: destination plugin: builtin:log - log: file-dst - dead-letter-queue: - plugin: "builtin:postgres" - settings: - table: non_existing_table_so_that_dlq_fails - url: postgresql://meroxauser:meroxapass@localhost/meroxadb?sslmode=disable - window-size: 3 - window-nack-threshold: 2 + name: destination ``` **Steps**: @@ -65,6 +61,28 @@ Recovery is not triggered when there is an error processing a record. **Pipeline configuration file**: ```yaml +version: "2.2" +pipelines: + - id: generator-to-log + status: running + description: Postgres source, file destination + connectors: + - id: generator + type: source + plugin: builtin:generator + name: source + settings: + format.type: structured + format.options.id: int + format.options.name: string + operations: create + - id: destination + type: destination + plugin: builtin:log + name: destination + processors: + - id: error + plugin: "error" ``` **Steps**: @@ -328,3 +346,106 @@ pipelines: **Additional comments**: --- + +## Test Case 11: Recovery triggered during a specific max-retries-window, after that pipeline is degraded + +**Priority** (low/medium/high): + +**Description**: + +A pipeline will be allowed to fail during a specific time window, after that it will be degraded. +Combining `max-retries` and `max-retries-window` we can control how many times a pipeline can fail during a specific time window. + +**Automated** (yes/no) + +**Setup**: + +**Pipeline configuration file**: + +```yaml +version: "2.2" +pipelines: + - id: generator-to-log + status: running + description: Postgres source, file destination + connectors: + - id: postgres-source + type: source + plugin: builtin:postgres + name: source + settings: + cdcMode: logrepl + snapshotMode: never + table: employees + url: postgresql://meroxauser:meroxapass@localhost/meroxadb?sslmode=disable + - id: destination + type: destination + plugin: builtin:log + name: destination +``` + +**Steps**: + +1. Run Conduit with `--pipelines.error-recovery.backoff-factor 1 --pipelines.error-recovery.min-delay 10s --pipelines.error-recovery.max-retries 2 --pipelines.error-recovery.max-retries-window 25s` +2. Stop postgres database +3. Leave it stopped and notice pipeline goes to degraded on attempt 3 (after ~20 seconds) + +**Expected Result**: + +After 20 seconds the pipeline should be degraded. + +**Additional comments**: + +--- + +## Test Case 12: Recovery triggered during a specific max-retries-window, pipeline is resilient during a specific time window + +**Priority** (low/medium/high): + +**Description**: + +**Automated** (yes/no) + +**Setup**: + +**Pipeline configuration file**: + +```yaml +version: "2.2" +pipelines: + - id: generator-to-log + status: running + description: Postgres source, file destination + connectors: + - id: postgres-source + type: source + plugin: builtin:postgres + name: source + settings: + cdcMode: logrepl + snapshotMode: never + table: employees + url: postgresql://meroxauser:meroxapass@localhost/meroxadb?sslmode=disable + - id: destination + type: destination + plugin: builtin:log + name: destination +``` + +**Steps**: + +1. Run Conduit with `--pipelines.error-recovery.backoff-factor 1 --pipelines.error-recovery.min-delay 10s --pipelines.error-recovery.max-retries 2 --pipelines.error-recovery.max-retries-window 25s` +2. Stop postgres database +3. Leave it stopped until backoff attempts are 2 +4. Start postgres database again +5. Leave it running for another 15 seconds +6. Notice backoff attempts are going back to 1 + (repeat if needed to see how backoff attempts are increasing and decreasing) + +**Expected Result**: + +Pipeline should be able to recover. + +**Additional comments**: + +--- diff --git a/pkg/conduit/config.go b/pkg/conduit/config.go index e96b0738f..6d15cffd0 100644 --- a/pkg/conduit/config.go +++ b/pkg/conduit/config.go @@ -96,8 +96,8 @@ type Config struct { BackoffFactor int // MaxRetries is the maximum number of restarts before the pipeline is considered unhealthy: Default: -1 (infinite) MaxRetries int64 - // HealthyAfter is the time after which the pipeline is considered healthy: Default: 5 minutes - HealthyAfter time.Duration + // MaxRetriesWindow is the duration window in which the max retries are counted: Default: 5 minutes + MaxRetriesWindow time.Duration } } @@ -144,7 +144,7 @@ func DefaultConfig() Config { cfg.Pipelines.ErrorRecovery.MaxDelay = 10 * time.Minute cfg.Pipelines.ErrorRecovery.BackoffFactor = 2 cfg.Pipelines.ErrorRecovery.MaxRetries = lifecycle.InfiniteRetriesErrRecovery - cfg.Pipelines.ErrorRecovery.HealthyAfter = 5 * time.Minute + cfg.Pipelines.ErrorRecovery.MaxRetriesWindow = 5 * time.Minute cfg.SchemaRegistry.Type = SchemaRegistryTypeBuiltin @@ -215,7 +215,7 @@ func (c Config) validateErrorRecovery() error { if errRecoveryCfg.MaxRetries < lifecycle.InfiniteRetriesErrRecovery { errs = append(errs, cerrors.Errorf(`invalid "max-retries" value. It must be %d for infinite retries or >= 0`, lifecycle.InfiniteRetriesErrRecovery)) } - if err := requirePositiveValue("healthy-after", errRecoveryCfg.HealthyAfter); err != nil { + if err := requirePositiveValue("max-retries-window", errRecoveryCfg.MaxRetriesWindow); err != nil { errs = append(errs, err) } diff --git a/pkg/conduit/config_test.go b/pkg/conduit/config_test.go index 046551e46..b1026a26b 100644 --- a/pkg/conduit/config_test.go +++ b/pkg/conduit/config_test.go @@ -213,12 +213,12 @@ func TestConfig_Validate(t *testing.T) { want: nil, }, { - name: "error recovery: negative healthy-after", + name: "error recovery: negative max-retries-window", setupConfig: func(c Config) Config { - c.Pipelines.ErrorRecovery.HealthyAfter = -time.Second + c.Pipelines.ErrorRecovery.MaxRetriesWindow = -time.Second return c }, - want: cerrors.New(`invalid error recovery config: "healthy-after" config value must be positive (got: -1s)`), + want: cerrors.New(`invalid error recovery config: "max-retries-window" config value must be positive (got: -1s)`), }, } diff --git a/pkg/conduit/entrypoint.go b/pkg/conduit/entrypoint.go index 1cd74aceb..e175b3374 100644 --- a/pkg/conduit/entrypoint.go +++ b/pkg/conduit/entrypoint.go @@ -147,9 +147,9 @@ func (*Entrypoint) Flags(cfg *Config) *flag.FlagSet { "maximum number of retries", ) flags.DurationVar( - &cfg.Pipelines.ErrorRecovery.HealthyAfter, - "pipelines.error-recovery.healthy-after", - cfg.Pipelines.ErrorRecovery.HealthyAfter, + &cfg.Pipelines.ErrorRecovery.MaxRetriesWindow, + "pipelines.error-recovery.max-retries-window", + cfg.Pipelines.ErrorRecovery.MaxRetriesWindow, "amount of time running without any errors after which a pipeline is considered healthy", ) diff --git a/pkg/conduit/runtime.go b/pkg/conduit/runtime.go index 0de2276c9..f1fcc7de6 100644 --- a/pkg/conduit/runtime.go +++ b/pkg/conduit/runtime.go @@ -205,11 +205,11 @@ func createServices(r *Runtime) error { // Error recovery configuration errRecoveryCfg := &lifecycle.ErrRecoveryCfg{ - MinDelay: r.Config.Pipelines.ErrorRecovery.MinDelay, - MaxDelay: r.Config.Pipelines.ErrorRecovery.MaxDelay, - BackoffFactor: r.Config.Pipelines.ErrorRecovery.BackoffFactor, - MaxRetries: r.Config.Pipelines.ErrorRecovery.MaxRetries, - HealthyAfter: r.Config.Pipelines.ErrorRecovery.HealthyAfter, // TODO: possibly create a go routine to continuously check health and reset status when needed + MinDelay: r.Config.Pipelines.ErrorRecovery.MinDelay, + MaxDelay: r.Config.Pipelines.ErrorRecovery.MaxDelay, + BackoffFactor: r.Config.Pipelines.ErrorRecovery.BackoffFactor, + MaxRetries: r.Config.Pipelines.ErrorRecovery.MaxRetries, + MaxRetriesWindow: r.Config.Pipelines.ErrorRecovery.MaxRetriesWindow, } plService := pipeline.NewService(r.logger, r.DB) diff --git a/pkg/lifecycle/service.go b/pkg/lifecycle/service.go index 9e0ef8948..056fba7cf 100644 --- a/pkg/lifecycle/service.go +++ b/pkg/lifecycle/service.go @@ -50,11 +50,11 @@ type FailureEvent struct { type FailureHandler func(FailureEvent) type ErrRecoveryCfg struct { - MinDelay time.Duration - MaxDelay time.Duration - BackoffFactor int - MaxRetries int64 - HealthyAfter time.Duration + MinDelay time.Duration + MaxDelay time.Duration + BackoffFactor int + MaxRetries int64 + MaxRetriesWindow time.Duration } func (e *ErrRecoveryCfg) toBackoff() *backoff.Backoff { @@ -103,10 +103,11 @@ func NewService( } type runnablePipeline struct { - pipeline *pipeline.Instance - n []stream.Node - t *tomb.Tomb - backoffCfg backoff.Backoff + pipeline *pipeline.Instance + n []stream.Node + t *tomb.Tomb + backoff *backoff.Backoff + recoveryAttempts *atomic.Int64 } // ConnectorService can fetch and create a connector instance. @@ -179,22 +180,17 @@ func (s *Service) Start( s.logger.Debug(ctx).Str(log.PipelineIDField, pl.ID).Msg("starting pipeline") s.logger.Trace(ctx).Str(log.PipelineIDField, pl.ID).Msg("building nodes") - var backoffCfg *backoff.Backoff - - // We check if the pipeline was previously running and get the backoff configuration from it. - oldRp, ok := s.runningPipelines.Get(pipelineID) - if !ok { - // default backoff configuration - backoffCfg = s.errRecoveryCfg.toBackoff() - } else { - backoffCfg = &oldRp.backoffCfg - } - - rp, err := s.buildRunnablePipeline(ctx, pl, backoffCfg) + rp, err := s.buildRunnablePipeline(ctx, pl) if err != nil { return cerrors.Errorf("could not build nodes for pipeline %s: %w", pl.ID, err) } + // We check if the pipeline was previously running and get the backoff configuration from it. + if oldRp, ok := s.runningPipelines.Get(pipelineID); ok { + rp.backoff = oldRp.backoff + rp.recoveryAttempts = oldRp.recoveryAttempts + } + s.logger.Trace(ctx).Str(log.PipelineIDField, pl.ID).Msg("running nodes") if err := s.runPipeline(ctx, rp); err != nil { return cerrors.Errorf("failed to run pipeline %s: %w", pl.ID, err) @@ -210,34 +206,39 @@ func (s *Service) Start( // It'll check the number of times the pipeline has been restarted and the duration of the backoff. // When the pipeline has reached out the maximum number of retries, it'll return a fatal error. func (s *Service) StartWithBackoff(ctx context.Context, rp *runnablePipeline) error { - s.logger.Info(ctx).Str(log.PipelineIDField, rp.pipeline.ID).Msg("restarting with backoff") - - attempt := int64(rp.backoffCfg.Attempt()) - duration := rp.backoffCfg.Duration() + // Increment number of recovery attempts. + attempt := rp.recoveryAttempts.Add(1) - s.logger.Trace(ctx).Dur(log.DurationField, duration).Int64(log.AttemptField, attempt).Msg("backoff configuration") - - if s.errRecoveryCfg.MaxRetries != InfiniteRetriesErrRecovery && attempt >= s.errRecoveryCfg.MaxRetries { + if s.errRecoveryCfg.MaxRetries != InfiniteRetriesErrRecovery && attempt > s.errRecoveryCfg.MaxRetries { return cerrors.FatalError(cerrors.Errorf("failed to recover pipeline %s after %d attempts: %w", rp.pipeline.ID, attempt, pipeline.ErrPipelineCannotRecover)) } + duration := rp.backoff.ForAttempt(float64(attempt)) + s.logger.Info(ctx). + Str(log.PipelineIDField, rp.pipeline.ID). + Dur(log.DurationField, duration). + Int64(log.AttemptField, attempt). + Msg("restarting with backoff") + + time.AfterFunc(duration+s.errRecoveryCfg.MaxRetriesWindow, func() { + s.logger.Debug(ctx). + Str(log.PipelineIDField, rp.pipeline.ID). + Dur(log.DurationField, duration). + Int64(log.AttemptField, attempt). + Msg("decreasing recovery attempts") + rp.recoveryAttempts.Add(-1) // Decrement the number of attempts after delay. + }) + // This results in a default delay progression of 1s, 2s, 4s, 8s, 16s, [...], 10m, 10m,... balancing the need for recovery time and minimizing downtime. - timer := time.NewTimer(duration) select { case <-ctx.Done(): return ctx.Err() case <-time.After(duration): - <-timer.C } - // Get status of pipeline to check if it already recovered. - if rp.pipeline.GetStatus() == pipeline.StatusRunning { - s.logger.Debug(ctx). - Str(log.PipelineIDField, rp.pipeline.ID). - Int64(log.AttemptField, attempt). - Int("backoffRetry.count", s.errRecoveryCfg.BackoffFactor). - Int64("backoffRetry.duration", duration.Milliseconds()). - Msg("pipeline recovered") + // The user may have stopped or restarted the pipeline while we were waiting. + actualRp, ok := s.runningPipelines.Get(rp.pipeline.ID) + if !ok || actualRp != rp { return nil } @@ -427,7 +428,6 @@ func (s *Service) buildNodes(ctx context.Context, pl *pipeline.Instance) ([]stre func (s *Service) buildRunnablePipeline( ctx context.Context, pl *pipeline.Instance, - backoffCfg *backoff.Backoff, ) (*runnablePipeline, error) { nodes, err := s.buildNodes(ctx, pl) if err != nil { @@ -435,9 +435,10 @@ func (s *Service) buildRunnablePipeline( } return &runnablePipeline{ - pipeline: pl, - n: nodes, - backoffCfg: *backoffCfg, + pipeline: pl, + n: nodes, + backoff: s.errRecoveryCfg.toBackoff(), + recoveryAttempts: &atomic.Int64{}, }, nil } @@ -745,11 +746,6 @@ func (s *Service) runPipeline(ctx context.Context, rp *runnablePipeline) error { }) } - // TODO: When it's recovering, we should only update the status back to running once HealthyAfter has passed. - // now: - // running -> (error) -> recovering (restart) -> running - // future (with the HealthyAfter mechanism): - // running -> (error) -> recovering (restart) -> recovering (wait for HealthyAfter) -> running err := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusRunning, "") if err != nil { return err @@ -792,7 +788,7 @@ func (s *Service) runPipeline(ctx context.Context, rp *runnablePipeline) error { s.logger. Err(ctx, err). Str(log.PipelineIDField, rp.pipeline.ID). - Msg("pipeline recovery failed stopped") + Msg("pipeline recovery failed") if updateErr := s.pipelines.UpdateStatus(ctx, rp.pipeline.ID, pipeline.StatusDegraded, fmt.Sprintf("%+v", recoveryErr)); updateErr != nil { return updateErr diff --git a/pkg/lifecycle/service_test.go b/pkg/lifecycle/service_test.go index 9e31f699a..16dcb60ab 100644 --- a/pkg/lifecycle/service_test.go +++ b/pkg/lifecycle/service_test.go @@ -85,11 +85,7 @@ func TestServiceLifecycle_buildRunnablePipeline(t *testing.T) { testPipelineService{}, ) - got, err := ls.buildRunnablePipeline( - ctx, - pl, - ls.errRecoveryCfg.toBackoff(), - ) + got, err := ls.buildRunnablePipeline(ctx, pl) is.NoErr(err) @@ -171,11 +167,7 @@ func TestService_buildRunnablePipeline_NoSourceNode(t *testing.T) { wantErr := "can't build pipeline without any source connectors" - got, err := ls.buildRunnablePipeline( - ctx, - pl, - ls.errRecoveryCfg.toBackoff(), - ) + got, err := ls.buildRunnablePipeline(ctx, pl) is.True(err != nil) is.Equal(err.Error(), wantErr) @@ -219,11 +211,7 @@ func TestService_buildRunnablePipeline_NoDestinationNode(t *testing.T) { } pl.SetStatus(pipeline.StatusUserStopped) - got, err := ls.buildRunnablePipeline( - ctx, - pl, - ls.errRecoveryCfg.toBackoff(), - ) + got, err := ls.buildRunnablePipeline(ctx, pl) is.True(err != nil) is.Equal(err.Error(), wantErr) @@ -929,11 +917,11 @@ func dummyDestination(persister *connector.Persister) *connector.Instance { func testErrRecoveryCfg() *ErrRecoveryCfg { return &ErrRecoveryCfg{ - MinDelay: time.Second, - MaxDelay: 10 * time.Minute, - BackoffFactor: 2, - MaxRetries: InfiniteRetriesErrRecovery, - HealthyAfter: 5 * time.Minute, + MinDelay: time.Second, + MaxDelay: 10 * time.Minute, + BackoffFactor: 2, + MaxRetries: InfiniteRetriesErrRecovery, + MaxRetriesWindow: 5 * time.Minute, } } diff --git a/pkg/orchestrator/orchestrator_test.go b/pkg/orchestrator/orchestrator_test.go index 373689232..727ab58b9 100644 --- a/pkg/orchestrator/orchestrator_test.go +++ b/pkg/orchestrator/orchestrator_test.go @@ -97,11 +97,11 @@ func TestPipelineSimple(t *testing.T) { pipelineService := pipeline.NewService(logger, db) errRecoveryCfg := &lifecycle.ErrRecoveryCfg{ - MinDelay: time.Second, - MaxDelay: 10 * time.Minute, - BackoffFactor: 2, - MaxRetries: 0, - HealthyAfter: 5 * time.Minute, + MinDelay: time.Second, + MaxDelay: 10 * time.Minute, + BackoffFactor: 2, + MaxRetries: 0, + MaxRetriesWindow: 5 * time.Minute, } lifecycleService := lifecycle.NewService(logger, errRecoveryCfg, connectorService, processorService, connPluginService, pipelineService) diff --git a/pkg/provisioning/service_test.go b/pkg/provisioning/service_test.go index 4423848c9..b8536bd66 100644 --- a/pkg/provisioning/service_test.go +++ b/pkg/provisioning/service_test.go @@ -520,11 +520,11 @@ func TestService_IntegrationTestServices(t *testing.T) { procService := processor.NewService(logger, db, procPluginService) errRecoveryCfg := &lifecycle.ErrRecoveryCfg{ - MinDelay: time.Second, - MaxDelay: 10 * time.Minute, - BackoffFactor: 2, - MaxRetries: 0, - HealthyAfter: 5 * time.Minute, + MinDelay: time.Second, + MaxDelay: 10 * time.Minute, + BackoffFactor: 2, + MaxRetries: 0, + MaxRetriesWindow: 5 * time.Minute, } lifecycleService := lifecycle.NewService(logger, errRecoveryCfg, connService, procService, connPluginService, plService)