Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[copilot][flytedirectory] multipart blob download #5715

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
43a0b18
add download multipart blob
wayner0628 Sep 1, 2024
6f78352
recursively process subparts
wayner0628 Sep 1, 2024
69514d6
implement GetItems function
wayner0628 Sep 4, 2024
f13022b
add unit testing
wayner0628 Sep 4, 2024
8fae9f9
Parallelly handle blob items
wayner0628 Sep 4, 2024
e82c5de
fix lint error
wayner0628 Sep 4, 2024
d7c4686
implement GetItems function
wayner0628 Sep 4, 2024
ceaa72d
add mutex avoid racing
wayner0628 Sep 4, 2024
1f0b195
avoid infinite call
wayner0628 Sep 4, 2024
19b0ae8
protect critical variables
wayner0628 Sep 5, 2024
b948aee
avoid infinite call
wayner0628 Sep 5, 2024
c88813f
lint
wayner0628 Sep 5, 2024
df9b8ed
add more unit tests
wayner0628 Sep 5, 2024
8150baa
add more unit tests
wayner0628 Sep 5, 2024
672b711
fix mock
wayner0628 Sep 5, 2024
da3de3c
Merge remote-tracking branch 'origin/master' into feature/download-mu…
wayner0628 Sep 15, 2024
96c4177
Accept incoming changes
wayner0628 Sep 15, 2024
ad12330
Accept incoming changes
wayner0628 Sep 15, 2024
65611c0
multipart blob download based on new api
wayner0628 Sep 15, 2024
38a030b
cache store stop listing at end cursor
wayner0628 Sep 15, 2024
abf7f6a
lint
wayner0628 Sep 15, 2024
2703848
remove old api mock
wayner0628 Sep 15, 2024
99847bd
remove old api mock
wayner0628 Sep 15, 2024
e008444
remove old api mock
wayner0628 Sep 15, 2024
acc16c8
update mem_store List to return global path
wayner0628 Oct 22, 2024
7ca6af1
change mkdir perm
wayner0628 Nov 7, 2024
ac2940a
add comments and handle more errors
wayner0628 Nov 7, 2024
27cdeee
Merge branch 'master' into feature/download-multipart-blob
wayner0628 Nov 8, 2024
bf65836
lint
wayner0628 Nov 8, 2024
db481d0
address race condition and aggregate errors
wayner0628 Nov 8, 2024
03e8221
fix tests
Future-Outlier Nov 8, 2024
dbbd8c3
err msg enhancement
Future-Outlier Nov 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 168 additions & 36 deletions flytecopilot/data/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
"io/ioutil"
"os"
"path"
"path/filepath"
"reflect"
"strconv"
"sync"

"github.com/ghodss/yaml"
"github.com/golang/protobuf/jsonpb"
Expand All @@ -31,57 +33,187 @@
mode core.IOStrategy_DownloadMode
}

