Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[copilot][flytedirectory] multipart blob download #5715

Merged
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
43a0b18
add download multipart blob
wayner0628 Sep 1, 2024
6f78352
recursively process subparts
wayner0628 Sep 1, 2024
69514d6
implement GetItems function
wayner0628 Sep 4, 2024
f13022b
add unit testing
wayner0628 Sep 4, 2024
8fae9f9
Parallelly handle blob items
wayner0628 Sep 4, 2024
e82c5de
fix lint error
wayner0628 Sep 4, 2024
d7c4686
implement GetItems function
wayner0628 Sep 4, 2024
ceaa72d
add mutex avoid racing
wayner0628 Sep 4, 2024
1f0b195
avoid infinite call
wayner0628 Sep 4, 2024
19b0ae8
protect critical variables
wayner0628 Sep 5, 2024
b948aee
avoid infinite call
wayner0628 Sep 5, 2024
c88813f
lint
wayner0628 Sep 5, 2024
df9b8ed
add more unit tests
wayner0628 Sep 5, 2024
8150baa
add more unit tests
wayner0628 Sep 5, 2024
672b711
fix mock
wayner0628 Sep 5, 2024
da3de3c
Merge remote-tracking branch 'origin/master' into feature/download-mu…
wayner0628 Sep 15, 2024
96c4177
Accept incoming changes
wayner0628 Sep 15, 2024
ad12330
Accept incoming changes
wayner0628 Sep 15, 2024
65611c0
multipart blob download based on new api
wayner0628 Sep 15, 2024
38a030b
cache store stop listing at end cursor
wayner0628 Sep 15, 2024
abf7f6a
lint
wayner0628 Sep 15, 2024
2703848
remove old api mock
wayner0628 Sep 15, 2024
99847bd
remove old api mock
wayner0628 Sep 15, 2024
e008444
remove old api mock
wayner0628 Sep 15, 2024
acc16c8
update mem_store List to return global path
wayner0628 Oct 22, 2024
7ca6af1
change mkdir perm
wayner0628 Nov 7, 2024
ac2940a
add comments and handle more errors
wayner0628 Nov 7, 2024
27cdeee
Merge branch 'master' into feature/download-multipart-blob
wayner0628 Nov 8, 2024
bf65836
lint
wayner0628 Nov 8, 2024
db481d0
address race condition and aggregate errors
wayner0628 Nov 8, 2024
03e8221
fix tests
Future-Outlier Nov 8, 2024
dbbd8c3
err msg enhancement
Future-Outlier Nov 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions flyteadmin/pkg/common/mocks/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@
return storage.SignedURLResponse{URL: *signedURL}, nil
}

func (t *TestDataStore) GetItems(ctx context.Context, reference storage.DataReference) ([]string, error) {
var s []string
return s, nil

Check warning on line 59 in flyteadmin/pkg/common/mocks/storage.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/common/mocks/storage.go#L57-L59

Added lines #L57 - L59 were not covered by tests
}

// Retrieves a byte array from the Blob store or an error
func (t *TestDataStore) ReadRaw(ctx context.Context, reference storage.DataReference) (io.ReadCloser, error) {
return NopCloser{}, nil
Expand Down
99 changes: 85 additions & 14 deletions flytecopilot/data/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
"io/ioutil"
"os"
"path"
"path/filepath"
"reflect"
"strconv"
"sync"

"github.com/ghodss/yaml"
"github.com/golang/protobuf/jsonpb"
Expand All @@ -31,24 +33,94 @@
mode core.IOStrategy_DownloadMode
}

