diff --git a/flytestdlib/storage/stow_store.go b/flytestdlib/storage/stow_store.go index b484bc2588..0799d0eb77 100644 --- a/flytestdlib/storage/stow_store.go +++ b/flytestdlib/storage/stow_store.go @@ -53,6 +53,15 @@ var fQNFn = map[string]func(string) DataReference{ }, } +// Checks if the error is AWS S3 bucket not found error +func awsBucketIsNotFound(err error) bool { + if awsErr, errOk := errs.Cause(err).(awserr.Error); errOk { + return awsErr.Code() == s32.ErrCodeNoSuchBucket + } + + return false +} + // Checks if the error is AWS S3 bucket already exists error. func awsBucketAlreadyExists(err error) bool { if IsExists(err) { @@ -110,22 +119,30 @@ type StowStore struct { baseContainerFQN DataReference } -func (s *StowStore) LoadContainer(ctx context.Context, container string, createIfNotFound bool) (stow.Container, error) { - // TODO: As of stow v0.2.6 elides the container lookup when a bucket region is set, - // so we always just attempt to create it when createIfNotFound is true. - - if createIfNotFound { - logger.Infof(ctx, "Attempting to create container [%s]", container) - _, err := s.loc.CreateContainer(container) - if err != nil && !awsBucketAlreadyExists(err) && !IsExists(err) { - return nil, fmt.Errorf("unable to initialize container [%v]. Error: %v", container, err) - } +func (s *StowStore) CreateContainer(ctx context.Context, container string) (stow.Container, error) { + logger.Infof(ctx, "Attempting to create container [%s]", container) + c, err := s.loc.CreateContainer(container) + if err != nil && !awsBucketAlreadyExists(err) && !IsExists(err) { + return nil, fmt.Errorf("unable to initialize container [%v]. Error: %v", container, err) } + return c, nil +} +func (s *StowStore) LoadContainer(ctx context.Context, container string, createIfNotFound bool) (stow.Container, error) { c, err := s.loc.Container(container) if err != nil { - logger.Errorf(ctx, "Container [%s] lookup failed. Error %s", container, err) - return nil, err + // IsNotFound is not always guaranteed to be returned if the underlying container doesn't exist! + // As of stow v0.2.6, the call to get container elides the lookup when a bucket region is set for S3 containers. + if IsNotFound(err) && createIfNotFound { + c, err = s.CreateContainer(ctx, container) + if err != nil { + logger.Errorf(ctx, "Call to create container [%s] failed. Error %s", container, err) + return nil, err + } + } else { + logger.Errorf(ctx, "Container [%s] lookup failed. Error %s", container, err) + return nil, err + } } return c, nil } @@ -180,7 +197,7 @@ func (s *StowStore) Head(ctx context.Context, reference DataReference) (Metadata } } - if IsNotFound(err) { + if IsNotFound(err) || awsBucketIsNotFound(err) { return StowMetadata{exists: false}, nil } @@ -235,8 +252,17 @@ func (s *StowStore) WriteRaw(ctx context.Context, reference DataReference, size t := s.metrics.WriteLatency.Start(ctx) _, err = container.Put(k, raw, size, opts.Metadata) if err != nil { - incFailureCounterForError(ctx, s.metrics.WriteFailure, err) - return errs.Wrapf(err, "Failed to write data [%vb] to path [%v].", size, k) + // If this error is due to the bucket not existing, first attempt to create it and retry the getContainer call. + if IsNotFound(err) || awsBucketIsNotFound(err) { + container, err = s.CreateContainer(ctx, c) + if err == nil { + s.dynamicContainerMap.Store(container, c) + } + } + if err != nil { + incFailureCounterForError(ctx, s.metrics.WriteFailure, err) + return errs.Wrapf(err, "Failed to write data [%vb] to path [%v].", size, k) + } } t.Stop() diff --git a/flytestdlib/storage/stow_store_test.go b/flytestdlib/storage/stow_store_test.go index c3131eb759..d291b5cb1b 100644 --- a/flytestdlib/storage/stow_store_test.go +++ b/flytestdlib/storage/stow_store_test.go @@ -3,6 +3,7 @@ package storage import ( "bytes" "context" + errors2 "errors" "fmt" "io" "io/ioutil" @@ -12,6 +13,9 @@ import ( "testing" "time" + "github.com/aws/aws-sdk-go/aws/awserr" + s32 "github.com/aws/aws-sdk-go/service/s3" + "github.com/graymeta/stow/google" "github.com/graymeta/stow/local" "github.com/pkg/errors" @@ -43,6 +47,7 @@ func (m mockStowLoc) CreateContainer(name string) (stow.Container, error) { type mockStowContainer struct { id string items map[string]mockStowItem + putCB func(name string, r io.Reader, size int64, metadata map[string]interface{}) (stow.Item, error) } func (m mockStowContainer) ID() string { @@ -70,6 +75,9 @@ func (mockStowContainer) RemoveItem(id string) error { } func (m *mockStowContainer) Put(name string, r io.Reader, size int64, metadata map[string]interface{}) (stow.Item, error) { + if m.putCB != nil { + return m.putCB(name, r, size, metadata) + } item := mockStowItem{url: name, size: size} m.items[name] = item return item, nil @@ -124,6 +132,17 @@ func (mockStowItem) Metadata() (map[string]interface{}, error) { return map[string]interface{}{}, nil } +func TestAwsBucketIsNotFound(t *testing.T) { + t.Run("detect is not found", func(t *testing.T) { + err := awserr.New(s32.ErrCodeNoSuchBucket, "foo", errors2.New("foo")) + assert.True(t, awsBucketIsNotFound(err)) + }) + t.Run("do not detect random errors", func(t *testing.T) { + err := awserr.New(s32.ErrCodeInvalidObjectState, "foo", errors2.New("foo")) + assert.False(t, awsBucketIsNotFound(err)) + }) +} + func TestStowStore_ReadRaw(t *testing.T) { labeled.SetMetricKeys(contextutils.ProjectKey, contextutils.DomainKey, contextutils.WorkflowIDKey, contextutils.TaskIDKey) @@ -410,7 +429,7 @@ func TestLoadContainer(t *testing.T) { loc: &mockStowLoc{ ContainerCb: func(id string) (stow.Container, error) { if id == container { - return newMockStowContainer(container), nil + return newMockStowContainer(container), stow.ErrNotFound } return nil, fmt.Errorf("container is not supported") }, @@ -430,10 +449,7 @@ func TestLoadContainer(t *testing.T) { stowStore := StowStore{ loc: &mockStowLoc{ ContainerCb: func(id string) (stow.Container, error) { - if id == container { - return newMockStowContainer(container), nil - } - return nil, fmt.Errorf("container is not supported") + return nil, stow.ErrNotFound }, CreateContainerCb: func(name string) (stow.Container, error) { if name == container { @@ -451,14 +467,73 @@ func TestLoadContainer(t *testing.T) { loc: &mockStowLoc{ ContainerCb: func(id string) (stow.Container, error) { if id == container { - return newMockStowContainer(container), nil + return newMockStowContainer(container), stow.ErrNotFound } return nil, fmt.Errorf("container is not supported") }, }, } - stowContainer, err := stowStore.LoadContainer(context.Background(), "container", false) + _, err := stowStore.LoadContainer(context.Background(), "container", false) + assert.EqualError(t, err, stow.ErrNotFound.Error()) + }) +} + +func TestStowStore_WriteRaw(t *testing.T) { + labeled.SetMetricKeys(contextutils.ProjectKey, contextutils.DomainKey, contextutils.WorkflowIDKey, contextutils.TaskIDKey) + const container = "container" + fn := fQNFn["s3"] + t.Run("create container when not found", func(t *testing.T) { + testScope := promutils.NewTestScope() + var createCalled bool + s, err := NewStowRawStore(fn(container), &mockStowLoc{ + ContainerCb: func(id string) (stow.Container, error) { + if id == container { + mockStowContainer := newMockStowContainer(container) + mockStowContainer.putCB = func(name string, r io.Reader, size int64, metadata map[string]interface{}) (stow.Item, error) { + return nil, awserr.New(s32.ErrCodeNoSuchBucket, "foo", errors2.New("foo")) + } + return mockStowContainer, nil + } + return nil, fmt.Errorf("container is not supported") + }, + CreateContainerCb: func(name string) (stow.Container, error) { + createCalled = true + if name == container { + return newMockStowContainer(container), nil + } + return nil, fmt.Errorf("container is not supported") + }, + }, true, testScope) assert.NoError(t, err) - assert.Equal(t, container, stowContainer.ID()) + err = s.WriteRaw(context.TODO(), DataReference("s3://container/path"), 0, Options{}, bytes.NewReader([]byte{})) + assert.NoError(t, err) + assert.True(t, createCalled) + var containerStoredInDynamicContainerMap bool + s.dynamicContainerMap.Range(func(key, value interface{}) bool { + if value == container { + containerStoredInDynamicContainerMap = true + return true + } + return false + }) + assert.True(t, containerStoredInDynamicContainerMap) + }) + t.Run("bubble up generic put errors", func(t *testing.T) { + testScope := promutils.NewTestScope() + s, err := NewStowRawStore(fn(container), &mockStowLoc{ + ContainerCb: func(id string) (stow.Container, error) { + if id == container { + mockStowContainer := newMockStowContainer(container) + mockStowContainer.putCB = func(name string, r io.Reader, size int64, metadata map[string]interface{}) (stow.Item, error) { + return nil, errors2.New("foo") + } + return mockStowContainer, nil + } + return nil, fmt.Errorf("container is not supported") + }, + }, true, testScope) + assert.NoError(t, err) + err = s.WriteRaw(context.TODO(), DataReference("s3://container/path"), 0, Options{}, bytes.NewReader([]byte{})) + assert.EqualError(t, err, "Failed to write data [0b] to path [path].: foo") }) }