From 43a0b185cb635bd830d551bbd775053d0ffd2d67 Mon Sep 17 00:00:00 2001 From: wayner0628 Date: Sun, 1 Sep 2024 16:30:36 +0800 Subject: [PATCH 01/29] add download multipart blob Signed-off-by: wayner0628 --- flytecopilot/data/download.go | 107 +++++++++++++++++++++--------- flytestdlib/storage/storage.go | 3 + flytestdlib/storage/stow_store.go | 34 ++++++++++ 3 files changed, 113 insertions(+), 31 deletions(-) diff --git a/flytecopilot/data/download.go b/flytecopilot/data/download.go index 0fd1f10bd9..92b6c323ad 100644 --- a/flytecopilot/data/download.go +++ b/flytecopilot/data/download.go @@ -8,8 +8,10 @@ import ( "io/ioutil" "os" "path" + "path/filepath" "reflect" "strconv" + "strings" "github.com/ghodss/yaml" "github.com/golang/protobuf/jsonpb" @@ -31,53 +33,96 @@ type Downloader struct { mode core.IOStrategy_DownloadMode } -// TODO add support for multipart blobs -func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toFilePath string) (interface{}, error) { +func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath string) (interface{}, error) { ref := storage.DataReference(blob.Uri) scheme, _, _, err := ref.Split() if err != nil { - return nil, errors.Wrapf(err, "Blob uri incorrectly formatted") + return nil, errors.Wrapf(err, "Blob uri incorrectly formatted: %s", blob.Uri) } - var reader io.ReadCloser - if scheme == "http" || scheme == "https" { - reader, err = DownloadFileFromHTTP(ctx, ref) - } else { - if blob.GetMetadata().GetType().Dimensionality == core.BlobType_MULTIPART { - logger.Warnf(ctx, "Currently only single part blobs are supported, we will force multipart to be 'path/00000'") - ref, err = d.store.ConstructReference(ctx, ref, "000000") + + // Handle multipart blob + if blob.GetMetadata().GetType().Dimensionality == core.BlobType_MULTIPART { + items, err := d.store.ReadDirectory(ctx, ref) + if err != nil { + return nil, errors.Wrapf(err, "failed to list parts for multipart blob [%s]", ref) + } + + if len(items) == 0 { + return nil, errors.Errorf("no parts found for multipart blob [%s]", ref) + } + + for _, item := range items { + // Get relative path + relativePath := strings.TrimPrefix(item.URL().Path, ref.String()) + relativePath = strings.TrimPrefix(relativePath, "/") // ensure no leading slash + + // Construct local file path + destPath := filepath.Join(toPath, relativePath) + + // Create necessary directories + err = os.MkdirAll(filepath.Dir(destPath), os.ModePerm) if err != nil { - return nil, err + logger.Errorf(ctx, "failed to create directories for path %s: %v", destPath, err) + continue + } + + // Download part + partReader, err := item.Open() + if err != nil { + logger.Errorf(ctx, "failed to open item %s: %v", item.ID(), err) + continue + } + + // Create local file + partWriter, err := os.Create(destPath) + if err != nil { + logger.Errorf(ctx, "failed to create file at path %s: %v", destPath, err) + partReader.Close() + continue } + + // Copy content + _, err = io.Copy(partWriter, partReader) + if err != nil { + logger.Errorf(ctx, "failed to write to file %s: %v", destPath, err) + } + + // Close readers and writers + partReader.Close() + partWriter.Close() + + logger.Infof(ctx, "Successfully downloaded part to [%s]", destPath) } + + logger.Infof(ctx, "Successfully downloaded all parts of multipart blob [%s] to directory [%s]", ref, toPath) + return toPath, nil + } + + var reader io.ReadCloser + if scheme == "http" || scheme == "https" { + reader, err = DownloadFileFromHTTP(ctx, ref) + } else if blob.GetMetadata().GetType().Dimensionality == core.BlobType_SINGLE { + // Handle single part blob reader, err = DownloadFileFromStorage(ctx, ref, d.store) } if err != nil { - logger.Errorf(ctx, "Failed to download from ref [%s]", ref) - return nil, err + return nil, errors.Wrapf(err, "failed to download blob from ref [%s]", ref) } - defer func() { - err := reader.Close() - if err != nil { - logger.Errorf(ctx, "failed to close Blob read stream @ref [%s]. Error: %s", ref, err) - } - }() + defer reader.Close() - writer, err := os.Create(toFilePath) + writer, err := os.Create(toPath) if err != nil { - return nil, errors.Wrapf(err, "failed to open file at path %s", toFilePath) + return nil, errors.Wrapf(err, "failed to create file at path %s", toPath) } - defer func() { - err := writer.Close() - if err != nil { - logger.Errorf(ctx, "failed to close File write stream. Error: %s", err) - } - }() - v, err := io.Copy(writer, reader) + defer writer.Close() + + bytesCopied, err := io.Copy(writer, reader) if err != nil { - return nil, errors.Wrapf(err, "failed to write remote data to local filesystem") + return nil, errors.Wrapf(err, "failed to write blob data to local filesystem") } - logger.Infof(ctx, "Successfully copied [%d] bytes remote data from [%s] to local [%s]", v, ref, toFilePath) - return toFilePath, nil + + logger.Infof(ctx, "Successfully copied [%d] bytes of blob data from [%s] to [%s]", bytesCopied, ref, toPath) + return toPath, nil } func (d Downloader) handleSchema(ctx context.Context, schema *core.Schema, toFilePath string) (interface{}, error) { diff --git a/flytestdlib/storage/storage.go b/flytestdlib/storage/storage.go index 3e84cb7acb..5611a51959 100644 --- a/flytestdlib/storage/storage.go +++ b/flytestdlib/storage/storage.go @@ -81,6 +81,9 @@ type RawStore interface { // ReadRaw retrieves a byte array from the Blob store or an error ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error) + // ReadDirectory retrieves a stow item array from the Blob store or an error + ReadDirectory(ctx context.Context, reference DataReference) ([]stow.Item, error) + // WriteRaw stores a raw byte array. WriteRaw(ctx context.Context, reference DataReference, size int64, opts Options, raw io.Reader) error diff --git a/flytestdlib/storage/stow_store.go b/flytestdlib/storage/stow_store.go index ce4a75a0a1..0529505105 100644 --- a/flytestdlib/storage/stow_store.go +++ b/flytestdlib/storage/stow_store.go @@ -285,6 +285,40 @@ func (s *StowStore) ReadRaw(ctx context.Context, reference DataReference) (io.Re return item.Open() } +func (s *StowStore) ReadDirectory(ctx context.Context, reference DataReference) ([]stow.Item, error) { + _, containerName, prefix, err := reference.Split() + if err != nil { + s.metrics.BadReference.Inc(ctx) + return nil, err + } + + container, err := s.getContainer(ctx, locationIDMain, containerName) + if err != nil { + return nil, err + } + + var items []stow.Item + cursor := stow.CursorStart + + for { + // List items with the given prefix + pageItems, nextCursor, err := container.Items(prefix, cursor, 100) + if err != nil { + logger.Errorf(ctx, "failed to list items with prefix: %s", prefix) + return nil, err + } + + items = append(items, pageItems...) + + if stow.IsCursorEnd(nextCursor) { + break + } + cursor = nextCursor + } + + return items, nil +} + func (s *StowStore) WriteRaw(ctx context.Context, reference DataReference, size int64, opts Options, raw io.Reader) error { _, c, k, err := reference.Split() if err != nil { From 6f7835219e76c89df8439286b2d4e16ef2168dc4 Mon Sep 17 00:00:00 2001 From: wayner0628 Date: Mon, 2 Sep 2024 00:41:24 +0800 Subject: [PATCH 02/29] recursively process subparts Signed-off-by: wayner0628 --- flytecopilot/data/download.go | 94 ++++++++------------- flytestdlib/storage/cached_rawstore.go | 22 +++++ flytestdlib/storage/cached_rawstore_test.go | 8 ++ flytestdlib/storage/mem_store.go | 28 ++++++ flytestdlib/storage/storage.go | 7 +- flytestdlib/storage/stow_store.go | 38 ++++++++- 6 files changed, 134 insertions(+), 63 deletions(-) diff --git a/flytecopilot/data/download.go b/flytecopilot/data/download.go index 92b6c323ad..28ec17a4b7 100644 --- a/flytecopilot/data/download.go +++ b/flytecopilot/data/download.go @@ -11,7 +11,6 @@ import ( "path/filepath" "reflect" "strconv" - "strings" "github.com/ghodss/yaml" "github.com/golang/protobuf/jsonpb" @@ -37,96 +36,77 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri ref := storage.DataReference(blob.Uri) scheme, _, _, err := ref.Split() if err != nil { - return nil, errors.Wrapf(err, "Blob uri incorrectly formatted: %s", blob.Uri) + return nil, errors.Wrapf(err, "Blob uri incorrectly formatted") } // Handle multipart blob if blob.GetMetadata().GetType().Dimensionality == core.BlobType_MULTIPART { - items, err := d.store.ReadDirectory(ctx, ref) + err = os.Mkdir(toPath, os.ModePerm) if err != nil { - return nil, errors.Wrapf(err, "failed to list parts for multipart blob [%s]", ref) + logger.Errorf(ctx, "failed to create directories for path %s: %v", toPath, err) + return nil, err } - - if len(items) == 0 { - return nil, errors.Errorf("no parts found for multipart blob [%s]", ref) + parts, err := d.store.ReadParts(ctx, ref) + if err != nil { + logger.Errorf(ctx, "failed to read parts for multipart blob [%s]", ref) + return nil, err } - for _, item := range items { - // Get relative path - relativePath := strings.TrimPrefix(item.URL().Path, ref.String()) - relativePath = strings.TrimPrefix(relativePath, "/") // ensure no leading slash + // Recursively processes all subparts within the directory + for _, relativePath := range parts { + joinPath := filepath.Join(toPath, relativePath) - // Construct local file path - destPath := filepath.Join(toPath, relativePath) - - // Create necessary directories - err = os.MkdirAll(filepath.Dir(destPath), os.ModePerm) + // Check subpart's type + dim := core.BlobType_SINGLE + isMultiPart, err := d.store.IsMultiPart(ctx, storage.DataReference(joinPath)) if err != nil { - logger.Errorf(ctx, "failed to create directories for path %s: %v", destPath, err) + logger.Errorf(ctx, "failed to check type for part [%s] in multipart blob [%s]", relativePath, ref) continue } - - // Download part - partReader, err := item.Open() - if err != nil { - logger.Errorf(ctx, "failed to open item %s: %v", item.ID(), err) - continue - } - - // Create local file - partWriter, err := os.Create(destPath) - if err != nil { - logger.Errorf(ctx, "failed to create file at path %s: %v", destPath, err) - partReader.Close() - continue + if isMultiPart { + dim = core.BlobType_MULTIPART } - - // Copy content - _, err = io.Copy(partWriter, partReader) - if err != nil { - logger.Errorf(ctx, "failed to write to file %s: %v", destPath, err) - } - - // Close readers and writers - partReader.Close() - partWriter.Close() - - logger.Infof(ctx, "Successfully downloaded part to [%s]", destPath) + d.handleBlob(ctx, &core.Blob{Metadata: &core.BlobMetadata{Type: &core.BlobType{Dimensionality: dim}}, Uri: joinPath}, joinPath) } - - logger.Infof(ctx, "Successfully downloaded all parts of multipart blob [%s] to directory [%s]", ref, toPath) return toPath, nil } - var reader io.ReadCloser if scheme == "http" || scheme == "https" { reader, err = DownloadFileFromHTTP(ctx, ref) - } else if blob.GetMetadata().GetType().Dimensionality == core.BlobType_SINGLE { + } else { // Handle single part blob reader, err = DownloadFileFromStorage(ctx, ref, d.store) } if err != nil { - return nil, errors.Wrapf(err, "failed to download blob from ref [%s]", ref) + logger.Errorf(ctx, "Failed to download from ref [%s]", ref) + return nil, err } - defer reader.Close() + defer func() { + err := reader.Close() + if err != nil { + logger.Errorf(ctx, "failed to close Blob read stream @ref [%s]. Error: %s", ref, err) + } + }() writer, err := os.Create(toPath) if err != nil { - return nil, errors.Wrapf(err, "failed to create file at path %s", toPath) + return nil, errors.Wrapf(err, "failed to open file at path %s", toPath) } - defer writer.Close() - - bytesCopied, err := io.Copy(writer, reader) + defer func() { + err := writer.Close() + if err != nil { + logger.Errorf(ctx, "failed to close File write stream. Error: %s", err) + } + }() + v, err := io.Copy(writer, reader) if err != nil { - return nil, errors.Wrapf(err, "failed to write blob data to local filesystem") + return nil, errors.Wrapf(err, "failed to write remote data to local filesystem") } - - logger.Infof(ctx, "Successfully copied [%d] bytes of blob data from [%s] to [%s]", bytesCopied, ref, toPath) + logger.Infof(ctx, "Successfully copied [%d] bytes remote data from [%s] to local [%s]", v, ref, toPath) return toPath, nil } func (d Downloader) handleSchema(ctx context.Context, schema *core.Schema, toFilePath string) (interface{}, error) { - // TODO Handle schema type return d.handleBlob(ctx, &core.Blob{Uri: schema.Uri, Metadata: &core.BlobMetadata{Type: &core.BlobType{Dimensionality: core.BlobType_MULTIPART}}}, toFilePath) } diff --git a/flytestdlib/storage/cached_rawstore.go b/flytestdlib/storage/cached_rawstore.go index 913a517a0f..cdb0d5615f 100644 --- a/flytestdlib/storage/cached_rawstore.go +++ b/flytestdlib/storage/cached_rawstore.go @@ -5,6 +5,7 @@ import ( "context" "io" "runtime/debug" + "strings" "time" "github.com/coocood/freecache" @@ -88,6 +89,27 @@ func (s *cachedRawStore) ReadRaw(ctx context.Context, reference DataReference) ( return ioutils.NewBytesReadCloser(b), err } +func (s *cachedRawStore) IsMultiPart(ctx context.Context, reference DataReference) (bool, error) { + // Check in the cache first + key := []byte(reference) + if _, err := s.cache.Get(key); err == nil { + if strings.HasSuffix(reference.String(), "/") { + // If the reference ends with '/', it might indicate a directory (multipart blob) + return true, nil + } + // Alternatively, you could parse oRaw to see if it contains metadata indicating a multipart blob + return false, nil + } + + // If not in cache, check the underlying RawStore + return s.RawStore.IsMultiPart(ctx, reference) +} + +func (s *cachedRawStore) ReadParts(ctx context.Context, reference DataReference) ([]string, error) { + // Cache might not have individual parts, so we rely on the underlying RawStore + return s.RawStore.ReadParts(ctx, reference) +} + // WriteRaw stores a raw byte array. func (s *cachedRawStore) WriteRaw(ctx context.Context, reference DataReference, size int64, opts Options, raw io.Reader) error { ctx, span := otelutils.NewSpan(ctx, otelutils.BlobstoreClientTracer, "flytestdlib.storage.cachedRawStore/WriteRaw") diff --git a/flytestdlib/storage/cached_rawstore_test.go b/flytestdlib/storage/cached_rawstore_test.go index b9751d7fa1..72fb4deccf 100644 --- a/flytestdlib/storage/cached_rawstore_test.go +++ b/flytestdlib/storage/cached_rawstore_test.go @@ -77,6 +77,14 @@ func (d *dummyStore) ReadRaw(ctx context.Context, reference DataReference) (io.R return d.ReadRawCb(ctx, reference) } +func (d *dummyStore) IsMultiPart(ctx context.Context, reference DataReference) (bool, error) { + return d.IsMultiPart(ctx, reference) +} + +func (d *dummyStore) ReadParts(ctx context.Context, reference DataReference) ([]string, error) { + return d.ReadParts(ctx, reference) +} + func (d *dummyStore) WriteRaw(ctx context.Context, reference DataReference, size int64, opts Options, raw io.Reader) error { return d.WriteRawCb(ctx, reference, size, opts, raw) } diff --git a/flytestdlib/storage/mem_store.go b/flytestdlib/storage/mem_store.go index a95a0a49ca..0cbbf5f197 100644 --- a/flytestdlib/storage/mem_store.go +++ b/flytestdlib/storage/mem_store.go @@ -9,6 +9,7 @@ import ( "io" "io/ioutil" "os" + "strings" ) type rawFile = []byte @@ -62,6 +63,33 @@ func (s *InMemoryStore) ReadRaw(ctx context.Context, reference DataReference) (i return nil, os.ErrNotExist } +func (s *InMemoryStore) IsMultiPart(ctx context.Context, reference DataReference) (bool, error) { + // Iterate over all keys in the cache to see if any key starts with "reference/" + for key := range s.cache { + if strings.HasPrefix(key.String(), string(reference)+"/") { + return true, nil + } + } + return false, nil +} + +func (s *InMemoryStore) ReadParts(ctx context.Context, reference DataReference) ([]string, error) { + var parts []string + prefix := string(reference) + "/" + + for key := range s.cache { + if strings.HasPrefix(key.String(), prefix) { + parts = append(parts, key.String()) + } + } + + if len(parts) == 0 { + return nil, os.ErrNotExist + } + + return parts, nil +} + // Delete removes the referenced data from the cache map. func (s *InMemoryStore) Delete(ctx context.Context, reference DataReference) error { if _, found := s.cache[reference]; !found { diff --git a/flytestdlib/storage/storage.go b/flytestdlib/storage/storage.go index 5611a51959..05f8e7bf9a 100644 --- a/flytestdlib/storage/storage.go +++ b/flytestdlib/storage/storage.go @@ -81,8 +81,11 @@ type RawStore interface { // ReadRaw retrieves a byte array from the Blob store or an error ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error) - // ReadDirectory retrieves a stow item array from the Blob store or an error - ReadDirectory(ctx context.Context, reference DataReference) ([]stow.Item, error) + // IsMultiPart checks if the subpart is a multipart blob + IsMultiPart(ctx context.Context, reference DataReference) (bool, error) + + // ReadParts retrieves the paths of all subparts from the Blob store or an error + ReadParts(ctx context.Context, reference DataReference) ([]string, error) // WriteRaw stores a raw byte array. WriteRaw(ctx context.Context, reference DataReference, size int64, opts Options, raw io.Reader) error diff --git a/flytestdlib/storage/stow_store.go b/flytestdlib/storage/stow_store.go index 0529505105..2b814509e4 100644 --- a/flytestdlib/storage/stow_store.go +++ b/flytestdlib/storage/stow_store.go @@ -285,7 +285,34 @@ func (s *StowStore) ReadRaw(ctx context.Context, reference DataReference) (io.Re return item.Open() } -func (s *StowStore) ReadDirectory(ctx context.Context, reference DataReference) ([]stow.Item, error) { +func (s *StowStore) IsMultiPart(ctx context.Context, reference DataReference) (bool, error) { + _, containerName, prefix, err := reference.Split() + if err != nil { + s.metrics.BadReference.Inc(ctx) + return false, err + } + + container, err := s.getContainer(ctx, locationIDMain, containerName) + if err != nil { + return false, err + } + + cursor := stow.CursorStart + + pageItems, _, err := container.Items(prefix, cursor, 100) + if err != nil { + logger.Errorf(ctx, "failed to list items with prefix: %s", prefix) + return false, err + } + + if len(pageItems) == 0 { + return false, nil + } else { + return true, nil + } +} + +func (s *StowStore) ReadParts(ctx context.Context, reference DataReference) ([]string, error) { _, containerName, prefix, err := reference.Split() if err != nil { s.metrics.BadReference.Inc(ctx) @@ -297,7 +324,7 @@ func (s *StowStore) ReadDirectory(ctx context.Context, reference DataReference) return nil, err } - var items []stow.Item + var parts []string cursor := stow.CursorStart for { @@ -308,7 +335,10 @@ func (s *StowStore) ReadDirectory(ctx context.Context, reference DataReference) return nil, err } - items = append(items, pageItems...) + for _, item := range pageItems { + part := item.URL().String() + parts = append(parts, part) + } if stow.IsCursorEnd(nextCursor) { break @@ -316,7 +346,7 @@ func (s *StowStore) ReadDirectory(ctx context.Context, reference DataReference) cursor = nextCursor } - return items, nil + return parts, nil } func (s *StowStore) WriteRaw(ctx context.Context, reference DataReference, size int64, opts Options, raw io.Reader) error { From 69514d6411377c9122e7b42d3f2e9512286f5343 Mon Sep 17 00:00:00 2001 From: wayner0628 Date: Thu, 5 Sep 2024 04:25:31 +0800 Subject: [PATCH 03/29] implement GetItems function Signed-off-by: wayner0628 --- flyteadmin/pkg/common/mocks/storage.go | 5 + flytestdlib/storage/cached_rawstore.go | 30 ++---- flytestdlib/storage/cached_rawstore_test.go | 12 +-- flytestdlib/storage/mem_store.go | 36 +++---- .../storage/mocks/composed_protobuf_store.go | 5 + flytestdlib/storage/mocks/raw_store.go | 5 + flytestdlib/storage/storage.go | 9 +- flytestdlib/storage/stow_store.go | 96 +++++++------------ 8 files changed, 77 insertions(+), 121 deletions(-) diff --git a/flyteadmin/pkg/common/mocks/storage.go b/flyteadmin/pkg/common/mocks/storage.go index 7e91bf0485..2394210ea3 100644 --- a/flyteadmin/pkg/common/mocks/storage.go +++ b/flyteadmin/pkg/common/mocks/storage.go @@ -54,6 +54,11 @@ func (t *TestDataStore) CreateSignedURL(ctx context.Context, reference storage.D return storage.SignedURLResponse{URL: *signedURL}, nil } +func (t *TestDataStore) GetItems(ctx context.Context, reference storage.DataReference) ([]string, error) { + var s []string + return s, nil +} + // Retrieves a byte array from the Blob store or an error func (t *TestDataStore) ReadRaw(ctx context.Context, reference storage.DataReference) (io.ReadCloser, error) { return NopCloser{}, nil diff --git a/flytestdlib/storage/cached_rawstore.go b/flytestdlib/storage/cached_rawstore.go index cdb0d5615f..2e087c8a81 100644 --- a/flytestdlib/storage/cached_rawstore.go +++ b/flytestdlib/storage/cached_rawstore.go @@ -5,7 +5,6 @@ import ( "context" "io" "runtime/debug" - "strings" "time" "github.com/coocood/freecache" @@ -51,6 +50,14 @@ func (s *cachedRawStore) Head(ctx context.Context, reference DataReference) (Met return s.RawStore.Head(ctx, reference) } +func (s *cachedRawStore) GetItems(ctx context.Context, reference DataReference) ([]string, error) { + ctx, span := otelutils.NewSpan(ctx, otelutils.BlobstoreClientTracer, "flytestdlib.storage.cachedRawStore/GetItems") + defer span.End() + + // freecache does not support full cache scanning + return s.RawStore.GetItems(ctx, reference) +} + // ReadRaw retrieves a byte array from the Blob store or an error func (s *cachedRawStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error) { ctx, span := otelutils.NewSpan(ctx, otelutils.BlobstoreClientTracer, "flytestdlib.storage.cachedRawStore/ReadRaw") @@ -89,27 +96,6 @@ func (s *cachedRawStore) ReadRaw(ctx context.Context, reference DataReference) ( return ioutils.NewBytesReadCloser(b), err } -func (s *cachedRawStore) IsMultiPart(ctx context.Context, reference DataReference) (bool, error) { - // Check in the cache first - key := []byte(reference) - if _, err := s.cache.Get(key); err == nil { - if strings.HasSuffix(reference.String(), "/") { - // If the reference ends with '/', it might indicate a directory (multipart blob) - return true, nil - } - // Alternatively, you could parse oRaw to see if it contains metadata indicating a multipart blob - return false, nil - } - - // If not in cache, check the underlying RawStore - return s.RawStore.IsMultiPart(ctx, reference) -} - -func (s *cachedRawStore) ReadParts(ctx context.Context, reference DataReference) ([]string, error) { - // Cache might not have individual parts, so we rely on the underlying RawStore - return s.RawStore.ReadParts(ctx, reference) -} - // WriteRaw stores a raw byte array. func (s *cachedRawStore) WriteRaw(ctx context.Context, reference DataReference, size int64, opts Options, raw io.Reader) error { ctx, span := otelutils.NewSpan(ctx, otelutils.BlobstoreClientTracer, "flytestdlib.storage.cachedRawStore/WriteRaw") diff --git a/flytestdlib/storage/cached_rawstore_test.go b/flytestdlib/storage/cached_rawstore_test.go index 72fb4deccf..6bd5ad0c87 100644 --- a/flytestdlib/storage/cached_rawstore_test.go +++ b/flytestdlib/storage/cached_rawstore_test.go @@ -73,16 +73,12 @@ func (d *dummyStore) Head(ctx context.Context, reference DataReference) (Metadat return d.HeadCb(ctx, reference) } -func (d *dummyStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error) { - return d.ReadRawCb(ctx, reference) -} - -func (d *dummyStore) IsMultiPart(ctx context.Context, reference DataReference) (bool, error) { - return d.IsMultiPart(ctx, reference) +func (d *dummyStore) GetItems(ctx context.Context, reference DataReference) ([]string, error) { + return d.GetItems(ctx, reference) } -func (d *dummyStore) ReadParts(ctx context.Context, reference DataReference) ([]string, error) { - return d.ReadParts(ctx, reference) +func (d *dummyStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error) { + return d.ReadRawCb(ctx, reference) } func (d *dummyStore) WriteRaw(ctx context.Context, reference DataReference, size int64, opts Options, raw io.Reader) error { diff --git a/flytestdlib/storage/mem_store.go b/flytestdlib/storage/mem_store.go index 0cbbf5f197..6fdce1b12d 100644 --- a/flytestdlib/storage/mem_store.go +++ b/flytestdlib/storage/mem_store.go @@ -55,39 +55,29 @@ func (s *InMemoryStore) Head(ctx context.Context, reference DataReference) (Meta }, nil } -func (s *InMemoryStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error) { - if raw, found := s.cache[reference]; found { - return ioutil.NopCloser(bytes.NewReader(raw)), nil - } - - return nil, os.ErrNotExist -} - -func (s *InMemoryStore) IsMultiPart(ctx context.Context, reference DataReference) (bool, error) { - // Iterate over all keys in the cache to see if any key starts with "reference/" - for key := range s.cache { - if strings.HasPrefix(key.String(), string(reference)+"/") { - return true, nil - } - } - return false, nil -} - -func (s *InMemoryStore) ReadParts(ctx context.Context, reference DataReference) ([]string, error) { - var parts []string +func (s *InMemoryStore) GetItems(ctx context.Context, reference DataReference) ([]string, error) { + var items []string prefix := string(reference) + "/" for key := range s.cache { if strings.HasPrefix(key.String(), prefix) { - parts = append(parts, key.String()) + items = append(items, key.String()) } } - if len(parts) == 0 { + if len(items) == 0 { return nil, os.ErrNotExist } - return parts, nil + return items, nil +} + +func (s *InMemoryStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error) { + if raw, found := s.cache[reference]; found { + return ioutil.NopCloser(bytes.NewReader(raw)), nil + } + + return nil, os.ErrNotExist } // Delete removes the referenced data from the cache map. diff --git a/flytestdlib/storage/mocks/composed_protobuf_store.go b/flytestdlib/storage/mocks/composed_protobuf_store.go index c9064c2ac5..a5ec7b74eb 100644 --- a/flytestdlib/storage/mocks/composed_protobuf_store.go +++ b/flytestdlib/storage/mocks/composed_protobuf_store.go @@ -244,6 +244,11 @@ func (_m *ComposedProtobufStore) OnReadRawMatch(matchers ...interface{}) *Compos return &ComposedProtobufStore_ReadRaw{Call: c_call} } +func (_m *ComposedProtobufStore) GetItems(ctx context.Context, reference storage.DataReference) ([]string, error) { + var s []string + return s, nil +} + // ReadRaw provides a mock function with given fields: ctx, reference func (_m *ComposedProtobufStore) ReadRaw(ctx context.Context, reference storage.DataReference) (io.ReadCloser, error) { ret := _m.Called(ctx, reference) diff --git a/flytestdlib/storage/mocks/raw_store.go b/flytestdlib/storage/mocks/raw_store.go index 06b731765d..170931e9be 100644 --- a/flytestdlib/storage/mocks/raw_store.go +++ b/flytestdlib/storage/mocks/raw_store.go @@ -210,6 +210,11 @@ func (_m *RawStore) OnReadRawMatch(matchers ...interface{}) *RawStore_ReadRaw { return &RawStore_ReadRaw{Call: c_call} } +func (_m *RawStore) GetItems(ctx context.Context, reference storage.DataReference) ([]string, error) { + var s []string + return s, nil +} + // ReadRaw provides a mock function with given fields: ctx, reference func (_m *RawStore) ReadRaw(ctx context.Context, reference storage.DataReference) (io.ReadCloser, error) { ret := _m.Called(ctx, reference) diff --git a/flytestdlib/storage/storage.go b/flytestdlib/storage/storage.go index 05f8e7bf9a..163f42ec4c 100644 --- a/flytestdlib/storage/storage.go +++ b/flytestdlib/storage/storage.go @@ -78,15 +78,12 @@ type RawStore interface { // Head gets metadata about the reference. This should generally be a light weight operation. Head(ctx context.Context, reference DataReference) (Metadata, error) + // GetItems retrieves the paths of all items from the Blob store or an error + GetItems(ctx context.Context, reference DataReference) ([]string, error) + // ReadRaw retrieves a byte array from the Blob store or an error ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error) - // IsMultiPart checks if the subpart is a multipart blob - IsMultiPart(ctx context.Context, reference DataReference) (bool, error) - - // ReadParts retrieves the paths of all subparts from the Blob store or an error - ReadParts(ctx context.Context, reference DataReference) ([]string, error) - // WriteRaw stores a raw byte array. WriteRaw(ctx context.Context, reference DataReference, size int64, opts Options, raw io.Reader) error diff --git a/flytestdlib/storage/stow_store.go b/flytestdlib/storage/stow_store.go index 2b814509e4..83ffd5c441 100644 --- a/flytestdlib/storage/stow_store.go +++ b/flytestdlib/storage/stow_store.go @@ -251,8 +251,8 @@ func (s *StowStore) Head(ctx context.Context, reference DataReference) (Metadata return StowMetadata{exists: false}, errs.Wrapf(err, "path:%v", k) } -func (s *StowStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error) { - _, c, k, err := reference.Split() +func (s *StowStore) GetItems(ctx context.Context, reference DataReference) ([]string, error) { + scheme, c, k, err := reference.Split() if err != nil { s.metrics.BadReference.Inc(ctx) return nil, err @@ -263,90 +263,62 @@ func (s *StowStore) ReadRaw(ctx context.Context, reference DataReference) (io.Re return nil, err } - t := s.metrics.ReadOpenLatency.Start(ctx) - item, err := container.Item(k) - if err != nil { - incFailureCounterForError(ctx, s.metrics.ReadFailure, err) - return nil, err - } - t.Stop() + var items []string + cursor := stow.CursorStart - sizeBytes, err := item.Size() - if err != nil { - return nil, err - } + for { + // List items with the given key + pageItems, nextCursor, err := container.Items(k, cursor, 100) + if err != nil { + logger.Errorf(ctx, "failed to list container [%s] items with key [%s]", c, k) + return nil, err + } - if GetConfig().Limits.GetLimitMegabytes != 0 { - if sizeBytes > GetConfig().Limits.GetLimitMegabytes*MiB { - return nil, errors.Errorf(ErrExceedsLimit, "limit exceeded. %.6fmb > %vmb. You can increase the limit by setting maxDownloadMBs.", float64(sizeBytes)/float64(MiB), GetConfig().Limits.GetLimitMegabytes) + for _, item := range pageItems { + URL := fmt.Sprintf("%s://%s/%s", scheme, c, item.URL().String()) + items = append(items, URL) } - } - return item.Open() + if stow.IsCursorEnd(nextCursor) { + break + } + cursor = nextCursor + } + return items, nil } -func (s *StowStore) IsMultiPart(ctx context.Context, reference DataReference) (bool, error) { - _, containerName, prefix, err := reference.Split() +func (s *StowStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error) { + _, c, k, err := reference.Split() if err != nil { s.metrics.BadReference.Inc(ctx) - return false, err - } - - container, err := s.getContainer(ctx, locationIDMain, containerName) - if err != nil { - return false, err + return nil, err } - cursor := stow.CursorStart - - pageItems, _, err := container.Items(prefix, cursor, 100) + container, err := s.getContainer(ctx, locationIDMain, c) if err != nil { - logger.Errorf(ctx, "failed to list items with prefix: %s", prefix) - return false, err + return nil, err } - if len(pageItems) == 0 { - return false, nil - } else { - return true, nil - } -} - -func (s *StowStore) ReadParts(ctx context.Context, reference DataReference) ([]string, error) { - _, containerName, prefix, err := reference.Split() + t := s.metrics.ReadOpenLatency.Start(ctx) + item, err := container.Item(k) if err != nil { - s.metrics.BadReference.Inc(ctx) + incFailureCounterForError(ctx, s.metrics.ReadFailure, err) return nil, err } + t.Stop() - container, err := s.getContainer(ctx, locationIDMain, containerName) + sizeBytes, err := item.Size() if err != nil { return nil, err } - var parts []string - cursor := stow.CursorStart - - for { - // List items with the given prefix - pageItems, nextCursor, err := container.Items(prefix, cursor, 100) - if err != nil { - logger.Errorf(ctx, "failed to list items with prefix: %s", prefix) - return nil, err - } - - for _, item := range pageItems { - part := item.URL().String() - parts = append(parts, part) - } - - if stow.IsCursorEnd(nextCursor) { - break + if GetConfig().Limits.GetLimitMegabytes != 0 { + if sizeBytes > GetConfig().Limits.GetLimitMegabytes*MiB { + return nil, errors.Errorf(ErrExceedsLimit, "limit exceeded. %.6fmb > %vmb. You can increase the limit by setting maxDownloadMBs.", float64(sizeBytes)/float64(MiB), GetConfig().Limits.GetLimitMegabytes) } - cursor = nextCursor } - return parts, nil + return item.Open() } func (s *StowStore) WriteRaw(ctx context.Context, reference DataReference, size int64, opts Options, raw io.Reader) error { From f13022b17bf7e0f8790b7ed8b16f9b2f7a869a99 Mon Sep 17 00:00:00 2001 From: wayner0628 Date: Thu, 5 Sep 2024 04:26:16 +0800 Subject: [PATCH 04/29] add unit testing Signed-off-by: wayner0628 --- flytecopilot/data/download_test.go | 124 +++++++++++++++++++++++++ flytestdlib/storage/mem_store_test.go | 38 ++++++++ flytestdlib/storage/stow_store_test.go | 95 ++++++++++++++++++- 3 files changed, 255 insertions(+), 2 deletions(-) create mode 100644 flytecopilot/data/download_test.go diff --git a/flytecopilot/data/download_test.go b/flytecopilot/data/download_test.go new file mode 100644 index 0000000000..0b0724b048 --- /dev/null +++ b/flytecopilot/data/download_test.go @@ -0,0 +1,124 @@ +package data + +import ( + "bytes" + "context" + "os" + "path/filepath" + "testing" + + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flyte/flytestdlib/promutils" + "github.com/flyteorg/flyte/flytestdlib/storage" + + "github.com/stretchr/testify/assert" +) + +func TestHandleBlobMultipart(t *testing.T) { + s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) + assert.NoError(t, err) + ref := storage.DataReference("s3://container/folder/file1") + s.WriteRaw(context.Background(), ref, 0, storage.Options{}, bytes.NewReader([]byte{})) + ref = storage.DataReference("s3://container/folder/file2") + s.WriteRaw(context.Background(), ref, 0, storage.Options{}, bytes.NewReader([]byte{})) + + d := Downloader{store: s} + + blob := &core.Blob{ + Uri: "s3://container/folder", + Metadata: &core.BlobMetadata{ + Type: &core.BlobType{ + Dimensionality: core.BlobType_MULTIPART, + }, + }, + } + + toPath := "./inputs" + defer func() { + err := os.RemoveAll(toPath) + if err != nil { + t.Errorf("Failed to delete directory: %v", err) + } + }() + + // Test the handleBlob function + result, err := d.handleBlob(context.Background(), blob, toPath) + assert.NoError(t, err) + assert.Equal(t, toPath, result) + + // Check if files were created and data written + for _, file := range []string{"file1", "file2"} { + if _, err := os.Stat(filepath.Join(toPath, "folder", file)); os.IsNotExist(err) { + t.Errorf("expected file %s to exist", file) + } + } +} + +func TestHandleBlobSinglePart(t *testing.T) { + s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) + assert.NoError(t, err) + ref := storage.DataReference("s3://container/file") + s.WriteRaw(context.Background(), ref, 0, storage.Options{}, bytes.NewReader([]byte{})) + + d := Downloader{store: s} + + blob := &core.Blob{ + Uri: "s3://container/file", + Metadata: &core.BlobMetadata{ + Type: &core.BlobType{ + Dimensionality: core.BlobType_SINGLE, + }, + }, + } + + toPath := "./input" + defer func() { + err := os.RemoveAll(toPath) + if err != nil { + t.Errorf("Failed to delete file: %v", err) + } + }() + + // Test the handleBlob function + result, err := d.handleBlob(context.Background(), blob, toPath) + assert.NoError(t, err) + assert.Equal(t, toPath, result) + + // Check if files were created and data written + if _, err := os.Stat(toPath); os.IsNotExist(err) { + t.Errorf("expected file %s to exist", toPath) + } +} + +func TestHandleBlobHTTP(t *testing.T) { + s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) + assert.NoError(t, err) + d := Downloader{store: s} + + blob := &core.Blob{ + Uri: "https://raw.githubusercontent.com/flyteorg/flyte/master/README.md", + Metadata: &core.BlobMetadata{ + Type: &core.BlobType{ + Dimensionality: core.BlobType_SINGLE, + }, + }, + } + + toPath := "./input" + defer func() { + err := os.RemoveAll(toPath) + if err != nil { + t.Errorf("Failed to delete file: %v", err) + } + }() + + // Test the handleBlob function + result, err := d.handleBlob(context.Background(), blob, toPath) + assert.NoError(t, err) + assert.Equal(t, toPath, result) + + // Check if files were created and data written + if _, err := os.Stat(toPath); os.IsNotExist(err) { + t.Errorf("expected file %s to exist", toPath) + } +} diff --git a/flytestdlib/storage/mem_store_test.go b/flytestdlib/storage/mem_store_test.go index fc58ed1151..f427a972dd 100644 --- a/flytestdlib/storage/mem_store_test.go +++ b/flytestdlib/storage/mem_store_test.go @@ -29,6 +29,44 @@ func TestInMemoryStore_Head(t *testing.T) { }) } +func TestInMemoryStore_GetItems(t *testing.T) { + t.Run("Nil Path", func(t *testing.T) { + s, err := NewInMemoryRawStore(context.TODO(), &Config{}, metrics) + assert.NoError(t, err) + + items, err := s.GetItems(context.TODO(), DataReference("hello")) + assert.Error(t, err) + assert.Nil(t, items) + }) + + t.Run("No Items", func(t *testing.T) { + s, err := NewInMemoryRawStore(context.TODO(), &Config{}, metrics) + assert.NoError(t, err) + err = s.WriteRaw(context.TODO(), DataReference("folder"), 0, Options{}, bytes.NewReader([]byte{})) + assert.NoError(t, err) + + items, err := s.GetItems(context.TODO(), DataReference("folder")) + assert.Error(t, err) + assert.Nil(t, items) + }) + + t.Run("Existing Items", func(t *testing.T) { + s, err := NewInMemoryRawStore(context.TODO(), &Config{}, metrics) + assert.NoError(t, err) + err = s.WriteRaw(context.TODO(), DataReference("folder/file1"), 0, Options{}, bytes.NewReader([]byte{})) + assert.NoError(t, err) + + err = s.WriteRaw(context.TODO(), DataReference("folder/file2"), 0, Options{}, bytes.NewReader([]byte{})) + assert.NoError(t, err) + + items, err := s.GetItems(context.TODO(), DataReference("folder")) + assert.NoError(t, err) + assert.Equal(t, 2, len(items)) + assert.Equal(t, "folder/file1", items[0]) + assert.Equal(t, "folder/file2", items[1]) + }) +} + func TestInMemoryStore_ReadRaw(t *testing.T) { t.Run("Empty store", func(t *testing.T) { s, err := NewInMemoryRawStore(context.TODO(), &Config{}, metrics) diff --git a/flytestdlib/storage/stow_store_test.go b/flytestdlib/storage/stow_store_test.go index 99678eb8ad..e524430063 100644 --- a/flytestdlib/storage/stow_store_test.go +++ b/flytestdlib/storage/stow_store_test.go @@ -10,6 +10,7 @@ import ( "net/url" "os" "path/filepath" + "strings" "testing" "time" @@ -73,8 +74,16 @@ func (m mockStowContainer) Item(id string) (stow.Item, error) { return nil, stow.ErrNotFound } -func (mockStowContainer) Items(prefix, cursor string, count int) ([]stow.Item, string, error) { - return []stow.Item{}, "", nil +func (m mockStowContainer) Items(prefix, cursor string, count int) ([]stow.Item, string, error) { + var result []stow.Item + // Ignore cursor and count + for id, item := range m.items { + if strings.HasPrefix(id, prefix + "/") { + result = append(result, item) + } + } + + return result, "", nil } func (m mockStowContainer) RemoveItem(id string) error { @@ -226,6 +235,88 @@ func TestStowStore_CreateSignedURL(t *testing.T) { }) } +func TestStowStore_GetItems(t *testing.T) { + const container = "container" + + t.Run("Correct Query with Items", func(t *testing.T) { + ctx := context.Background() + fn := fQNFn["s3"] + s, err := NewStowRawStore(fn(container), &mockStowLoc{ + ContainerCb: func(id string) (stow.Container, error) { + if id == container { + return newMockStowContainer(container), nil + } + return nil, fmt.Errorf("container is not supported") + }, + CreateContainerCb: func(name string) (stow.Container, error) { + if name == container { + return newMockStowContainer(container), nil + } + return nil, fmt.Errorf("container is not supported") + }, + }, nil, false, metrics) + assert.NoError(t, err) + writeTestFile(ctx, t, s, "s3://container/folder/file1") + writeTestFile(ctx, t, s, "s3://container/folder/file2") + dataReference := DataReference("s3://container/folder") + items, err := s.GetItems(ctx, dataReference) + assert.NoError(t, err) + assert.Equal(t, 2, len(items)) + assert.Equal(t, "s3://container/folder/file1", items[0]) + assert.Equal(t, "s3://container/folder/file2", items[1]) + }) + + t.Run("Empty Reference", func(t *testing.T) { + ctx := context.Background() + fn := fQNFn["s3"] + s, err := NewStowRawStore(fn(container), &mockStowLoc{ + ContainerCb: func(id string) (stow.Container, error) { + if id == container { + return newMockStowContainer(container), nil + } + return nil, fmt.Errorf("container is not supported") + }, + CreateContainerCb: func(name string) (stow.Container, error) { + if name == container { + return newMockStowContainer(container), nil + } + return nil, fmt.Errorf("container is not supported") + }, + }, nil, false, metrics) + assert.NoError(t, err) + dataReference := DataReference("s3://container/folder") + items, err := s.GetItems(ctx, dataReference) + assert.NoError(t, err) + assert.Equal(t, 0, len(items)) + }) + + t.Run("Bad Reference", func(t *testing.T) { + ctx := context.Background() + fn := fQNFn["s3"] + s, err := NewStowRawStore(fn(container), &mockStowLoc{ + ContainerCb: func(id string) (stow.Container, error) { + if id == container { + return newMockStowContainer(container), nil + } + return nil, fmt.Errorf("container is not supported") + }, + CreateContainerCb: func(name string) (stow.Container, error) { + if name == container { + return newMockStowContainer(container), nil + } + return nil, fmt.Errorf("container is not supported") + }, + }, nil, false, metrics) + assert.NoError(t, err) + writeTestFile(ctx, t, s, "s3://container/folder/file1") + writeTestFile(ctx, t, s, "s3://container/folder/file2") + dataReference := DataReference("s3://container/bad-folder") + items, err := s.GetItems(ctx, dataReference) + assert.NoError(t, err) + assert.Equal(t, 0, len(items)) + }) +} + func TestStowStore_ReadRaw(t *testing.T) { const container = "container" t.Run("Happy Path", func(t *testing.T) { From 8fae9f9ca05c2a621e4477a23b6dd3444590dc2e Mon Sep 17 00:00:00 2001 From: wayner0628 Date: Thu, 5 Sep 2024 04:27:26 +0800 Subject: [PATCH 05/29] Parallelly handle blob items Signed-off-by: wayner0628 --- flytecopilot/data/download.go | 87 +++++++++++++++++++++++++---------- 1 file changed, 62 insertions(+), 25 deletions(-) diff --git a/flytecopilot/data/download.go b/flytecopilot/data/download.go index 28ec17a4b7..74d2dee63a 100644 --- a/flytecopilot/data/download.go +++ b/flytecopilot/data/download.go @@ -11,6 +11,7 @@ import ( "path/filepath" "reflect" "strconv" + "sync" "github.com/ghodss/yaml" "github.com/golang/protobuf/jsonpb" @@ -39,42 +40,78 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri return nil, errors.Wrapf(err, "Blob uri incorrectly formatted") } - // Handle multipart blob + var reader io.ReadCloser if blob.GetMetadata().GetType().Dimensionality == core.BlobType_MULTIPART { - err = os.Mkdir(toPath, os.ModePerm) - if err != nil { - logger.Errorf(ctx, "failed to create directories for path %s: %v", toPath, err) - return nil, err - } - parts, err := d.store.ReadParts(ctx, ref) - if err != nil { - logger.Errorf(ctx, "failed to read parts for multipart blob [%s]", ref) + items, err := d.store.GetItems(ctx, ref) + if err != nil || len(items) == 0 { + logger.Errorf(ctx, "failed to collect items from multipart blob [%s]", ref) return nil, err } - // Recursively processes all subparts within the directory - for _, relativePath := range parts { - joinPath := filepath.Join(toPath, relativePath) + success := 0 + var wg sync.WaitGroup + for _, absPath := range items { + absPath := absPath // capture range variable + wg.Add(1) + go func() { + defer wg.Done() + defer func() { + if err := recover(); err != nil { + logger.Errorf(ctx,"recover receives error: %s", err) + } + }() - // Check subpart's type - dim := core.BlobType_SINGLE - isMultiPart, err := d.store.IsMultiPart(ctx, storage.DataReference(joinPath)) - if err != nil { - logger.Errorf(ctx, "failed to check type for part [%s] in multipart blob [%s]", relativePath, ref) - continue - } - if isMultiPart { - dim = core.BlobType_MULTIPART - } - d.handleBlob(ctx, &core.Blob{Metadata: &core.BlobMetadata{Type: &core.BlobType{Dimensionality: dim}}, Uri: joinPath}, joinPath) + ref = storage.DataReference(absPath) + reader, err = DownloadFileFromStorage(ctx, ref, d.store) + if err != nil { + logger.Errorf(ctx, "Failed to download from ref [%s]", ref) + return + } + defer func() { + err := reader.Close() + if err != nil { + logger.Errorf(ctx, "failed to close Blob read stream @ref [%s]. Error: %s", ref, err) + } + }() + + _, _, relativePath, err := ref.Split() + if err != nil { + logger.Errorf(ctx, "Failed to parse ref [%s]", ref) + return + } + newPath := filepath.Join(toPath, relativePath) + dir := filepath.Dir(newPath) + // 0755: the directory can be read by anyone but can only be written by the owner + os.MkdirAll(dir, 0755) + writer, err := os.Create(newPath) + if err != nil { + logger.Errorf(ctx, "failed to open file at path %s", newPath) + return + } + defer func() { + err := writer.Close() + if err != nil { + logger.Errorf(ctx, "failed to close File write stream. Error: %s", err) + } + }() + + _, err = io.Copy(writer, reader) + if err != nil { + logger.Errorf(ctx, "failed to write remote data to local filesystem") + return + } + success += 1 + }() } + wg.Wait() + ref = storage.DataReference(blob.Uri) + logger.Infof(ctx, "Successfully copied [%d] remote files from [%s] to local [%s]", success, ref, toPath) return toPath, nil } - var reader io.ReadCloser + if scheme == "http" || scheme == "https" { reader, err = DownloadFileFromHTTP(ctx, ref) } else { - // Handle single part blob reader, err = DownloadFileFromStorage(ctx, ref, d.store) } if err != nil { From e82c5defa55ff2993293728f720e1aaef096fea8 Mon Sep 17 00:00:00 2001 From: wayner0628 Date: Thu, 5 Sep 2024 04:39:17 +0800 Subject: [PATCH 06/29] fix lint error Signed-off-by: wayner0628 --- flytestdlib/storage/stow_store_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytestdlib/storage/stow_store_test.go b/flytestdlib/storage/stow_store_test.go index e524430063..6d10639f53 100644 --- a/flytestdlib/storage/stow_store_test.go +++ b/flytestdlib/storage/stow_store_test.go @@ -78,7 +78,7 @@ func (m mockStowContainer) Items(prefix, cursor string, count int) ([]stow.Item, var result []stow.Item // Ignore cursor and count for id, item := range m.items { - if strings.HasPrefix(id, prefix + "/") { + if strings.HasPrefix(id, prefix+"/") { result = append(result, item) } } From d7c468614ea6c6aad685805809835fe0d9b4ceee Mon Sep 17 00:00:00 2001 From: wayner0628 Date: Thu, 5 Sep 2024 04:39:39 +0800 Subject: [PATCH 07/29] implement GetItems function Signed-off-by: wayner0628 --- flytepropeller/pkg/utils/failing_datastore.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/flytepropeller/pkg/utils/failing_datastore.go b/flytepropeller/pkg/utils/failing_datastore.go index f3b65471c7..d214ce2534 100644 --- a/flytepropeller/pkg/utils/failing_datastore.go +++ b/flytepropeller/pkg/utils/failing_datastore.go @@ -27,6 +27,10 @@ func (FailingRawStore) Head(ctx context.Context, reference storage.DataReference return nil, fmt.Errorf("failed metadata fetch") } +func (FailingRawStore) GetItems(ctx context.Context, reference storage.DataReference) ([]string, error) { + return nil, fmt.Errorf("failed get items") +} + func (FailingRawStore) ReadRaw(ctx context.Context, reference storage.DataReference) (io.ReadCloser, error) { return nil, fmt.Errorf("failed read raw") } From ceaa72d65d24c6f7a2b6c6cba1efd5a39d93f23b Mon Sep 17 00:00:00 2001 From: wayner0628 Date: Thu, 5 Sep 2024 05:24:14 +0800 Subject: [PATCH 08/29] add mutex avoid racing Signed-off-by: wayner0628 --- flytecopilot/data/download.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/flytecopilot/data/download.go b/flytecopilot/data/download.go index 74d2dee63a..7c5c2a59bc 100644 --- a/flytecopilot/data/download.go +++ b/flytecopilot/data/download.go @@ -49,6 +49,7 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri } success := 0 + var mu sync.Mutex var wg sync.WaitGroup for _, absPath := range items { absPath := absPath // capture range variable @@ -81,9 +82,12 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri } newPath := filepath.Join(toPath, relativePath) dir := filepath.Dir(newPath) + + mu.Lock() // 0755: the directory can be read by anyone but can only be written by the owner os.MkdirAll(dir, 0755) writer, err := os.Create(newPath) + mu.Unlock() if err != nil { logger.Errorf(ctx, "failed to open file at path %s", newPath) return From 1f0b1953cff88bd4f939fe2bd166a496fc38f69a Mon Sep 17 00:00:00 2001 From: wayner0628 Date: Thu, 5 Sep 2024 05:25:12 +0800 Subject: [PATCH 09/29] avoid infinite call Signed-off-by: wayner0628 --- flytestdlib/storage/cached_rawstore.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/flytestdlib/storage/cached_rawstore.go b/flytestdlib/storage/cached_rawstore.go index 2e087c8a81..deebf96778 100644 --- a/flytestdlib/storage/cached_rawstore.go +++ b/flytestdlib/storage/cached_rawstore.go @@ -51,11 +51,12 @@ func (s *cachedRawStore) Head(ctx context.Context, reference DataReference) (Met } func (s *cachedRawStore) GetItems(ctx context.Context, reference DataReference) ([]string, error) { - ctx, span := otelutils.NewSpan(ctx, otelutils.BlobstoreClientTracer, "flytestdlib.storage.cachedRawStore/GetItems") + _, span := otelutils.NewSpan(ctx, otelutils.BlobstoreClientTracer, "flytestdlib.storage.cachedRawStore/GetItems") defer span.End() // freecache does not support full cache scanning - return s.RawStore.GetItems(ctx, reference) + var items []string + return items, nil } // ReadRaw retrieves a byte array from the Blob store or an error From 19b0ae8058ab5611edc6715619e9f5a95c79e56b Mon Sep 17 00:00:00 2001 From: wayner0628 Date: Fri, 6 Sep 2024 00:13:38 +0800 Subject: [PATCH 10/29] protect critical variables Signed-off-by: wayner0628 --- flytecopilot/data/download.go | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/flytecopilot/data/download.go b/flytecopilot/data/download.go index 7c5c2a59bc..992689143b 100644 --- a/flytecopilot/data/download.go +++ b/flytecopilot/data/download.go @@ -40,7 +40,6 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri return nil, errors.Wrapf(err, "Blob uri incorrectly formatted") } - var reader io.ReadCloser if blob.GetMetadata().GetType().Dimensionality == core.BlobType_MULTIPART { items, err := d.store.GetItems(ctx, ref) if err != nil || len(items) == 0 { @@ -52,18 +51,20 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri var mu sync.Mutex var wg sync.WaitGroup for _, absPath := range items { - absPath := absPath // capture range variable + // capture range variable + absPath := absPath + wg.Add(1) go func() { defer wg.Done() defer func() { if err := recover(); err != nil { - logger.Errorf(ctx,"recover receives error: %s", err) + logger.Errorf(ctx, "recover receives error: %s", err) } }() - ref = storage.DataReference(absPath) - reader, err = DownloadFileFromStorage(ctx, ref, d.store) + ref := storage.DataReference(absPath) + reader, err := DownloadFileFromStorage(ctx, ref, d.store) if err != nil { logger.Errorf(ctx, "Failed to download from ref [%s]", ref) return @@ -82,7 +83,7 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri } newPath := filepath.Join(toPath, relativePath) dir := filepath.Dir(newPath) - + mu.Lock() // 0755: the directory can be read by anyone but can only be written by the owner os.MkdirAll(dir, 0755) @@ -104,7 +105,9 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri logger.Errorf(ctx, "failed to write remote data to local filesystem") return } + mu.Lock() success += 1 + mu.Unlock() }() } wg.Wait() @@ -112,7 +115,9 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri logger.Infof(ctx, "Successfully copied [%d] remote files from [%s] to local [%s]", success, ref, toPath) return toPath, nil } - + + // reader should be declared here (avoid being shared across all goroutines) + var reader io.ReadCloser if scheme == "http" || scheme == "https" { reader, err = DownloadFileFromHTTP(ctx, ref) } else { From b948aee47fb4ccc387d74781bda7543870ffb2ca Mon Sep 17 00:00:00 2001 From: wayner0628 Date: Fri, 6 Sep 2024 00:14:11 +0800 Subject: [PATCH 11/29] avoid infinite call Signed-off-by: wayner0628 --- flytestdlib/storage/cached_rawstore.go | 3 +-- flytestdlib/storage/cached_rawstore_test.go | 3 ++- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/flytestdlib/storage/cached_rawstore.go b/flytestdlib/storage/cached_rawstore.go index deebf96778..4aa55709d0 100644 --- a/flytestdlib/storage/cached_rawstore.go +++ b/flytestdlib/storage/cached_rawstore.go @@ -55,8 +55,7 @@ func (s *cachedRawStore) GetItems(ctx context.Context, reference DataReference) defer span.End() // freecache does not support full cache scanning - var items []string - return items, nil + return s.RawStore.GetItems(ctx, reference) } // ReadRaw retrieves a byte array from the Blob store or an error diff --git a/flytestdlib/storage/cached_rawstore_test.go b/flytestdlib/storage/cached_rawstore_test.go index 6bd5ad0c87..f35bc4d3ef 100644 --- a/flytestdlib/storage/cached_rawstore_test.go +++ b/flytestdlib/storage/cached_rawstore_test.go @@ -55,6 +55,7 @@ func dummyCacheStore(t *testing.T, store RawStore, metrics *cacheMetrics) *cache type dummyStore struct { copyImpl HeadCb func(ctx context.Context, reference DataReference) (Metadata, error) + GetItemsCb func(ctx context.Context, reference DataReference) ([]string, error) ReadRawCb func(ctx context.Context, reference DataReference) (io.ReadCloser, error) WriteRawCb func(ctx context.Context, reference DataReference, size int64, opts Options, raw io.Reader) error DeleteCb func(ctx context.Context, reference DataReference) error @@ -74,7 +75,7 @@ func (d *dummyStore) Head(ctx context.Context, reference DataReference) (Metadat } func (d *dummyStore) GetItems(ctx context.Context, reference DataReference) ([]string, error) { - return d.GetItems(ctx, reference) + return d.GetItemsCb(ctx, reference) } func (d *dummyStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error) { From c88813f3a49cc231aa86b7fff875dc23c236f6c9 Mon Sep 17 00:00:00 2001 From: wayner0628 Date: Fri, 6 Sep 2024 00:22:54 +0800 Subject: [PATCH 12/29] lint Signed-off-by: wayner0628 --- flytestdlib/storage/mem_store_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytestdlib/storage/mem_store_test.go b/flytestdlib/storage/mem_store_test.go index f427a972dd..85e6f98a6e 100644 --- a/flytestdlib/storage/mem_store_test.go +++ b/flytestdlib/storage/mem_store_test.go @@ -33,7 +33,7 @@ func TestInMemoryStore_GetItems(t *testing.T) { t.Run("Nil Path", func(t *testing.T) { s, err := NewInMemoryRawStore(context.TODO(), &Config{}, metrics) assert.NoError(t, err) - + items, err := s.GetItems(context.TODO(), DataReference("hello")) assert.Error(t, err) assert.Nil(t, items) From df9b8ed00ef64fc1108f68ec7885a5678d14a54f Mon Sep 17 00:00:00 2001 From: wayner0628 Date: Fri, 6 Sep 2024 01:35:34 +0800 Subject: [PATCH 13/29] add more unit tests Signed-off-by: wayner0628 --- flytecopilot/data/download_test.go | 93 +++++++++++++-------- flytestdlib/storage/cached_rawstore.go | 1 + flytestdlib/storage/cached_rawstore_test.go | 20 +++++ flytestdlib/storage/stow_store_test.go | 48 +++++++++++ 4 files changed, 129 insertions(+), 33 deletions(-) diff --git a/flytecopilot/data/download_test.go b/flytecopilot/data/download_test.go index 0b0724b048..1f3b3a7be6 100644 --- a/flytecopilot/data/download_test.go +++ b/flytecopilot/data/download_test.go @@ -15,43 +15,72 @@ import ( ) func TestHandleBlobMultipart(t *testing.T) { - s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) - assert.NoError(t, err) - ref := storage.DataReference("s3://container/folder/file1") - s.WriteRaw(context.Background(), ref, 0, storage.Options{}, bytes.NewReader([]byte{})) - ref = storage.DataReference("s3://container/folder/file2") - s.WriteRaw(context.Background(), ref, 0, storage.Options{}, bytes.NewReader([]byte{})) - - d := Downloader{store: s} - - blob := &core.Blob{ - Uri: "s3://container/folder", - Metadata: &core.BlobMetadata{ - Type: &core.BlobType{ - Dimensionality: core.BlobType_MULTIPART, + t.Run("Successful Query", func(t *testing.T) { + s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) + assert.NoError(t, err) + ref := storage.DataReference("s3://container/folder/file1") + s.WriteRaw(context.Background(), ref, 0, storage.Options{}, bytes.NewReader([]byte{})) + ref = storage.DataReference("s3://container/folder/file2") + s.WriteRaw(context.Background(), ref, 0, storage.Options{}, bytes.NewReader([]byte{})) + + d := Downloader{store: s} + + blob := &core.Blob{ + Uri: "s3://container/folder", + Metadata: &core.BlobMetadata{ + Type: &core.BlobType{ + Dimensionality: core.BlobType_MULTIPART, + }, }, - }, - } + } - toPath := "./inputs" - defer func() { - err := os.RemoveAll(toPath) - if err != nil { - t.Errorf("Failed to delete directory: %v", err) + toPath := "./inputs" + defer func() { + err := os.RemoveAll(toPath) + if err != nil { + t.Errorf("Failed to delete directory: %v", err) + } + }() + + result, err := d.handleBlob(context.Background(), blob, toPath) + assert.NoError(t, err) + assert.Equal(t, toPath, result) + + // Check if files were created and data written + for _, file := range []string{"file1", "file2"} { + if _, err := os.Stat(filepath.Join(toPath, "folder", file)); os.IsNotExist(err) { + t.Errorf("expected file %s to exist", file) + } } - }() + }) - // Test the handleBlob function - result, err := d.handleBlob(context.Background(), blob, toPath) - assert.NoError(t, err) - assert.Equal(t, toPath, result) + t.Run("No Items", func(t *testing.T) { + s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) + assert.NoError(t, err) - // Check if files were created and data written - for _, file := range []string{"file1", "file2"} { - if _, err := os.Stat(filepath.Join(toPath, "folder", file)); os.IsNotExist(err) { - t.Errorf("expected file %s to exist", file) + d := Downloader{store: s} + + blob := &core.Blob{ + Uri: "s3://container/folder", + Metadata: &core.BlobMetadata{ + Type: &core.BlobType{ + Dimensionality: core.BlobType_MULTIPART, + }, + }, } - } + + toPath := "./inputs" + defer func() { + err := os.RemoveAll(toPath) + if err != nil { + t.Errorf("Failed to delete directory: %v", err) + } + }() + + result, err := d.handleBlob(context.Background(), blob, toPath) + assert.Error(t, err) + assert.Nil(t, result) + }) } func TestHandleBlobSinglePart(t *testing.T) { @@ -79,7 +108,6 @@ func TestHandleBlobSinglePart(t *testing.T) { } }() - // Test the handleBlob function result, err := d.handleBlob(context.Background(), blob, toPath) assert.NoError(t, err) assert.Equal(t, toPath, result) @@ -112,7 +140,6 @@ func TestHandleBlobHTTP(t *testing.T) { } }() - // Test the handleBlob function result, err := d.handleBlob(context.Background(), blob, toPath) assert.NoError(t, err) assert.Equal(t, toPath, result) diff --git a/flytestdlib/storage/cached_rawstore.go b/flytestdlib/storage/cached_rawstore.go index 4aa55709d0..cc19cae85e 100644 --- a/flytestdlib/storage/cached_rawstore.go +++ b/flytestdlib/storage/cached_rawstore.go @@ -50,6 +50,7 @@ func (s *cachedRawStore) Head(ctx context.Context, reference DataReference) (Met return s.RawStore.Head(ctx, reference) } +// GetItems retrieves the paths of all items from the Blob store or an error func (s *cachedRawStore) GetItems(ctx context.Context, reference DataReference) ([]string, error) { _, span := otelutils.NewSpan(ctx, otelutils.BlobstoreClientTracer, "flytestdlib.storage.cachedRawStore/GetItems") defer span.End() diff --git a/flytestdlib/storage/cached_rawstore_test.go b/flytestdlib/storage/cached_rawstore_test.go index f35bc4d3ef..14012c6ad2 100644 --- a/flytestdlib/storage/cached_rawstore_test.go +++ b/flytestdlib/storage/cached_rawstore_test.go @@ -110,6 +110,14 @@ func TestCachedRawStore(t *testing.T) { } return MemoryMetadata{}, fmt.Errorf("err") }, + GetItemsCb: func(ctx context.Context, reference DataReference) ([]string, error) { + var s []string + if reference == "k1" { + s = append(s, "item") + return s, nil + } + return s, fmt.Errorf("err") + }, WriteRawCb: func(ctx context.Context, reference DataReference, size int64, opts Options, raw io.Reader) error { if writeCalled { assert.FailNow(t, "Should not be writeCalled") @@ -165,6 +173,18 @@ func TestCachedRawStore(t *testing.T) { assert.False(t, m.Exists()) }) + t.Run("Get Items", func(t *testing.T) { + items, err := cStore.GetItems(ctx, k1) + assert.NoError(t, err) + assert.Equal(t, 1, len(items)) + }) + + t.Run("No Items", func(t *testing.T) { + items, err := cStore.GetItems(ctx, k2) + assert.Error(t, err) + assert.Equal(t, 0, len(items)) + }) + t.Run("ReadCachePopulate", func(t *testing.T) { o, err := cStore.ReadRaw(ctx, k1) assert.NoError(t, err) diff --git a/flytestdlib/storage/stow_store_test.go b/flytestdlib/storage/stow_store_test.go index 6d10639f53..3c4ab87c5d 100644 --- a/flytestdlib/storage/stow_store_test.go +++ b/flytestdlib/storage/stow_store_test.go @@ -315,6 +315,54 @@ func TestStowStore_GetItems(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 0, len(items)) }) + + t.Run("Broken Reference", func(t *testing.T) { + ctx := context.Background() + fn := fQNFn["s3"] + s, err := NewStowRawStore(fn(container), &mockStowLoc{ + ContainerCb: func(id string) (stow.Container, error) { + if id == container { + return newMockStowContainer(container), nil + } + return nil, fmt.Errorf("container is not supported") + }, + CreateContainerCb: func(name string) (stow.Container, error) { + if name == container { + return newMockStowContainer(container), nil + } + return nil, fmt.Errorf("container is not supported") + }, + }, nil, false, metrics) + assert.NoError(t, err) + dataReference := DataReference("s3://") + items, err := s.GetItems(ctx, dataReference) + assert.Error(t, err) + assert.Nil(t, items) + }) + + t.Run("Non-exist Container", func(t *testing.T) { + ctx := context.Background() + fn := fQNFn["s3"] + s, err := NewStowRawStore(fn(container), &mockStowLoc{ + ContainerCb: func(id string) (stow.Container, error) { + if id == container { + return newMockStowContainer(container), nil + } + return nil, fmt.Errorf("container is not supported") + }, + CreateContainerCb: func(name string) (stow.Container, error) { + if name == container { + return newMockStowContainer(container), nil + } + return nil, fmt.Errorf("container is not supported") + }, + }, nil, false, metrics) + assert.NoError(t, err) + dataReference := DataReference("s3://bad-container/folder") + items, err := s.GetItems(ctx, dataReference) + assert.Error(t, err) + assert.Nil(t, items) + }) } func TestStowStore_ReadRaw(t *testing.T) { From 8150baa9e976acd28c9a9e93530c0979e4973c3b Mon Sep 17 00:00:00 2001 From: wayner0628 Date: Fri, 6 Sep 2024 02:07:24 +0800 Subject: [PATCH 14/29] add more unit tests Signed-off-by: wayner0628 --- flytepropeller/pkg/utils/failing_datastore_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/flytepropeller/pkg/utils/failing_datastore_test.go b/flytepropeller/pkg/utils/failing_datastore_test.go index cb28e153a3..b78ed1474c 100644 --- a/flytepropeller/pkg/utils/failing_datastore_test.go +++ b/flytepropeller/pkg/utils/failing_datastore_test.go @@ -19,6 +19,9 @@ func TestFailingRawStore(t *testing.T) { c := f.GetBaseContainerFQN(ctx) assert.Equal(t, storage.DataReference(""), c) + _, err = f.GetItems(ctx, "") + assert.Error(t, err) + _, err = f.ReadRaw(ctx, "") assert.Error(t, err) From 672b7113ff8e563a84771529379213125f192b35 Mon Sep 17 00:00:00 2001 From: wayner0628 Date: Fri, 6 Sep 2024 02:57:50 +0800 Subject: [PATCH 15/29] fix mock Signed-off-by: wayner0628 --- flytestdlib/storage/mocks/raw_store.go | 46 +++++++++++++++++++++++--- 1 file changed, 41 insertions(+), 5 deletions(-) diff --git a/flytestdlib/storage/mocks/raw_store.go b/flytestdlib/storage/mocks/raw_store.go index 170931e9be..c15c4b7088 100644 --- a/flytestdlib/storage/mocks/raw_store.go +++ b/flytestdlib/storage/mocks/raw_store.go @@ -192,6 +192,47 @@ func (_m *RawStore) Head(ctx context.Context, reference storage.DataReference) ( return r0, r1 } +type RawStore_GetItems struct { + *mock.Call +} + +func (_m RawStore_GetItems) Return(_a0 error) *RawStore_GetItems { + return &RawStore_GetItems{Call: _m.Call.Return(_a0)} +} + +func (_m *RawStore) OnGetItems(ctx context.Context, reference storage.DataReference) *RawStore_GetItems { + c_call := _m.On("GetItems", ctx, reference) + return &RawStore_GetItems{Call: c_call} +} + +func (_m *RawStore) OnGetItemsMatch(matchers ...interface{}) *RawStore_GetItems { + c_call := _m.On("GetItems", matchers...) + return &RawStore_GetItems{Call: c_call} +} + +// GetItems provides a mock function with given fields: ctx, reference +func (_m *RawStore) GetItems(ctx context.Context, reference storage.DataReference) ([]string, error) { + ret := _m.Called(ctx, reference) + + var r0 []string + if rf, ok := ret.Get(0).(func(context.Context, storage.DataReference) []string); ok { + r0 = rf(ctx, reference) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]string) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, storage.DataReference) error); ok { + r1 = rf(ctx, reference) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + type RawStore_ReadRaw struct { *mock.Call } @@ -210,11 +251,6 @@ func (_m *RawStore) OnReadRawMatch(matchers ...interface{}) *RawStore_ReadRaw { return &RawStore_ReadRaw{Call: c_call} } -func (_m *RawStore) GetItems(ctx context.Context, reference storage.DataReference) ([]string, error) { - var s []string - return s, nil -} - // ReadRaw provides a mock function with given fields: ctx, reference func (_m *RawStore) ReadRaw(ctx context.Context, reference storage.DataReference) (io.ReadCloser, error) { ret := _m.Called(ctx, reference) From ad12330feb38882d0b73f84a340e7f3afd226387 Mon Sep 17 00:00:00 2001 From: wayner0628 Date: Sun, 15 Sep 2024 14:34:27 -0700 Subject: [PATCH 16/29] Accept incoming changes Signed-off-by: wayner0628 --- .../pkg/utils/failing_datastore_test.go | 3 - flytestdlib/storage/cached_rawstore.go | 9 -- flytestdlib/storage/cached_rawstore_test.go | 12 -- flytestdlib/storage/mem_store.go | 1 - flytestdlib/storage/mem_store_test.go | 38 ----- flytestdlib/storage/stow_store_test.go | 130 ------------------ 6 files changed, 193 deletions(-) diff --git a/flytepropeller/pkg/utils/failing_datastore_test.go b/flytepropeller/pkg/utils/failing_datastore_test.go index b78ed1474c..cb28e153a3 100644 --- a/flytepropeller/pkg/utils/failing_datastore_test.go +++ b/flytepropeller/pkg/utils/failing_datastore_test.go @@ -19,9 +19,6 @@ func TestFailingRawStore(t *testing.T) { c := f.GetBaseContainerFQN(ctx) assert.Equal(t, storage.DataReference(""), c) - _, err = f.GetItems(ctx, "") - assert.Error(t, err) - _, err = f.ReadRaw(ctx, "") assert.Error(t, err) diff --git a/flytestdlib/storage/cached_rawstore.go b/flytestdlib/storage/cached_rawstore.go index cc19cae85e..913a517a0f 100644 --- a/flytestdlib/storage/cached_rawstore.go +++ b/flytestdlib/storage/cached_rawstore.go @@ -50,15 +50,6 @@ func (s *cachedRawStore) Head(ctx context.Context, reference DataReference) (Met return s.RawStore.Head(ctx, reference) } -// GetItems retrieves the paths of all items from the Blob store or an error -func (s *cachedRawStore) GetItems(ctx context.Context, reference DataReference) ([]string, error) { - _, span := otelutils.NewSpan(ctx, otelutils.BlobstoreClientTracer, "flytestdlib.storage.cachedRawStore/GetItems") - defer span.End() - - // freecache does not support full cache scanning - return s.RawStore.GetItems(ctx, reference) -} - // ReadRaw retrieves a byte array from the Blob store or an error func (s *cachedRawStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error) { ctx, span := otelutils.NewSpan(ctx, otelutils.BlobstoreClientTracer, "flytestdlib.storage.cachedRawStore/ReadRaw") diff --git a/flytestdlib/storage/cached_rawstore_test.go b/flytestdlib/storage/cached_rawstore_test.go index 1a5159ac97..bbbb6c5962 100644 --- a/flytestdlib/storage/cached_rawstore_test.go +++ b/flytestdlib/storage/cached_rawstore_test.go @@ -173,18 +173,6 @@ func TestCachedRawStore(t *testing.T) { assert.False(t, m.Exists()) }) - t.Run("Get Items", func(t *testing.T) { - items, err := cStore.GetItems(ctx, k1) - assert.NoError(t, err) - assert.Equal(t, 1, len(items)) - }) - - t.Run("No Items", func(t *testing.T) { - items, err := cStore.GetItems(ctx, k2) - assert.Error(t, err) - assert.Equal(t, 0, len(items)) - }) - t.Run("ReadCachePopulate", func(t *testing.T) { o, err := cStore.ReadRaw(ctx, k1) assert.NoError(t, err) diff --git a/flytestdlib/storage/mem_store.go b/flytestdlib/storage/mem_store.go index 4c22d80b6b..94083f6646 100644 --- a/flytestdlib/storage/mem_store.go +++ b/flytestdlib/storage/mem_store.go @@ -9,7 +9,6 @@ import ( "io" "io/ioutil" "os" - "strings" ) type rawFile = []byte diff --git a/flytestdlib/storage/mem_store_test.go b/flytestdlib/storage/mem_store_test.go index 85e6f98a6e..fc58ed1151 100644 --- a/flytestdlib/storage/mem_store_test.go +++ b/flytestdlib/storage/mem_store_test.go @@ -29,44 +29,6 @@ func TestInMemoryStore_Head(t *testing.T) { }) } -func TestInMemoryStore_GetItems(t *testing.T) { - t.Run("Nil Path", func(t *testing.T) { - s, err := NewInMemoryRawStore(context.TODO(), &Config{}, metrics) - assert.NoError(t, err) - - items, err := s.GetItems(context.TODO(), DataReference("hello")) - assert.Error(t, err) - assert.Nil(t, items) - }) - - t.Run("No Items", func(t *testing.T) { - s, err := NewInMemoryRawStore(context.TODO(), &Config{}, metrics) - assert.NoError(t, err) - err = s.WriteRaw(context.TODO(), DataReference("folder"), 0, Options{}, bytes.NewReader([]byte{})) - assert.NoError(t, err) - - items, err := s.GetItems(context.TODO(), DataReference("folder")) - assert.Error(t, err) - assert.Nil(t, items) - }) - - t.Run("Existing Items", func(t *testing.T) { - s, err := NewInMemoryRawStore(context.TODO(), &Config{}, metrics) - assert.NoError(t, err) - err = s.WriteRaw(context.TODO(), DataReference("folder/file1"), 0, Options{}, bytes.NewReader([]byte{})) - assert.NoError(t, err) - - err = s.WriteRaw(context.TODO(), DataReference("folder/file2"), 0, Options{}, bytes.NewReader([]byte{})) - assert.NoError(t, err) - - items, err := s.GetItems(context.TODO(), DataReference("folder")) - assert.NoError(t, err) - assert.Equal(t, 2, len(items)) - assert.Equal(t, "folder/file1", items[0]) - assert.Equal(t, "folder/file2", items[1]) - }) -} - func TestInMemoryStore_ReadRaw(t *testing.T) { t.Run("Empty store", func(t *testing.T) { s, err := NewInMemoryRawStore(context.TODO(), &Config{}, metrics) diff --git a/flytestdlib/storage/stow_store_test.go b/flytestdlib/storage/stow_store_test.go index c60be18882..4de273dd93 100644 --- a/flytestdlib/storage/stow_store_test.go +++ b/flytestdlib/storage/stow_store_test.go @@ -257,136 +257,6 @@ func TestStowStore_CreateSignedURL(t *testing.T) { }) } -func TestStowStore_GetItems(t *testing.T) { - const container = "container" - - t.Run("Correct Query with Items", func(t *testing.T) { - ctx := context.Background() - fn := fQNFn["s3"] - s, err := NewStowRawStore(fn(container), &mockStowLoc{ - ContainerCb: func(id string) (stow.Container, error) { - if id == container { - return newMockStowContainer(container), nil - } - return nil, fmt.Errorf("container is not supported") - }, - CreateContainerCb: func(name string) (stow.Container, error) { - if name == container { - return newMockStowContainer(container), nil - } - return nil, fmt.Errorf("container is not supported") - }, - }, nil, false, metrics) - assert.NoError(t, err) - writeTestFile(ctx, t, s, "s3://container/folder/file1") - writeTestFile(ctx, t, s, "s3://container/folder/file2") - dataReference := DataReference("s3://container/folder") - items, err := s.GetItems(ctx, dataReference) - assert.NoError(t, err) - assert.Equal(t, 2, len(items)) - assert.Equal(t, "s3://container/folder/file1", items[0]) - assert.Equal(t, "s3://container/folder/file2", items[1]) - }) - - t.Run("Empty Reference", func(t *testing.T) { - ctx := context.Background() - fn := fQNFn["s3"] - s, err := NewStowRawStore(fn(container), &mockStowLoc{ - ContainerCb: func(id string) (stow.Container, error) { - if id == container { - return newMockStowContainer(container), nil - } - return nil, fmt.Errorf("container is not supported") - }, - CreateContainerCb: func(name string) (stow.Container, error) { - if name == container { - return newMockStowContainer(container), nil - } - return nil, fmt.Errorf("container is not supported") - }, - }, nil, false, metrics) - assert.NoError(t, err) - dataReference := DataReference("s3://container/folder") - items, err := s.GetItems(ctx, dataReference) - assert.NoError(t, err) - assert.Equal(t, 0, len(items)) - }) - - t.Run("Bad Reference", func(t *testing.T) { - ctx := context.Background() - fn := fQNFn["s3"] - s, err := NewStowRawStore(fn(container), &mockStowLoc{ - ContainerCb: func(id string) (stow.Container, error) { - if id == container { - return newMockStowContainer(container), nil - } - return nil, fmt.Errorf("container is not supported") - }, - CreateContainerCb: func(name string) (stow.Container, error) { - if name == container { - return newMockStowContainer(container), nil - } - return nil, fmt.Errorf("container is not supported") - }, - }, nil, false, metrics) - assert.NoError(t, err) - writeTestFile(ctx, t, s, "s3://container/folder/file1") - writeTestFile(ctx, t, s, "s3://container/folder/file2") - dataReference := DataReference("s3://container/bad-folder") - items, err := s.GetItems(ctx, dataReference) - assert.NoError(t, err) - assert.Equal(t, 0, len(items)) - }) - - t.Run("Broken Reference", func(t *testing.T) { - ctx := context.Background() - fn := fQNFn["s3"] - s, err := NewStowRawStore(fn(container), &mockStowLoc{ - ContainerCb: func(id string) (stow.Container, error) { - if id == container { - return newMockStowContainer(container), nil - } - return nil, fmt.Errorf("container is not supported") - }, - CreateContainerCb: func(name string) (stow.Container, error) { - if name == container { - return newMockStowContainer(container), nil - } - return nil, fmt.Errorf("container is not supported") - }, - }, nil, false, metrics) - assert.NoError(t, err) - dataReference := DataReference("s3://") - items, err := s.GetItems(ctx, dataReference) - assert.Error(t, err) - assert.Nil(t, items) - }) - - t.Run("Non-exist Container", func(t *testing.T) { - ctx := context.Background() - fn := fQNFn["s3"] - s, err := NewStowRawStore(fn(container), &mockStowLoc{ - ContainerCb: func(id string) (stow.Container, error) { - if id == container { - return newMockStowContainer(container), nil - } - return nil, fmt.Errorf("container is not supported") - }, - CreateContainerCb: func(name string) (stow.Container, error) { - if name == container { - return newMockStowContainer(container), nil - } - return nil, fmt.Errorf("container is not supported") - }, - }, nil, false, metrics) - assert.NoError(t, err) - dataReference := DataReference("s3://bad-container/folder") - items, err := s.GetItems(ctx, dataReference) - assert.Error(t, err) - assert.Nil(t, items) - }) -} - func TestStowStore_ReadRaw(t *testing.T) { const container = "container" t.Run("Happy Path", func(t *testing.T) { From 65611c052b4ea67ac57ebc97f3e86658031fcfb9 Mon Sep 17 00:00:00 2001 From: wayner0628 Date: Sun, 15 Sep 2024 15:59:41 -0700 Subject: [PATCH 17/29] multipart blob download based on new api Signed-off-by: wayner0628 --- flytecopilot/data/download.go | 48 +++++++++++++++++++------------- flytestdlib/storage/mem_store.go | 16 ++++++++++- 2 files changed, 44 insertions(+), 20 deletions(-) diff --git a/flytecopilot/data/download.go b/flytecopilot/data/download.go index 992689143b..1df300a65a 100644 --- a/flytecopilot/data/download.go +++ b/flytecopilot/data/download.go @@ -34,25 +34,36 @@ type Downloader struct { } func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath string) (interface{}, error) { - ref := storage.DataReference(blob.Uri) - scheme, _, _, err := ref.Split() + blobRef := storage.DataReference(blob.Uri) + scheme, c, _, err := blobRef.Split() if err != nil { return nil, errors.Wrapf(err, "Blob uri incorrectly formatted") } if blob.GetMetadata().GetType().Dimensionality == core.BlobType_MULTIPART { - items, err := d.store.GetItems(ctx, ref) - if err != nil || len(items) == 0 { - logger.Errorf(ctx, "failed to collect items from multipart blob [%s]", ref) - return nil, err + maxItems := 100 + cursor := storage.NewCursorAtStart() + var items []storage.DataReference + var keys []string + for { + items, cursor, err = d.store.List(ctx, blobRef, maxItems, cursor) + if err != nil || len(items) == 0 { + logger.Errorf(ctx, "failed to collect items from multipart blob [%s]", blobRef) + return nil, err + } + for _, item := range items { + keys = append(keys, item.String()) + } + if cursor.cursorState == storage.AtEndCursorState { + break + } } success := 0 var mu sync.Mutex var wg sync.WaitGroup - for _, absPath := range items { - // capture range variable - absPath := absPath + for _, k := range keys { + absPath := fmt.Sprintf("%s://%s/%s", scheme, c, k) wg.Add(1) go func() { @@ -76,19 +87,19 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri } }() - _, _, relativePath, err := ref.Split() + _, _, k, err := ref.Split() if err != nil { logger.Errorf(ctx, "Failed to parse ref [%s]", ref) return } - newPath := filepath.Join(toPath, relativePath) + newPath := filepath.Join(toPath, k) dir := filepath.Dir(newPath) mu.Lock() // 0755: the directory can be read by anyone but can only be written by the owner os.MkdirAll(dir, 0755) - writer, err := os.Create(newPath) mu.Unlock() + writer, err := os.Create(newPath) if err != nil { logger.Errorf(ctx, "failed to open file at path %s", newPath) return @@ -111,26 +122,25 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri }() } wg.Wait() - ref = storage.DataReference(blob.Uri) - logger.Infof(ctx, "Successfully copied [%d] remote files from [%s] to local [%s]", success, ref, toPath) + logger.Infof(ctx, "Successfully copied [%d] remote files from [%s] to local [%s]", success, blobRef, toPath) return toPath, nil } // reader should be declared here (avoid being shared across all goroutines) var reader io.ReadCloser if scheme == "http" || scheme == "https" { - reader, err = DownloadFileFromHTTP(ctx, ref) + reader, err = DownloadFileFromHTTP(ctx, blobRef) } else { - reader, err = DownloadFileFromStorage(ctx, ref, d.store) + reader, err = DownloadFileFromStorage(ctx, blobRef, d.store) } if err != nil { - logger.Errorf(ctx, "Failed to download from ref [%s]", ref) + logger.Errorf(ctx, "Failed to download from ref [%s]", blobRef) return nil, err } defer func() { err := reader.Close() if err != nil { - logger.Errorf(ctx, "failed to close Blob read stream @ref [%s]. Error: %s", ref, err) + logger.Errorf(ctx, "failed to close Blob read stream @ref [%s]. Error: %s", blobRef, err) } }() @@ -148,7 +158,7 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri if err != nil { return nil, errors.Wrapf(err, "failed to write remote data to local filesystem") } - logger.Infof(ctx, "Successfully copied [%d] bytes remote data from [%s] to local [%s]", v, ref, toPath) + logger.Infof(ctx, "Successfully copied [%d] bytes remote data from [%s] to local [%s]", v, blobRef, toPath) return toPath, nil } diff --git a/flytestdlib/storage/mem_store.go b/flytestdlib/storage/mem_store.go index 94083f6646..93411c5246 100644 --- a/flytestdlib/storage/mem_store.go +++ b/flytestdlib/storage/mem_store.go @@ -9,6 +9,7 @@ import ( "io" "io/ioutil" "os" + "strings" ) type rawFile = []byte @@ -55,7 +56,20 @@ func (s *InMemoryStore) Head(ctx context.Context, reference DataReference) (Meta } func (s *InMemoryStore) List(ctx context.Context, reference DataReference, maxItems int, cursor Cursor) ([]DataReference, Cursor, error) { - return nil, NewCursorAtEnd(), fmt.Errorf("Not implemented yet") + var items []DataReference + prefix := strings.TrimSuffix(string(reference), "/") + "/" + + for key := range s.cache { + if strings.HasPrefix(key.String(), prefix) { + items = append(items, key) + } + } + + if len(items) == 0 { + return nil, NewCursorAtEnd(), os.ErrNotExist + } + + return items, NewCursorAtEnd(), nil } func (s *InMemoryStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error) { From 38a030bf81afb8160b4f01a04ffc49610e58fc39 Mon Sep 17 00:00:00 2001 From: wayner0628 Date: Sun, 15 Sep 2024 16:15:14 -0700 Subject: [PATCH 18/29] cache store stop listing at end cursor Signed-off-by: wayner0628 --- flytecopilot/data/download.go | 2 +- flytestdlib/storage/mem_store.go | 13 ++++++++++--- flytestdlib/storage/storage.go | 9 ++++++++- 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/flytecopilot/data/download.go b/flytecopilot/data/download.go index 1df300a65a..5f62ff627b 100644 --- a/flytecopilot/data/download.go +++ b/flytecopilot/data/download.go @@ -54,7 +54,7 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri for _, item := range items { keys = append(keys, item.String()) } - if cursor.cursorState == storage.AtEndCursorState { + if storage.IsCursorEnd(cursor) { break } } diff --git a/flytestdlib/storage/mem_store.go b/flytestdlib/storage/mem_store.go index 93411c5246..4b2a4b7077 100644 --- a/flytestdlib/storage/mem_store.go +++ b/flytestdlib/storage/mem_store.go @@ -10,6 +10,8 @@ import ( "io/ioutil" "os" "strings" + + "github.com/flyteorg/flyte/flytestdlib/logger" ) type rawFile = []byte @@ -59,9 +61,14 @@ func (s *InMemoryStore) List(ctx context.Context, reference DataReference, maxIt var items []DataReference prefix := strings.TrimSuffix(string(reference), "/") + "/" - for key := range s.cache { - if strings.HasPrefix(key.String(), prefix) { - items = append(items, key) + for ref := range s.cache { + if strings.HasPrefix(ref.String(), prefix) { + _, _, k, err := ref.Split() + if err != nil { + logger.Errorf(ctx, "failed to split reference [%s]", ref) + continue + } + items = append(items, DataReference(k)) } } diff --git a/flytestdlib/storage/storage.go b/flytestdlib/storage/storage.go index 52e6905513..bb9dd47b91 100644 --- a/flytestdlib/storage/storage.go +++ b/flytestdlib/storage/storage.go @@ -75,6 +75,13 @@ func NewCursorFromCustomPosition(customPosition string) Cursor { } } +func IsCursorEnd(cursor Cursor) bool { + if cursor.cursorState == AtEndCursorState { + return true + } + return false +} + // DataStore is a simplified interface for accessing and storing data in one of the Cloud stores. // Today we rely on Stow for multi-cloud support, but this interface abstracts that part type DataStore struct { @@ -113,7 +120,7 @@ type RawStore interface { // Head gets metadata about the reference. This should generally be a light weight operation. Head(ctx context.Context, reference DataReference) (Metadata, error) - // List gets a list of items given a prefix, using a paginated API + // List gets a list of items (relative path to the reference input) given a prefix, using a paginated API List(ctx context.Context, reference DataReference, maxItems int, cursor Cursor) ([]DataReference, Cursor, error) // ReadRaw retrieves a byte array from the Blob store or an error From abf7f6ab22533be612bc2e9ebc0e9159ce9cb981 Mon Sep 17 00:00:00 2001 From: wayner0628 Date: Sun, 15 Sep 2024 16:24:32 -0700 Subject: [PATCH 19/29] lint Signed-off-by: wayner0628 --- flytestdlib/storage/storage.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/flytestdlib/storage/storage.go b/flytestdlib/storage/storage.go index bb9dd47b91..0779beffe9 100644 --- a/flytestdlib/storage/storage.go +++ b/flytestdlib/storage/storage.go @@ -76,10 +76,7 @@ func NewCursorFromCustomPosition(customPosition string) Cursor { } func IsCursorEnd(cursor Cursor) bool { - if cursor.cursorState == AtEndCursorState { - return true - } - return false + return cursor.cursorState == AtEndCursorState } // DataStore is a simplified interface for accessing and storing data in one of the Cloud stores. From 270384896efff99843caf016a4a6f16c11538b1f Mon Sep 17 00:00:00 2001 From: wayner0628 Date: Sun, 15 Sep 2024 16:44:14 -0700 Subject: [PATCH 20/29] remove old api mock Signed-off-by: wayner0628 --- .../storage/mocks/composed_protobuf_store.go | 5 --- flytestdlib/storage/mocks/raw_store.go | 33 ------------------- 2 files changed, 38 deletions(-) diff --git a/flytestdlib/storage/mocks/composed_protobuf_store.go b/flytestdlib/storage/mocks/composed_protobuf_store.go index 78c71376f2..49a0ee89dd 100644 --- a/flytestdlib/storage/mocks/composed_protobuf_store.go +++ b/flytestdlib/storage/mocks/composed_protobuf_store.go @@ -292,11 +292,6 @@ func (_m *ComposedProtobufStore) OnReadRawMatch(matchers ...interface{}) *Compos return &ComposedProtobufStore_ReadRaw{Call: c_call} } -func (_m *ComposedProtobufStore) GetItems(ctx context.Context, reference storage.DataReference) ([]string, error) { - var s []string - return s, nil -} - // ReadRaw provides a mock function with given fields: ctx, reference func (_m *ComposedProtobufStore) ReadRaw(ctx context.Context, reference storage.DataReference) (io.ReadCloser, error) { ret := _m.Called(ctx, reference) diff --git a/flytestdlib/storage/mocks/raw_store.go b/flytestdlib/storage/mocks/raw_store.go index c15c4b7088..b4ceb00eeb 100644 --- a/flytestdlib/storage/mocks/raw_store.go +++ b/flytestdlib/storage/mocks/raw_store.go @@ -200,39 +200,6 @@ func (_m RawStore_GetItems) Return(_a0 error) *RawStore_GetItems { return &RawStore_GetItems{Call: _m.Call.Return(_a0)} } -func (_m *RawStore) OnGetItems(ctx context.Context, reference storage.DataReference) *RawStore_GetItems { - c_call := _m.On("GetItems", ctx, reference) - return &RawStore_GetItems{Call: c_call} -} - -func (_m *RawStore) OnGetItemsMatch(matchers ...interface{}) *RawStore_GetItems { - c_call := _m.On("GetItems", matchers...) - return &RawStore_GetItems{Call: c_call} -} - -// GetItems provides a mock function with given fields: ctx, reference -func (_m *RawStore) GetItems(ctx context.Context, reference storage.DataReference) ([]string, error) { - ret := _m.Called(ctx, reference) - - var r0 []string - if rf, ok := ret.Get(0).(func(context.Context, storage.DataReference) []string); ok { - r0 = rf(ctx, reference) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]string) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func(context.Context, storage.DataReference) error); ok { - r1 = rf(ctx, reference) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - type RawStore_ReadRaw struct { *mock.Call } From 99847bd55eee06ff7cc8c219d3a5b77d15acf235 Mon Sep 17 00:00:00 2001 From: wayner0628 Date: Sun, 15 Sep 2024 16:45:37 -0700 Subject: [PATCH 21/29] remove old api mock Signed-off-by: wayner0628 --- flyteadmin/pkg/common/mocks/storage.go | 5 ----- flytestdlib/storage/mocks/raw_store.go | 8 -------- 2 files changed, 13 deletions(-) diff --git a/flyteadmin/pkg/common/mocks/storage.go b/flyteadmin/pkg/common/mocks/storage.go index 0262cda350..bf29eedd3e 100644 --- a/flyteadmin/pkg/common/mocks/storage.go +++ b/flyteadmin/pkg/common/mocks/storage.go @@ -58,11 +58,6 @@ func (t *TestDataStore) CreateSignedURL(ctx context.Context, reference storage.D return storage.SignedURLResponse{URL: *signedURL}, nil } -func (t *TestDataStore) GetItems(ctx context.Context, reference storage.DataReference) ([]string, error) { - var s []string - return s, nil -} - // Retrieves a byte array from the Blob store or an error func (t *TestDataStore) ReadRaw(ctx context.Context, reference storage.DataReference) (io.ReadCloser, error) { return NopCloser{}, nil diff --git a/flytestdlib/storage/mocks/raw_store.go b/flytestdlib/storage/mocks/raw_store.go index b4ceb00eeb..06b731765d 100644 --- a/flytestdlib/storage/mocks/raw_store.go +++ b/flytestdlib/storage/mocks/raw_store.go @@ -192,14 +192,6 @@ func (_m *RawStore) Head(ctx context.Context, reference storage.DataReference) ( return r0, r1 } -type RawStore_GetItems struct { - *mock.Call -} - -func (_m RawStore_GetItems) Return(_a0 error) *RawStore_GetItems { - return &RawStore_GetItems{Call: _m.Call.Return(_a0)} -} - type RawStore_ReadRaw struct { *mock.Call } From e008444d2a8162bc4e1c47695e764bbb86400379 Mon Sep 17 00:00:00 2001 From: wayner0628 Date: Sun, 15 Sep 2024 16:50:08 -0700 Subject: [PATCH 22/29] remove old api mock Signed-off-by: wayner0628 --- flytestdlib/storage/cached_rawstore_test.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/flytestdlib/storage/cached_rawstore_test.go b/flytestdlib/storage/cached_rawstore_test.go index bbbb6c5962..9c304790cb 100644 --- a/flytestdlib/storage/cached_rawstore_test.go +++ b/flytestdlib/storage/cached_rawstore_test.go @@ -55,7 +55,6 @@ func dummyCacheStore(t *testing.T, store RawStore, metrics *cacheMetrics) *cache type dummyStore struct { copyImpl HeadCb func(ctx context.Context, reference DataReference) (Metadata, error) - GetItemsCb func(ctx context.Context, reference DataReference) ([]string, error) ReadRawCb func(ctx context.Context, reference DataReference) (io.ReadCloser, error) WriteRawCb func(ctx context.Context, reference DataReference, size int64, opts Options, raw io.Reader) error DeleteCb func(ctx context.Context, reference DataReference) error @@ -110,14 +109,6 @@ func TestCachedRawStore(t *testing.T) { } return MemoryMetadata{}, fmt.Errorf("err") }, - GetItemsCb: func(ctx context.Context, reference DataReference) ([]string, error) { - var s []string - if reference == "k1" { - s = append(s, "item") - return s, nil - } - return s, fmt.Errorf("err") - }, WriteRawCb: func(ctx context.Context, reference DataReference, size int64, opts Options, raw io.Reader) error { if writeCalled { assert.FailNow(t, "Should not be writeCalled") From acc16c87bcc0a34f870001660a32cd4b57c44669 Mon Sep 17 00:00:00 2001 From: wayner0628 Date: Tue, 22 Oct 2024 15:36:23 -0700 Subject: [PATCH 23/29] update mem_store List to return global path Signed-off-by: wayner0628 --- flytecopilot/data/download.go | 10 +++++----- flytestdlib/storage/mem_store.go | 9 +-------- 2 files changed, 6 insertions(+), 13 deletions(-) diff --git a/flytecopilot/data/download.go b/flytecopilot/data/download.go index 5f62ff627b..bfd4069d78 100644 --- a/flytecopilot/data/download.go +++ b/flytecopilot/data/download.go @@ -35,7 +35,7 @@ type Downloader struct { func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath string) (interface{}, error) { blobRef := storage.DataReference(blob.Uri) - scheme, c, _, err := blobRef.Split() + scheme, _, _, err := blobRef.Split() if err != nil { return nil, errors.Wrapf(err, "Blob uri incorrectly formatted") } @@ -44,7 +44,7 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri maxItems := 100 cursor := storage.NewCursorAtStart() var items []storage.DataReference - var keys []string + var absPaths []string for { items, cursor, err = d.store.List(ctx, blobRef, maxItems, cursor) if err != nil || len(items) == 0 { @@ -52,7 +52,7 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri return nil, err } for _, item := range items { - keys = append(keys, item.String()) + absPaths = append(absPaths, item.String()) } if storage.IsCursorEnd(cursor) { break @@ -62,8 +62,8 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri success := 0 var mu sync.Mutex var wg sync.WaitGroup - for _, k := range keys { - absPath := fmt.Sprintf("%s://%s/%s", scheme, c, k) + for _, absPath := range absPaths { + absPath := absPath wg.Add(1) go func() { diff --git a/flytestdlib/storage/mem_store.go b/flytestdlib/storage/mem_store.go index 4b2a4b7077..4e8bf94c82 100644 --- a/flytestdlib/storage/mem_store.go +++ b/flytestdlib/storage/mem_store.go @@ -10,8 +10,6 @@ import ( "io/ioutil" "os" "strings" - - "github.com/flyteorg/flyte/flytestdlib/logger" ) type rawFile = []byte @@ -63,12 +61,7 @@ func (s *InMemoryStore) List(ctx context.Context, reference DataReference, maxIt for ref := range s.cache { if strings.HasPrefix(ref.String(), prefix) { - _, _, k, err := ref.Split() - if err != nil { - logger.Errorf(ctx, "failed to split reference [%s]", ref) - continue - } - items = append(items, DataReference(k)) + items = append(items, ref) } } From 7ca6af1ab7d880e96c6d5cd00b63119de66b7575 Mon Sep 17 00:00:00 2001 From: wayner0628 Date: Wed, 6 Nov 2024 17:14:13 -0800 Subject: [PATCH 24/29] change mkdir perm Signed-off-by: wayner0628 --- flytecopilot/data/download.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flytecopilot/data/download.go b/flytecopilot/data/download.go index bfd4069d78..31d227a05c 100644 --- a/flytecopilot/data/download.go +++ b/flytecopilot/data/download.go @@ -96,8 +96,8 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri dir := filepath.Dir(newPath) mu.Lock() - // 0755: the directory can be read by anyone but can only be written by the owner - os.MkdirAll(dir, 0755) + // 0777: the directory can be read and written by anyone + os.MkdirAll(dir, 0777) mu.Unlock() writer, err := os.Create(newPath) if err != nil { From ac2940a86e77dd4a199741b8096f81e95bfa0f9c Mon Sep 17 00:00:00 2001 From: wayner0628 Date: Thu, 7 Nov 2024 14:10:06 -0800 Subject: [PATCH 25/29] add comments and handle more errors Signed-off-by: wayner0628 --- flytecopilot/data/download.go | 103 +++++++++++++++++++++++----------- 1 file changed, 71 insertions(+), 32 deletions(-) diff --git a/flytecopilot/data/download.go b/flytecopilot/data/download.go index 31d227a05c..60ff2f0a80 100644 --- a/flytecopilot/data/download.go +++ b/flytecopilot/data/download.go @@ -33,7 +33,25 @@ type Downloader struct { mode core.IOStrategy_DownloadMode } +// TODO add timeout and rate limit +// TODO use chunk to download func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath string) (interface{}, error) { + /* + handleBlob handles the retrieval and local storage of blob data, including support for both single and multipart blob types. + For multipart blobs, it lists all parts recursively and spawns concurrent goroutines to download each part while managing file I/O in parallel. + + - The function begins by validating the blob URI and categorizing the blob type (single or multipart). + - In the multipart case, it recursively lists all blob parts and launches goroutines to download and save each part. + Goroutine closure and I/O success tracking are managed to avoid resource leaks. + - For single-part blobs, it directly downloads and writes the data to the specified path. + + Life Cycle: + 1. Blob URI -> Blob Metadata Type check -> Recursive List parts if Multipart -> Launch goroutines to download parts + (input blob object) (determine multipart/single) (List API, handles recursive case) (each part handled in parallel) + 2. Download part or full blob -> Save locally with error checks -> Handle reader/writer closures -> Return local path or error + (download each part) (error on write or directory) (close streams safely, track success) (completion or report missing closures) + */ + blobRef := storage.DataReference(blob.Uri) scheme, _, _, err := blobRef.Split() if err != nil { @@ -41,6 +59,8 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri } if blob.GetMetadata().GetType().Dimensionality == core.BlobType_MULTIPART { + // Collect all parts of the multipart blob recursively (List API handles nested directories) + // Set maxItems to 100 as a parameter for the List API, enabling batch retrieval of items until all are downloaded maxItems := 100 cursor := storage.NewCursorAtStart() var items []storage.DataReference @@ -60,6 +80,11 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri } success := 0 + itemCount := len(absPaths) + // Track successful closures of readers and writers in deferred functions + readerCloseSuccessCount := 0 + writerCloseSuccessCount := 0 + // We use Mutex to avoid race conditions when updating counters and creating directories var mu sync.Mutex var wg sync.WaitGroup for _, absPath := range absPaths { @@ -85,6 +110,7 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri if err != nil { logger.Errorf(ctx, "failed to close Blob read stream @ref [%s]. Error: %s", ref, err) } + readerCloseSuccessCount += 1 }() _, _, k, err := ref.Split() @@ -96,6 +122,7 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri dir := filepath.Dir(newPath) mu.Lock() + // os.MkdirAll creates the specified directory structure if it doesn’t already exist // 0777: the directory can be read and written by anyone os.MkdirAll(dir, 0777) mu.Unlock() @@ -109,6 +136,7 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri if err != nil { logger.Errorf(ctx, "failed to close File write stream. Error: %s", err) } + writerCloseSuccessCount += 1 }() _, err = io.Copy(writer, reader) @@ -121,45 +149,56 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri mu.Unlock() }() } + // Go routines are synchronized with a WaitGroup to prevent goroutine leaks. wg.Wait() - logger.Infof(ctx, "Successfully copied [%d] remote files from [%s] to local [%s]", success, blobRef, toPath) - return toPath, nil - } - - // reader should be declared here (avoid being shared across all goroutines) - var reader io.ReadCloser - if scheme == "http" || scheme == "https" { - reader, err = DownloadFileFromHTTP(ctx, blobRef) - } else { - reader, err = DownloadFileFromStorage(ctx, blobRef, d.store) - } - if err != nil { - logger.Errorf(ctx, "Failed to download from ref [%s]", blobRef) - return nil, err - } - defer func() { - err := reader.Close() + if success != itemCount { + return nil, errors.Errorf("failed to copy %d out of %d remote files from [%s] to local [%s]", itemCount-success, itemCount, blobRef, toPath) + } else if readerCloseSuccessCount != itemCount { + return nil, errors.Errorf("failed to close %d out of %d remote file readers", itemCount-readerCloseSuccessCount, itemCount) + } else if writerCloseSuccessCount != itemCount { + return nil, errors.Errorf("failed to close %d out of %d local file writers", itemCount-writerCloseSuccessCount, itemCount) + } else { + logger.Infof(ctx, "successfully copied %d remote files from [%s] to local [%s]", success, blobRef, toPath) + return toPath, nil + } + } else if blob.GetMetadata().GetType().Dimensionality == core.BlobType_SINGLE { + // reader should be declared here (avoid being shared across all goroutines) + var reader io.ReadCloser + if scheme == "http" || scheme == "https" { + reader, err = DownloadFileFromHTTP(ctx, blobRef) + } else { + reader, err = DownloadFileFromStorage(ctx, blobRef, d.store) + } if err != nil { - logger.Errorf(ctx, "failed to close Blob read stream @ref [%s]. Error: %s", blobRef, err) + logger.Errorf(ctx, "Failed to download from ref [%s]", blobRef) + return nil, err } - }() + defer func() { + err := reader.Close() + if err != nil { + logger.Errorf(ctx, "failed to close Blob read stream @ref [%s]. Error: %s", blobRef, err) + } + }() - writer, err := os.Create(toPath) - if err != nil { - return nil, errors.Wrapf(err, "failed to open file at path %s", toPath) - } - defer func() { - err := writer.Close() + writer, err := os.Create(toPath) if err != nil { - logger.Errorf(ctx, "failed to close File write stream. Error: %s", err) + return nil, errors.Wrapf(err, "failed to open file at path %s", toPath) } - }() - v, err := io.Copy(writer, reader) - if err != nil { - return nil, errors.Wrapf(err, "failed to write remote data to local filesystem") + defer func() { + err := writer.Close() + if err != nil { + logger.Errorf(ctx, "failed to close File write stream. Error: %s", err) + } + }() + v, err := io.Copy(writer, reader) + if err != nil { + return nil, errors.Wrapf(err, "failed to write remote data to local filesystem") + } + logger.Infof(ctx, "Successfully copied [%d] bytes remote data from [%s] to local [%s]", v, blobRef, toPath) + return toPath, nil + } else { + return nil, errors.Errorf("unexpected Blob type encountered") } - logger.Infof(ctx, "Successfully copied [%d] bytes remote data from [%s] to local [%s]", v, blobRef, toPath) - return toPath, nil } func (d Downloader) handleSchema(ctx context.Context, schema *core.Schema, toFilePath string) (interface{}, error) { From bf658367538f7d9008416c5e808d09b97c10f36a Mon Sep 17 00:00:00 2001 From: Wei-Yu Kao <115421902+wayner0628@users.noreply.github.com> Date: Thu, 7 Nov 2024 20:48:58 -0800 Subject: [PATCH 26/29] lint Co-authored-by: Han-Ru Chen (Future-Outlier) Signed-off-by: Wei-Yu Kao <115421902+wayner0628@users.noreply.github.com> --- flytecopilot/data/download.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flytecopilot/data/download.go b/flytecopilot/data/download.go index 60ff2f0a80..35a8c11ee5 100644 --- a/flytecopilot/data/download.go +++ b/flytecopilot/data/download.go @@ -196,9 +196,9 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri } logger.Infof(ctx, "Successfully copied [%d] bytes remote data from [%s] to local [%s]", v, blobRef, toPath) return toPath, nil - } else { - return nil, errors.Errorf("unexpected Blob type encountered") } + + return nil, errors.Errorf("unexpected Blob type encountered") } func (d Downloader) handleSchema(ctx context.Context, schema *core.Schema, toFilePath string) (interface{}, error) { From db481d03a90ff330af208c057540f566c8040a6a Mon Sep 17 00:00:00 2001 From: wayner0628 Date: Thu, 7 Nov 2024 21:07:09 -0800 Subject: [PATCH 27/29] address race condition and aggregate errors Signed-off-by: wayner0628 --- flytecopilot/data/download.go | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/flytecopilot/data/download.go b/flytecopilot/data/download.go index 35a8c11ee5..47fc8bfbfc 100644 --- a/flytecopilot/data/download.go +++ b/flytecopilot/data/download.go @@ -79,7 +79,8 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri } } - success := 0 + // Track the count of successful downloads and the total number of items + downloadSuccess := 0 itemCount := len(absPaths) // Track successful closures of readers and writers in deferred functions readerCloseSuccessCount := 0 @@ -110,7 +111,9 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri if err != nil { logger.Errorf(ctx, "failed to close Blob read stream @ref [%s]. Error: %s", ref, err) } + mu.Lock() readerCloseSuccessCount += 1 + mu.Unlock() }() _, _, k, err := ref.Split() @@ -136,7 +139,9 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri if err != nil { logger.Errorf(ctx, "failed to close File write stream. Error: %s", err) } + mu.Lock() writerCloseSuccessCount += 1 + mu.Unlock() }() _, err = io.Copy(writer, reader) @@ -145,22 +150,20 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri return } mu.Lock() - success += 1 + downloadSuccess += 1 mu.Unlock() }() } // Go routines are synchronized with a WaitGroup to prevent goroutine leaks. wg.Wait() - if success != itemCount { - return nil, errors.Errorf("failed to copy %d out of %d remote files from [%s] to local [%s]", itemCount-success, itemCount, blobRef, toPath) - } else if readerCloseSuccessCount != itemCount { - return nil, errors.Errorf("failed to close %d out of %d remote file readers", itemCount-readerCloseSuccessCount, itemCount) - } else if writerCloseSuccessCount != itemCount { - return nil, errors.Errorf("failed to close %d out of %d local file writers", itemCount-writerCloseSuccessCount, itemCount) - } else { - logger.Infof(ctx, "successfully copied %d remote files from [%s] to local [%s]", success, blobRef, toPath) - return toPath, nil + if downloadSuccess != itemCount || readerCloseSuccessCount != itemCount || writerCloseSuccessCount != itemCount { + return nil, errors.Errorf( + "Failed to copy %d out of %d remote files from [%s] to local [%s]. Failed to close %d readers; Failed to close %d writers.", + itemCount-downloadSuccess, itemCount, blobRef, toPath, itemCount-readerCloseSuccessCount, itemCount-writerCloseSuccessCount, + ) } + logger.Infof(ctx, "successfully copied %d remote files from [%s] to local [%s]", downloadSuccess, blobRef, toPath) + return toPath, nil } else if blob.GetMetadata().GetType().Dimensionality == core.BlobType_SINGLE { // reader should be declared here (avoid being shared across all goroutines) var reader io.ReadCloser @@ -197,7 +200,7 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri logger.Infof(ctx, "Successfully copied [%d] bytes remote data from [%s] to local [%s]", v, blobRef, toPath) return toPath, nil } - + return nil, errors.Errorf("unexpected Blob type encountered") } From 03e8221d2437081cdb14d1e806aa220a12f22020 Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Fri, 8 Nov 2024 13:34:41 +0800 Subject: [PATCH 28/29] fix tests Signed-off-by: Future-Outlier --- flytecopilot/data/download.go | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/flytecopilot/data/download.go b/flytecopilot/data/download.go index 47fc8bfbfc..38fab6dfb2 100644 --- a/flytecopilot/data/download.go +++ b/flytecopilot/data/download.go @@ -109,10 +109,11 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri defer func() { err := reader.Close() if err != nil { - logger.Errorf(ctx, "failed to close Blob read stream @ref [%s]. Error: %s", ref, err) + logger.Errorf(ctx, "failed to close Blob read stream @ref [%s].\n"+ + "Error: %s", ref, err) } mu.Lock() - readerCloseSuccessCount += 1 + readerCloseSuccessCount++ mu.Unlock() }() @@ -127,20 +128,26 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri mu.Lock() // os.MkdirAll creates the specified directory structure if it doesn’t already exist // 0777: the directory can be read and written by anyone - os.MkdirAll(dir, 0777) + err = os.MkdirAll(dir, 0777) mu.Unlock() + if err != nil { + logger.Errorf(ctx, "failed to make dir at path [%s]", dir) + return + } + writer, err := os.Create(newPath) if err != nil { - logger.Errorf(ctx, "failed to open file at path %s", newPath) + logger.Errorf(ctx, "failed to open file at path [%s]", newPath) return } defer func() { err := writer.Close() if err != nil { - logger.Errorf(ctx, "failed to close File write stream. Error: %s", err) + logger.Errorf(ctx, "failed to close File write stream.\n"+ + "Error: [%s]", err) } mu.Lock() - writerCloseSuccessCount += 1 + writerCloseSuccessCount++ mu.Unlock() }() @@ -150,7 +157,7 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri return } mu.Lock() - downloadSuccess += 1 + downloadSuccess++ mu.Unlock() }() } @@ -158,7 +165,9 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri wg.Wait() if downloadSuccess != itemCount || readerCloseSuccessCount != itemCount || writerCloseSuccessCount != itemCount { return nil, errors.Errorf( - "Failed to copy %d out of %d remote files from [%s] to local [%s]. Failed to close %d readers; Failed to close %d writers.", + "Failed to copy %d out of %d remote files from [%s] to local [%s].\n"+ + "Failed to close %d readers\n"+ + "Failed to close %d writers.", itemCount-downloadSuccess, itemCount, blobRef, toPath, itemCount-readerCloseSuccessCount, itemCount-writerCloseSuccessCount, ) } From dbbd8c3dd95cb4d3252e7d8ad4e948a6c5647e07 Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Fri, 8 Nov 2024 13:35:03 +0800 Subject: [PATCH 29/29] err msg enhancement Signed-off-by: Future-Outlier --- flytecopilot/data/download.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytecopilot/data/download.go b/flytecopilot/data/download.go index 38fab6dfb2..e4efa22222 100644 --- a/flytecopilot/data/download.go +++ b/flytecopilot/data/download.go @@ -96,7 +96,7 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath stri defer wg.Done() defer func() { if err := recover(); err != nil { - logger.Errorf(ctx, "recover receives error: %s", err) + logger.Errorf(ctx, "recover receives error: [%s]", err) } }()