Skip to content

Commit

Permalink
Refactor to remove Workflow from Node executor (flyteorg#102)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ketan Umare authored Apr 16, 2020
1 parent 20f3587 commit 8a11edf
Show file tree
Hide file tree
Showing 63 changed files with 4,264 additions and 1,509 deletions.
2 changes: 2 additions & 0 deletions flytepropeller/cmd/kubectl-flyte/cmd/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"

gotree "github.com/DiSiqueira/GoTree"
"github.com/lyft/flytestdlib/storage"
"github.com/spf13/cobra"
v12 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -64,6 +65,7 @@ func (g *GetOpts) getWorkflow(ctx context.Context, name string) error {
}
wp := printers.WorkflowPrinter{}
tree := gotree.New("Workflow")
w.DataReferenceConstructor = storage.URLPathConstructor{}
if err := wp.Print(ctx, tree, w); err != nil {
return err
}
Expand Down
5 changes: 2 additions & 3 deletions flytepropeller/cmd/kubectl-flyte/cmd/printers/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (

gotree "github.com/DiSiqueira/GoTree"
"github.com/fatih/color"

"github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/lyft/flytepropeller/pkg/controller/executors"
"github.com/lyft/flytepropeller/pkg/controller/nodes/task"
"github.com/lyft/flytepropeller/pkg/utils"
)
Expand Down Expand Up @@ -113,8 +113,7 @@ func (p NodePrinter) traverseNode(ctx context.Context, tree gotree.Tree, w v1alp
if node.GetWorkflowNode().GetSubWorkflowRef() != nil {
s := w.FindSubWorkflow(*node.GetWorkflowNode().GetSubWorkflowRef())
wp := WorkflowPrinter{}
cw := executors.NewSubContextualWorkflow(w, s, nodeStatus)
return wp.Print(ctx, tree, cw)
return wp.PrintSubWorkflow(ctx, tree, w, s, nodeStatus)
}
case v1alpha1.NodeKindTask:
sub := tree.Add(strings.Join(p.NodeInfo(w.GetName(), node, nodeStatus), " | "))
Expand Down
25 changes: 24 additions & 1 deletion flytepropeller/cmd/kubectl-flyte/cmd/printers/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

gotree "github.com/DiSiqueira/GoTree"
"github.com/fatih/color"

"github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/lyft/flytepropeller/pkg/visualize"
)
Expand All @@ -25,7 +26,7 @@ func ColorizeWorkflowPhase(p v1alpha1.WorkflowPhase) string {
return color.CyanString("%s", p.String())
}

func CalculateWorkflowRuntime(s v1alpha1.ExecutableWorkflowStatus) string {
func CalculateWorkflowRuntime(s v1alpha1.ExecutionTimeInfo) string {
if s.GetStartedAt() != nil {
if s.GetStoppedAt() != nil {
return s.GetStoppedAt().Sub(s.GetStartedAt().Time).String()
Expand All @@ -35,6 +36,12 @@ func CalculateWorkflowRuntime(s v1alpha1.ExecutableWorkflowStatus) string {
return "na"
}

type ContextualWorkflow struct {
v1alpha1.MetaExtended
v1alpha1.ExecutableSubWorkflow
v1alpha1.NodeStatusGetter
}

type WorkflowPrinter struct {
}

Expand All @@ -53,6 +60,22 @@ func (p WorkflowPrinter) Print(ctx context.Context, tree gotree.Tree, w v1alpha1
return np.PrintList(ctx, newTree, w, sortedNodes)
}

func (p WorkflowPrinter) PrintSubWorkflow(ctx context.Context, tree gotree.Tree, w v1alpha1.ExecutableWorkflow, swf v1alpha1.ExecutableSubWorkflow, ns v1alpha1.ExecutableNodeStatus) error {
sortedNodes, err := visualize.TopologicalSort(swf)
if err != nil {
return err
}
newTree := gotree.New(fmt.Sprintf("SubWorkflow [%s] (%s %s %s)",
swf.GetID(), CalculateWorkflowRuntime(ns),
ColorizeNodePhase(ns.GetPhase()), ns.GetMessage()))
if tree != nil {
tree.AddTree(newTree)
}
np := NodePrinter{}

return np.PrintList(ctx, newTree, &ContextualWorkflow{MetaExtended: w, ExecutableSubWorkflow: swf, NodeStatusGetter: ns}, sortedNodes)
}

func (p WorkflowPrinter) PrintShort(tree gotree.Tree, w v1alpha1.ExecutableWorkflow) error {
if tree == nil {
return fmt.Errorf("bad state in printer")
Expand Down
52 changes: 25 additions & 27 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package v1alpha1

import (
"context"

"time"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -238,16 +237,20 @@ type MutableNodeStatus interface {
ClearSubNodeStatus()
}

type ExecutionTimeInfo interface {
GetStoppedAt() *metav1.Time
GetStartedAt() *metav1.Time
GetLastUpdatedAt() *metav1.Time
}

// Interface for a Node p. This provides a mutable API.
type ExecutableNodeStatus interface {
NodeStatusGetter
MutableNodeStatus
NodeStatusVisitor
ExecutionTimeInfo
GetPhase() NodePhase
GetQueuedAt() *metav1.Time
GetStoppedAt() *metav1.Time
GetStartedAt() *metav1.Time
GetLastUpdatedAt() *metav1.Time
GetLastAttemptStartedAt() *metav1.Time
GetParentNodeID() *NodeID
GetParentTaskID() *core.TaskExecutionIdentifier
Expand Down Expand Up @@ -324,11 +327,9 @@ type ExecutableNode interface {
// Interface for the Workflow p. This is the mutable portion for a Workflow
type ExecutableWorkflowStatus interface {
NodeStatusGetter
ExecutionTimeInfo
UpdatePhase(p WorkflowPhase, msg string)
GetPhase() WorkflowPhase
GetStoppedAt() *metav1.Time
GetStartedAt() *metav1.Time
GetLastUpdatedAt() *metav1.Time
IsTerminated() bool
GetMessage() string
SetDataDir(DataReference)
Expand All @@ -340,13 +341,18 @@ type ExecutableWorkflowStatus interface {
ConstructNodeDataDir(ctx context.Context, name NodeID) (storage.DataReference, error)
}

type NodeGetter interface {
GetNode(nodeID NodeID) (ExecutableNode, bool)
}

type BaseWorkflow interface {
NodeGetter
StartNode() ExecutableNode
GetID() WorkflowID
// From returns all nodes that can be reached directly
// from the node with the given unique name.
FromNode(name NodeID) ([]NodeID, error)
GetNode(nodeID NodeID) (ExecutableNode, bool)
ToNode(name NodeID) ([]NodeID, error)
}

type BaseWorkflowWithStatus interface {
Expand All @@ -365,9 +371,9 @@ type ExecutableSubWorkflow interface {
GetOutputs() *OutputVarMap
}

// WorkflowMeta provides an interface to retrieve labels, annotations and other concepts that are declared only once
// Meta provides an interface to retrieve labels, annotations and other concepts that are declared only once
// for the top level workflow
type WorkflowMeta interface {
type Meta interface {
GetExecutionID() ExecutionID
GetK8sWorkflowID() types.NamespacedName
GetOwnerReference() metav1.OwnerReference
Expand All @@ -384,17 +390,21 @@ type TaskDetailsGetter interface {
GetTask(id TaskID) (ExecutableTask, error)
}

type WorkflowMetaExtended interface {
WorkflowMeta
TaskDetailsGetter
type SubWorkflowGetter interface {
FindSubWorkflow(subID WorkflowID) ExecutableSubWorkflow
}

type MetaExtended interface {
Meta
TaskDetailsGetter
SubWorkflowGetter
GetExecutionStatus() ExecutableWorkflowStatus
}

// A Top level Workflow is a combination of WorkflowMeta and an ExecutableSubWorkflow
// A Top level Workflow is a combination of Meta and an ExecutableSubWorkflow
type ExecutableWorkflow interface {
ExecutableSubWorkflow
WorkflowMetaExtended
MetaExtended
NodeStatusGetter
}

Expand All @@ -420,15 +430,3 @@ func GetOutputsFile(outputDir DataReference) DataReference {
func GetInputsFile(inputDir DataReference) DataReference {
return inputDir + "/inputs.pb"
}

func GetOutputErrorFile(inputDir DataReference) DataReference {
return inputDir + "/error.pb"
}

func GetFutureFile() string {
return "futures.pb"
}

func GetCompiledFutureFile() string {
return "futures_compiled.pb"
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 8a11edf

Please sign in to comment.