Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Limit parallelism of task nodes (#232)
Browse files Browse the repository at this point in the history
* wip: Limit parallelism of task nodes

Signed-off-by: Ketan Umare <[email protected]>

* Unit test fixes

Signed-off-by: Ketan Umare <[email protected]>

* can be deleted

* Setting parallelism to 10 to test

* Updated logic to use task handler to mark parallelism

Signed-off-by: Ketan Umare <[email protected]>

* Test for incr parallelism

Signed-off-by: Ketan Umare <[email protected]>

* linter fix

Signed-off-by: Ketan Umare <[email protected]>

* fixed lint error

Signed-off-by: Ketan Umare <[email protected]>

* updated go.sum

Signed-off-by: Ketan Umare <[email protected]>

* Update pkg/controller/nodes/executor.go

Co-authored-by: Haytham Abuelfutuh <[email protected]>

Co-authored-by: Haytham Abuelfutuh <[email protected]>
Signed-off-by: Ketan Umare <[email protected]>
  • Loading branch information
kumare3 and EngHabu committed May 25, 2021
1 parent 24938cd commit 3b9f57c
Show file tree
Hide file tree
Showing 18 changed files with 414 additions and 59 deletions.
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ Flyte Propeller
===============
[![Current Release](https://img.shields.io/github/release/flyteorg/flytepropeller.svg)](https://github.com/flyteorg/flytepropeller/releases/latest)
![Master](https://github.com/flyteorg/flytepropeller/workflows/Master/badge.svg)
[![GoDoc](https://godoc.org/github.com/lyft/flytepropeller?status.svg)](https://pkg.go.dev/mod/github.com/lyft/flytepropeller)
[![GoDoc](https://godoc.org/github.com/flyteorg/flytepropeller?status.svg)](https://pkg.go.dev/mod/github.com/flyteorg/flytepropeller)
[![License](https://img.shields.io/badge/LICENSE-Apache2.0-ff69b4.svg)](http://www.apache.org/licenses/LICENSE-2.0.html)
[![CodeCoverage](https://img.shields.io/codecov/c/github/flyteorg/flytepropeller.svg)](https://codecov.io/gh/flyteorg/flytepropeller)
[![Go Report Card](https://goreportcard.com/badge/github.com/lyft/flytepropeller)](https://goreportcard.com/report/github.com/lyft/flytepropeller)
![Commit activity](https://img.shields.io/github/commit-activity/w/lyft/flytepropeller.svg?style=plastic)
![Commit since last release](https://img.shields.io/github/commits-since/lyft/flytepropeller/latest.svg?style=plastic)
[![Go Report Card](https://goreportcard.com/badge/github.com/flyteorg/flytepropeller)](https://goreportcard.com/report/github.com/flyteorg/flytepropeller)
![Commit activity](https://img.shields.io/github/commit-activity/w/flyteorg/flytepropeller.svg?style=plastic)
![Commit since last release](https://img.shields.io/github/commits-since/flyteorg/flytepropeller/latest.svg?style=plastic)

Kubernetes operator to executes Flyte graphs natively on kubernetes

Expand Down Expand Up @@ -89,15 +89,15 @@ To delete a specific workflow
$ kubectl-flyte delete --namespace flytekit-development flytekit-development-ff806e973581f4508bf1
```

To delete all completed workflows - they have to be either success/failed with a special isCompleted label set on them. The Label is set `here <https://github.com/lyft/flytepropeller/blob/master/pkg/controller/controller.go#L247>`
To delete all completed workflows - they have to be either success/failed with a special isCompleted label set on them. The Label is set `here <https://github.com/flyteorg/flytepropeller/blob/master/pkg/controller/controller.go#L247>`

```
$ kubectl-flyte delete --namespace flytekit-development --all-completed
```

Running propeller locally
-------------------------
use the config.yaml in root found `here <https://github.com/lyft/flytepropeller/blob/master/config.yaml>`. Cd into this folder and then run
use the config.yaml in root found `here <https://github.com/flyteorg/flytepropeller/blob/master/config.yaml>`. Cd into this folder and then run

```
$ flytepropeller --logtostderr
Expand Down
3 changes: 0 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ github.com/Azure/go-autorest/logger v0.2.0/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ
github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk=
github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo=
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295 h1:xJ0dAkuxJXfwdH7IaSzBEbSQxEDz36YUmt7+CB4zoNA=
Expand Down Expand Up @@ -231,7 +230,6 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv
github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/flyteorg/flyteidl v0.18.15 h1:sXrlwTRaRjQsXYMNrY/S930SKdKtu4XnpNFEu8I4tn4=
github.com/flyteorg/flyteidl v0.18.15/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI=
github.com/flyteorg/flyteidl v0.18.20 h1:OGOb2FOHWL363Qp8uzbJeFbQBKYPT30+afv+8BnBlGs=
github.com/flyteorg/flyteidl v0.18.20/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI=
Expand Down Expand Up @@ -1231,7 +1229,6 @@ honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4 h1:UoveltGrhghAA7ePc+e+QYDHXrBps2PqFZiHkGR/xK8=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
k8s.io/api v0.0.0-20210217171935-8e2decd92398/go.mod h1:60tmSUpHxGPFerNHbo/ayI2lKxvtrhbxFyXuEIWJd78=
k8s.io/api v0.18.2/go.mod h1:SJCWI7OLzhZSvbY7U8zwNl9UA4o1fizoug34OV/2r78=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,16 @@ type RawOutputDataConfig struct {
func (in *RawOutputDataConfig) DeepCopyInto(out *RawOutputDataConfig) {
*out = *in
}

// This contains workflow-execution specifications and overrides.
type ExecutionConfig struct {
// Maps individual task types to their alternate (non-default) plugin handlers by name.
TaskPluginImpls map[string]TaskPluginOverride
// Can be used to control the number of parallel nodes to run within the workflow. This is useful to achieve fairness.
MaxParallelism uint32
}

type TaskPluginOverride struct {
PluginIDs []string
MissingPluginBehavior admin.PluginOverride_MissingPluginBehavior
}
12 changes: 0 additions & 12 deletions pkg/apis/flyteworkflow/v1alpha1/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"k8s.io/apimachinery/pkg/types"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/golang/protobuf/jsonpb"
"github.com/pkg/errors"
Expand Down Expand Up @@ -306,14 +305,3 @@ type FlyteWorkflowList struct {
metav1.ListMeta `json:"metadata"`
Items []FlyteWorkflow `json:"items"`
}

// This contains workflow-execution specifications and overrides.
type ExecutionConfig struct {
// Maps individual task types to their alternate (non-default) plugin handlers by name.
TaskPluginImpls map[string]TaskPluginOverride
}

type TaskPluginOverride struct {
PluginIDs []string
MissingPluginBehavior admin.PluginOverride_MissingPluginBehavior
}
1 change: 1 addition & 0 deletions pkg/apis/flyteworkflow/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 33 additions & 4 deletions pkg/controller/executors/execution_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,21 @@ type ImmutableParentInfo interface {
CurrentAttempt() uint32
}

type ControlFlow interface {
CurrentParallelism() uint32
IncrementParallelism() uint32
}

type ExecutionContext interface {
ImmutableExecutionContext
TaskDetailsGetter
SubWorkflowGetter
ParentInfoGetter
ControlFlow
}

type execContext struct {
ControlFlow
ImmutableExecutionContext
TaskDetailsGetter
SubWorkflowGetter
Expand All @@ -59,24 +66,40 @@ func (p *parentExecutionInfo) CurrentAttempt() uint32 {
return p.currentAttempts
}

type controlFlow struct {
// We could use atomic.Uint32, but this is not required for current Propeller. As every round is run in a single
// thread and using atomic will introduce memory barriers
v uint32
}

func (c *controlFlow) CurrentParallelism() uint32 {
return c.v
}

func (c *controlFlow) IncrementParallelism() uint32 {
c.v = c.v + 1
return c.v
}

func NewExecutionContextWithTasksGetter(prevExecContext ExecutionContext, taskGetter TaskDetailsGetter) ExecutionContext {
return NewExecutionContext(prevExecContext, taskGetter, prevExecContext, prevExecContext.GetParentInfo())
return NewExecutionContext(prevExecContext, taskGetter, prevExecContext, prevExecContext.GetParentInfo(), prevExecContext)
}

func NewExecutionContextWithWorkflowGetter(prevExecContext ExecutionContext, getter SubWorkflowGetter) ExecutionContext {
return NewExecutionContext(prevExecContext, prevExecContext, getter, prevExecContext.GetParentInfo())
return NewExecutionContext(prevExecContext, prevExecContext, getter, prevExecContext.GetParentInfo(), prevExecContext)
}

func NewExecutionContextWithParentInfo(prevExecContext ExecutionContext, parentInfo ImmutableParentInfo) ExecutionContext {
return NewExecutionContext(prevExecContext, prevExecContext, prevExecContext, parentInfo)
return NewExecutionContext(prevExecContext, prevExecContext, prevExecContext, parentInfo, prevExecContext)
}

func NewExecutionContext(immExecContext ImmutableExecutionContext, tasksGetter TaskDetailsGetter, workflowGetter SubWorkflowGetter, parentInfo ImmutableParentInfo) ExecutionContext {
func NewExecutionContext(immExecContext ImmutableExecutionContext, tasksGetter TaskDetailsGetter, workflowGetter SubWorkflowGetter, parentInfo ImmutableParentInfo, flow ControlFlow) ExecutionContext {
return execContext{
ImmutableExecutionContext: immExecContext,
TaskDetailsGetter: tasksGetter,
SubWorkflowGetter: workflowGetter,
parentInfo: parentInfo,
ControlFlow: flow,
}
}

Expand All @@ -86,3 +109,9 @@ func NewParentInfo(uniqueID string, currentAttempts uint32) ImmutableParentInfo
uniqueID: uniqueID,
}
}

func InitializeControlFlow() ControlFlow {
return &controlFlow{
v: 0,
}
}
2 changes: 1 addition & 1 deletion pkg/controller/executors/execution_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestExecutionContext(t *testing.T) {
subWfGetter := subWfGetter{}
immutableParentInfo := immutableParentInfo{}

ec := NewExecutionContext(eCtx, taskGetter, subWfGetter, immutableParentInfo)
ec := NewExecutionContext(eCtx, taskGetter, subWfGetter, immutableParentInfo, InitializeControlFlow())
assert.NotNil(t, ec)
typed := ec.(execContext)
assert.Equal(t, typed.ImmutableExecutionContext, eCtx)
Expand Down
74 changes: 74 additions & 0 deletions pkg/controller/executors/mocks/control_flow.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 64 additions & 0 deletions pkg/controller/executors/mocks/execution_context.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/controller/nodes/dynamic/dynamic_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C
return dynamicWorkflowContext{
isDynamic: true,
subWorkflow: dynamicWf,
execContext: executors.NewExecutionContext(nCtx.ExecutionContext(), dynamicWf, dynamicWf, newParentInfo),
execContext: executors.NewExecutionContext(nCtx.ExecutionContext(), dynamicWf, dynamicWf, newParentInfo, nCtx.ExecutionContext()),
nodeLookup: executors.NewNodeLookup(dynamicWf, dynamicNodeStatus),
}, nil
}
Expand Down
Loading

0 comments on commit 3b9f57c

Please sign in to comment.