Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Raw output prefix #169

Merged
merged 16 commits into from
Aug 12, 2020
5 changes: 3 additions & 2 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ tasks:
- container
- K8S-ARRAY
- qubole-hive-executor
- sagemaker_training
- sagemaker_hyperparameter_tuning
# Uncomment to enable sagemaker plugin
# - sagemaker_training
# - sagemaker_hyperparameter_tuning
# Sample plugins config
plugins:
# All k8s plugins default configuration
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/grpc-ecosystem/grpc-gateway v1.14.3 // indirect
github.com/imdario/mergo v0.3.8 // indirect
github.com/lyft/datacatalog v0.2.1
github.com/lyft/flyteidl v0.18.0
github.com/lyft/flyteidl v0.18.1
github.com/lyft/flyteplugins v0.4.2
github.com/lyft/flytestdlib v0.3.9
github.com/magiconair/properties v1.8.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,8 @@ github.com/lyft/datacatalog v0.2.1/go.mod h1:ktrPvzTDUwHO5Lv0hLH38zLHnOJ++rGoAO0
github.com/lyft/flyteidl v0.17.0/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.18.0 h1:f4yv1MafE26wpMC6QlthM02EeTEDXpy/waL54dRDiSs=
github.com/lyft/flyteidl v0.18.0/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.18.1 h1:COKkZi5k6bQvUYOk5gE70+FJX9/NUn0WOQ1uMrw3Qio=
github.com/lyft/flyteidl v0.18.1/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteplugins v0.4.2 h1:DUyvi7PkJtQ+WV5ZlVypIfJOJOL3THn6QJgh5g24kG4=
github.com/lyft/flyteplugins v0.4.2/go.mod h1:8zhqFG9BzbHNQGEXzGYltTJLD+KTmQZkanxXgeFI25c=
github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
Expand Down
15 changes: 15 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/admin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package v1alpha1

import "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"

// This contains an OutputLocationPrefix. When running against AWS, this should be something of the form
// s3://my-bucket, or s3://my-bucket/ A sharding string will automatically be appended to this prefix before
// handing off to plugins/tasks. Sharding behavior may change in the future.
// Background available at https://github.com/lyft/flyte/issues/211
type RawOutputDataConfig struct {
*admin.RawOutputDataConfig
}

func (in *RawOutputDataConfig) DeepCopyInto(out *RawOutputDataConfig) {
EngHabu marked this conversation as resolved.
Show resolved Hide resolved
*out = *in
}
15 changes: 15 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/admin_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package v1alpha1

import (
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/stretchr/testify/assert"

"testing"
)

func TestRawOutputConfig(t *testing.T) {
r := RawOutputDataConfig{&admin.RawOutputDataConfig{
OutputLocationPrefix: "s3://bucket",
}}
assert.Equal(t, "s3://bucket", r.OutputLocationPrefix)
}
1 change: 1 addition & 0 deletions pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ type Meta interface {
GetName() string
GetServiceAccountName() string
IsInterruptible() bool
GetRawOutputDataConfig() RawOutputDataConfig
}

