Skip to content

Commit

Permalink
Retry storage operation on 409 conflict (flyteorg#325)
Browse files Browse the repository at this point in the history
* add retry wip

Signed-off-by: Sonja Ericsson <[email protected]>

* fix

Signed-off-by: Sonja Ericsson <[email protected]>

* fix

Signed-off-by: Sonja Ericsson <[email protected]>

* fix

Signed-off-by: Sonja Ericsson <[email protected]>

* add tests

Signed-off-by: Sonja Ericsson <[email protected]>

* fix

Signed-off-by: Sonja Ericsson <[email protected]>

* fix

Signed-off-by: Sonja Ericsson <[email protected]>

* fix

Signed-off-by: Sonja Ericsson <[email protected]>

* fix

Signed-off-by: Sonja Ericsson <[email protected]>

* fix

Signed-off-by: Sonja Ericsson <[email protected]>

* fix lint

Signed-off-by: Sonja Ericsson <[email protected]>
  • Loading branch information
sonjaer authored Jan 20, 2022
1 parent 185e449 commit 6fa5b48
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 4 deletions.
13 changes: 12 additions & 1 deletion pkg/async/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,27 @@ import (
// RetryDelay indicates how long to wait between restarting a subscriber connection in the case of network failures.
var RetryDelay = 30 * time.Second

func Retry(attempts int, delay time.Duration, f func() error) error {
func RetryOnSpecificErrors(attempts int, delay time.Duration, f func() error, IsErrorRetryable func(error) bool) error {
var err error
for attempt := 0; attempt <= attempts; attempt++ {
err = f()
if err == nil {
return nil
}
if !IsErrorRetryable(err) {
return err
}
logger.Warningf(context.Background(),
"Failed [%v] on attempt %d of %d", err, attempt, attempts)
time.Sleep(delay)
}
return err
}

func retryOnAllErrors(err error) bool {
return true
}

func Retry(attempts int, delay time.Duration, f func() error) error {
return RetryOnSpecificErrors(attempts, delay, f, retryOnAllErrors)
}
15 changes: 15 additions & 0 deletions pkg/async/shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,18 @@ func TestRetry_RetriesExhausted(t *testing.T) {
})
assert.EqualError(t, err, "foo")
}

func TestRetryOnlyOnRetryableExceptions(t *testing.T) {
attemptsRecorded := 0
err := RetryOnSpecificErrors(3, time.Millisecond, func() error {
attemptsRecorded++
if attemptsRecorded <= 1 {
return errors.New("foo")
}
return errors.New("not-foo")
}, func(err error) bool {
return err.Error() == "foo"
})
assert.EqualValues(t, attemptsRecorded, 2)
assert.EqualError(t, err, "not-foo")
}
27 changes: 24 additions & 3 deletions pkg/common/data_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,23 @@ package common

import (
"context"
"time"

"github.com/flyteorg/flyteadmin/pkg/async"
"github.com/flyteorg/flyteadmin/pkg/errors"
"github.com/flyteorg/flyteadmin/pkg/manager/impl/shared"
"google.golang.org/grpc/codes"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flytestdlib/storage"
errrs "github.com/pkg/errors"
"google.golang.org/api/googleapi"
"google.golang.org/grpc/codes"
)

func OffloadLiteralMap(ctx context.Context, storageClient *storage.DataStore, literalMap *core.LiteralMap, nestedKeys ...string) (storage.DataReference, error) {
return OffloadLiteralMapWithRetryDelayAndAttempts(ctx, storageClient, literalMap, async.RetryDelay, 5, nestedKeys...)
}

func OffloadLiteralMapWithRetryDelayAndAttempts(ctx context.Context, storageClient *storage.DataStore, literalMap *core.LiteralMap, retryDelay time.Duration, attempts int, nestedKeys ...string) (storage.DataReference, error) {
if literalMap == nil {
literalMap = &core.LiteralMap{}
}
Expand All @@ -23,8 +30,22 @@ func OffloadLiteralMap(ctx context.Context, storageClient *storage.DataStore, li
if err != nil {
return "", errors.NewFlyteAdminErrorf(codes.Internal, "Failed to construct data reference for [%+v] with err: %v", nestedKeys, err)
}
if err := storageClient.WriteProtobuf(ctx, uri, storage.Options{}, literalMap); err != nil {

err = async.RetryOnSpecificErrors(attempts, retryDelay, func() error {
err = storageClient.WriteProtobuf(ctx, uri, storage.Options{}, literalMap)
return err
}, isRetryableError)

if err != nil {
return "", errors.NewFlyteAdminErrorf(codes.Internal, "Failed to write protobuf for [%+v] with err: %v", nestedKeys, err)
}

return uri, nil
}

func isRetryableError(err error) bool {
if e, ok := errrs.Cause(err).(*googleapi.Error); ok && e.Code == 409 {
return true
}
return false
}
17 changes: 17 additions & 0 deletions pkg/common/data_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package common
import (
"context"
"testing"
"time"

"github.com/flyteorg/flyteadmin/pkg/errors"
errs "github.com/pkg/errors"
"google.golang.org/api/googleapi"
"google.golang.org/grpc/codes"

commonMocks "github.com/flyteorg/flyteadmin/pkg/common/mocks"
Expand Down Expand Up @@ -63,3 +66,17 @@ func TestOffloadLiteralMap_StorageFailure(t *testing.T) {
_, err := OffloadLiteralMap(context.TODO(), mockStorage, literalMap, "nested", "key")
assert.Equal(t, err.(errors.FlyteAdminError).Code(), codes.Internal)
}

func TestOffloadLiteralMap_RetryOn409(t *testing.T) {
mockStorage := commonMocks.GetMockStorageClient()
retries := 0
mockStorage.ComposedProtobufStore.(*commonMocks.TestDataStore).WriteProtobufCb = func(ctx context.Context, reference storage.DataReference, opts storage.Options, msg proto.Message) error {
retries++
assert.Equal(t, reference.String(), "s3://bucket/metadata/nested/key")
return errs.Wrapf(&googleapi.Error{Code: 409}, "Failed to write data [%vb] to path [%v].", 10, "size")
}
expectedRetries := 2
_, err := OffloadLiteralMapWithRetryDelayAndAttempts(context.TODO(), mockStorage, literalMap, time.Millisecond, expectedRetries, "nested", "key")
assert.EqualValues(t, retries, expectedRetries+1)
assert.Equal(t, err.(errors.FlyteAdminError).Code(), codes.Internal)
}

0 comments on commit 6fa5b48

Please sign in to comment.