Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Introducing FlytePropeller TURBO Mode #minor #219

Merged
merged 9 commits into from
Feb 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cmd/kubectl-flyte/cmd/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ func (g *GetOpts) iterateOverWorkflows(f func(*v1alpha1.FlyteWorkflow) error, ba
return err
}
for _, w := range wList.Items {
if err := f(&w); err != nil {
_w := w
if err := f(&_w); err != nil {
return err
}
counter++
Expand Down Expand Up @@ -127,7 +128,8 @@ func (g *GetOpts) iterateOverQuotas(f func(quota *v12.ResourceQuota) error, batc
return err
}
for _, r := range rq.Items {
if err := f(&r); err != nil {
_r := r
if err := f(&_r); err != nil {
return err
}
counter++
Expand Down
3 changes: 1 addition & 2 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ tasks:
- container
- sidecar
- K8S-ARRAY
- qubole-hive-executor
# Uncomment to enable sagemaker plugin
# - sagemaker_training
# - sagemaker_hyperparameter_tuning
Expand Down Expand Up @@ -95,7 +94,7 @@ event:
rate: 500
capacity: 1000
admin:
endpoint: localhost:30081
endpoint: localhost:80
insecure: true
catalog-cache:
type: noop
Expand Down
2 changes: 1 addition & 1 deletion pkg/compiler/errors/compiler_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ const (
func NewBranchNodeNotSpecified(branchNodeID string) *CompileError {
return newError(
BranchNodeIDNotFound,
fmt.Sprintf("BranchNode not assigned"),
"BranchNode not assigned",
branchNodeID,
)
}
Expand Down
14 changes: 8 additions & 6 deletions pkg/compiler/validators/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,15 @@ func ValidateUnderlyingInterface(w c.WorkflowBuilder, node c.NodeBuilder, errs e

// Compute exposed inputs as the union of all required inputs and any input overwritten by the node.
exposedInputs := map[string]*core.Variable{}
for name, p := range inputs.Parameters {
if p.GetRequired() {
exposedInputs[name] = p.Var
} else if containsBindingByVariableName(node.GetInputs(), name) {
exposedInputs[name] = p.Var
if inputs != nil && inputs.Parameters != nil {
for name, p := range inputs.Parameters {
if p.GetRequired() {
exposedInputs[name] = p.Var
} else if containsBindingByVariableName(node.GetInputs(), name) {
exposedInputs[name] = p.Var
}
// else, the param has a default value and is not being overwritten by the node
}
// else, the param has a default value and is not being overwritten by the node
}

iface = &core.TypedInterface{
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ var (
MaxNodeRetriesOnSystemFailures: 3,
InterruptibleFailureThreshold: 1,
},
MaxStreakLength: 5, // Turbo mode is enabled by default
}
)

Expand Down Expand Up @@ -74,6 +75,7 @@ type Config struct {
MaxDatasetSizeBytes int64 `json:"max-output-size-bytes" pflag:",Maximum size of outputs per task"`
KubeConfig KubeClientConfig `json:"kube-client-config" pflag:",Configuration to control the Kubernetes client"`
NodeConfig NodeConfig `json:"node-config,omitempty" pflag:",config for a workflow node"`
MaxStreakLength int `json:"max-streak-length" pflag:",Maximum number of consecutive rounds that one propeller worker can use for one workflow - >1 => turbo-mode is enabled."`
}

type KubeClientConfig struct {
Expand Down
91 changes: 62 additions & 29 deletions pkg/controller/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type propellerMetrics struct {
PanicObserved labeled.Counter
RoundSkipped prometheus.Counter
WorkflowNotFound prometheus.Counter
StreakLength labeled.Counter
RoundTime labeled.StopWatch
}

func newPropellerMetrics(scope promutils.Scope) *propellerMetrics {
Expand All @@ -45,6 +47,8 @@ func newPropellerMetrics(scope promutils.Scope) *propellerMetrics {
PanicObserved: labeled.NewCounter("panic", "Panic during handling or aborting workflow", roundScope, labeled.EmitUnlabeledMetric),
RoundSkipped: roundScope.MustNewCounter("skipped", "Round Skipped because of stale workflow"),
WorkflowNotFound: roundScope.MustNewCounter("not_found", "workflow not found in the cache"),
StreakLength: labeled.NewCounter("streak_length", "Number of consecutive rounds used in fast follow mode", roundScope, labeled.EmitUnlabeledMetric),
RoundTime: labeled.NewStopWatch("round_time", "Total time taken by one round traversing, copying and storing a workflow", time.Millisecond, roundScope, labeled.EmitUnlabeledMetric),
}
}

Expand Down Expand Up @@ -180,40 +184,69 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error {
return nil
}
}
streak := 0
defer p.metrics.StreakLength.Add(ctx, float64(streak))

mutatedWf, err := p.TryMutateWorkflow(ctx, w)
if err != nil {
// NOTE We are overriding the deepcopy here, as we are essentially ingnoring all mutations
// We only want to increase failed attempts and discard any other partial changes to the CRD.
mutatedWf = RecordSystemError(w, err)
p.metrics.SystemError.Inc(ctx)
} else if mutatedWf == nil {
return nil
} else {
if !w.GetExecutionStatus().IsTerminated() {
// No updates in the status we detected, we will skip writing to KubeAPI
if mutatedWf.Status.Equals(&w.Status) {
logger.Info(ctx, "WF hasn't been updated in this round.")
return nil
maxLength := p.cfg.MaxStreakLength
if maxLength <= 0 {
maxLength = 1
}

for streak = 0; streak < maxLength; streak++ {
t := p.metrics.RoundTime.Start(ctx)
mutatedWf, err := p.TryMutateWorkflow(ctx, w)
EngHabu marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
// NOTE We are overriding the deepcopy here, as we are essentially ingnoring all mutations
// We only want to increase failed attempts and discard any other partial changes to the CRD.
mutatedWf = RecordSystemError(w, err)
p.metrics.SystemError.Inc(ctx)
} else if mutatedWf == nil {
logger.Errorf(ctx, "Should not happen! Mutation resulted in a nil workflow!")
return nil
EngHabu marked this conversation as resolved.
Show resolved Hide resolved
} else {
if !w.GetExecutionStatus().IsTerminated() {
// No updates in the status we detected, we will skip writing to KubeAPI
if mutatedWf.Status.Equals(&w.Status) {
logger.Info(ctx, "WF hasn't been updated in this round.")
t.Stop()
return nil
}
}
if mutatedWf.GetExecutionStatus().IsTerminated() {
// If the end result is a terminated workflow, we remove the labels
// We add a completed label so that we can avoid polling for this workflow
SetCompletedLabel(mutatedWf, time.Now())
ResetFinalizers(mutatedWf)
}
}
if mutatedWf.GetExecutionStatus().IsTerminated() {
// If the end result is a terminated workflow, we remove the labels
// We add a completed label so that we can avoid polling for this workflow
SetCompletedLabel(mutatedWf, time.Now())
ResetFinalizers(mutatedWf)
// TODO we will need to call updatestatus when it is supported. But to preserve metadata like (label/finalizer) we will need to use update

// update the GetExecutionStatus block of the FlyteWorkflow resource. UpdateStatus will not
// allow changes to the Spec of the resource, which is ideal for ensuring
// nothing other than resource status has been updated.
newWf, updateErr := p.wfStore.Update(ctx, mutatedWf, workflowstore.PriorityClassCritical)
EngHabu marked this conversation as resolved.
Show resolved Hide resolved
if updateErr != nil {
t.Stop()
return updateErr
EngHabu marked this conversation as resolved.
Show resolved Hide resolved
}
if err != nil {
t.Stop()
// An error was encountered during the round. Let us return, so that we can back-off gracefully
return err
}
if mutatedWf.GetExecutionStatus().IsTerminated() || newWf.ResourceVersion == mutatedWf.ResourceVersion {
// Workflow is terminated (no need to continue) or no status was changed, we can wait
logger.Infof(ctx, "Will not fast follow, Reason: Wf terminated? %v, Version matched? %v",
mutatedWf.GetExecutionStatus().IsTerminated(), newWf.ResourceVersion == mutatedWf.ResourceVersion)
t.Stop()
return nil
}
logger.Infof(ctx, "FastFollow Enabled. Detected State change, we will try another round. StreakLength [%d]", streak)
w = newWf
t.Stop()
}
// TODO we will need to call updatestatus when it is supported. But to preserve metadata like (label/finalizer) we will need to use update

// update the GetExecutionStatus block of the FlyteWorkflow resource. UpdateStatus will not
// allow changes to the Spec of the resource, which is ideal for ensuring
// nothing other than resource status has been updated.
_, updateErr := p.wfStore.Update(ctx, mutatedWf, workflowstore.PriorityClassCritical)
if updateErr != nil {
return updateErr
}
return err
logger.Infof(ctx, "Streak ended at [%d]/Max: [%d]", streak, maxLength)
return nil
}

func NewPropellerHandler(_ context.Context, cfg *config.Config, wfStore workflowstore.FlyteWorkflow, executor executors.Workflow, scope promutils.Scope) *Propeller {
Expand Down
Loading