Skip to content

Commit

Permalink
feat(backend): avoid downcast and use ExecutionSpec if possible (#9232)
Browse files Browse the repository at this point in the history
Limit the direct Workflow data struct access to common.util
and use ExecutionSpec and ExecutionStatus in other places.
Create a data struct to represent the node status information
in ExecutionStatus and add an API to access this information.
  • Loading branch information
yhwang authored May 4, 2023
1 parent 068231b commit 6c72d95
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 22 deletions.
22 changes: 11 additions & 11 deletions backend/src/apiserver/server/api_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"sort"
"strconv"

workflowapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
apiv1beta1 "github.com/kubeflow/pipelines/backend/api/v1beta1/go_client"
Expand Down Expand Up @@ -1620,15 +1619,15 @@ func toModelTask(t interface{}) (*model.Task, error) {
children = append(children, c.GetPodName())
}
}
case workflowapi.NodeStatus:
case util.NodeStatus:
// TODO(gkcalat): parse input and output artifacts
wfStatus := t
nodeId = wfStatus.ID
name = wfStatus.DisplayName
state = string(wfStatus.Phase)
startTime = wfStatus.StartedAt.Unix()
createTime = startTime
finishTime = wfStatus.FinishedAt.Unix()
state = wfStatus.State
startTime = wfStatus.StartTime
createTime = wfStatus.CreateTime
finishTime = wfStatus.FinishTime
children = wfStatus.Children
default:
return nil, util.NewUnknownApiVersionError("Task", t)
Expand Down Expand Up @@ -1672,20 +1671,21 @@ func toModelTasks(t interface{}) ([]*model.Task, error) {
modelTasks = append(modelTasks, modelTask)
}
return modelTasks, nil
case *util.Workflow:
case util.ExecutionSpec:
execSpec := t
runId := execSpec.ExecutionObjectMeta().Labels[util.LabelKeyWorkflowRunId]
namespace := execSpec.ExecutionNamespace()
createdAt := execSpec.GetCreationTimestamp().Unix()
createdAt := execSpec.ExecutionObjectMeta().GetCreationTimestamp().Unix()
// Get sorted node names to make the results repeatable
nodeNames := make([]string, 0, len(execSpec.Status.Nodes))
for nodeName := range execSpec.Status.Nodes {
nodes := execSpec.ExecutionStatus().NodeStatuses()
nodeNames := make([]string, 0, len(nodes))
for nodeName := range nodes {
nodeNames = append(nodeNames, nodeName)
}
sort.Strings(nodeNames)
modelTasks := make([]*model.Task, 0)
for _, nodeName := range nodeNames {
node := execSpec.Status.Nodes[nodeName]
node := nodes[nodeName]
modelTask, err := toModelTask(node)
if err != nil {
return nil, util.Wrap(err, "Failed to convert Argo workflow to tasks details")
Expand Down
12 changes: 5 additions & 7 deletions backend/src/apiserver/server/api_converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@ package server
import (
"strings"
"testing"
"time"

"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
workflowapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/google/go-cmp/cmp"
apiv1beta1 "github.com/kubeflow/pipelines/backend/api/v1beta1/go_client"
Expand Down Expand Up @@ -3013,14 +3011,14 @@ func Test_toModelTask(t *testing.T) {
},
{
"argo node status",
workflowapi.NodeStatus{
util.NodeStatus{
ID: "1",
DisplayName: "node_1",
Name: "wrong name",
Phase: workflowapi.NodePhase("Pending"),
State: "Pending",
Children: []string{"node3", "node4"},
StartedAt: v1.Time{Time: time.Unix(4, 0)},
FinishedAt: v1.Time{Time: time.Unix(5, 0)},
StartTime: 4,
CreateTime: 4,
FinishTime: 5,
},
&model.Task{
PodName: "1",
Expand Down
8 changes: 4 additions & 4 deletions backend/src/apiserver/server/report_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ type ReportServer struct {
}

// Extracts task details from an execution spec and reports them to storage.
func (s ReportServer) reportTasksFromExecution(wf *util.Workflow, runId string) ([]*model.Task, error) {
if len(wf.Status.Nodes) == 0 {
func (s ReportServer) reportTasksFromExecution(execSpec util.ExecutionSpec, runId string) ([]*model.Task, error) {
if !execSpec.ExecutionStatus().HasNodes() {
return nil, nil
}
tasks, err := toModelTasks(wf)
tasks, err := toModelTasks(execSpec)
if err != nil {
return nil, util.Wrap(err, "Failed to report tasks of an execution")
}
Expand All @@ -55,7 +55,7 @@ func (s *ReportServer) reportWorkflow(ctx context.Context, workflow string) (*em
}

runId := newExecSpec.ExecutionObjectMeta().Labels[util.LabelKeyWorkflowRunId]
_, err = s.reportTasksFromExecution(newExecSpec.(*util.Workflow), runId)
_, err = s.reportTasksFromExecution(newExecSpec, runId)
if err != nil {
return nil, util.Wrap(err, "Failed to report task details")
}
Expand Down
16 changes: 16 additions & 0 deletions backend/src/common/util/execution_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,17 @@ import (
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// Data struct to represent Node status
type NodeStatus struct {
ID string
DisplayName string
State string
StartTime int64
CreateTime int64
FinishTime int64
Children []string
}

type RetrieveArtifact func(request *api.ReadArtifactRequest, user string) (*api.ReadArtifactResponse, error)

// Abstract interface to encapsulate the resources of the execution runtime specifically
Expand Down Expand Up @@ -54,4 +65,9 @@ type ExecutionStatus interface {

// does ExecutionStatus contain any finished node or not
HasMetrics() bool

// Any node status exists or not
HasNodes() bool
// Get node statuses, the NodeStatus data struct could be extended if needed
NodeStatuses() map[string]NodeStatus
}
20 changes: 20 additions & 0 deletions backend/src/common/util/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,26 @@ func (w *Workflow) PatchTemplateOutputArtifacts() {
}
}

func (w *Workflow) NodeStatuses() map[string]NodeStatus {
rev := make(map[string]NodeStatus, len(w.Status.Nodes))
for id, node := range w.Status.Nodes {
rev[id] = NodeStatus{
ID: node.ID,
DisplayName: node.DisplayName,
State: string(node.Phase),
StartTime: node.StartedAt.Unix(),
CreateTime: node.StartedAt.Unix(),
FinishTime: node.FinishedAt.Unix(),
Children: node.Children,
}
}
return rev
}

func (w *Workflow) HasNodes() bool {
return len(w.Status.Nodes) > 0
}

// implementation of ExecutionClientInterface
type WorkflowClient struct {
client *argoclient.Clientset
Expand Down

0 comments on commit 6c72d95

Please sign in to comment.