type TaskDetailsGetter interface {
Expand Down
32 changes: 32 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableWorkflow.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/mocks/Meta.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/mocks/MetaExtended.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 12 additions & 2 deletions pkg/apis/flyteworkflow/v1alpha1/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,14 @@ type FlyteWorkflow struct {
ServiceAccountName string `json:"serviceAccountName,omitempty" protobuf:"bytes,8,opt,name=serviceAccountName"`
// Status is the only mutable section in the workflow. It holds all the execution information
Status WorkflowStatus `json:"status,omitempty"`

// non-Serialized fields
// RawOutputDataConfig defines the configurations to use for generating raw outputs (e.g. blobs, schemas).
RawOutputDataConfig RawOutputDataConfig `json:"rawOutputDataConfig,omitempty"`

// non-Serialized fields (these will not get written to etcd)
// As of 2020-07, the only real implementation of this interface is a URLPathConstructor, which is just an empty
// struct. However, because this field is an interface, we create it once when the crd is hydrated from etcd,
// so that it can be used downstream without any confusion.
// This field is here because it's easier to put it here than pipe through a new object through all of propeller.
DataReferenceConstructor storage.ReferenceConstructor `json:"-"`
}

Expand Down Expand Up @@ -110,6 +116,10 @@ func (in *FlyteWorkflow) IsInterruptible() bool {
return in.NodeDefaults.Interruptible
}

func (in *FlyteWorkflow) GetRawOutputDataConfig() RawOutputDataConfig {
return in.RawOutputDataConfig
}

type Inputs struct {
*core.LiteralMap
}
Expand Down
32 changes: 32 additions & 0 deletions pkg/controller/executors/mocks/execution_context.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions pkg/controller/executors/mocks/immutable_execution_context.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions pkg/controller/nodes/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"testing"
"time"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/event"
"github.com/lyft/flytestdlib/promutils/labeled"
"github.com/lyft/flytestdlib/storage"
Expand Down Expand Up @@ -193,6 +195,9 @@ func TestNodeExecutor_RecursiveNodeHandler_RecurseStartNodes(t *testing.T) {
},
},
DataReferenceConstructor: store,
RawOutputDataConfig: v1alpha1.RawOutputDataConfig{
RawOutputDataConfig: &admin.RawOutputDataConfig{OutputLocationPrefix: ""},
},
}, startNode, startNodeStatus

}
Expand Down Expand Up @@ -292,6 +297,9 @@ func TestNodeExecutor_RecursiveNodeHandler_RecurseEndNode(t *testing.T) {
},
},
DataReferenceConstructor: store,
RawOutputDataConfig: v1alpha1.RawOutputDataConfig{
RawOutputDataConfig: &admin.RawOutputDataConfig{OutputLocationPrefix: ""},
},
}, n, ns

}
Expand Down Expand Up @@ -377,6 +385,9 @@ func TestNodeExecutor_RecursiveNodeHandler_RecurseEndNode(t *testing.T) {
},
},
DataReferenceConstructor: store,
RawOutputDataConfig: v1alpha1.RawOutputDataConfig{
RawOutputDataConfig: &admin.RawOutputDataConfig{OutputLocationPrefix: ""},
},
}, n, ns

}
Expand Down Expand Up @@ -507,6 +518,9 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) {
},
},
DataReferenceConstructor: store,
RawOutputDataConfig: v1alpha1.RawOutputDataConfig{
RawOutputDataConfig: &admin.RawOutputDataConfig{OutputLocationPrefix: ""},
},
}, n, ns

}
Expand Down Expand Up @@ -599,6 +613,9 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) {
mockWf.OnGetLabels().Return(make(map[string]string))
mockWf.OnIsInterruptible().Return(false)
mockWf.OnGetOnFailurePolicy().Return(v1alpha1.WorkflowOnFailurePolicy(core.WorkflowMetadata_FAIL_IMMEDIATELY))
mockWf.OnGetRawOutputDataConfig().Return(v1alpha1.RawOutputDataConfig{
RawOutputDataConfig: &admin.RawOutputDataConfig{OutputLocationPrefix: ""},
})
mockWfStatus.OnGetDataDir().Return(storage.DataReference("x"))
mockWfStatus.OnConstructNodeDataDirMatch(mock.Anything, mock.Anything, mock.Anything).Return("x", nil)
return mockWf, mockN2Status
Expand Down Expand Up @@ -1098,6 +1115,9 @@ func TestNodeExecutor_RecursiveNodeHandler_UpstreamNotReady(t *testing.T) {
},
},
DataReferenceConstructor: store,
RawOutputDataConfig: v1alpha1.RawOutputDataConfig{
RawOutputDataConfig: &admin.RawOutputDataConfig{OutputLocationPrefix: ""},
},
}, n, ns

}
Expand Down Expand Up @@ -1210,6 +1230,9 @@ func TestNodeExecutor_RecursiveNodeHandler_BranchNode(t *testing.T) {
eCtx.OnIsInterruptible().Return(true)
eCtx.OnGetExecutionID().Return(v1alpha1.WorkflowExecutionIdentifier{WorkflowExecutionIdentifier: &core.WorkflowExecutionIdentifier{}})
eCtx.OnGetLabels().Return(nil)
eCtx.OnGetRawOutputDataConfig().Return(v1alpha1.RawOutputDataConfig{
RawOutputDataConfig: &admin.RawOutputDataConfig{OutputLocationPrefix: ""},
})

branchTakenNodeID := "branchTakenNode"
branchTakenNode := &mocks.ExecutableNode{}
Expand Down
Loading