// TODO add support for multipart blobs
func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toFilePath string) (interface{}, error) {
func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath string) (interface{}, error) {
ref := storage.DataReference(blob.Uri)
scheme, _, _, err := ref.Split()
if err != nil {
return nil, errors.Wrapf(err, "Blob uri incorrectly formatted")
}

if blob.GetMetadata().GetType().Dimensionality == core.BlobType_MULTIPART {
items, err := d.store.GetItems(ctx, ref)
if err != nil || len(items) == 0 {
logger.Errorf(ctx, "failed to collect items from multipart blob [%s]", ref)
return nil, err
}

success := 0
wayner0628 marked this conversation as resolved.
Show resolved Hide resolved
var mu sync.Mutex
var wg sync.WaitGroup
for _, absPath := range items {
// capture range variable
absPath := absPath

wg.Add(1)
go func() {
defer wg.Done()
defer func() {
if err := recover(); err != nil {
logger.Errorf(ctx, "recover receives error: %s", err)

Check warning on line 62 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L62

Added line #L62 was not covered by tests
}
}()

ref := storage.DataReference(absPath)
reader, err := DownloadFileFromStorage(ctx, ref, d.store)
if err != nil {
logger.Errorf(ctx, "Failed to download from ref [%s]", ref)
return

Check warning on line 70 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L69-L70

Added lines #L69 - L70 were not covered by tests
}
defer func() {
err := reader.Close()
if err != nil {
logger.Errorf(ctx, "failed to close Blob read stream @ref [%s]. Error: %s", ref, err)

Check warning on line 75 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L75

Added line #L75 was not covered by tests
}
}()

_, _, relativePath, err := ref.Split()
if err != nil {
logger.Errorf(ctx, "Failed to parse ref [%s]", ref)
return

Check warning on line 82 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L81-L82

Added lines #L81 - L82 were not covered by tests
}
newPath := filepath.Join(toPath, relativePath)
dir := filepath.Dir(newPath)

mu.Lock()
// 0755: the directory can be read by anyone but can only be written by the owner
wayner0628 marked this conversation as resolved.
Show resolved Hide resolved
os.MkdirAll(dir, 0755)
writer, err := os.Create(newPath)
mu.Unlock()
if err != nil {
wayner0628 marked this conversation as resolved.
Show resolved Hide resolved
logger.Errorf(ctx, "failed to open file at path %s", newPath)
return

Check warning on line 94 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L93-L94

Added lines #L93 - L94 were not covered by tests
}
defer func() {
err := writer.Close()
if err != nil {
logger.Errorf(ctx, "failed to close File write stream. Error: %s", err)

Check warning on line 99 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L99

Added line #L99 was not covered by tests
}
}()

_, err = io.Copy(writer, reader)
if err != nil {
logger.Errorf(ctx, "failed to write remote data to local filesystem")
return

Check warning on line 106 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L105-L106

Added lines #L105 - L106 were not covered by tests
}
mu.Lock()
success += 1
mu.Unlock()
}()
}
wg.Wait()
ref = storage.DataReference(blob.Uri)
logger.Infof(ctx, "Successfully copied [%d] remote files from [%s] to local [%s]", success, ref, toPath)
return toPath, nil
}

// reader should be declared here (avoid being shared across all goroutines)
var reader io.ReadCloser
if scheme == "http" || scheme == "https" {
reader, err = DownloadFileFromHTTP(ctx, ref)
} else {
if blob.GetMetadata().GetType().Dimensionality == core.BlobType_MULTIPART {
logger.Warnf(ctx, "Currently only single part blobs are supported, we will force multipart to be 'path/00000'")
ref, err = d.store.ConstructReference(ctx, ref, "000000")
if err != nil {
return nil, err
}
}
reader, err = DownloadFileFromStorage(ctx, ref, d.store)
}
if err != nil {
Expand All @@ -62,9 +134,9 @@
}
}()

writer, err := os.Create(toFilePath)
writer, err := os.Create(toPath)
if err != nil {
return nil, errors.Wrapf(err, "failed to open file at path %s", toFilePath)
return nil, errors.Wrapf(err, "failed to open file at path %s", toPath)

Check warning on line 139 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L139

Added line #L139 was not covered by tests
}
defer func() {
err := writer.Close()
Expand All @@ -76,12 +148,11 @@
if err != nil {
return nil, errors.Wrapf(err, "failed to write remote data to local filesystem")
}
logger.Infof(ctx, "Successfully copied [%d] bytes remote data from [%s] to local [%s]", v, ref, toFilePath)
return toFilePath, nil
logger.Infof(ctx, "Successfully copied [%d] bytes remote data from [%s] to local [%s]", v, ref, toPath)
return toPath, nil
}

