Skip to content

Commit

Permalink
Supporting interruptible for map tasks (flyteorg#415)
Browse files Browse the repository at this point in the history
* added GetInterruptibleFailureThreshold function to nodeExecMetadata

Signed-off-by: Daniel Rammer <[email protected]>

* fixed unit tests

Signed-off-by: Daniel Rammer <[email protected]>

* fixed lint issue

Signed-off-by: Daniel Rammer <[email protected]>

* Update to released flyteplugins

Signed-off-by: Haytham Abuelfutuh <[email protected]>

Co-authored-by: Haytham Abuelfutuh <[email protected]>
  • Loading branch information
hamersaw and EngHabu authored Apr 11, 2022
1 parent ff9b4e8 commit f9e165b
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 13 deletions.
4 changes: 2 additions & 2 deletions flytepropeller/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ require (
github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.10.0
github.com/flyteorg/flyteidl v0.24.17
github.com/flyteorg/flyteplugins v0.10.19
github.com/flyteorg/flyteidl v0.24.19
github.com/flyteorg/flyteplugins v0.10.23
github.com/flyteorg/flytestdlib v0.4.13
github.com/ghodss/yaml v1.0.0
github.com/go-redis/redis v6.15.7+incompatible
Expand Down
9 changes: 4 additions & 5 deletions flytepropeller/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,10 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv
github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/flyteorg/flyteidl v0.24.7/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.24.17 h1:Xx70bJbuQGyvS8uAyU4AN74rot6KnzJ9r/L9gcCdEsU=
github.com/flyteorg/flyteidl v0.24.17/go.mod h1:vHSugApgS3hRITIafzQDU8DZD/W8wFRfFcgaFU35Dww=
github.com/flyteorg/flyteplugins v0.10.19 h1:9fY3aYXfjVR8jyb4omdWu9RW2FwcmAnld9PHnR0BLW8=
github.com/flyteorg/flyteplugins v0.10.19/go.mod h1:C2va2hfD7mBi24dXRhBi0GIKG4dzFhSR27GsCCFDzss=
github.com/flyteorg/flyteidl v0.24.19 h1:9PR0UVe2atWLot0X6dgyiXTMKbut28LJYl4HrcMHl7E=
github.com/flyteorg/flyteidl v0.24.19/go.mod h1:vHSugApgS3hRITIafzQDU8DZD/W8wFRfFcgaFU35Dww=
github.com/flyteorg/flyteplugins v0.10.23 h1:vRTcw+B9bjiCyVsdV6rDuTX4E9JMOy8ZEf9M71fKkeg=
github.com/flyteorg/flyteplugins v0.10.23/go.mod h1:12hTsHaGNKU9BVpTGcxtiL+Zrf5sfDXiDDsPvEO40CQ=
github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220=
github.com/flyteorg/flytestdlib v0.4.13 h1:TzgqhECRGfOHYH1A7rUwcKEEH2rTtPxGy+oYcif7iBw=
github.com/flyteorg/flytestdlib v0.4.13/go.mod h1:fv1ar34LJLMTaf0tbfetisLykUlARi7rP+NQTUn6QQs=
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type NodeExecutionMetadata interface {
GetK8sServiceAccount() string
GetSecurityContext() core.SecurityContext
IsInterruptible() bool
GetInterruptibleFailureThreshold() uint32
}

type NodeExecutionContext interface {
Expand Down
17 changes: 12 additions & 5 deletions flytepropeller/pkg/controller/nodes/node_exec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ const NodeInterruptibleLabel = "interruptible"

type nodeExecMetadata struct {
v1alpha1.Meta
nodeExecID *core.NodeExecutionIdentifier
interrutptible bool
nodeLabels map[string]string
nodeExecID *core.NodeExecutionIdentifier
interrutptible bool
interruptibleFailureThreshold uint32
nodeLabels map[string]string
}

func (e nodeExecMetadata) GetNodeExecutionID() *core.NodeExecutionIdentifier {
Expand All @@ -46,6 +47,10 @@ func (e nodeExecMetadata) IsInterruptible() bool {
return e.interrutptible
}

func (e nodeExecMetadata) GetInterruptibleFailureThreshold() uint32 {
return e.interruptibleFailureThreshold
}

func (e nodeExecMetadata) GetLabels() map[string]string {
return e.nodeLabels
}
Expand Down Expand Up @@ -136,7 +141,7 @@ func (e nodeExecContext) MaxDatasetSizeBytes() int64 {
}

func newNodeExecContext(_ context.Context, store *storage.DataStore, execContext executors.ExecutionContext, nl executors.NodeLookup,
node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus, inputs io.InputReader, interruptible bool,
node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus, inputs io.InputReader, interruptible bool, interruptibleFailureThreshold uint32,
maxDatasetSize int64, er events.TaskEventRecorder, tr handler.TaskReader, nsm *nodeStateManager,
enqueueOwner func() error, rawOutputPrefix storage.DataReference, outputShardSelector ioutils.ShardSelector) *nodeExecContext {

Expand All @@ -146,7 +151,8 @@ func newNodeExecContext(_ context.Context, store *storage.DataStore, execContext
NodeId: node.GetID(),
ExecutionId: execContext.GetExecutionID().WorkflowExecutionIdentifier,
},
interrutptible: interruptible,
interrutptible: interruptible,
interruptibleFailureThreshold: interruptibleFailureThreshold,
}

// Copy the wf labels before adding node specific labels.
Expand Down Expand Up @@ -235,6 +241,7 @@ func (c *nodeExecutor) newNodeExecContextDefault(ctx context.Context, currentNod
),
),
interruptible,
c.interruptibleFailureThreshold,
c.maxDatasetSizeBytes,
&taskEventRecorder{TaskEventRecorder: c.taskRecorder},
tr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func Test_NodeContext(t *testing.T) {
s, _ := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
p := parentInfo{}
execContext := executors.NewExecutionContext(w1, nil, nil, p, nil)
nCtx := newNodeExecContext(context.TODO(), s, execContext, w1, n, nil, nil, false, 0, nil, TaskReader{}, nil, nil, "s3://bucket", ioutils.NewConstantShardSelector([]string{"x"}))
nCtx := newNodeExecContext(context.TODO(), s, execContext, w1, n, nil, nil, false, 0, 2, nil, TaskReader{}, nil, nil, "s3://bucket", ioutils.NewConstantShardSelector([]string{"x"}))
assert.Equal(t, "id", nCtx.NodeExecutionMetadata().GetLabels()["node-id"])
assert.Equal(t, "false", nCtx.NodeExecutionMetadata().GetLabels()["interruptible"])
assert.Equal(t, "task-name", nCtx.NodeExecutionMetadata().GetLabels()["task-name"])
Expand Down

0 comments on commit f9e165b

Please sign in to comment.