// TODO add support for multipart blobs
func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toFilePath string) (interface{}, error) {
ref := storage.DataReference(blob.Uri)
scheme, _, _, err := ref.Split()
// TODO add timeout and rate limit
// TODO use chunk to download
func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath string) (interface{}, error) {
/*
handleBlob handles the retrieval and local storage of blob data, including support for both single and multipart blob types.
For multipart blobs, it lists all parts recursively and spawns concurrent goroutines to download each part while managing file I/O in parallel.

- The function begins by validating the blob URI and categorizing the blob type (single or multipart).
- In the multipart case, it recursively lists all blob parts and launches goroutines to download and save each part.
Goroutine closure and I/O success tracking are managed to avoid resource leaks.
- For single-part blobs, it directly downloads and writes the data to the specified path.

Life Cycle:
1. Blob URI -> Blob Metadata Type check -> Recursive List parts if Multipart -> Launch goroutines to download parts
(input blob object) (determine multipart/single) (List API, handles recursive case) (each part handled in parallel)
2. Download part or full blob -> Save locally with error checks -> Handle reader/writer closures -> Return local path or error
(download each part) (error on write or directory) (close streams safely, track success) (completion or report missing closures)
*/

blobRef := storage.DataReference(blob.Uri)
scheme, _, _, err := blobRef.Split()
if err != nil {
return nil, errors.Wrapf(err, "Blob uri incorrectly formatted")
}
var reader io.ReadCloser
if scheme == "http" || scheme == "https" {
reader, err = DownloadFileFromHTTP(ctx, ref)
} else {
if blob.GetMetadata().GetType().Dimensionality == core.BlobType_MULTIPART {
logger.Warnf(ctx, "Currently only single part blobs are supported, we will force multipart to be 'path/00000'")
ref, err = d.store.ConstructReference(ctx, ref, "000000")
if err != nil {

if blob.GetMetadata().GetType().Dimensionality == core.BlobType_MULTIPART {
// Collect all parts of the multipart blob recursively (List API handles nested directories)
// Set maxItems to 100 as a parameter for the List API, enabling batch retrieval of items until all are downloaded
maxItems := 100
wayner0628 marked this conversation as resolved.
Show resolved Hide resolved
cursor := storage.NewCursorAtStart()
var items []storage.DataReference
var absPaths []string
for {
items, cursor, err = d.store.List(ctx, blobRef, maxItems, cursor)
if err != nil || len(items) == 0 {
logger.Errorf(ctx, "failed to collect items from multipart blob [%s]", blobRef)
return nil, err
}
for _, item := range items {
absPaths = append(absPaths, item.String())
}
if storage.IsCursorEnd(cursor) {
break
}
}

// Track the count of successful downloads and the total number of items
downloadSuccess := 0
itemCount := len(absPaths)
// Track successful closures of readers and writers in deferred functions
readerCloseSuccessCount := 0
writerCloseSuccessCount := 0
// We use Mutex to avoid race conditions when updating counters and creating directories
var mu sync.Mutex
var wg sync.WaitGroup
for _, absPath := range absPaths {
absPath := absPath

wg.Add(1)
go func() {
defer wg.Done()
defer func() {
if err := recover(); err != nil {
logger.Errorf(ctx, "recover receives error: [%s]", err)
}

Check warning on line 100 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L99-L100

Added lines #L99 - L100 were not covered by tests
}()

ref := storage.DataReference(absPath)
reader, err := DownloadFileFromStorage(ctx, ref, d.store)
if err != nil {
logger.Errorf(ctx, "Failed to download from ref [%s]", ref)
return
}

Check warning on line 108 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L106-L108

Added lines #L106 - L108 were not covered by tests
defer func() {
err := reader.Close()
if err != nil {
logger.Errorf(ctx, "failed to close Blob read stream @ref [%s].\n"+
"Error: %s", ref, err)
}

Check warning on line 114 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L112-L114

Added lines #L112 - L114 were not covered by tests
mu.Lock()
readerCloseSuccessCount++
mu.Unlock()
}()

_, _, k, err := ref.Split()
if err != nil {
logger.Errorf(ctx, "Failed to parse ref [%s]", ref)
return
}

Check warning on line 124 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L122-L124

Added lines #L122 - L124 were not covered by tests
newPath := filepath.Join(toPath, k)
dir := filepath.Dir(newPath)

mu.Lock()
// os.MkdirAll creates the specified directory structure if it doesn’t already exist
// 0777: the directory can be read and written by anyone
err = os.MkdirAll(dir, 0777)
mu.Unlock()
if err != nil {
wayner0628 marked this conversation as resolved.
Show resolved Hide resolved
logger.Errorf(ctx, "failed to make dir at path [%s]", dir)
return
}

Check warning on line 136 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L134-L136

Added lines #L134 - L136 were not covered by tests

writer, err := os.Create(newPath)
if err != nil {
logger.Errorf(ctx, "failed to open file at path [%s]", newPath)
return
}

Check warning on line 142 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L140-L142

Added lines #L140 - L142 were not covered by tests
defer func() {
err := writer.Close()
if err != nil {
logger.Errorf(ctx, "failed to close File write stream.\n"+
"Error: [%s]", err)
}

Check warning on line 148 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L146-L148

Added lines #L146 - L148 were not covered by tests
mu.Lock()
writerCloseSuccessCount++
mu.Unlock()
}()

_, err = io.Copy(writer, reader)
if err != nil {
logger.Errorf(ctx, "failed to write remote data to local filesystem")
return
}

Check warning on line 158 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L156-L158

Added lines #L156 - L158 were not covered by tests
mu.Lock()
downloadSuccess++
mu.Unlock()
}()
}
// Go routines are synchronized with a WaitGroup to prevent goroutine leaks.
wg.Wait()
if downloadSuccess != itemCount || readerCloseSuccessCount != itemCount || writerCloseSuccessCount != itemCount {
return nil, errors.Errorf(
"Failed to copy %d out of %d remote files from [%s] to local [%s].\n"+
"Failed to close %d readers\n"+
"Failed to close %d writers.",
itemCount-downloadSuccess, itemCount, blobRef, toPath, itemCount-readerCloseSuccessCount, itemCount-writerCloseSuccessCount,
)
}

Check warning on line 173 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L167-L173

Added lines #L167 - L173 were not covered by tests
logger.Infof(ctx, "successfully copied %d remote files from [%s] to local [%s]", downloadSuccess, blobRef, toPath)
return toPath, nil
} else if blob.GetMetadata().GetType().Dimensionality == core.BlobType_SINGLE {
// reader should be declared here (avoid being shared across all goroutines)
var reader io.ReadCloser
if scheme == "http" || scheme == "https" {
reader, err = DownloadFileFromHTTP(ctx, blobRef)
} else {
reader, err = DownloadFileFromStorage(ctx, blobRef, d.store)
}
reader, err = DownloadFileFromStorage(ctx, ref, d.store)
}
if err != nil {
logger.Errorf(ctx, "Failed to download from ref [%s]", ref)
return nil, err
}
defer func() {
err := reader.Close()
if err != nil {
logger.Errorf(ctx, "failed to close Blob read stream @ref [%s]. Error: %s", ref, err)
logger.Errorf(ctx, "Failed to download from ref [%s]", blobRef)
return nil, err

Check warning on line 186 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L185-L186

Added lines #L185 - L186 were not covered by tests
}
}()
defer func() {
err := reader.Close()
if err != nil {
logger.Errorf(ctx, "failed to close Blob read stream @ref [%s]. Error: %s", blobRef, err)
}

Check warning on line 192 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L191-L192

Added lines #L191 - L192 were not covered by tests
}()

writer, err := os.Create(toFilePath)
if err != nil {
return nil, errors.Wrapf(err, "failed to open file at path %s", toFilePath)
}
defer func() {
err := writer.Close()
writer, err := os.Create(toPath)
if err != nil {
logger.Errorf(ctx, "failed to close File write stream. Error: %s", err)
return nil, errors.Wrapf(err, "failed to open file at path %s", toPath)

Check warning on line 197 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L197

Added line #L197 was not covered by tests
}
}()
v, err := io.Copy(writer, reader)
if err != nil {
return nil, errors.Wrapf(err, "failed to write remote data to local filesystem")
defer func() {
err := writer.Close()
if err != nil {
logger.Errorf(ctx, "failed to close File write stream. Error: %s", err)
}

Check warning on line 203 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L202-L203

Added lines #L202 - L203 were not covered by tests
}()
v, err := io.Copy(writer, reader)
if err != nil {
return nil, errors.Wrapf(err, "failed to write remote data to local filesystem")
}

Check warning on line 208 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L207-L208

Added lines #L207 - L208 were not covered by tests
logger.Infof(ctx, "Successfully copied [%d] bytes remote data from [%s] to local [%s]", v, blobRef, toPath)
return toPath, nil
}
logger.Infof(ctx, "Successfully copied [%d] bytes remote data from [%s] to local [%s]", v, ref, toFilePath)
return toFilePath, nil

return nil, errors.Errorf("unexpected Blob type encountered")

Check warning on line 213 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L213

Added line #L213 was not covered by tests
}

func (d Downloader) handleSchema(ctx context.Context, schema *core.Schema, toFilePath string) (interface{}, error) {
// TODO Handle schema type
return d.handleBlob(ctx, &core.Blob{Uri: schema.Uri, Metadata: &core.BlobMetadata{Type: &core.BlobType{Dimensionality: core.BlobType_MULTIPART}}}, toFilePath)
}

Expand Down
151 changes: 151 additions & 0 deletions flytecopilot/data/download_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package data

import (
"bytes"
"context"
"os"
"path/filepath"
"testing"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/flyteorg/flyte/flytestdlib/storage"

"github.com/stretchr/testify/assert"
)

func TestHandleBlobMultipart(t *testing.T) {
t.Run("Successful Query", func(t *testing.T) {
s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)
ref := storage.DataReference("s3://container/folder/file1")
s.WriteRaw(context.Background(), ref, 0, storage.Options{}, bytes.NewReader([]byte{}))
ref = storage.DataReference("s3://container/folder/file2")
s.WriteRaw(context.Background(), ref, 0, storage.Options{}, bytes.NewReader([]byte{}))

d := Downloader{store: s}

blob := &core.Blob{
Uri: "s3://container/folder",
Metadata: &core.BlobMetadata{
Type: &core.BlobType{
Dimensionality: core.BlobType_MULTIPART,
},
},
}

toPath := "./inputs"
defer func() {
err := os.RemoveAll(toPath)
if err != nil {
t.Errorf("Failed to delete directory: %v", err)
}
}()

result, err := d.handleBlob(context.Background(), blob, toPath)
assert.NoError(t, err)
assert.Equal(t, toPath, result)

// Check if files were created and data written
for _, file := range []string{"file1", "file2"} {
if _, err := os.Stat(filepath.Join(toPath, "folder", file)); os.IsNotExist(err) {
t.Errorf("expected file %s to exist", file)
}
}
})

t.Run("No Items", func(t *testing.T) {
s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)

d := Downloader{store: s}

blob := &core.Blob{
Uri: "s3://container/folder",
Metadata: &core.BlobMetadata{
Type: &core.BlobType{
Dimensionality: core.BlobType_MULTIPART,
},
},
}

toPath := "./inputs"
defer func() {
err := os.RemoveAll(toPath)
if err != nil {
t.Errorf("Failed to delete directory: %v", err)
}
}()

result, err := d.handleBlob(context.Background(), blob, toPath)
assert.Error(t, err)
assert.Nil(t, result)
})
}

func TestHandleBlobSinglePart(t *testing.T) {
s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)
ref := storage.DataReference("s3://container/file")
s.WriteRaw(context.Background(), ref, 0, storage.Options{}, bytes.NewReader([]byte{}))

d := Downloader{store: s}

blob := &core.Blob{
Uri: "s3://container/file",
Metadata: &core.BlobMetadata{
Type: &core.BlobType{
Dimensionality: core.BlobType_SINGLE,
},
},
}

toPath := "./input"
defer func() {
err := os.RemoveAll(toPath)
if err != nil {
t.Errorf("Failed to delete file: %v", err)
}
}()

result, err := d.handleBlob(context.Background(), blob, toPath)
assert.NoError(t, err)
assert.Equal(t, toPath, result)

// Check if files were created and data written
if _, err := os.Stat(toPath); os.IsNotExist(err) {
t.Errorf("expected file %s to exist", toPath)
}
}

func TestHandleBlobHTTP(t *testing.T) {
s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)
d := Downloader{store: s}

blob := &core.Blob{
Uri: "https://raw.githubusercontent.com/flyteorg/flyte/master/README.md",
Metadata: &core.BlobMetadata{
Type: &core.BlobType{
Dimensionality: core.BlobType_SINGLE,
},
},
}

toPath := "./input"
defer func() {
err := os.RemoveAll(toPath)
if err != nil {
t.Errorf("Failed to delete file: %v", err)
}
}()

result, err := d.handleBlob(context.Background(), blob, toPath)
assert.NoError(t, err)
assert.Equal(t, toPath, result)

// Check if files were created and data written
if _, err := os.Stat(toPath); os.IsNotExist(err) {
t.Errorf("expected file %s to exist", toPath)
}
}
Loading
Loading