Skip to content

Commit

Permalink
Track recovery attempts using leaky bucket (#1881)
Browse files Browse the repository at this point in the history
* use leaky buffer to track recovery attempts

* rename healthy-after

* rename config to max-retries-window

* document test cases

* fix md linting

* change log level

---------

Co-authored-by: Raúl Barroso <[email protected]>
  • Loading branch information
lovromazgon and raulb authored Oct 8, 2024
1 parent cab1ed5 commit 292193e
Show file tree
Hide file tree
Showing 9 changed files with 212 additions and 107 deletions.
149 changes: 135 additions & 14 deletions docs/test-cases/pipeline-recovery.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,24 @@ a retry.
```yaml
version: "2.2"
pipelines:
- id: file-pipeline
- id: chaos-to-log
status: running
name: file-pipeline
description: dlq write error
description: Postgres source, file destination
dead-letter-queue:
plugin: standalone:chaos
settings:
writeMode: error
connectors:
- id: chaos-src
- id: chaos
type: source
plugin: standalone:chaos
name: chaos-src
name: source
settings:
readMode: error
- id: log-dst
- id: destination
type: destination
plugin: builtin:log
log: file-dst
dead-letter-queue:
plugin: "builtin:postgres"
settings:
table: non_existing_table_so_that_dlq_fails
url: postgresql://meroxauser:meroxapass@localhost/meroxadb?sslmode=disable
window-size: 3
window-nack-threshold: 2
name: destination
```
**Steps**:
Expand All @@ -65,6 +61,28 @@ Recovery is not triggered when there is an error processing a record.
**Pipeline configuration file**:
```yaml
version: "2.2"
pipelines:
- id: generator-to-log
status: running
description: Postgres source, file destination
connectors:
- id: generator
type: source
plugin: builtin:generator
name: source
settings:
format.type: structured
format.options.id: int
format.options.name: string
operations: create
- id: destination
type: destination
plugin: builtin:log
name: destination
processors:
- id: error
plugin: "error"
```
**Steps**:
Expand Down Expand Up @@ -328,3 +346,106 @@ pipelines:
**Additional comments**:

---

## Test Case 11: Recovery triggered during a specific max-retries-window, after that pipeline is degraded

**Priority** (low/medium/high):

**Description**:

A pipeline will be allowed to fail during a specific time window, after that it will be degraded.
Combining `max-retries` and `max-retries-window` we can control how many times a pipeline can fail during a specific time window.

**Automated** (yes/no)

**Setup**:

**Pipeline configuration file**:

```yaml
version: "2.2"
pipelines:
- id: generator-to-log
status: running
description: Postgres source, file destination
connectors:
- id: postgres-source
type: source
plugin: builtin:postgres
name: source
settings:
cdcMode: logrepl
snapshotMode: never
table: employees
url: postgresql://meroxauser:meroxapass@localhost/meroxadb?sslmode=disable
- id: destination
type: destination
plugin: builtin:log
name: destination
```

**Steps**:

1. Run Conduit with `--pipelines.error-recovery.backoff-factor 1 --pipelines.error-recovery.min-delay 10s --pipelines.error-recovery.max-retries 2 --pipelines.error-recovery.max-retries-window 25s`
2. Stop postgres database
3. Leave it stopped and notice pipeline goes to degraded on attempt 3 (after ~20 seconds)

**Expected Result**:

After 20 seconds the pipeline should be degraded.

**Additional comments**:

---

## Test Case 12: Recovery triggered during a specific max-retries-window, pipeline is resilient during a specific time window

**Priority** (low/medium/high):

**Description**:

**Automated** (yes/no)

**Setup**:

**Pipeline configuration file**:

```yaml
version: "2.2"
pipelines:
- id: generator-to-log
status: running
description: Postgres source, file destination
connectors:
- id: postgres-source
type: source
plugin: builtin:postgres
name: source
settings:
cdcMode: logrepl
snapshotMode: never
table: employees
url: postgresql://meroxauser:meroxapass@localhost/meroxadb?sslmode=disable
- id: destination
type: destination
plugin: builtin:log
name: destination
```

**Steps**:

1. Run Conduit with `--pipelines.error-recovery.backoff-factor 1 --pipelines.error-recovery.min-delay 10s --pipelines.error-recovery.max-retries 2 --pipelines.error-recovery.max-retries-window 25s`
2. Stop postgres database
3. Leave it stopped until backoff attempts are 2
4. Start postgres database again
5. Leave it running for another 15 seconds
6. Notice backoff attempts are going back to 1
(repeat if needed to see how backoff attempts are increasing and decreasing)

**Expected Result**:

Pipeline should be able to recover.

**Additional comments**:

---
8 changes: 4 additions & 4 deletions pkg/conduit/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ type Config struct {
BackoffFactor int
// MaxRetries is the maximum number of restarts before the pipeline is considered unhealthy: Default: -1 (infinite)
MaxRetries int64
// HealthyAfter is the time after which the pipeline is considered healthy: Default: 5 minutes
HealthyAfter time.Duration
// MaxRetriesWindow is the duration window in which the max retries are counted: Default: 5 minutes
MaxRetriesWindow time.Duration
}
}