func (d Downloader) handleSchema(ctx context.Context, schema *core.Schema, toFilePath string) (interface{}, error) {
// TODO Handle schema type
return d.handleBlob(ctx, &core.Blob{Uri: schema.Uri, Metadata: &core.BlobMetadata{Type: &core.BlobType{Dimensionality: core.BlobType_MULTIPART}}}, toFilePath)
}

Expand Down
151 changes: 151 additions & 0 deletions flytecopilot/data/download_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package data

import (
"bytes"
"context"
"os"
"path/filepath"
"testing"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/flyteorg/flyte/flytestdlib/storage"

"github.com/stretchr/testify/assert"
)

func TestHandleBlobMultipart(t *testing.T) {
t.Run("Successful Query", func(t *testing.T) {
s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)
ref := storage.DataReference("s3://container/folder/file1")
s.WriteRaw(context.Background(), ref, 0, storage.Options{}, bytes.NewReader([]byte{}))
ref = storage.DataReference("s3://container/folder/file2")
s.WriteRaw(context.Background(), ref, 0, storage.Options{}, bytes.NewReader([]byte{}))

d := Downloader{store: s}

blob := &core.Blob{
Uri: "s3://container/folder",
Metadata: &core.BlobMetadata{
Type: &core.BlobType{
Dimensionality: core.BlobType_MULTIPART,
},
},
}

toPath := "./inputs"
defer func() {
err := os.RemoveAll(toPath)
if err != nil {
t.Errorf("Failed to delete directory: %v", err)
}
}()

result, err := d.handleBlob(context.Background(), blob, toPath)
assert.NoError(t, err)
assert.Equal(t, toPath, result)

// Check if files were created and data written
for _, file := range []string{"file1", "file2"} {
if _, err := os.Stat(filepath.Join(toPath, "folder", file)); os.IsNotExist(err) {
t.Errorf("expected file %s to exist", file)
}
}
})

t.Run("No Items", func(t *testing.T) {
s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)

d := Downloader{store: s}

blob := &core.Blob{
Uri: "s3://container/folder",
Metadata: &core.BlobMetadata{
Type: &core.BlobType{
Dimensionality: core.BlobType_MULTIPART,
},
},
}

toPath := "./inputs"
defer func() {
err := os.RemoveAll(toPath)
if err != nil {
t.Errorf("Failed to delete directory: %v", err)
}
}()

result, err := d.handleBlob(context.Background(), blob, toPath)
assert.Error(t, err)
assert.Nil(t, result)
})
}

func TestHandleBlobSinglePart(t *testing.T) {
s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)
ref := storage.DataReference("s3://container/file")
s.WriteRaw(context.Background(), ref, 0, storage.Options{}, bytes.NewReader([]byte{}))

d := Downloader{store: s}

blob := &core.Blob{
Uri: "s3://container/file",
Metadata: &core.BlobMetadata{
Type: &core.BlobType{
Dimensionality: core.BlobType_SINGLE,
},
},
}

toPath := "./input"
defer func() {
err := os.RemoveAll(toPath)
if err != nil {
t.Errorf("Failed to delete file: %v", err)
}
}()

result, err := d.handleBlob(context.Background(), blob, toPath)
assert.NoError(t, err)
assert.Equal(t, toPath, result)

// Check if files were created and data written
if _, err := os.Stat(toPath); os.IsNotExist(err) {
t.Errorf("expected file %s to exist", toPath)
}
}

func TestHandleBlobHTTP(t *testing.T) {
s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)
d := Downloader{store: s}

blob := &core.Blob{
Uri: "https://raw.githubusercontent.com/flyteorg/flyte/master/README.md",
Metadata: &core.BlobMetadata{
Type: &core.BlobType{
Dimensionality: core.BlobType_SINGLE,
},
},
}

toPath := "./input"
defer func() {
err := os.RemoveAll(toPath)
if err != nil {
t.Errorf("Failed to delete file: %v", err)
}
}()

result, err := d.handleBlob(context.Background(), blob, toPath)
assert.NoError(t, err)
assert.Equal(t, toPath, result)

// Check if files were created and data written
if _, err := os.Stat(toPath); os.IsNotExist(err) {
t.Errorf("expected file %s to exist", toPath)
}
}
4 changes: 4 additions & 0 deletions flytepropeller/pkg/utils/failing_datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ func (FailingRawStore) Head(ctx context.Context, reference storage.DataReference
return nil, fmt.Errorf("failed metadata fetch")
}

