diff --git a/flytecopilot/data/download.go b/flytecopilot/data/download.go index 73d6e3be53..24450697e7 100644 --- a/flytecopilot/data/download.go +++ b/flytecopilot/data/download.go @@ -358,6 +358,10 @@ func (d Downloader) handleLiteral(ctx context.Context, lit *core.Literal, filePa Scalar: s, }}, nil case *core.Literal_Collection: + err := os.MkdirAll(filePath, os.ModePerm) + if err != nil { + return nil, nil, errors.Wrapf(err, "failed to create directory [%s]", filePath) + } v, c2, err := d.handleCollection(ctx, lit.GetCollection(), filePath, writeToFile) if err != nil { return nil, nil, err @@ -366,6 +370,10 @@ func (d Downloader) handleLiteral(ctx context.Context, lit *core.Literal, filePa Collection: c2, }}, nil case *core.Literal_Map: + err := os.MkdirAll(filePath, os.ModePerm) + if err != nil { + return nil, nil, errors.Wrapf(err, "failed to create directory [%s]", filePath) + } v, m, err := d.RecursiveDownload(ctx, lit.GetMap(), filePath, writeToFile) if err != nil { return nil, nil, err @@ -410,6 +418,16 @@ func (d Downloader) RecursiveDownload(ctx context.Context, inputs *core.LiteralM } f := make(FutureMap, len(inputs.GetLiterals())) for variable, literal := range inputs.GetLiterals() { + if literal.GetOffloadedMetadata() != nil { + offloadedMetadataURI := literal.GetOffloadedMetadata().GetUri() + // literal will be overwritten with the contents of the offloaded data which contains the actual large literal. + if err := d.store.ReadProtobuf(ctx, storage.DataReference(offloadedMetadataURI), literal); err != nil { + errString := fmt.Sprintf("Failed to read the object at location [%s] with error [%s]", offloadedMetadataURI, err) + logger.Error(ctx, errString) + return nil, nil, fmt.Errorf("%s", errString) + } + logger.Infof(ctx, "read object at location [%s]", offloadedMetadataURI) + } varPath := path.Join(dir, variable) lit := literal f[variable] = futures.NewAsyncFuture(childCtx, func(ctx2 context.Context) (interface{}, error) { diff --git a/flytecopilot/data/download_test.go b/flytecopilot/data/download_test.go index b4bee54fc5..dbc7cb33e7 100644 --- a/flytecopilot/data/download_test.go +++ b/flytecopilot/data/download_test.go @@ -152,3 +152,162 @@ func TestHandleBlobHTTP(t *testing.T) { t.Errorf("expected file %s to exist", toPath) } } + +func TestRecursiveDownload(t *testing.T) { + t.Run("OffloadedMetadataContainsCollectionOfStrings", func(t *testing.T) { + s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) + assert.NoError(t, err) + + d := Downloader{store: s} + + offloadedLiteral := &core.Literal{ + Value: &core.Literal_OffloadedMetadata{ + OffloadedMetadata: &core.LiteralOffloadedMetadata{ + Uri: "s3://container/offloaded", + }, + }, + } + + inputs := &core.LiteralMap{ + Literals: map[string]*core.Literal{ + "input1": offloadedLiteral, + }, + } + + // Mock reading the offloaded metadata + err = s.WriteProtobuf(context.Background(), storage.DataReference("s3://container/offloaded"), storage.Options{}, &core.Literal{ + Value: &core.Literal_Collection{ + Collection: &core.LiteralCollection{ + Literals: []*core.Literal{ + { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_Primitive{ + Primitive: &core.Primitive{ + Value: &core.Primitive_StringValue{ + StringValue: "string1", + }, + }, + }, + }, + }, + }, + { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_Primitive{ + Primitive: &core.Primitive{ + Value: &core.Primitive_StringValue{ + StringValue: "string2", + }, + }, + }, + }, + }, + }, + }, + }, + }, + }) + assert.NoError(t, err) + + toPath := "./inputs" + defer func() { + err := os.RemoveAll(toPath) + if err != nil { + t.Errorf("Failed to delete directory: %v", err) + } + }() + + varMap, lMap, err := d.RecursiveDownload(context.Background(), inputs, toPath, true) + assert.NoError(t, err) + assert.NotNil(t, varMap) + assert.NotNil(t, lMap) + assert.Equal(t, []interface{}{"string1", "string2"}, varMap["input1"]) + // Check if files were created and data written + for _, file := range []string{"0", "1"} { + if _, err := os.Stat(filepath.Join(toPath, "input1", file)); os.IsNotExist(err) { + t.Errorf("expected file %s to exist", file) + } + } + }) + + t.Run("OffloadedMetadataContainsMapOfStringString", func(t *testing.T) { + s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) + assert.NoError(t, err) + + d := Downloader{store: s} + + offloadedLiteral := &core.Literal{ + Value: &core.Literal_OffloadedMetadata{ + OffloadedMetadata: &core.LiteralOffloadedMetadata{ + Uri: "s3://container/offloaded", + }, + }, + } + + inputs := &core.LiteralMap{ + Literals: map[string]*core.Literal{ + "input1": offloadedLiteral, + }, + } + + // Mock reading the offloaded metadata + err = s.WriteProtobuf(context.Background(), storage.DataReference("s3://container/offloaded"), storage.Options{}, &core.Literal{ + Value: &core.Literal_Map{ + Map: &core.LiteralMap{ + Literals: map[string]*core.Literal{ + "key1": { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_Primitive{ + Primitive: &core.Primitive{ + Value: &core.Primitive_StringValue{ + StringValue: "value1", + }, + }, + }, + }, + }, + }, + "key2": { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_Primitive{ + Primitive: &core.Primitive{ + Value: &core.Primitive_StringValue{ + StringValue: "value2", + }, + }, + }, + }, + }, + }, + }, + }, + }, + }) + assert.NoError(t, err) + + toPath := "./inputs" + defer func() { + err := os.RemoveAll(toPath) + if err != nil { + t.Errorf("Failed to delete directory: %v", err) + } + }() + + varMap, lMap, err := d.RecursiveDownload(context.Background(), inputs, toPath, true) + assert.NoError(t, err) + assert.NotNil(t, varMap) + assert.NotNil(t, lMap) + assert.Equal(t, "value1", varMap["input1"].(VarMap)["key1"]) + assert.Equal(t, "value2", varMap["input1"].(VarMap)["key2"]) + + for _, file := range []string{"key1", "key2"} { + if _, err := os.Stat(filepath.Join(toPath, "input1", file)); os.IsNotExist(err) { + t.Errorf("expected file %s to exist", file) + } + } + }) +}