Expand Down Expand Up @@ -144,7 +144,7 @@ func DefaultConfig() Config {
cfg.Pipelines.ErrorRecovery.MaxDelay = 10 * time.Minute
cfg.Pipelines.ErrorRecovery.BackoffFactor = 2
cfg.Pipelines.ErrorRecovery.MaxRetries = lifecycle.InfiniteRetriesErrRecovery
cfg.Pipelines.ErrorRecovery.HealthyAfter = 5 * time.Minute
cfg.Pipelines.ErrorRecovery.MaxRetriesWindow = 5 * time.Minute

cfg.SchemaRegistry.Type = SchemaRegistryTypeBuiltin

Expand Down Expand Up @@ -215,7 +215,7 @@ func (c Config) validateErrorRecovery() error {
if errRecoveryCfg.MaxRetries < lifecycle.InfiniteRetriesErrRecovery {
errs = append(errs, cerrors.Errorf(`invalid "max-retries" value. It must be %d for infinite retries or >= 0`, lifecycle.InfiniteRetriesErrRecovery))
}
if err := requirePositiveValue("healthy-after", errRecoveryCfg.HealthyAfter); err != nil {
if err := requirePositiveValue("max-retries-window", errRecoveryCfg.MaxRetriesWindow); err != nil {
errs = append(errs, err)
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/conduit/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,12 +213,12 @@ func TestConfig_Validate(t *testing.T) {
want: nil,
},
{
name: "error recovery: negative healthy-after",
name: "error recovery: negative max-retries-window",
setupConfig: func(c Config) Config {
c.Pipelines.ErrorRecovery.HealthyAfter = -time.Second
c.Pipelines.ErrorRecovery.MaxRetriesWindow = -time.Second
return c
},
want: cerrors.New(`invalid error recovery config: "healthy-after" config value must be positive (got: -1s)`),
want: cerrors.New(`invalid error recovery config: "max-retries-window" config value must be positive (got: -1s)`),
},
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/conduit/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ func (*Entrypoint) Flags(cfg *Config) *flag.FlagSet {
"maximum number of retries",
)
flags.DurationVar(
&cfg.Pipelines.ErrorRecovery.HealthyAfter,
"pipelines.error-recovery.healthy-after",
cfg.Pipelines.ErrorRecovery.HealthyAfter,
&cfg.Pipelines.ErrorRecovery.MaxRetriesWindow,
"pipelines.error-recovery.max-retries-window",
cfg.Pipelines.ErrorRecovery.MaxRetriesWindow,
"amount of time running without any errors after which a pipeline is considered healthy",
)

Expand Down
10 changes: 5 additions & 5 deletions pkg/conduit/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,11 @@ func createServices(r *Runtime) error {

// Error recovery configuration
errRecoveryCfg := &lifecycle.ErrRecoveryCfg{
MinDelay: r.Config.Pipelines.ErrorRecovery.MinDelay,
MaxDelay: r.Config.Pipelines.ErrorRecovery.MaxDelay,
BackoffFactor: r.Config.Pipelines.ErrorRecovery.BackoffFactor,
MaxRetries: r.Config.Pipelines.ErrorRecovery.MaxRetries,
HealthyAfter: r.Config.Pipelines.ErrorRecovery.HealthyAfter, // TODO: possibly create a go routine to continuously check health and reset status when needed
MinDelay: r.Config.Pipelines.ErrorRecovery.MinDelay,
MaxDelay: r.Config.Pipelines.ErrorRecovery.MaxDelay,
BackoffFactor: r.Config.Pipelines.ErrorRecovery.BackoffFactor,
MaxRetries: r.Config.Pipelines.ErrorRecovery.MaxRetries,
MaxRetriesWindow: r.Config.Pipelines.ErrorRecovery.MaxRetriesWindow,
}

plService := pipeline.NewService(r.logger, r.DB)
Expand Down
Loading

0 comments on commit 292193e

Please sign in to comment.