Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Add deckURI to NodeExecutionData #413

Merged
merged 18 commits into from
Jun 7, 2022
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,5 @@ require (
)

replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.20210825070134-bfc34418fe84

replace github.com/flyteorg/flyteidl => github.com/flyteorg/flyteidl v0.24.22-0.20220423164627-6a98b8b5c8b0
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -367,10 +367,10 @@ github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v0.23.0/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.24.0/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.24.17 h1:Xx70bJbuQGyvS8uAyU4AN74rot6KnzJ9r/L9gcCdEsU=
github.com/flyteorg/flyteidl v0.24.17/go.mod h1:vHSugApgS3hRITIafzQDU8DZD/W8wFRfFcgaFU35Dww=
github.com/flyteorg/flyteidl v0.24.22-0.20220421094708-8ebe1b261b20 h1:R4DJ/ebC7XCIugC5IMKcGXLpRUqRMh5JFYlWvaspSQg=
github.com/flyteorg/flyteidl v0.24.22-0.20220421094708-8ebe1b261b20/go.mod h1:vHSugApgS3hRITIafzQDU8DZD/W8wFRfFcgaFU35Dww=
github.com/flyteorg/flyteidl v0.24.22-0.20220423164627-6a98b8b5c8b0 h1:GGZS03gQYTNcoHvWBEV/ZQgO1gwXRO+nrNBPiUhnOJE=
github.com/flyteorg/flyteidl v0.24.22-0.20220423164627-6a98b8b5c8b0/go.mod h1:vHSugApgS3hRITIafzQDU8DZD/W8wFRfFcgaFU35Dww=
github.com/flyteorg/flyteplugins v0.10.16 h1:rwNI2MACPbcST2O6CEUsNW6bccz7ZLni0GiY3orevfw=
github.com/flyteorg/flyteplugins v0.10.16/go.mod h1:YBWV8QnFakDJfLyua8pYddiWqszAqseBKIJPNMERlos=
github.com/flyteorg/flytepropeller v0.16.36 h1:5uE8JsutrPVyLVrRJ8BgvhZUOmTBFkEkn5xmIOo21nU=
Expand Down
33 changes: 24 additions & 9 deletions pkg/data/implementations/aws_remote_url.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type s3Interface interface {
// AWS-specific implementation of RemoteURLInterface
type AWSRemoteURL struct {
s3Client s3Interface
presignClient s3Interface
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
presignDuration time.Duration
}

Expand Down Expand Up @@ -70,12 +71,7 @@ func (a *AWSRemoteURL) Get(ctx context.Context, uri string) (admin.UrlBlob, erro
codes.Internal, "failed to get object size for %s with %v", uri, err)
}

// The second return argument here is the GetObjectOutput, which we don't use below.
req, _ := a.s3Client.GetObjectRequest(&s3.GetObjectInput{
Bucket: &s3URI.bucket,
Key: &s3URI.key,
})
urlStr, err := req.Presign(a.presignDuration)
urlStr, err := a.getPresignURL(s3URI)
if err != nil {
logger.Warning(ctx,
"failed to presign url for uri [%s] for %v with err %v", uri, a.presignDuration, err)
Expand All @@ -92,14 +88,33 @@ func (a *AWSRemoteURL) Get(ctx context.Context, uri string) (admin.UrlBlob, erro
}, nil
}

