Skip to content

Commit

Permalink
Raw output prefix (flyteorg#169)
Browse files Browse the repository at this point in the history
* attempt to bring in IDL change, and add to workflow struct

* go sum

* changes

* spelling

* make generate

* just use meta

* unit tests

* wip

* wip

* make goimports

* wip

* add comment in config.yaml

* Revert local changes

Co-authored-by: Haytham AbuelFutuh <[email protected]>
  • Loading branch information
wild-endeavor and EngHabu authored Aug 12, 2020
1 parent dea621d commit 97bb6b9
Show file tree
Hide file tree
Showing 17 changed files with 328 additions and 21 deletions.
5 changes: 3 additions & 2 deletions flytepropeller/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ tasks:
- container
- K8S-ARRAY
- qubole-hive-executor
- sagemaker_training
- sagemaker_hyperparameter_tuning
# Uncomment to enable sagemaker plugin
# - sagemaker_training
# - sagemaker_hyperparameter_tuning
# Sample plugins config
plugins:
# All k8s plugins default configuration
Expand Down
2 changes: 1 addition & 1 deletion flytepropeller/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/grpc-ecosystem/grpc-gateway v1.14.3 // indirect
github.com/imdario/mergo v0.3.8 // indirect
github.com/lyft/datacatalog v0.2.1
github.com/lyft/flyteidl v0.18.0
github.com/lyft/flyteidl v0.18.1
github.com/lyft/flyteplugins v0.4.4
github.com/lyft/flytestdlib v0.3.9
github.com/magiconair/properties v1.8.1
Expand Down
4 changes: 2 additions & 2 deletions flytepropeller/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -398,8 +398,8 @@ github.com/lyft/datacatalog v0.2.1/go.mod h1:ktrPvzTDUwHO5Lv0hLH38zLHnOJ++rGoAO0
github.com/lyft/flyteidl v0.17.0/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.18.0 h1:f4yv1MafE26wpMC6QlthM02EeTEDXpy/waL54dRDiSs=
github.com/lyft/flyteidl v0.18.0/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteplugins v0.4.2 h1:DUyvi7PkJtQ+WV5ZlVypIfJOJOL3THn6QJgh5g24kG4=
github.com/lyft/flyteplugins v0.4.2/go.mod h1:8zhqFG9BzbHNQGEXzGYltTJLD+KTmQZkanxXgeFI25c=
github.com/lyft/flyteidl v0.18.1 h1:COKkZi5k6bQvUYOk5gE70+FJX9/NUn0WOQ1uMrw3Qio=
github.com/lyft/flyteidl v0.18.1/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteplugins v0.4.4 h1:2tFBAtcxjd81wVByI5yVSIBKJ/UECk7XQK3F1XzttNA=
github.com/lyft/flyteplugins v0.4.4/go.mod h1:8zhqFG9BzbHNQGEXzGYltTJLD+KTmQZkanxXgeFI25c=
github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
Expand Down
15 changes: 15 additions & 0 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/admin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package v1alpha1

import "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"

// This contains an OutputLocationPrefix. When running against AWS, this should be something of the form
// s3://my-bucket, or s3://my-bucket/ A sharding string will automatically be appended to this prefix before
// handing off to plugins/tasks. Sharding behavior may change in the future.
// Background available at https://github.com/lyft/flyte/issues/211
type RawOutputDataConfig struct {
*admin.RawOutputDataConfig
}

func (in *RawOutputDataConfig) DeepCopyInto(out *RawOutputDataConfig) {
*out = *in
}
15 changes: 15 additions & 0 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/admin_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package v1alpha1

import (
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/stretchr/testify/assert"

"testing"
)

func TestRawOutputConfig(t *testing.T) {
r := RawOutputDataConfig{&admin.RawOutputDataConfig{
OutputLocationPrefix: "s3://bucket",
}}
assert.Equal(t, "s3://bucket", r.OutputLocationPrefix)
}
1 change: 1 addition & 0 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ type Meta interface {
GetName() string
GetServiceAccountName() string
IsInterruptible() bool
GetRawOutputDataConfig() RawOutputDataConfig
}

type TaskDetailsGetter interface {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks/Meta.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 12 additions & 2 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,14 @@ type FlyteWorkflow struct {
ServiceAccountName string `json:"serviceAccountName,omitempty" protobuf:"bytes,8,opt,name=serviceAccountName"`
// Status is the only mutable section in the workflow. It holds all the execution information
Status WorkflowStatus `json:"status,omitempty"`

// non-Serialized fields
// RawOutputDataConfig defines the configurations to use for generating raw outputs (e.g. blobs, schemas).
RawOutputDataConfig RawOutputDataConfig `json:"rawOutputDataConfig,omitempty"`

// non-Serialized fields (these will not get written to etcd)
// As of 2020-07, the only real implementation of this interface is a URLPathConstructor, which is just an empty
// struct. However, because this field is an interface, we create it once when the crd is hydrated from etcd,
// so that it can be used downstream without any confusion.
// This field is here because it's easier to put it here than pipe through a new object through all of propeller.
DataReferenceConstructor storage.ReferenceConstructor `json:"-"`
}

Expand Down Expand Up @@ -110,6 +116,10 @@ func (in *FlyteWorkflow) IsInterruptible() bool {
return in.NodeDefaults.Interruptible
}

func (in *FlyteWorkflow) GetRawOutputDataConfig() RawOutputDataConfig {
return in.RawOutputDataConfig
}

type Inputs struct {
*core.LiteralMap
}
Expand Down
32 changes: 32 additions & 0 deletions flytepropeller/pkg/controller/executors/mocks/execution_context.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions flytepropeller/pkg/controller/nodes/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"testing"
"time"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/event"
"github.com/lyft/flytestdlib/promutils/labeled"
"github.com/lyft/flytestdlib/storage"
Expand Down Expand Up @@ -193,6 +195,9 @@ func TestNodeExecutor_RecursiveNodeHandler_RecurseStartNodes(t *testing.T) {
},
},
DataReferenceConstructor: store,
RawOutputDataConfig: v1alpha1.RawOutputDataConfig{
RawOutputDataConfig: &admin.RawOutputDataConfig{OutputLocationPrefix: ""},
},
}, startNode, startNodeStatus

}
Expand Down Expand Up @@ -292,6 +297,9 @@ func TestNodeExecutor_RecursiveNodeHandler_RecurseEndNode(t *testing.T) {
},
},
DataReferenceConstructor: store,
RawOutputDataConfig: v1alpha1.RawOutputDataConfig{
RawOutputDataConfig: &admin.RawOutputDataConfig{OutputLocationPrefix: ""},
},
}, n, ns

}
Expand Down Expand Up @@ -377,6 +385,9 @@ func TestNodeExecutor_RecursiveNodeHandler_RecurseEndNode(t *testing.T) {
},
},
DataReferenceConstructor: store,
RawOutputDataConfig: v1alpha1.RawOutputDataConfig{
RawOutputDataConfig: &admin.RawOutputDataConfig{OutputLocationPrefix: ""},
},
}, n, ns

}
Expand Down Expand Up @@ -507,6 +518,9 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) {
},
},
DataReferenceConstructor: store,
RawOutputDataConfig: v1alpha1.RawOutputDataConfig{
RawOutputDataConfig: &admin.RawOutputDataConfig{OutputLocationPrefix: ""},
},
}, n, ns

}
Expand Down Expand Up @@ -599,6 +613,9 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) {
mockWf.OnGetLabels().Return(make(map[string]string))
mockWf.OnIsInterruptible().Return(false)
mockWf.OnGetOnFailurePolicy().Return(v1alpha1.WorkflowOnFailurePolicy(core.WorkflowMetadata_FAIL_IMMEDIATELY))
mockWf.OnGetRawOutputDataConfig().Return(v1alpha1.RawOutputDataConfig{
RawOutputDataConfig: &admin.RawOutputDataConfig{OutputLocationPrefix: ""},
})
mockWfStatus.OnGetDataDir().Return(storage.DataReference("x"))
mockWfStatus.OnConstructNodeDataDirMatch(mock.Anything, mock.Anything, mock.Anything).Return("x", nil)
return mockWf, mockN2Status
Expand Down Expand Up @@ -1098,6 +1115,9 @@ func TestNodeExecutor_RecursiveNodeHandler_UpstreamNotReady(t *testing.T) {
},
},
DataReferenceConstructor: store,
RawOutputDataConfig: v1alpha1.RawOutputDataConfig{
RawOutputDataConfig: &admin.RawOutputDataConfig{OutputLocationPrefix: ""},
},
}, n, ns

}
Expand Down Expand Up @@ -1210,6 +1230,9 @@ func TestNodeExecutor_RecursiveNodeHandler_BranchNode(t *testing.T) {
eCtx.OnIsInterruptible().Return(true)
eCtx.OnGetExecutionID().Return(v1alpha1.WorkflowExecutionIdentifier{WorkflowExecutionIdentifier: &core.WorkflowExecutionIdentifier{}})
eCtx.OnGetLabels().Return(nil)
eCtx.OnGetRawOutputDataConfig().Return(v1alpha1.RawOutputDataConfig{
RawOutputDataConfig: &admin.RawOutputDataConfig{OutputLocationPrefix: ""},
})

branchTakenNodeID := "branchTakenNode"
branchTakenNode := &mocks.ExecutableNode{}
Expand Down
Loading

0 comments on commit 97bb6b9

Please sign in to comment.