diff --git a/flyteadmin/pkg/common/mocks/storage.go b/flyteadmin/pkg/common/mocks/storage.go index 7e91bf0485..bf29eedd3e 100644 --- a/flyteadmin/pkg/common/mocks/storage.go +++ b/flyteadmin/pkg/common/mocks/storage.go @@ -33,6 +33,10 @@ func (t *TestDataStore) Head(ctx context.Context, reference storage.DataReferenc return t.HeadCb(ctx, reference) } +func (t *TestDataStore) List(ctx context.Context, reference storage.DataReference, maxItems int, cursor storage.Cursor) ([]storage.DataReference, storage.Cursor, error) { + return nil, storage.NewCursorAtEnd(), fmt.Errorf("Not implemented yet") +} + func (t *TestDataStore) ReadProtobuf(ctx context.Context, reference storage.DataReference, msg proto.Message) error { return t.ReadProtobufCb(ctx, reference, msg) } diff --git a/flytepropeller/pkg/utils/failing_datastore.go b/flytepropeller/pkg/utils/failing_datastore.go index f3b65471c7..7948a85b81 100644 --- a/flytepropeller/pkg/utils/failing_datastore.go +++ b/flytepropeller/pkg/utils/failing_datastore.go @@ -27,6 +27,10 @@ func (FailingRawStore) Head(ctx context.Context, reference storage.DataReference return nil, fmt.Errorf("failed metadata fetch") } +func (FailingRawStore) List(ctx context.Context, reference storage.DataReference, maxItems int, cursor storage.Cursor) ([]storage.DataReference, storage.Cursor, error) { + return nil, storage.NewCursorAtEnd(), fmt.Errorf("Not implemented yet") +} + func (FailingRawStore) ReadRaw(ctx context.Context, reference storage.DataReference) (io.ReadCloser, error) { return nil, fmt.Errorf("failed read raw") } diff --git a/flytestdlib/storage/cached_rawstore_test.go b/flytestdlib/storage/cached_rawstore_test.go index b9751d7fa1..9c304790cb 100644 --- a/flytestdlib/storage/cached_rawstore_test.go +++ b/flytestdlib/storage/cached_rawstore_test.go @@ -73,6 +73,10 @@ func (d *dummyStore) Head(ctx context.Context, reference DataReference) (Metadat return d.HeadCb(ctx, reference) } +func (d *dummyStore) List(ctx context.Context, reference DataReference, maxItems int, cursor Cursor) ([]DataReference, Cursor, error) { + return nil, NewCursorAtEnd(), fmt.Errorf("Not implemented yet") +} + func (d *dummyStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error) { return d.ReadRawCb(ctx, reference) } diff --git a/flytestdlib/storage/mem_store.go b/flytestdlib/storage/mem_store.go index a95a0a49ca..94083f6646 100644 --- a/flytestdlib/storage/mem_store.go +++ b/flytestdlib/storage/mem_store.go @@ -54,6 +54,10 @@ func (s *InMemoryStore) Head(ctx context.Context, reference DataReference) (Meta }, nil } +func (s *InMemoryStore) List(ctx context.Context, reference DataReference, maxItems int, cursor Cursor) ([]DataReference, Cursor, error) { + return nil, NewCursorAtEnd(), fmt.Errorf("Not implemented yet") +} + func (s *InMemoryStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error) { if raw, found := s.cache[reference]; found { return ioutil.NopCloser(bytes.NewReader(raw)), nil diff --git a/flytestdlib/storage/mocks/composed_protobuf_store.go b/flytestdlib/storage/mocks/composed_protobuf_store.go index c9064c2ac5..49a0ee89dd 100644 --- a/flytestdlib/storage/mocks/composed_protobuf_store.go +++ b/flytestdlib/storage/mocks/composed_protobuf_store.go @@ -194,6 +194,54 @@ func (_m *ComposedProtobufStore) Head(ctx context.Context, reference storage.Dat return r0, r1 } +type ComposedProtobufStore_List struct { + *mock.Call +} + +func (_m ComposedProtobufStore_List) Return(_a0 []storage.DataReference, _a1 storage.Cursor, _a2 error) *ComposedProtobufStore_List { + return &ComposedProtobufStore_List{Call: _m.Call.Return(_a0, _a1, _a2)} +} + +func (_m *ComposedProtobufStore) OnList(ctx context.Context, reference storage.DataReference, maxItems int, cursor storage.Cursor) *ComposedProtobufStore_List { + c_call := _m.On("List", ctx, reference, maxItems, cursor) + return &ComposedProtobufStore_List{Call: c_call} +} + +func (_m *ComposedProtobufStore) OnListMatch(matchers ...interface{}) *ComposedProtobufStore_List { + c_call := _m.On("List", matchers...) + return &ComposedProtobufStore_List{Call: c_call} +} + +// List provides a mock function with given fields: ctx, reference, maxItems, cursor +func (_m *ComposedProtobufStore) List(ctx context.Context, reference storage.DataReference, maxItems int, cursor storage.Cursor) ([]storage.DataReference, storage.Cursor, error) { + ret := _m.Called(ctx, reference, maxItems, cursor) + + var r0 []storage.DataReference + if rf, ok := ret.Get(0).(func(context.Context, storage.DataReference, int, storage.Cursor) []storage.DataReference); ok { + r0 = rf(ctx, reference, maxItems, cursor) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]storage.DataReference) + } + } + + var r1 storage.Cursor + if rf, ok := ret.Get(1).(func(context.Context, storage.DataReference, int, storage.Cursor) storage.Cursor); ok { + r1 = rf(ctx, reference, maxItems, cursor) + } else { + r1 = ret.Get(1).(storage.Cursor) + } + + var r2 error + if rf, ok := ret.Get(2).(func(context.Context, storage.DataReference, int, storage.Cursor) error); ok { + r2 = rf(ctx, reference, maxItems, cursor) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + type ComposedProtobufStore_ReadProtobuf struct { *mock.Call } diff --git a/flytestdlib/storage/storage.go b/flytestdlib/storage/storage.go index 3e84cb7acb..52e6905513 100644 --- a/flytestdlib/storage/storage.go +++ b/flytestdlib/storage/storage.go @@ -40,6 +40,41 @@ type Metadata interface { ContentMD5() string } +type CursorState int + +const ( + // Enum representing state of the cursor + AtStartCursorState CursorState = 0 + AtEndCursorState CursorState = 1 + AtCustomPosCursorState CursorState = 2 +) + +type Cursor struct { + cursorState CursorState + customPosition string +} + +func NewCursorAtStart() Cursor { + return Cursor{ + cursorState: AtStartCursorState, + customPosition: "", + } +} + +func NewCursorAtEnd() Cursor { + return Cursor{ + cursorState: AtEndCursorState, + customPosition: "", + } +} + +func NewCursorFromCustomPosition(customPosition string) Cursor { + return Cursor{ + cursorState: AtCustomPosCursorState, + customPosition: customPosition, + } +} + // DataStore is a simplified interface for accessing and storing data in one of the Cloud stores. // Today we rely on Stow for multi-cloud support, but this interface abstracts that part type DataStore struct { @@ -78,6 +113,9 @@ type RawStore interface { // Head gets metadata about the reference. This should generally be a light weight operation. Head(ctx context.Context, reference DataReference) (Metadata, error) + // List gets a list of items given a prefix, using a paginated API + List(ctx context.Context, reference DataReference, maxItems int, cursor Cursor) ([]DataReference, Cursor, error) + // ReadRaw retrieves a byte array from the Blob store or an error ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error) diff --git a/flytestdlib/storage/stow_store.go b/flytestdlib/storage/stow_store.go index ce4a75a0a1..6b731b9c86 100644 --- a/flytestdlib/storage/stow_store.go +++ b/flytestdlib/storage/stow_store.go @@ -92,6 +92,9 @@ type stowMetrics struct { HeadFailure labeled.Counter HeadLatency labeled.StopWatch + ListFailure labeled.Counter + ListLatency labeled.StopWatch + ReadFailure labeled.Counter ReadOpenLatency labeled.StopWatch @@ -251,6 +254,46 @@ func (s *StowStore) Head(ctx context.Context, reference DataReference) (Metadata return StowMetadata{exists: false}, errs.Wrapf(err, "path:%v", k) } +func (s *StowStore) List(ctx context.Context, reference DataReference, maxItems int, cursor Cursor) ([]DataReference, Cursor, error) { + _, c, k, err := reference.Split() + if err != nil { + s.metrics.BadReference.Inc(ctx) + return nil, NewCursorAtEnd(), err + } + + container, err := s.getContainer(ctx, locationIDMain, c) + if err != nil { + return nil, NewCursorAtEnd(), err + } + + t := s.metrics.ListLatency.Start(ctx) + var stowCursor string + if cursor.cursorState == AtStartCursorState { + stowCursor = stow.CursorStart + } else if cursor.cursorState == AtEndCursorState { + return nil, NewCursorAtEnd(), fmt.Errorf("Cursor cannot be at end for the List call") + } else { + stowCursor = cursor.customPosition + } + items, stowCursor, err := container.Items(k, stowCursor, maxItems) + if err == nil { + results := make([]DataReference, len(items)) + for index, item := range items { + results[index] = DataReference(item.URL().String()) + } + if stow.IsCursorEnd(stowCursor) { + cursor = NewCursorAtEnd() + } else { + cursor = NewCursorFromCustomPosition(stowCursor) + } + t.Stop() + return results, cursor, nil + } + + incFailureCounterForError(ctx, s.metrics.ListFailure, err) + return nil, NewCursorAtEnd(), errs.Wrapf(err, "path:%v", k) +} + func (s *StowStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error) { _, c, k, err := reference.Split() if err != nil { @@ -434,6 +477,9 @@ func newStowMetrics(scope promutils.Scope) *stowMetrics { HeadFailure: labeled.NewCounter("head_failure", "Indicates failure in HEAD for a given reference", scope, labeled.EmitUnlabeledMetric), HeadLatency: labeled.NewStopWatch("head", "Indicates time to fetch metadata using the Head API", time.Millisecond, scope, labeled.EmitUnlabeledMetric), + ListFailure: labeled.NewCounter("list_failure", "Indicates failure in item listing for a given reference", scope, labeled.EmitUnlabeledMetric), + ListLatency: labeled.NewStopWatch("list", "Indicates time to fetch item listing using the List API", time.Millisecond, scope, labeled.EmitUnlabeledMetric), + ReadFailure: labeled.NewCounter("read_failure", "Indicates failure in GET for a given reference", scope, labeled.EmitUnlabeledMetric, failureTypeOption), ReadOpenLatency: labeled.NewStopWatch("read_open", "Indicates time to first byte when reading", time.Millisecond, scope, labeled.EmitUnlabeledMetric), diff --git a/flytestdlib/storage/stow_store_test.go b/flytestdlib/storage/stow_store_test.go index 99678eb8ad..4de273dd93 100644 --- a/flytestdlib/storage/stow_store_test.go +++ b/flytestdlib/storage/stow_store_test.go @@ -10,6 +10,8 @@ import ( "net/url" "os" "path/filepath" + "sort" + "strconv" "testing" "time" @@ -73,8 +75,37 @@ func (m mockStowContainer) Item(id string) (stow.Item, error) { return nil, stow.ErrNotFound } -func (mockStowContainer) Items(prefix, cursor string, count int) ([]stow.Item, string, error) { - return []stow.Item{}, "", nil +func (m mockStowContainer) Items(prefix, cursor string, count int) ([]stow.Item, string, error) { + startIndex := 0 + if cursor != "" { + index, err := strconv.Atoi(cursor) + if err != nil { + return nil, "", fmt.Errorf("Invalid cursor '%s'", cursor) + } + startIndex = index + } + endIndexExc := min(len(m.items), startIndex+count) + + itemKeys := make([]string, len(m.items)) + index := 0 + for key := range m.items { + itemKeys[index] = key + index++ + } + sort.Strings(itemKeys) + + numItems := endIndexExc - startIndex + results := make([]stow.Item, numItems) + for index, itemKey := range itemKeys[startIndex:endIndexExc] { + results[index] = m.items[itemKey] + } + + if endIndexExc == len(m.items) { + cursor = "" + } else { + cursor = fmt.Sprintf("%d", endIndexExc) + } + return results, cursor, nil } func (m mockStowContainer) RemoveItem(id string) error { @@ -361,6 +392,67 @@ func TestStowStore_ReadRaw(t *testing.T) { }) } +func TestStowStore_List(t *testing.T) { + const container = "container" + t.Run("Listing", func(t *testing.T) { + ctx := context.Background() + fn := fQNFn["s3"] + s, err := NewStowRawStore(fn(container), &mockStowLoc{ + ContainerCb: func(id string) (stow.Container, error) { + if id == container { + return newMockStowContainer(container), nil + } + return nil, fmt.Errorf("container is not supported") + }, + CreateContainerCb: func(name string) (stow.Container, error) { + if name == container { + return newMockStowContainer(container), nil + } + return nil, fmt.Errorf("container is not supported") + }, + }, nil, false, metrics) + assert.NoError(t, err) + writeTestFile(ctx, t, s, "s3://container/a/1") + writeTestFile(ctx, t, s, "s3://container/a/2") + var maxResults = 10 + var dataReference DataReference = "s3://container/a" + items, cursor, err := s.List(ctx, dataReference, maxResults, NewCursorAtStart()) + assert.NoError(t, err) + assert.Equal(t, NewCursorAtEnd(), cursor) + assert.Equal(t, []DataReference{"a/1", "a/2"}, items) + }) + + t.Run("Listing with pagination", func(t *testing.T) { + ctx := context.Background() + fn := fQNFn["s3"] + s, err := NewStowRawStore(fn(container), &mockStowLoc{ + ContainerCb: func(id string) (stow.Container, error) { + if id == container { + return newMockStowContainer(container), nil + } + return nil, fmt.Errorf("container is not supported") + }, + CreateContainerCb: func(name string) (stow.Container, error) { + if name == container { + return newMockStowContainer(container), nil + } + return nil, fmt.Errorf("container is not supported") + }, + }, nil, false, metrics) + assert.NoError(t, err) + writeTestFile(ctx, t, s, "s3://container/a/1") + writeTestFile(ctx, t, s, "s3://container/a/2") + var maxResults = 1 + var dataReference DataReference = "s3://container/a" + items, cursor, err := s.List(ctx, dataReference, maxResults, NewCursorAtStart()) + assert.NoError(t, err) + assert.Equal(t, []DataReference{"a/1"}, items) + items, _, err = s.List(ctx, dataReference, maxResults, cursor) + assert.NoError(t, err) + assert.Equal(t, []DataReference{"a/2"}, items) + }) +} + func TestNewLocalStore(t *testing.T) { labeled.SetMetricKeys(contextutils.ProjectKey, contextutils.DomainKey, contextutils.WorkflowIDKey, contextutils.TaskIDKey) t.Run("Valid config", func(t *testing.T) {