diff --git a/flyteadmin/pkg/manager/impl/node_execution_manager.go b/flyteadmin/pkg/manager/impl/node_execution_manager.go index 41f09f728b..8b51ba7943 100644 --- a/flyteadmin/pkg/manager/impl/node_execution_manager.go +++ b/flyteadmin/pkg/manager/impl/node_execution_manager.go @@ -437,9 +437,12 @@ func (m *NodeExecutionManager) GetNodeExecutionData( logger.Debugf(ctx, "failed to transform node execution model [%+v] when fetching data: %v", request.Id, err) return nil, err } - signedInputsURLBlob, err := m.urlData.Get(ctx, nodeExecution.InputUri) - if err != nil { - return nil, err + signedInputsURLBlob := admin.UrlBlob{} + if nodeExecution.InputUri != "" { + signedInputsURLBlob, err = m.urlData.Get(ctx, nodeExecution.InputUri) + if err != nil { + return nil, err + } } signedOutputsURLBlob := admin.UrlBlob{} if nodeExecution.Closure.GetOutputUri() != "" { diff --git a/flyteadmin/pkg/manager/impl/util/data.go b/flyteadmin/pkg/manager/impl/util/data.go index 61d2360858..a227200559 100644 --- a/flyteadmin/pkg/manager/impl/util/data.go +++ b/flyteadmin/pkg/manager/impl/util/data.go @@ -8,7 +8,7 @@ import ( func ShouldFetchData(config *interfaces.RemoteDataConfig, urlBlob admin.UrlBlob) bool { return config.Scheme == common.Local || config.Scheme == common.None || config.MaxSizeInBytes == 0 || - urlBlob.Bytes < config.MaxSizeInBytes + (len(urlBlob.Url) > 0 && urlBlob.Bytes < config.MaxSizeInBytes) } func ShouldFetchOutputData(config *interfaces.RemoteDataConfig, urlBlob admin.UrlBlob, outputURI string) bool { diff --git a/flyteadmin/pkg/manager/impl/util/data_test.go b/flyteadmin/pkg/manager/impl/util/data_test.go index 3f1505f2f1..9525b1f1ee 100644 --- a/flyteadmin/pkg/manager/impl/util/data_test.go +++ b/flyteadmin/pkg/manager/impl/util/data_test.go @@ -15,6 +15,7 @@ func TestShouldFetchData(t *testing.T) { Scheme: common.Local, MaxSizeInBytes: 100, }, admin.UrlBlob{ + Url: "s3://data", Bytes: 200, })) }) @@ -23,6 +24,7 @@ func TestShouldFetchData(t *testing.T) { Scheme: common.None, MaxSizeInBytes: 100, }, admin.UrlBlob{ + Url: "s3://data", Bytes: 200, })) }) @@ -30,6 +32,7 @@ func TestShouldFetchData(t *testing.T) { assert.True(t, ShouldFetchData(&interfaces.RemoteDataConfig{ Scheme: common.None, }, admin.UrlBlob{ + Url: "s3://data", Bytes: 200, })) }) @@ -38,10 +41,20 @@ func TestShouldFetchData(t *testing.T) { Scheme: common.AWS, MaxSizeInBytes: 1000, }, admin.UrlBlob{ + Url: "s3://data", Bytes: 200, })) }) t.Run("max size over limit", func(t *testing.T) { + assert.False(t, ShouldFetchData(&interfaces.RemoteDataConfig{ + Scheme: common.AWS, + MaxSizeInBytes: 100, + }, admin.UrlBlob{ + Url: "s3://data", + Bytes: 200, + })) + }) + t.Run("empty url config", func(t *testing.T) { assert.False(t, ShouldFetchData(&interfaces.RemoteDataConfig{ Scheme: common.AWS, MaxSizeInBytes: 100, @@ -57,6 +70,7 @@ func TestShouldFetchOutputData(t *testing.T) { Scheme: common.Local, MaxSizeInBytes: 100, }, admin.UrlBlob{ + Url: "s3://data", Bytes: 200, }, "s3://foo/bar.txt")) }) @@ -65,6 +79,7 @@ func TestShouldFetchOutputData(t *testing.T) { Scheme: common.AWS, MaxSizeInBytes: 1000, }, admin.UrlBlob{ + Url: "s3://data", Bytes: 200, }, "s3://foo/bar.txt")) }) @@ -73,6 +88,7 @@ func TestShouldFetchOutputData(t *testing.T) { Scheme: common.AWS, MaxSizeInBytes: 1000, }, admin.UrlBlob{ + Url: "s3://data", Bytes: 200, }, "")) })