diff --git a/go.mod b/go.mod index 2ea26c6..2707a8c 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/xataio/pgstream go 1.22.2 require ( - github.com/cenkalti/backoff v2.2.1+incompatible + github.com/cenkalti/backoff/v4 v4.1.2 github.com/elastic/go-elasticsearch/v8 v8.13.1 github.com/golang-migrate/migrate/v4 v4.17.1 github.com/jackc/pglogrepl v0.0.0-20240307033717-828fbfe908e9 diff --git a/go.sum b/go.sum index 7dfc6b2..8725254 100644 --- a/go.sum +++ b/go.sum @@ -20,8 +20,8 @@ github.com/MarvinJWendt/testza v0.5.2/go.mod h1:xu53QFE5sCdjtMCKk8YMQ2MnymimEctc github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow= github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM= github.com/atomicgo/cursor v0.0.1/go.mod h1:cBON2QmmrysudxNBFthvMtN32r3jxVRIvzkUiF/RuIk= -github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= -github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= +github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo= +github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/containerd/console v1.0.3 h1:lIr7SlA5PxZyMV30bDW0MGbiOPXwc63yRuCP0ARubLw= github.com/containerd/console v1.0.3/go.mod h1:7LqA/THxQ86k76b8c/EMSiaJ3h1eZkMkXar0TQ1gf3U= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= diff --git a/internal/backoff/backoff.go b/internal/backoff/backoff.go new file mode 100644 index 0000000..a7766b9 --- /dev/null +++ b/internal/backoff/backoff.go @@ -0,0 +1,62 @@ +// SPDX-License-Identifier: Apache-2.0 + +package backoff + +import ( + "context" + "errors" + "time" + + "github.com/cenkalti/backoff/v4" +) + +type Backoff interface { + RetryNotify(Operation, Notify) error +} + +type ( + Operation func() error + Notify func(error, time.Duration) +) + +type Config struct { + InitialInterval time.Duration + MaxInterval time.Duration + MaxRetries uint +} + +// ExponentialBackoff is a wrapper around the cenkalti exponential backoff +type ExponentialBackoff struct { + backoff.BackOff +} + +var ErrPermanent = errors.New("permanent error, do not retry") + +type Provider func(ctx context.Context) Backoff + +func NewExponentialBackoff(ctx context.Context, cfg *Config) *ExponentialBackoff { + exp := backoff.NewExponentialBackOff() + exp.InitialInterval = cfg.InitialInterval + exp.MaxElapsedTime = cfg.MaxInterval + + var bo backoff.BackOff = exp + if cfg.MaxRetries > 0 { + bo = backoff.WithMaxRetries(bo, uint64(cfg.MaxRetries)) + } + bo = backoff.WithContext(bo, ctx) + + return &ExponentialBackoff{ + BackOff: bo, + } +} + +func (ebo *ExponentialBackoff) RetryNotify(op Operation, notify Notify) error { + boOp := func() error { + err := op() + if errors.Is(err, ErrPermanent) { + return backoff.Permanent(err) + } + return err + } + return backoff.RetryNotify(boOp, ebo, backoff.Notify(notify)) +} diff --git a/internal/backoff/mocks/mock_backoff.go b/internal/backoff/mocks/mock_backoff.go new file mode 100644 index 0000000..1ca044f --- /dev/null +++ b/internal/backoff/mocks/mock_backoff.go @@ -0,0 +1,13 @@ +// SPDX-License-Identifier: Apache-2.0 + +package mocks + +import "github.com/xataio/pgstream/internal/backoff" + +type Backoff struct { + RetryNotifyFn func(backoff.Operation, backoff.Notify) error +} + +func (m *Backoff) RetryNotify(op backoff.Operation, not backoff.Notify) error { + return m.RetryNotifyFn(op, not) +} diff --git a/pkg/wal/listener/kafka/wal_kafka_reader.go b/pkg/wal/listener/kafka/wal_kafka_reader.go index e9cf855..0154600 100644 --- a/pkg/wal/listener/kafka/wal_kafka_reader.go +++ b/pkg/wal/listener/kafka/wal_kafka_reader.go @@ -9,7 +9,7 @@ import ( "runtime/debug" "time" - "github.com/cenkalti/backoff" + "github.com/xataio/pgstream/internal/backoff" "github.com/xataio/pgstream/internal/kafka" "github.com/xataio/pgstream/pkg/wal" @@ -18,8 +18,6 @@ import ( ) const ( - backoffMaxElapsedTime = 2 * time.Minute - // if we go over this limit the log will likely be truncated and it will not // be very readable logMaxBytes = 10000 @@ -31,7 +29,12 @@ type Reader struct { // processRecord is called for a new record. processRecord payloadProcessor - backoffMaxElapsedTime time.Duration + backoffProvider backoff.Provider +} + +type ReaderConfig struct { + Kafka kafka.ReaderConfig + CommitBackoff backoff.Config } type kafkaReader interface { @@ -42,18 +45,20 @@ type kafkaReader interface { type payloadProcessor func(context.Context, []byte, wal.CommitPosition) error -func NewReader(config kafka.ReaderConfig, +func NewReader(config ReaderConfig, processRecord payloadProcessor, ) (*Reader, error) { - reader, err := kafka.NewReader(config) + reader, err := kafka.NewReader(config.Kafka) if err != nil { return nil, err } return &Reader{ - reader: reader, - processRecord: processRecord, - backoffMaxElapsedTime: backoffMaxElapsedTime, + reader: reader, + processRecord: processRecord, + backoffProvider: func(ctx context.Context) backoff.Backoff { + return backoff.NewExponentialBackoff(ctx, &config.CommitBackoff) + }, }, nil } @@ -124,14 +129,11 @@ func (r *Reader) checkpoint(ctx context.Context, positions []wal.CommitPosition) } func (r *Reader) commitMessagesWithRetry(ctx context.Context, msgs []*kafka.Message) error { - backoffCfg := backoff.NewExponentialBackOff() - backoffCfg.MaxElapsedTime = r.backoffMaxElapsedTime - - return backoff.RetryNotify( + bo := r.backoffProvider(ctx) + return bo.RetryNotify( func() error { return r.reader.CommitMessages(ctx, msgs...) }, - backoffCfg, func(err error, d time.Duration) { log.Ctx(ctx).Warn().Err(err).Msgf("failed to commit messages. Retrying in %v", d) }) diff --git a/pkg/wal/listener/kafka/wal_kafka_reader_test.go b/pkg/wal/listener/kafka/wal_kafka_reader_test.go index cea0c55..d227fc9 100644 --- a/pkg/wal/listener/kafka/wal_kafka_reader_test.go +++ b/pkg/wal/listener/kafka/wal_kafka_reader_test.go @@ -11,6 +11,8 @@ import ( "time" "github.com/stretchr/testify/require" + "github.com/xataio/pgstream/internal/backoff" + backoffmocks "github.com/xataio/pgstream/internal/backoff/mocks" "github.com/xataio/pgstream/internal/kafka" kafkamocks "github.com/xataio/pgstream/internal/kafka/mocks" "github.com/xataio/pgstream/pkg/wal" @@ -164,8 +166,9 @@ func TestReader_checkpoint(t *testing.T) { errTest := errors.New("oh noes") tests := []struct { - name string - reader *kafkamocks.Reader + name string + reader *kafkamocks.Reader + backoffProvider backoff.Provider wantErr error }{ @@ -177,6 +180,13 @@ func TestReader_checkpoint(t *testing.T) { return nil }, }, + backoffProvider: func(ctx context.Context) backoff.Backoff { + return &backoffmocks.Backoff{ + RetryNotifyFn: func(o backoff.Operation, n backoff.Notify) error { + return o() + }, + } + }, wantErr: nil, }, @@ -187,6 +197,17 @@ func TestReader_checkpoint(t *testing.T) { return errTest }, }, + backoffProvider: func(ctx context.Context) backoff.Backoff { + return &backoffmocks.Backoff{ + RetryNotifyFn: func(o backoff.Operation, n backoff.Notify) error { + err := o() + if err != nil { + n(err, 50*time.Millisecond) + } + return err + }, + } + }, wantErr: errTest, }, @@ -198,8 +219,8 @@ func TestReader_checkpoint(t *testing.T) { t.Parallel() r := Reader{ - reader: tc.reader, - backoffMaxElapsedTime: 5 * time.Millisecond, + reader: tc.reader, + backoffProvider: tc.backoffProvider, } err := r.checkpoint(context.Background(), testPositions)