Skip to content

Commit

Permalink
Merge branch 'main' into bundle-doc-minor-updates
Browse files Browse the repository at this point in the history
  • Loading branch information
azdagron authored Oct 5, 2024
2 parents e829df2 + 84d40aa commit 6383104
Show file tree
Hide file tree
Showing 9 changed files with 178 additions and 69 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## [2.4.0] - 2024-10-05

### Added

- Support for using a custom backoff strategy in the Workload API client (#302)
- Support for a default JWT-SVID picker (#301)

## [2.3.0] - 2024-06-17

### Changed
Expand Down
12 changes: 6 additions & 6 deletions v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/go-jose/go-jose/v4 v4.0.4
github.com/stretchr/testify v1.9.0
github.com/zeebo/errs v1.3.0
google.golang.org/grpc v1.64.0
google.golang.org/grpc v1.67.1
google.golang.org/grpc/examples v0.0.0-20230224211313-3775f633ce20
google.golang.org/protobuf v1.34.2
)
Expand All @@ -16,11 +16,11 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/kr/pretty v0.1.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/crypto v0.25.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect
golang.org/x/crypto v0.26.0 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/sys v0.24.0 // indirect
golang.org/x/text v0.17.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
24 changes: 12 additions & 12 deletions v2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/zeebo/errs v1.3.0 h1:hmiaKqgYZzcVgRL1Vkc1Mn2914BbzB0IBxs+ebeutGs=
github.com/zeebo/errs v1.3.0/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4=
golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30=
golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M=
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 h1:NnYq6UN9ReLM9/Y01KWNOWyI5xQ9kbIms5GGJVwS/Yc=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY=
google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg=
golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw=
golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54=
golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE=
golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg=
golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg=
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc=
golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 h1:e7S5W7MGGLaSu8j3YjdezkZ+m1/Nm0uRVRMEMGk26Xs=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E=
google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA=
google.golang.org/grpc/examples v0.0.0-20230224211313-3775f633ce20 h1:MLBCGN1O7GzIx+cBiwfYPwtmZ41U3Mn/cotLJciaArI=
google.golang.org/grpc/examples v0.0.0-20230224211313-3775f633ce20/go.mod h1:Nr5H8+MlGWr5+xX/STzdoEqJrO+YteqFbMyCsrb6mH0=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
Expand Down
47 changes: 34 additions & 13 deletions v2/workloadapi/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,51 @@ import (
"time"
)

// backoff defines an linear backoff policy.
type backoff struct {
InitialDelay time.Duration
MaxDelay time.Duration
// BackoffStrategy provides backoff facilities.
type BackoffStrategy interface {
// NewBackoff returns a new backoff for the strategy. The returned
// Backoff is in the same state that it would be in after a call to
// Reset().
NewBackoff() Backoff
}

// Backoff provides backoff for a workload API operation.
type Backoff interface {
// Next returns the next backoff period.
Next() time.Duration

// Reset() resets the backoff.
Reset()
}

type defaultBackoffStrategy struct{}

func (defaultBackoffStrategy) NewBackoff() Backoff {
return newLinearBackoff()
}

// linearBackoff defines an linear backoff policy.
type linearBackoff struct {
initialDelay time.Duration
maxDelay time.Duration
n int
}

func newBackoff() *backoff {
return &backoff{
InitialDelay: time.Second,
MaxDelay: 30 * time.Second,
func newLinearBackoff() *linearBackoff {
return &linearBackoff{
initialDelay: time.Second,
maxDelay: 30 * time.Second,
n: 0,
}
}

// Duration returns the next wait period for the backoff. Not goroutine-safe.
func (b *backoff) Duration() time.Duration {
func (b *linearBackoff) Next() time.Duration {
backoff := float64(b.n) + 1
d := math.Min(b.InitialDelay.Seconds()*backoff, b.MaxDelay.Seconds())
d := math.Min(b.initialDelay.Seconds()*backoff, b.maxDelay.Seconds())
b.n++
return time.Duration(d) * time.Second
}

// Reset resets the backoff's state.
func (b *backoff) Reset() {
func (b *linearBackoff) Reset() {
b.n = 0
}
23 changes: 8 additions & 15 deletions v2/workloadapi/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,27 @@ import (
"github.com/stretchr/testify/require"
)

func TestBackoff(t *testing.T) {
new := func() *backoff { //nolint:all
b := newBackoff()
b.InitialDelay = time.Second
b.MaxDelay = 30 * time.Second
return b
}

testUntilMax := func(t *testing.T, b *backoff) {
func TestLinearBackoff(t *testing.T) {
testUntilMax := func(t *testing.T, b *linearBackoff) {
for i := 1; i < 30; i++ {
require.Equal(t, time.Duration(i)*time.Second, b.Duration())
require.Equal(t, time.Duration(i)*time.Second, b.Next())
}
require.Equal(t, 30*time.Second, b.Duration())
require.Equal(t, 30*time.Second, b.Duration())
require.Equal(t, 30*time.Second, b.Duration())
require.Equal(t, 30*time.Second, b.Next())
require.Equal(t, 30*time.Second, b.Next())
require.Equal(t, 30*time.Second, b.Next())
}

t.Run("test max", func(t *testing.T) {
t.Parallel()

b := new()
b := newLinearBackoff()
testUntilMax(t, b)
})

t.Run("test reset", func(t *testing.T) {
t.Parallel()

b := new()
b := newLinearBackoff()
testUntilMax(t, b)

b.Reset()
Expand Down
19 changes: 10 additions & 9 deletions v2/workloadapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (c *Client) FetchX509Bundles(ctx context.Context) (*x509bundle.Set, error)
// WatchX509Bundles watches for changes to the X.509 bundles. The watcher receives
// the updated X.509 bundles.
func (c *Client) WatchX509Bundles(ctx context.Context, watcher X509BundleWatcher) error {
backoff := newBackoff()
backoff := c.config.backoffStrategy.NewBackoff()
for {
err := c.watchX509Bundles(ctx, watcher, backoff)
watcher.OnX509BundlesWatchError(err)
Expand Down Expand Up @@ -152,7 +152,7 @@ func (c *Client) FetchX509Context(ctx context.Context) (*X509Context, error) {
// WatchX509Context watches for updates to the X.509 context. The watcher
// receives the updated X.509 context.
func (c *Client) WatchX509Context(ctx context.Context, watcher X509ContextWatcher) error {
backoff := newBackoff()
backoff := c.config.backoffStrategy.NewBackoff()
for {
err := c.watchX509Context(ctx, watcher, backoff)
watcher.OnX509ContextWatchError(err)
Expand Down Expand Up @@ -224,7 +224,7 @@ func (c *Client) FetchJWTBundles(ctx context.Context) (*jwtbundle.Set, error) {
// WatchJWTBundles watches for changes to the JWT bundles. The watcher receives
// the updated JWT bundles.
func (c *Client) WatchJWTBundles(ctx context.Context, watcher JWTBundleWatcher) error {
backoff := newBackoff()
backoff := c.config.backoffStrategy.NewBackoff()
for {
err := c.watchJWTBundles(ctx, watcher, backoff)
watcher.OnJWTBundlesWatchError(err)
Expand Down Expand Up @@ -258,7 +258,7 @@ func (c *Client) newConn(ctx context.Context) (*grpc.ClientConn, error) {
return grpc.DialContext(ctx, c.config.address, c.config.dialOptions...) //nolint:staticcheck // preserve backcompat with WithDialOptions option
}

func (c *Client) handleWatchError(ctx context.Context, err error, backoff *backoff) error {
func (c *Client) handleWatchError(ctx context.Context, err error, backoff Backoff) error {
code := status.Code(err)
if code == codes.Canceled {
return err
Expand All @@ -270,7 +270,7 @@ func (c *Client) handleWatchError(ctx context.Context, err error, backoff *backo
}

c.config.log.Errorf("Failed to watch the Workload API: %v", err)
retryAfter := backoff.Duration()
retryAfter := backoff.Next()
c.config.log.Debugf("Retrying watch in %s", retryAfter)
select {
case <-time.After(retryAfter):
Expand All @@ -281,7 +281,7 @@ func (c *Client) handleWatchError(ctx context.Context, err error, backoff *backo
}
}

func (c *Client) watchX509Context(ctx context.Context, watcher X509ContextWatcher, backoff *backoff) error {
func (c *Client) watchX509Context(ctx context.Context, watcher X509ContextWatcher, backoff Backoff) error {
ctx, cancel := context.WithCancel(withHeader(ctx))
defer cancel()

Expand All @@ -308,7 +308,7 @@ func (c *Client) watchX509Context(ctx context.Context, watcher X509ContextWatche
}
}

func (c *Client) watchJWTBundles(ctx context.Context, watcher JWTBundleWatcher, backoff *backoff) error {
func (c *Client) watchJWTBundles(ctx context.Context, watcher JWTBundleWatcher, backoff Backoff) error {
ctx, cancel := context.WithCancel(withHeader(ctx))
defer cancel()

Expand All @@ -335,7 +335,7 @@ func (c *Client) watchJWTBundles(ctx context.Context, watcher JWTBundleWatcher,
}
}

func (c *Client) watchX509Bundles(ctx context.Context, watcher X509BundleWatcher, backoff *backoff) error {
func (c *Client) watchX509Bundles(ctx context.Context, watcher X509BundleWatcher, backoff Backoff) error {
ctx, cancel := context.WithCancel(withHeader(ctx))
defer cancel()

Expand Down Expand Up @@ -402,7 +402,8 @@ func withHeader(ctx context.Context) context.Context {

func defaultClientConfig() clientConfig {
return clientConfig{
log: logger.Null,
log: logger.Null,
backoffStrategy: defaultBackoffStrategy{},
}
}

Expand Down
48 changes: 45 additions & 3 deletions v2/workloadapi/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/x509"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -103,7 +104,10 @@ func TestFetchX509Bundles(t *testing.T) {
func TestWatchX509Bundles(t *testing.T) {
wl := fakeworkloadapi.New(t)
defer wl.Stop()
c, err := New(context.Background(), WithAddr(wl.Addr()))

backoffStrategy := &testBackoffStrategy{}

c, err := New(context.Background(), WithAddr(wl.Addr()), WithBackoffStrategy(backoffStrategy))
require.NoError(t, err)
defer c.Close()

Expand Down Expand Up @@ -149,6 +153,9 @@ func TestWatchX509Bundles(t *testing.T) {
wl.Stop()
tw.WaitForUpdates(1)
assert.Len(t, tw.Errors(), 2)

// Assert that there was the expected number of backoffs.
assert.Equal(t, 2, backoffStrategy.BackedOff())
}

func TestFetchX509Context(t *testing.T) {
Expand Down Expand Up @@ -213,7 +220,10 @@ func TestWatchX509Context(t *testing.T) {
federatedCA := test.NewCA(t, federatedTD)
wl := fakeworkloadapi.New(t)
defer wl.Stop()
c, err := New(context.Background(), WithAddr(wl.Addr()))

backoffStrategy := &testBackoffStrategy{}

c, err := New(context.Background(), WithAddr(wl.Addr()), WithBackoffStrategy(backoffStrategy))
require.NoError(t, err)
defer c.Close()

Expand Down Expand Up @@ -291,6 +301,9 @@ func TestWatchX509Context(t *testing.T) {

cancel()
wg.Wait()

// Assert that there was the expected number of backoffs.
assert.Equal(t, 2, backoffStrategy.BackedOff())
}

func TestFetchJWTSVID(t *testing.T) {
Expand Down Expand Up @@ -375,7 +388,10 @@ func TestFetchJWTBundles(t *testing.T) {
func TestWatchJWTBundles(t *testing.T) {
wl := fakeworkloadapi.New(t)
defer wl.Stop()
c, err := New(context.Background(), WithAddr(wl.Addr()))

backoffStrategy := &testBackoffStrategy{}

c, err := New(context.Background(), WithAddr(wl.Addr()), WithBackoffStrategy(backoffStrategy))
require.NoError(t, err)
defer c.Close()

Expand Down Expand Up @@ -421,6 +437,9 @@ func TestWatchJWTBundles(t *testing.T) {
wl.Stop()
tw.WaitForUpdates(1)
assert.Len(t, tw.Errors(), 2)

// Assert that there was the expected number of backoffs.
assert.Equal(t, 2, backoffStrategy.BackedOff())
}

func TestValidateJWTSVID(t *testing.T) {
Expand Down Expand Up @@ -605,3 +624,26 @@ func (w *testWatcher) WaitForUpdates(expectedNumUpdates int) {
}
}
}

type testBackoffStrategy struct {
backedOff int32
}

func (s *testBackoffStrategy) NewBackoff() Backoff {
return testBackoff{backedOff: &s.backedOff}
}

func (s *testBackoffStrategy) BackedOff() int {
return int(atomic.LoadInt32(&s.backedOff))
}

type testBackoff struct {
backedOff *int32
}

func (b testBackoff) Next() time.Duration {
atomic.AddInt32(b.backedOff, 1)
return time.Millisecond * 200
}

func (b testBackoff) Reset() {}
Loading

0 comments on commit 6383104

Please sign in to comment.