Skip to content

Commit

Permalink
Introducing FlytePropeller TURBO Mode #minor (flyteorg#219)
Browse files Browse the repository at this point in the history
  • Loading branch information
kumare3 authored Feb 4, 2021
1 parent 62b2f16 commit d2b5fd5
Show file tree
Hide file tree
Showing 11 changed files with 297 additions and 45 deletions.
6 changes: 4 additions & 2 deletions flytepropeller/cmd/kubectl-flyte/cmd/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ func (g *GetOpts) iterateOverWorkflows(f func(*v1alpha1.FlyteWorkflow) error, ba
return err
}
for _, w := range wList.Items {
if err := f(&w); err != nil {
_w := w
if err := f(&_w); err != nil {
return err
}
counter++
Expand Down Expand Up @@ -127,7 +128,8 @@ func (g *GetOpts) iterateOverQuotas(f func(quota *v12.ResourceQuota) error, batc
return err
}
for _, r := range rq.Items {
if err := f(&r); err != nil {
_r := r
if err := f(&_r); err != nil {
return err
}
counter++
Expand Down
3 changes: 1 addition & 2 deletions flytepropeller/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ tasks:
- container
- sidecar
- K8S-ARRAY
- qubole-hive-executor
# Uncomment to enable sagemaker plugin
# - sagemaker_training
# - sagemaker_hyperparameter_tuning
Expand Down Expand Up @@ -95,7 +94,7 @@ event:
rate: 500
capacity: 1000
admin:
endpoint: localhost:30081
endpoint: localhost:80
insecure: true
catalog-cache:
type: noop
Expand Down
2 changes: 1 addition & 1 deletion flytepropeller/pkg/compiler/errors/compiler_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ const (
func NewBranchNodeNotSpecified(branchNodeID string) *CompileError {
return newError(
BranchNodeIDNotFound,
fmt.Sprintf("BranchNode not assigned"),
"BranchNode not assigned",
branchNodeID,
)
}
Expand Down
14 changes: 8 additions & 6 deletions flytepropeller/pkg/compiler/validators/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,15 @@ func ValidateUnderlyingInterface(w c.WorkflowBuilder, node c.NodeBuilder, errs e

// Compute exposed inputs as the union of all required inputs and any input overwritten by the node.
exposedInputs := map[string]*core.Variable{}
for name, p := range inputs.Parameters {
if p.GetRequired() {
exposedInputs[name] = p.Var
} else if containsBindingByVariableName(node.GetInputs(), name) {
exposedInputs[name] = p.Var
if inputs != nil && inputs.Parameters != nil {
for name, p := range inputs.Parameters {
if p.GetRequired() {
exposedInputs[name] = p.Var
} else if containsBindingByVariableName(node.GetInputs(), name) {
exposedInputs[name] = p.Var
}
// else, the param has a default value and is not being overwritten by the node
}
// else, the param has a default value and is not being overwritten by the node
}

iface = &core.TypedInterface{
Expand Down
2 changes: 2 additions & 0 deletions flytepropeller/pkg/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ var (
MaxNodeRetriesOnSystemFailures: 3,
InterruptibleFailureThreshold: 1,
},
MaxStreakLength: 5, // Turbo mode is enabled by default
}
)

Expand Down Expand Up @@ -74,6 +75,7 @@ type Config struct {
MaxDatasetSizeBytes int64 `json:"max-output-size-bytes" pflag:",Maximum size of outputs per task"`
KubeConfig KubeClientConfig `json:"kube-client-config" pflag:",Configuration to control the Kubernetes client"`
NodeConfig NodeConfig `json:"node-config,omitempty" pflag:",config for a workflow node"`
MaxStreakLength int `json:"max-streak-length" pflag:",Maximum number of consecutive rounds that one propeller worker can use for one workflow - >1 => turbo-mode is enabled."`
}

type KubeClientConfig struct {
Expand Down
91 changes: 62 additions & 29 deletions flytepropeller/pkg/controller/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type propellerMetrics struct {
PanicObserved labeled.Counter
RoundSkipped prometheus.Counter
WorkflowNotFound prometheus.Counter
StreakLength labeled.Counter
RoundTime labeled.StopWatch
}

func newPropellerMetrics(scope promutils.Scope) *propellerMetrics {
Expand All @@ -45,6 +47,8 @@ func newPropellerMetrics(scope promutils.Scope) *propellerMetrics {
PanicObserved: labeled.NewCounter("panic", "Panic during handling or aborting workflow", roundScope, labeled.EmitUnlabeledMetric),
RoundSkipped: roundScope.MustNewCounter("skipped", "Round Skipped because of stale workflow"),
WorkflowNotFound: roundScope.MustNewCounter("not_found", "workflow not found in the cache"),
StreakLength: labeled.NewCounter("streak_length", "Number of consecutive rounds used in fast follow mode", roundScope, labeled.EmitUnlabeledMetric),
RoundTime: labeled.NewStopWatch("round_time", "Total time taken by one round traversing, copying and storing a workflow", time.Millisecond, roundScope, labeled.EmitUnlabeledMetric),
}
}

Expand Down Expand Up @@ -180,40 +184,69 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error {
return nil
}
}
streak := 0
defer p.metrics.StreakLength.Add(ctx, float64(streak))

mutatedWf, err := p.TryMutateWorkflow(ctx, w)
if err != nil {
// NOTE We are overriding the deepcopy here, as we are essentially ingnoring all mutations
// We only want to increase failed attempts and discard any other partial changes to the CRD.
mutatedWf = RecordSystemError(w, err)
p.metrics.SystemError.Inc(ctx)
} else if mutatedWf == nil {
return nil
} else {
if !w.GetExecutionStatus().IsTerminated() {
// No updates in the status we detected, we will skip writing to KubeAPI
if mutatedWf.Status.Equals(&w.Status) {
logger.Info(ctx, "WF hasn't been updated in this round.")
return nil
maxLength := p.cfg.MaxStreakLength
if maxLength <= 0 {
maxLength = 1
}

for streak = 0; streak < maxLength; streak++ {
t := p.metrics.RoundTime.Start(ctx)
mutatedWf, err := p.TryMutateWorkflow(ctx, w)
if err != nil {
// NOTE We are overriding the deepcopy here, as we are essentially ingnoring all mutations
// We only want to increase failed attempts and discard any other partial changes to the CRD.
mutatedWf = RecordSystemError(w, err)
p.metrics.SystemError.Inc(ctx)
} else if mutatedWf == nil {
logger.Errorf(ctx, "Should not happen! Mutation resulted in a nil workflow!")
return nil
} else {
if !w.GetExecutionStatus().IsTerminated() {
// No updates in the status we detected, we will skip writing to KubeAPI
if mutatedWf.Status.Equals(&w.Status) {
logger.Info(ctx, "WF hasn't been updated in this round.")
t.Stop()
return nil
}
}
if mutatedWf.GetExecutionStatus().IsTerminated() {
// If the end result is a terminated workflow, we remove the labels
// We add a completed label so that we can avoid polling for this workflow
SetCompletedLabel(mutatedWf, time.Now())
ResetFinalizers(mutatedWf)
}
}
if mutatedWf.GetExecutionStatus().IsTerminated() {
// If the end result is a terminated workflow, we remove the labels
// We add a completed label so that we can avoid polling for this workflow
SetCompletedLabel(mutatedWf, time.Now())
ResetFinalizers(mutatedWf)
// TODO we will need to call updatestatus when it is supported. But to preserve metadata like (label/finalizer) we will need to use update

// update the GetExecutionStatus block of the FlyteWorkflow resource. UpdateStatus will not
// allow changes to the Spec of the resource, which is ideal for ensuring
// nothing other than resource status has been updated.
newWf, updateErr := p.wfStore.Update(ctx, mutatedWf, workflowstore.PriorityClassCritical)
if updateErr != nil {
t.Stop()
return updateErr
}
if err != nil {
t.Stop()
// An error was encountered during the round. Let us return, so that we can back-off gracefully
return err
}
if mutatedWf.GetExecutionStatus().IsTerminated() || newWf.ResourceVersion == mutatedWf.ResourceVersion {
// Workflow is terminated (no need to continue) or no status was changed, we can wait
logger.Infof(ctx, "Will not fast follow, Reason: Wf terminated? %v, Version matched? %v",
mutatedWf.GetExecutionStatus().IsTerminated(), newWf.ResourceVersion == mutatedWf.ResourceVersion)
t.Stop()
return nil
}
logger.Infof(ctx, "FastFollow Enabled. Detected State change, we will try another round. StreakLength [%d]", streak)
w = newWf
t.Stop()
}
// TODO we will need to call updatestatus when it is supported. But to preserve metadata like (label/finalizer) we will need to use update

// update the GetExecutionStatus block of the FlyteWorkflow resource. UpdateStatus will not
// allow changes to the Spec of the resource, which is ideal for ensuring
// nothing other than resource status has been updated.
_, updateErr := p.wfStore.Update(ctx, mutatedWf, workflowstore.PriorityClassCritical)
if updateErr != nil {
return updateErr
}
return err
logger.Infof(ctx, "Streak ended at [%d]/Max: [%d]", streak, maxLength)
return nil
}

func NewPropellerHandler(_ context.Context, cfg *config.Config, wfStore workflowstore.FlyteWorkflow, executor executors.Workflow, scope promutils.Scope) *Propeller {
Expand Down
Loading

0 comments on commit d2b5fd5

Please sign in to comment.