Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(backend): avoid downcast and use ExecutionSpec if possible #9232

Merged
merged 1 commit into from
May 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions backend/src/apiserver/server/api_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"sort"
"strconv"

workflowapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
apiv1beta1 "github.com/kubeflow/pipelines/backend/api/v1beta1/go_client"
Expand Down Expand Up @@ -1620,15 +1619,15 @@ func toModelTask(t interface{}) (*model.Task, error) {
children = append(children, c.GetPodName())
}
}
case workflowapi.NodeStatus:
case util.NodeStatus:
// TODO(gkcalat): parse input and output artifacts
wfStatus := t
nodeId = wfStatus.ID
name = wfStatus.DisplayName
state = string(wfStatus.Phase)
startTime = wfStatus.StartedAt.Unix()
createTime = startTime
finishTime = wfStatus.FinishedAt.Unix()
state = wfStatus.State
startTime = wfStatus.StartTime
createTime = wfStatus.CreateTime
finishTime = wfStatus.FinishTime
children = wfStatus.Children
default:
return nil, util.NewUnknownApiVersionError("Task", t)
Expand Down Expand Up @@ -1672,20 +1671,21 @@ func toModelTasks(t interface{}) ([]*model.Task, error) {
modelTasks = append(modelTasks, modelTask)
}
return modelTasks, nil
case *util.Workflow:
case util.ExecutionSpec:
execSpec := t
runId := execSpec.ExecutionObjectMeta().Labels[util.LabelKeyWorkflowRunId]
namespace := execSpec.ExecutionNamespace()
createdAt := execSpec.GetCreationTimestamp().Unix()
createdAt := execSpec.ExecutionObjectMeta().GetCreationTimestamp().Unix()
// Get sorted node names to make the results repeatable
nodeNames := make([]string, 0, len(execSpec.Status.Nodes))
for nodeName := range execSpec.Status.Nodes {
nodes := execSpec.ExecutionStatus().NodeStatuses()
nodeNames := make([]string, 0, len(nodes))
for nodeName := range nodes {
nodeNames = append(nodeNames, nodeName)
}
sort.Strings(nodeNames)
modelTasks := make([]*model.Task, 0)
for _, nodeName := range nodeNames {
node := execSpec.Status.Nodes[nodeName]
node := nodes[nodeName]
modelTask, err := toModelTask(node)
if err != nil {
return nil, util.Wrap(err, "Failed to convert Argo workflow to tasks details")
Expand Down
12 changes: 5 additions & 7 deletions backend/src/apiserver/server/api_converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@ package server
import (
"strings"
"testing"
"time"

"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
workflowapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/google/go-cmp/cmp"
apiv1beta1 "github.com/kubeflow/pipelines/backend/api/v1beta1/go_client"
Expand Down Expand Up @@ -3013,14 +3011,14 @@ func Test_toModelTask(t *testing.T) {
},
{
"argo node status",
workflowapi.NodeStatus{
util.NodeStatus{
ID: "1",
DisplayName: "node_1",
Name: "wrong name",
Phase: workflowapi.NodePhase("Pending"),
State: "Pending",
Children: []string{"node3", "node4"},
StartedAt: v1.Time{Time: time.Unix(4, 0)},
FinishedAt: v1.Time{Time: time.Unix(5, 0)},
StartTime: 4,
CreateTime: 4,
FinishTime: 5,
},
&model.Task{
PodName: "1",
Expand Down
8 changes: 4 additions & 4 deletions backend/src/apiserver/server/report_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ type ReportServer struct {
}

// Extracts task details from an execution spec and reports them to storage.
func (s ReportServer) reportTasksFromExecution(wf *util.Workflow, runId string) ([]*model.Task, error) {
if len(wf.Status.Nodes) == 0 {
func (s ReportServer) reportTasksFromExecution(execSpec util.ExecutionSpec, runId string) ([]*model.Task, error) {
if !execSpec.ExecutionStatus().HasNodes() {
return nil, nil
}
tasks, err := toModelTasks(wf)
tasks, err := toModelTasks(execSpec)
if err != nil {
return nil, util.Wrap(err, "Failed to report tasks of an execution")
}
Expand All @@ -55,7 +55,7 @@ func (s *ReportServer) reportWorkflow(ctx context.Context, workflow string) (*em
}

runId := newExecSpec.ExecutionObjectMeta().Labels[util.LabelKeyWorkflowRunId]
_, err = s.reportTasksFromExecution(newExecSpec.(*util.Workflow), runId)
_, err = s.reportTasksFromExecution(newExecSpec, runId)
if err != nil {
return nil, util.Wrap(err, "Failed to report task details")
}
Expand Down
16 changes: 16 additions & 0 deletions backend/src/common/util/execution_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,17 @@ import (
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// Data struct to represent Node status
type NodeStatus struct {
ID string
DisplayName string
State string
StartTime int64
CreateTime int64
FinishTime int64
Children []string
}

type RetrieveArtifact func(request *api.ReadArtifactRequest, user string) (*api.ReadArtifactResponse, error)

// Abstract interface to encapsulate the resources of the execution runtime specifically
Expand Down Expand Up @@ -54,4 +65,9 @@ type ExecutionStatus interface {

// does ExecutionStatus contain any finished node or not
HasMetrics() bool

// Any node status exists or not
HasNodes() bool
// Get node statuses, the NodeStatus data struct could be extended if needed
NodeStatuses() map[string]NodeStatus
}
20 changes: 20 additions & 0 deletions backend/src/common/util/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,26 @@ func (w *Workflow) PatchTemplateOutputArtifacts() {
}
}

func (w *Workflow) NodeStatuses() map[string]NodeStatus {
rev := make(map[string]NodeStatus, len(w.Status.Nodes))
for id, node := range w.Status.Nodes {
rev[id] = NodeStatus{
ID: node.ID,
DisplayName: node.DisplayName,
State: string(node.Phase),
StartTime: node.StartedAt.Unix(),
CreateTime: node.StartedAt.Unix(),
FinishTime: node.FinishedAt.Unix(),
Children: node.Children,
}
}
return rev
}

func (w *Workflow) HasNodes() bool {
return len(w.Status.Nodes) > 0
}

// implementation of ExecutionClientInterface
type WorkflowClient struct {
client *argoclient.Clientset
Expand Down