func NewAWSRemoteURL(config *aws.Config, presignDuration time.Duration) interfaces.RemoteURLInterface {
func (a *AWSRemoteURL) getPresignURL(s3URI AWSS3Object) (string, error) {
// The second return argument here is the GetObjectOutput, which we don't use below.
req, _ := a.presignClient.GetObjectRequest(&s3.GetObjectInput{
Bucket: &s3URI.bucket,
Key: &s3URI.key,
})
return req.Presign(a.presignDuration)
}

func getS3Client(config *aws.Config) s3Interface {
sesh, err := session.NewSession(config)
if err != nil {
panic(err)
}
s3Client := s3.New(sesh)
return s3.New(sesh)
}

func NewAWSRemoteURL(config *aws.Config, presignDuration time.Duration) interfaces.RemoteURLInterface {
presignConfig := *config
endpoint := storage.GetConfig().SignedURL.StowConfigOverride["endpoint"]
if len(endpoint) > 0 {
presignConfig.WithEndpoint(endpoint)
}

return &AWSRemoteURL{
s3Client: s3Client,
s3Client: getS3Client(config),
presignClient: getS3Client(&presignConfig),
presignDuration: presignDuration,
}
}
6 changes: 6 additions & 0 deletions pkg/manager/impl/node_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,11 +507,17 @@ func (m *NodeExecutionManager) GetNodeExecutionData(
return nil, err
}

deckURI, err := util.GetDeckURI(ctx, m.urlData, nodeExecution.Closure)
if err != nil {
return nil, err
}

response := &admin.NodeExecutionGetDataResponse{
Inputs: inputURLBlob,
Outputs: outputURLBlob,
FullInputs: inputs,
FullOutputs: outputs,
DeckUri: deckURI,
}

if len(nodeExecutionModel.DynamicWorkflowRemoteClosureReference) > 0 {
Expand Down
22 changes: 22 additions & 0 deletions pkg/manager/impl/util/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package util

import (
"context"
"strings"

"github.com/flyteorg/flyteadmin/pkg/common"
dataInterfaces "github.com/flyteorg/flyteadmin/pkg/data/interfaces"
Expand All @@ -13,6 +14,11 @@ import (
"github.com/golang/protobuf/proto"
)

const (
outputsFile = "outputs.pb"
deckFile = "deck.html"
)

func shouldFetchData(config *runtimeInterfaces.RemoteDataConfig, urlBlob admin.UrlBlob) bool {
return config.Scheme == common.Local || config.Scheme == common.None || config.MaxSizeInBytes == 0 ||
urlBlob.Bytes < config.MaxSizeInBytes
Expand Down Expand Up @@ -123,3 +129,19 @@ func GetOutputs(ctx context.Context, urlData dataInterfaces.RemoteURLInterface,

return fullOutputs, &outputsURLBlob, nil
}

func GetDeckURI(ctx context.Context, urlData dataInterfaces.RemoteURLInterface, closure ExecutionClosure) (string, error) {
if closure == nil || len(closure.GetOutputUri()) == 0 {
return "", nil
}

outputURI := closure.GetOutputUri()
// Both files exist in the same folder
deckURI := strings.Replace(outputURI, outputsFile, deckFile, 1)

deckURLBlob, err := urlData.Get(ctx, deckURI)
if err != nil {
return "", err
}
return deckURLBlob.GetUrl(), nil
}
26 changes: 25 additions & 1 deletion pkg/manager/impl/util/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ func TestGetInputs(t *testing.T) {
assert.True(t, proto.Equal(fullInputs, testLiteralMap))
assert.Empty(t, inputURLBlob)
})

}

func TestGetOutputs(t *testing.T) {
Expand Down Expand Up @@ -233,6 +232,31 @@ func TestGetOutputs(t *testing.T) {
})
}

func TestGetDeckURI(t *testing.T) {
expectedDeckURI := "s3://foo/bar/deck.html"
expectedDeckSignURI := "s3://foo/signed/deck.html"

expectedDeckURLBlob := admin.UrlBlob{
Url: expectedDeckSignURI,
Bytes: 1000,
}

mockRemoteURL := urlMocks.NewMockRemoteURL()
mockRemoteURL.(*urlMocks.MockRemoteURL).GetCallback = func(ctx context.Context, uri string) (admin.UrlBlob, error) {
assert.Equal(t, expectedDeckURI, uri)
return expectedDeckURLBlob, nil
}

closure := &admin.NodeExecutionClosure{
OutputResult: &admin.NodeExecutionClosure_OutputUri{
OutputUri: testOutputsURI,
},
}
deckURI, err := GetDeckURI(context.TODO(), mockRemoteURL, closure)
assert.NoError(t, err)
assert.Equal(t, expectedDeckSignURI, deckURI)
}

func TestWorkflowExecutionClosure(t *testing.T) {
t.Run("outputs offloaded", func(t *testing.T) {
workflowExecutionClosure := admin.ExecutionClosure{
Expand Down