Skip to content

Commit

Permalink
Statemachine & Status fixes (Discretization, status size... etc.) (fl…
Browse files Browse the repository at this point in the history
…yteorg#49)

This PR fixes a few issues uncovered during the investigation of the statemachine inconsistency issues last week. Specifically:

- [X] Ensure each node can a progress at most once per round (IsDirty flag)
- [X] Remove ParentTaskID and DataDir from NodeStatus field (Causing workflow etcd. obj size to bloat)
- [X] Add Parent RetryAttempt in the generated hierarchal name of dynamic sub-nodes to ensure retries do not reuse an existing sub-node status.

Details: https://docs.google.com/document/d/1ISaxIZeYLcBaeapEmeTqb-g0x04pJbf5t3i30qMfk6U/edit?usp=sharing
  • Loading branch information
EngHabu authored Jan 2, 2020
1 parent 71cdc77 commit a79c03f
Show file tree
Hide file tree
Showing 70 changed files with 1,110 additions and 385 deletions.
9 changes: 6 additions & 3 deletions cmd/kubectl-flyte/cmd/get.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"context"
"fmt"
"sort"
"strings"
Expand Down Expand Up @@ -33,9 +34,11 @@ func NewGetCommand(opts *RootOptions) *cobra.Command {
Short: "Gets a single workflow or lists all workflows currently in execution",
Long: `use labels to filter`,
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()

if len(args) > 0 {
name := args[0]
return getOpts.getWorkflow(name)
return getOpts.getWorkflow(ctx, name)
}
return getOpts.listWorkflows()
},
Expand All @@ -49,7 +52,7 @@ func NewGetCommand(opts *RootOptions) *cobra.Command {
return getCmd
}

func (g *GetOpts) getWorkflow(name string) error {
func (g *GetOpts) getWorkflow(ctx context.Context, name string) error {
parts := strings.Split(name, "/")
if len(parts) > 1 {
g.ConfigOverrides.Context.Namespace = parts[0]
Expand All @@ -61,7 +64,7 @@ func (g *GetOpts) getWorkflow(name string) error {
}
wp := printers.WorkflowPrinter{}
tree := gotree.New("Workflow")
if err := wp.Print(tree, w); err != nil {
if err := wp.Print(ctx, tree, w); err != nil {
return err
}
fmt.Print(tree.Print())
Expand Down
13 changes: 7 additions & 6 deletions cmd/kubectl-flyte/cmd/printers/node.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package printers

import (
"context"
"fmt"
"strconv"
"strings"
Expand Down Expand Up @@ -79,7 +80,7 @@ func (p NodePrinter) BranchNodeInfo(node v1alpha1.ExecutableNode, nodeStatus v1a

}

func (p NodePrinter) traverseNode(tree gotree.Tree, w v1alpha1.ExecutableWorkflow, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus) error {
func (p NodePrinter) traverseNode(ctx context.Context, tree gotree.Tree, w v1alpha1.ExecutableWorkflow, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus) error {
switch node.GetKind() {
case v1alpha1.NodeKindBranch:
subTree := tree.Add(strings.Join(p.BranchNodeInfo(node, nodeStatus), " | "))
Expand All @@ -89,7 +90,7 @@ func (p NodePrinter) traverseNode(tree gotree.Tree, w v1alpha1.ExecutableWorkflo
if !ok {
return fmt.Errorf("failed to find branch node %s", *nodeID)
}
if err := p.traverseNode(subTree, w, ifNode, nodeStatus.GetNodeExecutionStatus(*nodeID)); err != nil {
if err := p.traverseNode(ctx, subTree, w, ifNode, nodeStatus.GetNodeExecutionStatus(ctx, *nodeID)); err != nil {
return err
}
}
Expand All @@ -113,7 +114,7 @@ func (p NodePrinter) traverseNode(tree gotree.Tree, w v1alpha1.ExecutableWorkflo
s := w.FindSubWorkflow(*node.GetWorkflowNode().GetSubWorkflowRef())
wp := WorkflowPrinter{}
cw := executors.NewSubContextualWorkflow(w, s, nodeStatus)
return wp.Print(tree, cw)
return wp.Print(ctx, tree, cw)
}
case v1alpha1.NodeKindTask:
sub := tree.Add(strings.Join(p.NodeInfo(w.GetName(), node, nodeStatus), " | "))
Expand All @@ -126,10 +127,10 @@ func (p NodePrinter) traverseNode(tree gotree.Tree, w v1alpha1.ExecutableWorkflo
return nil
}

func (p NodePrinter) PrintList(tree gotree.Tree, w v1alpha1.ExecutableWorkflow, nodes []v1alpha1.ExecutableNode) error {
func (p NodePrinter) PrintList(ctx context.Context, tree gotree.Tree, w v1alpha1.ExecutableWorkflow, nodes []v1alpha1.ExecutableNode) error {
for _, n := range nodes {
s := w.GetNodeExecutionStatus(n.GetID())
if err := p.traverseNode(tree, w, n, s); err != nil {
s := w.GetNodeExecutionStatus(ctx, n.GetID())
if err := p.traverseNode(ctx, tree, w, n, s); err != nil {
return err
}
}
Expand Down
5 changes: 3 additions & 2 deletions cmd/kubectl-flyte/cmd/printers/workflow.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package printers

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -37,7 +38,7 @@ func CalculateWorkflowRuntime(s v1alpha1.ExecutableWorkflowStatus) string {
type WorkflowPrinter struct {
}

func (p WorkflowPrinter) Print(tree gotree.Tree, w v1alpha1.ExecutableWorkflow) error {
func (p WorkflowPrinter) Print(ctx context.Context, tree gotree.Tree, w v1alpha1.ExecutableWorkflow) error {
sortedNodes, err := visualize.TopologicalSort(w)
if err != nil {
return err
Expand All @@ -49,7 +50,7 @@ func (p WorkflowPrinter) Print(tree gotree.Tree, w v1alpha1.ExecutableWorkflow)
tree.AddTree(newTree)
}
np := NodePrinter{}
return np.PrintList(newTree, w, sortedNodes)
return np.PrintList(ctx, newTree, w, sortedNodes)
}

func (p WorkflowPrinter) PrintShort(tree gotree.Tree, w v1alpha1.ExecutableWorkflow) error {
Expand Down
16 changes: 13 additions & 3 deletions pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ type ExecutableBranchNodeStatus interface {
}

type MutableBranchNodeStatus interface {
Mutable
ExecutableBranchNodeStatus

SetBranchNodeError()
Expand All @@ -178,6 +179,7 @@ type ExecutableDynamicNodeStatus interface {
}

type MutableDynamicNodeStatus interface {
Mutable
ExecutableDynamicNodeStatus

SetDynamicNodePhase(phase DynamicNodePhase)
Expand All @@ -198,11 +200,17 @@ type ExecutableWorkflowNodeStatus interface {
}

type MutableWorkflowNodeStatus interface {
Mutable
ExecutableWorkflowNodeStatus
SetWorkflowNodePhase(phase WorkflowNodePhase)
}

type Mutable interface {
IsDirty() bool
}

type MutableNodeStatus interface {
Mutable
// Mutation API's
SetDataDir(DataReference)
SetParentNodeID(n *NodeID)
Expand All @@ -225,6 +233,7 @@ type MutableNodeStatus interface {
GetDynamicNodeStatus() MutableDynamicNodeStatus
ClearDynamicNodeStatus()
ClearLastAttemptStartedAt()
ClearSubNodeStatus()
}

// Interface for a Node p. This provides a mutable API.
Expand All @@ -247,14 +256,14 @@ type ExecutableNodeStatus interface {
GetTaskNodeStatus() ExecutableTaskNodeStatus

IsCached() bool
IsDirty() bool
}

type ExecutableSubWorkflowNodeStatus interface {
GetPhase() WorkflowPhase
}

type MutableSubWorkflowNodeStatus interface {
Mutable
ExecutableSubWorkflowNodeStatus
SetPhase(phase WorkflowPhase)
}
Expand All @@ -268,6 +277,7 @@ type ExecutableTaskNodeStatus interface {
}

type MutableTaskNodeStatus interface {
Mutable
ExecutableTaskNodeStatus
SetPhase(phase int)
SetPhaseVersion(version uint32)
Expand Down Expand Up @@ -320,7 +330,7 @@ type ExecutableWorkflowStatus interface {
SetOutputReference(reference DataReference)
IncFailedAttempts()
SetMessage(msg string)
ConstructNodeDataDir(ctx context.Context, constructor storage.ReferenceConstructor, name NodeID) (storage.DataReference, error)
ConstructNodeDataDir(ctx context.Context, name NodeID) (storage.DataReference, error)
}

type BaseWorkflow interface {
Expand Down Expand Up @@ -381,7 +391,7 @@ type ExecutableWorkflow interface {
}

type NodeStatusGetter interface {
GetNodeExecutionStatus(id NodeID) ExecutableNodeStatus
GetNodeExecutionStatus(ctx context.Context, id NodeID) ExecutableNodeStatus
}

type NodeStatusMap = map[NodeID]ExecutableNodeStatus
Expand Down
16 changes: 9 additions & 7 deletions pkg/apis/flyteworkflow/v1alpha1/mocks/BaseWorkflowWithStatus.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 14 additions & 7 deletions pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableNodeStatus.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 9 additions & 7 deletions pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableWorkflow.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 16 additions & 16 deletions pkg/apis/flyteworkflow/v1alpha1/mocks/ExecutableWorkflowStatus.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit a79c03f

Please sign in to comment.