Skip to content

Commit

Permalink
Adding shard, project, and domain labels to crd (flyteorg#357)
Browse files Browse the repository at this point in the history
* added shard, project, and domain labels to crd

Signed-off-by: Daniel Rammer <[email protected]>

* fixed tests

Signed-off-by: Daniel Rammer <[email protected]>

* removed test debugging log output

Signed-off-by: Daniel Rammer <[email protected]>

* setting project / domain labels based on workflow identifer (in case of dynamic workflow)

Signed-off-by: Daniel Rammer <[email protected]>

* fixed dynamic workflow test composedPBStore write size

Signed-off-by: Daniel Rammer <[email protected]>

* fixing random pr comments

Signed-off-by: Daniel Rammer <[email protected]>

* fixed dynamic workflow test composedPBStore write size ... again

Signed-off-by: Daniel Rammer <[email protected]>

* moved ShardKeyspaceSize definition to v1alpha1 package

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored Nov 1, 2021
1 parent 6a9c1d4 commit 990dc8e
Show file tree
Hide file tree
Showing 18 changed files with 112 additions and 21 deletions.
5 changes: 5 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// Defines a non-configurable keyspace size for shard keys. This needs to be a small value because we use label
// selectors to define shard key ranges which do not support range queries. It should only be modified increasingly
// to ensure backward compatibility.
const ShardKeyspaceSize = 32

const StartNodeID = "start-node"
const EndNodeID = "end-node"

Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,24 @@ tasks:
value: testValue2
- key: testKey3
value: testValue3
- key: testKey1
value: testValue1
- key: testKey2
value: testValue2
- key: testKey3
value: testValue3
- key: testKey1
value: testValue1
- key: testKey2
value: testValue2
- key: testKey3
value: testValue3
- key: testKey1
value: testValue1
- key: testKey2
value: testValue2
- key: testKey3
value: testValue3
image: myflytecontainer:abc123
resources: {}
id:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
"namespace": "namespace",
"creationTimestamp": null,
"labels": {
"domain": "domain",
"execution-id": "name",
"project": "hello",
"shard-key": "6",
"workflow-name": "myapp-workflows-cereal-mycereal"
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
"namespace": "namespace",
"creationTimestamp": null,
"labels": {
"domain": "domain",
"execution-id": "name",
"project": "hello",
"shard-key": "6",
"workflow-name": "myapp-workflows-cereal-mycereal"
}
},
Expand Down
3 changes: 3 additions & 0 deletions pkg/compiler/test/testdata/branch/k8s/success_1.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
"namespace": "namespace",
"creationTimestamp": null,
"labels": {
"domain": "domain",
"execution-id": "name",
"project": "hello",
"shard-key": "6",
"workflow-name": "test-serialization-my-wf"
}
},
Expand Down
3 changes: 3 additions & 0 deletions pkg/compiler/test/testdata/branch/k8s/success_10_simple.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
"namespace": "namespace",
"creationTimestamp": null,
"labels": {
"domain": "domain",
"execution-id": "name",
"project": "hello",
"shard-key": "6",
"workflow-name": "core-control-flow-run-conditions-basic-boolean-wf"
}
},
Expand Down
3 changes: 3 additions & 0 deletions pkg/compiler/test/testdata/branch/k8s/success_2.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
"namespace": "namespace",
"creationTimestamp": null,
"labels": {
"domain": "domain",
"execution-id": "name",
"project": "hello",
"shard-key": "6",
"workflow-name": "test-serialization-my-wf"
}
},
Expand Down
3 changes: 3 additions & 0 deletions pkg/compiler/test/testdata/branch/k8s/success_3.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
"namespace": "namespace",
"creationTimestamp": null,
"labels": {
"domain": "domain",
"execution-id": "name",
"project": "hello",
"shard-key": "6",
"workflow-name": "test-serialization-my-wf"
}
},
Expand Down
3 changes: 3 additions & 0 deletions pkg/compiler/test/testdata/branch/k8s/success_4.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
"namespace": "namespace",
"creationTimestamp": null,
"labels": {
"domain": "domain",
"execution-id": "name",
"project": "hello",
"shard-key": "6",
"workflow-name": "test-serialization-my-wf"
}
},
Expand Down
3 changes: 3 additions & 0 deletions pkg/compiler/test/testdata/branch/k8s/success_5.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
"namespace": "namespace",
"creationTimestamp": null,
"labels": {
"domain": "domain",
"execution-id": "name",
"project": "hello",
"shard-key": "6",
"workflow-name": "test-serialization-my-wf"
}
},
Expand Down
3 changes: 3 additions & 0 deletions pkg/compiler/test/testdata/branch/k8s/success_6.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
"namespace": "namespace",
"creationTimestamp": null,
"labels": {
"domain": "domain",
"execution-id": "name",
"project": "hello",
"shard-key": "6",
"workflow-name": "recipes-02-intermediate-run-conditions-multiplier"
}
},
Expand Down
3 changes: 3 additions & 0 deletions pkg/compiler/test/testdata/branch/k8s/success_7_nested.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
"namespace": "namespace",
"creationTimestamp": null,
"labels": {
"domain": "domain",
"execution-id": "name",
"project": "hello",
"shard-key": "6",
"workflow-name": "core-control-flow-run-conditions-so-nested"
}
},
Expand Down
3 changes: 3 additions & 0 deletions pkg/compiler/test/testdata/branch/k8s/success_8_nested.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
"namespace": "namespace",
"creationTimestamp": null,
"labels": {
"domain": "domain",
"execution-id": "name",
"project": "hello",
"shard-key": "6",
"workflow-name": "core-control-flow-run-conditions-so-nested"
}
},
Expand Down
3 changes: 3 additions & 0 deletions pkg/compiler/test/testdata/branch/k8s/success_9_nested.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
"namespace": "namespace",
"creationTimestamp": null,
"labels": {
"domain": "domain",
"execution-id": "name",
"project": "hello",
"shard-key": "6",
"workflow-name": "core-control-flow-run-conditions-consume-outputs"
}
},
Expand Down
49 changes: 35 additions & 14 deletions pkg/compiler/transformers/k8s/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package k8s

