diff --git a/pkg/apis/flyteworkflow/v1alpha1/workflow.go b/pkg/apis/flyteworkflow/v1alpha1/workflow.go index 0b4e6f3a31..e2207ac791 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/workflow.go +++ b/pkg/apis/flyteworkflow/v1alpha1/workflow.go @@ -15,6 +15,11 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +// Defines a non-configurable keyspace size for shard keys. This needs to be a small value because we use label +// selectors to define shard key ranges which do not support range queries. It should only be modified increasingly +// to ensure backward compatibility. +const ShardKeyspaceSize = 32 + const StartNodeID = "start-node" const EndNodeID = "end-node" diff --git a/pkg/compiler/test/testdata/app-workflows-work-one-python-task-w-f.pb b/pkg/compiler/test/testdata/app-workflows-work-one-python-task-w-f.pb index 45947eb7de..27701bb175 100755 Binary files a/pkg/compiler/test/testdata/app-workflows-work-one-python-task-w-f.pb and b/pkg/compiler/test/testdata/app-workflows-work-one-python-task-w-f.pb differ diff --git a/pkg/compiler/test/testdata/app-workflows-work-one-python-task-w-f.yaml b/pkg/compiler/test/testdata/app-workflows-work-one-python-task-w-f.yaml index 3398b20ca8..e26baa7968 100644 --- a/pkg/compiler/test/testdata/app-workflows-work-one-python-task-w-f.yaml +++ b/pkg/compiler/test/testdata/app-workflows-work-one-python-task-w-f.yaml @@ -36,6 +36,24 @@ tasks: value: testValue2 - key: testKey3 value: testValue3 + - key: testKey1 + value: testValue1 + - key: testKey2 + value: testValue2 + - key: testKey3 + value: testValue3 + - key: testKey1 + value: testValue1 + - key: testKey2 + value: testValue2 + - key: testKey3 + value: testValue3 + - key: testKey1 + value: testValue1 + - key: testKey2 + value: testValue2 + - key: testKey3 + value: testValue3 image: myflytecontainer:abc123 resources: {} id: diff --git a/pkg/compiler/test/testdata/branch/k8s/5_myapp.workflows.cereal.mycereal_2.json b/pkg/compiler/test/testdata/branch/k8s/5_myapp.workflows.cereal.mycereal_2.json index 8853939217..8cc5207906 100755 --- a/pkg/compiler/test/testdata/branch/k8s/5_myapp.workflows.cereal.mycereal_2.json +++ b/pkg/compiler/test/testdata/branch/k8s/5_myapp.workflows.cereal.mycereal_2.json @@ -6,7 +6,10 @@ "namespace": "namespace", "creationTimestamp": null, "labels": { + "domain": "domain", "execution-id": "name", + "project": "hello", + "shard-key": "6", "workflow-name": "myapp-workflows-cereal-mycereal" } }, diff --git a/pkg/compiler/test/testdata/branch/k8s/mycereal_condition_has_no_deps.json b/pkg/compiler/test/testdata/branch/k8s/mycereal_condition_has_no_deps.json index 237db30930..0ee4c2d0bf 100755 --- a/pkg/compiler/test/testdata/branch/k8s/mycereal_condition_has_no_deps.json +++ b/pkg/compiler/test/testdata/branch/k8s/mycereal_condition_has_no_deps.json @@ -6,7 +6,10 @@ "namespace": "namespace", "creationTimestamp": null, "labels": { + "domain": "domain", "execution-id": "name", + "project": "hello", + "shard-key": "6", "workflow-name": "myapp-workflows-cereal-mycereal" } }, diff --git a/pkg/compiler/test/testdata/branch/k8s/success_1.json b/pkg/compiler/test/testdata/branch/k8s/success_1.json index 499c9a58e2..bde5655859 100755 --- a/pkg/compiler/test/testdata/branch/k8s/success_1.json +++ b/pkg/compiler/test/testdata/branch/k8s/success_1.json @@ -6,7 +6,10 @@ "namespace": "namespace", "creationTimestamp": null, "labels": { + "domain": "domain", "execution-id": "name", + "project": "hello", + "shard-key": "6", "workflow-name": "test-serialization-my-wf" } }, diff --git a/pkg/compiler/test/testdata/branch/k8s/success_10_simple.json b/pkg/compiler/test/testdata/branch/k8s/success_10_simple.json index 82a336bb46..41ba4aa4e2 100755 --- a/pkg/compiler/test/testdata/branch/k8s/success_10_simple.json +++ b/pkg/compiler/test/testdata/branch/k8s/success_10_simple.json @@ -6,7 +6,10 @@ "namespace": "namespace", "creationTimestamp": null, "labels": { + "domain": "domain", "execution-id": "name", + "project": "hello", + "shard-key": "6", "workflow-name": "core-control-flow-run-conditions-basic-boolean-wf" } }, diff --git a/pkg/compiler/test/testdata/branch/k8s/success_2.json b/pkg/compiler/test/testdata/branch/k8s/success_2.json index c5122edb2a..c988bf1a7a 100755 --- a/pkg/compiler/test/testdata/branch/k8s/success_2.json +++ b/pkg/compiler/test/testdata/branch/k8s/success_2.json @@ -6,7 +6,10 @@ "namespace": "namespace", "creationTimestamp": null, "labels": { + "domain": "domain", "execution-id": "name", + "project": "hello", + "shard-key": "6", "workflow-name": "test-serialization-my-wf" } }, diff --git a/pkg/compiler/test/testdata/branch/k8s/success_3.json b/pkg/compiler/test/testdata/branch/k8s/success_3.json index fb1f954e00..df79f8c497 100755 --- a/pkg/compiler/test/testdata/branch/k8s/success_3.json +++ b/pkg/compiler/test/testdata/branch/k8s/success_3.json @@ -6,7 +6,10 @@ "namespace": "namespace", "creationTimestamp": null, "labels": { + "domain": "domain", "execution-id": "name", + "project": "hello", + "shard-key": "6", "workflow-name": "test-serialization-my-wf" } }, diff --git a/pkg/compiler/test/testdata/branch/k8s/success_4.json b/pkg/compiler/test/testdata/branch/k8s/success_4.json index 21eef4ec3d..4031561e4d 100755 --- a/pkg/compiler/test/testdata/branch/k8s/success_4.json +++ b/pkg/compiler/test/testdata/branch/k8s/success_4.json @@ -6,7 +6,10 @@ "namespace": "namespace", "creationTimestamp": null, "labels": { + "domain": "domain", "execution-id": "name", + "project": "hello", + "shard-key": "6", "workflow-name": "test-serialization-my-wf" } }, diff --git a/pkg/compiler/test/testdata/branch/k8s/success_5.json b/pkg/compiler/test/testdata/branch/k8s/success_5.json index 2dae17f15a..7ac8d0c458 100755 --- a/pkg/compiler/test/testdata/branch/k8s/success_5.json +++ b/pkg/compiler/test/testdata/branch/k8s/success_5.json @@ -6,7 +6,10 @@ "namespace": "namespace", "creationTimestamp": null, "labels": { + "domain": "domain", "execution-id": "name", + "project": "hello", + "shard-key": "6", "workflow-name": "test-serialization-my-wf" } }, diff --git a/pkg/compiler/test/testdata/branch/k8s/success_6.json b/pkg/compiler/test/testdata/branch/k8s/success_6.json index 4090c61999..ae90069b0d 100755 --- a/pkg/compiler/test/testdata/branch/k8s/success_6.json +++ b/pkg/compiler/test/testdata/branch/k8s/success_6.json @@ -6,7 +6,10 @@ "namespace": "namespace", "creationTimestamp": null, "labels": { + "domain": "domain", "execution-id": "name", + "project": "hello", + "shard-key": "6", "workflow-name": "recipes-02-intermediate-run-conditions-multiplier" } }, diff --git a/pkg/compiler/test/testdata/branch/k8s/success_7_nested.json b/pkg/compiler/test/testdata/branch/k8s/success_7_nested.json index bf3dadff4d..1001fe1f48 100755 --- a/pkg/compiler/test/testdata/branch/k8s/success_7_nested.json +++ b/pkg/compiler/test/testdata/branch/k8s/success_7_nested.json @@ -6,7 +6,10 @@ "namespace": "namespace", "creationTimestamp": null, "labels": { + "domain": "domain", "execution-id": "name", + "project": "hello", + "shard-key": "6", "workflow-name": "core-control-flow-run-conditions-so-nested" } }, diff --git a/pkg/compiler/test/testdata/branch/k8s/success_8_nested.json b/pkg/compiler/test/testdata/branch/k8s/success_8_nested.json index c4ea2abb3e..520417759f 100755 --- a/pkg/compiler/test/testdata/branch/k8s/success_8_nested.json +++ b/pkg/compiler/test/testdata/branch/k8s/success_8_nested.json @@ -6,7 +6,10 @@ "namespace": "namespace", "creationTimestamp": null, "labels": { + "domain": "domain", "execution-id": "name", + "project": "hello", + "shard-key": "6", "workflow-name": "core-control-flow-run-conditions-so-nested" } }, diff --git a/pkg/compiler/test/testdata/branch/k8s/success_9_nested.json b/pkg/compiler/test/testdata/branch/k8s/success_9_nested.json index 82ade925fd..40cca30e81 100755 --- a/pkg/compiler/test/testdata/branch/k8s/success_9_nested.json +++ b/pkg/compiler/test/testdata/branch/k8s/success_9_nested.json @@ -6,7 +6,10 @@ "namespace": "namespace", "creationTimestamp": null, "labels": { + "domain": "domain", "execution-id": "name", + "project": "hello", + "shard-key": "6", "workflow-name": "core-control-flow-run-conditions-consume-outputs" } }, diff --git a/pkg/compiler/transformers/k8s/workflow.go b/pkg/compiler/transformers/k8s/workflow.go index a6f92aa59c..95e3db718d 100644 --- a/pkg/compiler/transformers/k8s/workflow.go +++ b/pkg/compiler/transformers/k8s/workflow.go @@ -3,6 +3,7 @@ package k8s import ( "fmt" + "hash/fnv" "strings" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" @@ -13,8 +14,22 @@ import ( v1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -const ExecutionIDLabel = "execution-id" -const WorkflowNameLabel = "workflow-name" +const ( + // Labels are set on the FlyteWorkflow CRD to aid downstream processing + + // The FlyteWorkflow domain according to registration ownership + DomainLabel = "domain" + // A concatenation of project, domain, workflow name, and a unique ID + ExecutionIDLabel = "execution-id" + // The FlyteWorkflow project according to registration ownership + ProjectLabel = "project" + // Shard keys are used during FlytePropeller sharding, this value is set to a hash of the FlyteWorkflow ExecutionID. + // The pseudo-random unique ID component means this value is deterministic for the same ExecutionID, but will vary + // across executions of the same workflow. + ShardKeyLabel = "shard-key" + // The fully qualified FlyteWorkflow name + WorkflowNameLabel = "workflow-name" +) func requiresInputs(w *core.WorkflowTemplate) bool { if w == nil || w.GetInterface() == nil || w.GetInterface().GetInputs() == nil || @@ -126,22 +141,18 @@ func withSeparatorIfNotEmpty(value string) string { } func generateName(wfID *core.Identifier, execID *core.WorkflowExecutionIdentifier) ( - name string, generateName string, label string, err error) { + name string, generateName string, label string, project string, domain string, err error) { if execID != nil { - return execID.Name, "", execID.Name, nil + return execID.Name, "", execID.Name, execID.Project, execID.Domain, nil } else if wfID != nil { - wid := fmt.Sprintf("%v%v%v", - withSeparatorIfNotEmpty(wfID.Project), - withSeparatorIfNotEmpty(wfID.Domain), - wfID.Name, - ) + wid := fmt.Sprintf("%v%v%v", withSeparatorIfNotEmpty(wfID.Project), withSeparatorIfNotEmpty(wfID.Domain), wfID.Name) // TODO: this is a hack until we figure out how to restrict generated names. K8s has a limitation of 63 chars wid = wid[:minInt(32, len(wid))] - return "", fmt.Sprintf("%v-", wid), wid, nil + return "", fmt.Sprintf("%v-", wid), wid, wfID.Project, wfID.Domain, nil } else { - return "", "", "", fmt.Errorf("expected param not set. wfID or execID must be non-nil values") + return "", "", "", "", "", fmt.Errorf("expected param not set. wfID or execID must be non-nil values") } } @@ -207,14 +218,24 @@ func BuildFlyteWorkflow(wfClosure *core.CompiledWorkflowClosure, inputs *core.Li NodeDefaults: v1alpha1.NodeDefaults{Interruptible: interruptible}, } - obj.ObjectMeta.Name, obj.ObjectMeta.GenerateName, obj.ObjectMeta.Labels[ExecutionIDLabel], err = - generateName(wf.GetId(), executionID) - + name, generatedName, label, project, domain, err := generateName(wf.GetId(), executionID) if err != nil { errs.Collect(errors.NewWorkflowBuildError(err)) } + + obj.ObjectMeta.Name = name + obj.ObjectMeta.GenerateName = generatedName + obj.ObjectMeta.Labels[ExecutionIDLabel] = label + obj.ObjectMeta.Labels[ProjectLabel] = project + obj.ObjectMeta.Labels[DomainLabel] = domain obj.ObjectMeta.Labels[WorkflowNameLabel] = utils.SanitizeLabelValue(WorkflowNameFromID(primarySpec.ID)) + h := fnv.New32a() + h.Write([]byte(label)) + hash := h.Sum32() % v1alpha1.ShardKeyspaceSize + + obj.ObjectMeta.Labels[ShardKeyLabel] = fmt.Sprint(hash) + if obj.Nodes == nil || obj.Connections.Downstream == nil { // If we come here, we'd better have an error generated earlier. Otherwise, add one to make sure build fails. if !errs.HasErrors() { diff --git a/pkg/compiler/transformers/k8s/workflow_test.go b/pkg/compiler/transformers/k8s/workflow_test.go index b52594ff88..8b0fb41a8f 100644 --- a/pkg/compiler/transformers/k8s/workflow_test.go +++ b/pkg/compiler/transformers/k8s/workflow_test.go @@ -116,6 +116,7 @@ func TestBuildFlyteWorkflow(t *testing.T) { assert.True(t, *wf.WorkflowSpec.Nodes["n_1"].Interruptibe) assert.Nil(t, wf.WorkflowSpec.Nodes[common.StartNodeID].Interruptibe) assert.Equal(t, "wf-1", wf.Labels[WorkflowNameLabel]) + assert.Equal(t, "4", wf.Labels[ShardKeyLabel]) assert.NoError(t, err) assert.NotNil(t, wf) errors.SetConfig(errors.Config{}) @@ -186,12 +187,12 @@ func TestBuildFlyteWorkflow_withInputs(t *testing.T) { func TestGenerateName(t *testing.T) { t.Run("Invalid params", func(t *testing.T) { - _, _, _, err := generateName(nil, nil) + _, _, _, _, _, err := generateName(nil, nil) assert.Error(t, err) }) t.Run("wfID full", func(t *testing.T) { - name, generateName, _, err := generateName(&core.Identifier{ + name, generateName, _, project, domain, err := generateName(&core.Identifier{ Name: "myworkflow", Project: "myproject", Domain: "development", @@ -200,20 +201,24 @@ func TestGenerateName(t *testing.T) { assert.NoError(t, err) assert.Empty(t, name) assert.Equal(t, "myproject-development-myworkflow-", generateName) + assert.Equal(t, "myproject", project) + assert.Equal(t, "development", domain) }) t.Run("wfID missing project domain", func(t *testing.T) { - name, generateName, _, err := generateName(&core.Identifier{ + name, generateName, _, project, domain, err := generateName(&core.Identifier{ Name: "myworkflow", }, nil) assert.NoError(t, err) assert.Empty(t, name) assert.Equal(t, "myworkflow-", generateName) + assert.Equal(t, "", project) + assert.Equal(t, "", domain) }) t.Run("wfID too long", func(t *testing.T) { - name, generateName, _, err := generateName(&core.Identifier{ + name, generateName, _, project, domain, err := generateName(&core.Identifier{ Name: "workflowsomethingsomethingsomething", Project: "myproject", Domain: "development", @@ -222,10 +227,12 @@ func TestGenerateName(t *testing.T) { assert.NoError(t, err) assert.Empty(t, name) assert.Equal(t, "myproject-development-workflowso-", generateName) + assert.Equal(t, "myproject", project) + assert.Equal(t, "development", domain) }) t.Run("execID full", func(t *testing.T) { - name, generateName, _, err := generateName(nil, &core.WorkflowExecutionIdentifier{ + name, generateName, _, project, domain, err := generateName(nil, &core.WorkflowExecutionIdentifier{ Name: "myexecution", Project: "myproject", Domain: "development", @@ -234,16 +241,20 @@ func TestGenerateName(t *testing.T) { assert.NoError(t, err) assert.Empty(t, generateName) assert.Equal(t, "myexecution", name) + assert.Equal(t, "myproject", project) + assert.Equal(t, "development", domain) }) t.Run("execID missing project domain", func(t *testing.T) { - name, generateName, _, err := generateName(nil, &core.WorkflowExecutionIdentifier{ + name, generateName, _, project, domain, err := generateName(nil, &core.WorkflowExecutionIdentifier{ Name: "myexecution", }) assert.NoError(t, err) assert.Empty(t, generateName) assert.Equal(t, "myexecution", name) + assert.Equal(t, "", project) + assert.Equal(t, "", domain) }) } diff --git a/pkg/controller/nodes/dynamic/dynamic_workflow_test.go b/pkg/controller/nodes/dynamic/dynamic_workflow_test.go index a65a902695..7bcd527c65 100644 --- a/pkg/controller/nodes/dynamic/dynamic_workflow_test.go +++ b/pkg/controller/nodes/dynamic/dynamic_workflow_test.go @@ -498,7 +498,7 @@ func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *t composedPBStore.OnWriteRawMatch( mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference("s3://my-s3-bucket/foo/bar/futures_compiled.pb"), - int64(1374), + int64(1429), storage.Options{}, mock.MatchedBy(func(rdr *bytes.Reader) bool { return true })).Return(errors.New("foo"))