Skip to content

Commit

Permalink
Merge pull request #17 from xataio/add-backoff-internal-lib
Browse files Browse the repository at this point in the history
Add backoff lib
  • Loading branch information
eminano authored May 22, 2024
2 parents 832789f + 68c65a1 commit e79490a
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 21 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/xataio/pgstream
go 1.22.2

require (
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/cenkalti/backoff/v4 v4.1.2
github.com/elastic/go-elasticsearch/v8 v8.13.1
github.com/golang-migrate/migrate/v4 v4.17.1
github.com/jackc/pglogrepl v0.0.0-20240307033717-828fbfe908e9
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ github.com/MarvinJWendt/testza v0.5.2/go.mod h1:xu53QFE5sCdjtMCKk8YMQ2MnymimEctc
github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow=
github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM=
github.com/atomicgo/cursor v0.0.1/go.mod h1:cBON2QmmrysudxNBFthvMtN32r3jxVRIvzkUiF/RuIk=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo=
github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/containerd/console v1.0.3 h1:lIr7SlA5PxZyMV30bDW0MGbiOPXwc63yRuCP0ARubLw=
github.com/containerd/console v1.0.3/go.mod h1:7LqA/THxQ86k76b8c/EMSiaJ3h1eZkMkXar0TQ1gf3U=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
Expand Down
62 changes: 62 additions & 0 deletions internal/backoff/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// SPDX-License-Identifier: Apache-2.0

package backoff

import (
"context"
"errors"
"time"

"github.com/cenkalti/backoff/v4"
)

type Backoff interface {
RetryNotify(Operation, Notify) error
}

type (
Operation func() error
Notify func(error, time.Duration)
)

type Config struct {
InitialInterval time.Duration
MaxInterval time.Duration
MaxRetries uint
}

// ExponentialBackoff is a wrapper around the cenkalti exponential backoff
type ExponentialBackoff struct {
backoff.BackOff
}

var ErrPermanent = errors.New("permanent error, do not retry")

type Provider func(ctx context.Context) Backoff

func NewExponentialBackoff(ctx context.Context, cfg *Config) *ExponentialBackoff {
exp := backoff.NewExponentialBackOff()
exp.InitialInterval = cfg.InitialInterval
exp.MaxElapsedTime = cfg.MaxInterval

var bo backoff.BackOff = exp
if cfg.MaxRetries > 0 {
bo = backoff.WithMaxRetries(bo, uint64(cfg.MaxRetries))
}
bo = backoff.WithContext(bo, ctx)

return &ExponentialBackoff{
BackOff: bo,
}
}

func (ebo *ExponentialBackoff) RetryNotify(op Operation, notify Notify) error {
boOp := func() error {
err := op()
if errors.Is(err, ErrPermanent) {
return backoff.Permanent(err)
}
return err
}
return backoff.RetryNotify(boOp, ebo, backoff.Notify(notify))
}
13 changes: 13 additions & 0 deletions internal/backoff/mocks/mock_backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// SPDX-License-Identifier: Apache-2.0

package mocks

import "github.com/xataio/pgstream/internal/backoff"

type Backoff struct {
RetryNotifyFn func(backoff.Operation, backoff.Notify) error
}

func (m *Backoff) RetryNotify(op backoff.Operation, not backoff.Notify) error {
return m.RetryNotifyFn(op, not)
}
30 changes: 16 additions & 14 deletions pkg/wal/listener/kafka/wal_kafka_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"runtime/debug"
"time"

"github.com/cenkalti/backoff"
"github.com/xataio/pgstream/internal/backoff"
"github.com/xataio/pgstream/internal/kafka"
"github.com/xataio/pgstream/pkg/wal"

Expand All @@ -18,8 +18,6 @@ import (
)

const (
backoffMaxElapsedTime = 2 * time.Minute

// if we go over this limit the log will likely be truncated and it will not
// be very readable
logMaxBytes = 10000
Expand All @@ -31,7 +29,12 @@ type Reader struct {
// processRecord is called for a new record.
processRecord payloadProcessor

backoffMaxElapsedTime time.Duration
backoffProvider backoff.Provider
}

type ReaderConfig struct {
Kafka kafka.ReaderConfig
CommitBackoff backoff.Config
}

type kafkaReader interface {
Expand All @@ -42,18 +45,20 @@ type kafkaReader interface {

type payloadProcessor func(context.Context, []byte, wal.CommitPosition) error

func NewReader(config kafka.ReaderConfig,
func NewReader(config ReaderConfig,
processRecord payloadProcessor,
) (*Reader, error) {
reader, err := kafka.NewReader(config)
reader, err := kafka.NewReader(config.Kafka)
if err != nil {
return nil, err
}

return &Reader{
reader: reader,
processRecord: processRecord,
backoffMaxElapsedTime: backoffMaxElapsedTime,
reader: reader,
processRecord: processRecord,
backoffProvider: func(ctx context.Context) backoff.Backoff {
return backoff.NewExponentialBackoff(ctx, &config.CommitBackoff)
},
}, nil
}

Expand Down Expand Up @@ -124,14 +129,11 @@ func (r *Reader) checkpoint(ctx context.Context, positions []wal.CommitPosition)
}

func (r *Reader) commitMessagesWithRetry(ctx context.Context, msgs []*kafka.Message) error {
backoffCfg := backoff.NewExponentialBackOff()
backoffCfg.MaxElapsedTime = r.backoffMaxElapsedTime

return backoff.RetryNotify(
bo := r.backoffProvider(ctx)
return bo.RetryNotify(
func() error {
return r.reader.CommitMessages(ctx, msgs...)
},
backoffCfg,
func(err error, d time.Duration) {
log.Ctx(ctx).Warn().Err(err).Msgf("failed to commit messages. Retrying in %v", d)
})
Expand Down
29 changes: 25 additions & 4 deletions pkg/wal/listener/kafka/wal_kafka_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"time"

"github.com/stretchr/testify/require"
"github.com/xataio/pgstream/internal/backoff"
backoffmocks "github.com/xataio/pgstream/internal/backoff/mocks"
"github.com/xataio/pgstream/internal/kafka"
kafkamocks "github.com/xataio/pgstream/internal/kafka/mocks"
"github.com/xataio/pgstream/pkg/wal"
Expand Down Expand Up @@ -164,8 +166,9 @@ func TestReader_checkpoint(t *testing.T) {
errTest := errors.New("oh noes")

tests := []struct {
name string
reader *kafkamocks.Reader
name string
reader *kafkamocks.Reader
backoffProvider backoff.Provider

wantErr error
}{
Expand All @@ -177,6 +180,13 @@ func TestReader_checkpoint(t *testing.T) {
return nil
},
},
backoffProvider: func(ctx context.Context) backoff.Backoff {
return &backoffmocks.Backoff{
RetryNotifyFn: func(o backoff.Operation, n backoff.Notify) error {
return o()
},
}
},

wantErr: nil,
},
Expand All @@ -187,6 +197,17 @@ func TestReader_checkpoint(t *testing.T) {
return errTest
},
},
backoffProvider: func(ctx context.Context) backoff.Backoff {
return &backoffmocks.Backoff{
RetryNotifyFn: func(o backoff.Operation, n backoff.Notify) error {
err := o()
if err != nil {
n(err, 50*time.Millisecond)
}
return err
},
}
},

wantErr: errTest,
},
Expand All @@ -198,8 +219,8 @@ func TestReader_checkpoint(t *testing.T) {
t.Parallel()

r := Reader{
reader: tc.reader,
backoffMaxElapsedTime: 5 * time.Millisecond,
reader: tc.reader,
backoffProvider: tc.backoffProvider,
}

err := r.checkpoint(context.Background(), testPositions)
Expand Down

0 comments on commit e79490a

Please sign in to comment.