import (
"fmt"
"hash/fnv"
"strings"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
Expand All @@ -13,8 +14,22 @@ import (
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const ExecutionIDLabel = "execution-id"
const WorkflowNameLabel = "workflow-name"
const (
// Labels are set on the FlyteWorkflow CRD to aid downstream processing

// The FlyteWorkflow domain according to registration ownership
DomainLabel = "domain"
// A concatenation of project, domain, workflow name, and a unique ID
ExecutionIDLabel = "execution-id"
// The FlyteWorkflow project according to registration ownership
ProjectLabel = "project"
// Shard keys are used during FlytePropeller sharding, this value is set to a hash of the FlyteWorkflow ExecutionID.
// The pseudo-random unique ID component means this value is deterministic for the same ExecutionID, but will vary
// across executions of the same workflow.
ShardKeyLabel = "shard-key"
// The fully qualified FlyteWorkflow name
WorkflowNameLabel = "workflow-name"
)

func requiresInputs(w *core.WorkflowTemplate) bool {
if w == nil || w.GetInterface() == nil || w.GetInterface().GetInputs() == nil ||
Expand Down Expand Up @@ -126,22 +141,18 @@ func withSeparatorIfNotEmpty(value string) string {
}

func generateName(wfID *core.Identifier, execID *core.WorkflowExecutionIdentifier) (
name string, generateName string, label string, err error) {
name string, generateName string, label string, project string, domain string, err error) {

if execID != nil {
return execID.Name, "", execID.Name, nil
return execID.Name, "", execID.Name, execID.Project, execID.Domain, nil
} else if wfID != nil {
wid := fmt.Sprintf("%v%v%v",
withSeparatorIfNotEmpty(wfID.Project),
withSeparatorIfNotEmpty(wfID.Domain),
wfID.Name,
)
wid := fmt.Sprintf("%v%v%v", withSeparatorIfNotEmpty(wfID.Project), withSeparatorIfNotEmpty(wfID.Domain), wfID.Name)

// TODO: this is a hack until we figure out how to restrict generated names. K8s has a limitation of 63 chars
wid = wid[:minInt(32, len(wid))]
return "", fmt.Sprintf("%v-", wid), wid, nil
return "", fmt.Sprintf("%v-", wid), wid, wfID.Project, wfID.Domain, nil
} else {
return "", "", "", fmt.Errorf("expected param not set. wfID or execID must be non-nil values")
return "", "", "", "", "", fmt.Errorf("expected param not set. wfID or execID must be non-nil values")
}
}

Expand Down Expand Up @@ -207,14 +218,24 @@ func BuildFlyteWorkflow(wfClosure *core.CompiledWorkflowClosure, inputs *core.Li
NodeDefaults: v1alpha1.NodeDefaults{Interruptible: interruptible},
}

obj.ObjectMeta.Name, obj.ObjectMeta.GenerateName, obj.ObjectMeta.Labels[ExecutionIDLabel], err =
generateName(wf.GetId(), executionID)

name, generatedName, label, project, domain, err := generateName(wf.GetId(), executionID)
if err != nil {
errs.Collect(errors.NewWorkflowBuildError(err))
}

obj.ObjectMeta.Name = name
obj.ObjectMeta.GenerateName = generatedName
obj.ObjectMeta.Labels[ExecutionIDLabel] = label
obj.ObjectMeta.Labels[ProjectLabel] = project
obj.ObjectMeta.Labels[DomainLabel] = domain
obj.ObjectMeta.Labels[WorkflowNameLabel] = utils.SanitizeLabelValue(WorkflowNameFromID(primarySpec.ID))

h := fnv.New32a()
h.Write([]byte(label))
hash := h.Sum32() % v1alpha1.ShardKeyspaceSize

obj.ObjectMeta.Labels[ShardKeyLabel] = fmt.Sprint(hash)

if obj.Nodes == nil || obj.Connections.Downstream == nil {
// If we come here, we'd better have an error generated earlier. Otherwise, add one to make sure build fails.
if !errs.HasErrors() {
Expand Down
23 changes: 17 additions & 6 deletions pkg/compiler/transformers/k8s/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func TestBuildFlyteWorkflow(t *testing.T) {
assert.True(t, *wf.WorkflowSpec.Nodes["n_1"].Interruptibe)
assert.Nil(t, wf.WorkflowSpec.Nodes[common.StartNodeID].Interruptibe)
assert.Equal(t, "wf-1", wf.Labels[WorkflowNameLabel])
assert.Equal(t, "4", wf.Labels[ShardKeyLabel])
assert.NoError(t, err)
assert.NotNil(t, wf)
errors.SetConfig(errors.Config{})
Expand Down Expand Up @@ -186,12 +187,12 @@ func TestBuildFlyteWorkflow_withInputs(t *testing.T) {

func TestGenerateName(t *testing.T) {
t.Run("Invalid params", func(t *testing.T) {
_, _, _, err := generateName(nil, nil)
_, _, _, _, _, err := generateName(nil, nil)
assert.Error(t, err)
})

t.Run("wfID full", func(t *testing.T) {
name, generateName, _, err := generateName(&core.Identifier{
name, generateName, _, project, domain, err := generateName(&core.Identifier{
Name: "myworkflow",
Project: "myproject",
Domain: "development",
Expand All @@ -200,20 +201,24 @@ func TestGenerateName(t *testing.T) {
assert.NoError(t, err)
assert.Empty(t, name)
assert.Equal(t, "myproject-development-myworkflow-", generateName)
assert.Equal(t, "myproject", project)
assert.Equal(t, "development", domain)
})

t.Run("wfID missing project domain", func(t *testing.T) {
name, generateName, _, err := generateName(&core.Identifier{
name, generateName, _, project, domain, err := generateName(&core.Identifier{
Name: "myworkflow",
}, nil)

assert.NoError(t, err)
assert.Empty(t, name)
assert.Equal(t, "myworkflow-", generateName)
assert.Equal(t, "", project)
assert.Equal(t, "", domain)
})

t.Run("wfID too long", func(t *testing.T) {
name, generateName, _, err := generateName(&core.Identifier{
name, generateName, _, project, domain, err := generateName(&core.Identifier{
Name: "workflowsomethingsomethingsomething",
Project: "myproject",
Domain: "development",
Expand All @@ -222,10 +227,12 @@ func TestGenerateName(t *testing.T) {
assert.NoError(t, err)
assert.Empty(t, name)
assert.Equal(t, "myproject-development-workflowso-", generateName)
assert.Equal(t, "myproject", project)
assert.Equal(t, "development", domain)
})

t.Run("execID full", func(t *testing.T) {
name, generateName, _, err := generateName(nil, &core.WorkflowExecutionIdentifier{
name, generateName, _, project, domain, err := generateName(nil, &core.WorkflowExecutionIdentifier{
Name: "myexecution",
Project: "myproject",
Domain: "development",
Expand All @@ -234,16 +241,20 @@ func TestGenerateName(t *testing.T) {
assert.NoError(t, err)
assert.Empty(t, generateName)
assert.Equal(t, "myexecution", name)
assert.Equal(t, "myproject", project)
assert.Equal(t, "development", domain)
})

t.Run("execID missing project domain", func(t *testing.T) {
name, generateName, _, err := generateName(nil, &core.WorkflowExecutionIdentifier{
name, generateName, _, project, domain, err := generateName(nil, &core.WorkflowExecutionIdentifier{
Name: "myexecution",
})

assert.NoError(t, err)
assert.Empty(t, generateName)
assert.Equal(t, "myexecution", name)
assert.Equal(t, "", project)
assert.Equal(t, "", domain)
})
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/nodes/dynamic/dynamic_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *t
composedPBStore.OnWriteRawMatch(
mock.MatchedBy(func(ctx context.Context) bool { return true }),
storage.DataReference("s3://my-s3-bucket/foo/bar/futures_compiled.pb"),
int64(1374),
int64(1429),
storage.Options{},
mock.MatchedBy(func(rdr *bytes.Reader) bool { return true })).Return(errors.New("foo"))

Expand Down

0 comments on commit 990dc8e

Please sign in to comment.