func (FailingRawStore) GetItems(ctx context.Context, reference storage.DataReference) ([]string, error) {
return nil, fmt.Errorf("failed get items")
}

func (FailingRawStore) ReadRaw(ctx context.Context, reference storage.DataReference) (io.ReadCloser, error) {
return nil, fmt.Errorf("failed read raw")
}
Expand Down
3 changes: 3 additions & 0 deletions flytepropeller/pkg/utils/failing_datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ func TestFailingRawStore(t *testing.T) {
c := f.GetBaseContainerFQN(ctx)
assert.Equal(t, storage.DataReference(""), c)

_, err = f.GetItems(ctx, "")
assert.Error(t, err)

_, err = f.ReadRaw(ctx, "")
assert.Error(t, err)

Expand Down
9 changes: 9 additions & 0 deletions flytestdlib/storage/cached_rawstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ func (s *cachedRawStore) Head(ctx context.Context, reference DataReference) (Met
return s.RawStore.Head(ctx, reference)
}

// GetItems retrieves the paths of all items from the Blob store or an error
func (s *cachedRawStore) GetItems(ctx context.Context, reference DataReference) ([]string, error) {
_, span := otelutils.NewSpan(ctx, otelutils.BlobstoreClientTracer, "flytestdlib.storage.cachedRawStore/GetItems")
defer span.End()

// freecache does not support full cache scanning
return s.RawStore.GetItems(ctx, reference)
}

// ReadRaw retrieves a byte array from the Blob store or an error
func (s *cachedRawStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error) {
ctx, span := otelutils.NewSpan(ctx, otelutils.BlobstoreClientTracer, "flytestdlib.storage.cachedRawStore/ReadRaw")
Expand Down
25 changes: 25 additions & 0 deletions flytestdlib/storage/cached_rawstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func dummyCacheStore(t *testing.T, store RawStore, metrics *cacheMetrics) *cache
type dummyStore struct {
copyImpl
HeadCb func(ctx context.Context, reference DataReference) (Metadata, error)
GetItemsCb func(ctx context.Context, reference DataReference) ([]string, error)
ReadRawCb func(ctx context.Context, reference DataReference) (io.ReadCloser, error)
WriteRawCb func(ctx context.Context, reference DataReference, size int64, opts Options, raw io.Reader) error
DeleteCb func(ctx context.Context, reference DataReference) error
Expand All @@ -73,6 +74,10 @@ func (d *dummyStore) Head(ctx context.Context, reference DataReference) (Metadat
return d.HeadCb(ctx, reference)
}

func (d *dummyStore) GetItems(ctx context.Context, reference DataReference) ([]string, error) {
return d.GetItemsCb(ctx, reference)
}

func (d *dummyStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error) {
return d.ReadRawCb(ctx, reference)
}
Expand Down Expand Up @@ -105,6 +110,14 @@ func TestCachedRawStore(t *testing.T) {
}
return MemoryMetadata{}, fmt.Errorf("err")
},
GetItemsCb: func(ctx context.Context, reference DataReference) ([]string, error) {
var s []string
if reference == "k1" {
s = append(s, "item")
return s, nil
}
return s, fmt.Errorf("err")
},
WriteRawCb: func(ctx context.Context, reference DataReference, size int64, opts Options, raw io.Reader) error {
if writeCalled {
assert.FailNow(t, "Should not be writeCalled")
Expand Down Expand Up @@ -160,6 +173,18 @@ func TestCachedRawStore(t *testing.T) {
assert.False(t, m.Exists())
})

t.Run("Get Items", func(t *testing.T) {
items, err := cStore.GetItems(ctx, k1)
assert.NoError(t, err)
assert.Equal(t, 1, len(items))
})

t.Run("No Items", func(t *testing.T) {
items, err := cStore.GetItems(ctx, k2)
assert.Error(t, err)
assert.Equal(t, 0, len(items))
})

t.Run("ReadCachePopulate", func(t *testing.T) {
o, err := cStore.ReadRaw(ctx, k1)
assert.NoError(t, err)
Expand Down
Loading
Loading