diff --git a/backend/src/apiserver/server/api_converter.go b/backend/src/apiserver/server/api_converter.go index 919fea213e4..09b6819f605 100644 --- a/backend/src/apiserver/server/api_converter.go +++ b/backend/src/apiserver/server/api_converter.go @@ -20,7 +20,6 @@ import ( "sort" "strconv" - workflowapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/golang/protobuf/ptypes/timestamp" "github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec" apiv1beta1 "github.com/kubeflow/pipelines/backend/api/v1beta1/go_client" @@ -1620,15 +1619,15 @@ func toModelTask(t interface{}) (*model.Task, error) { children = append(children, c.GetPodName()) } } - case workflowapi.NodeStatus: + case util.NodeStatus: // TODO(gkcalat): parse input and output artifacts wfStatus := t nodeId = wfStatus.ID name = wfStatus.DisplayName - state = string(wfStatus.Phase) - startTime = wfStatus.StartedAt.Unix() - createTime = startTime - finishTime = wfStatus.FinishedAt.Unix() + state = wfStatus.State + startTime = wfStatus.StartTime + createTime = wfStatus.CreateTime + finishTime = wfStatus.FinishTime children = wfStatus.Children default: return nil, util.NewUnknownApiVersionError("Task", t) @@ -1672,20 +1671,21 @@ func toModelTasks(t interface{}) ([]*model.Task, error) { modelTasks = append(modelTasks, modelTask) } return modelTasks, nil - case *util.Workflow: + case util.ExecutionSpec: execSpec := t runId := execSpec.ExecutionObjectMeta().Labels[util.LabelKeyWorkflowRunId] namespace := execSpec.ExecutionNamespace() - createdAt := execSpec.GetCreationTimestamp().Unix() + createdAt := execSpec.ExecutionObjectMeta().GetCreationTimestamp().Unix() // Get sorted node names to make the results repeatable - nodeNames := make([]string, 0, len(execSpec.Status.Nodes)) - for nodeName := range execSpec.Status.Nodes { + nodes := execSpec.ExecutionStatus().NodeStatuses() + nodeNames := make([]string, 0, len(nodes)) + for nodeName := range nodes { nodeNames = append(nodeNames, nodeName) } sort.Strings(nodeNames) modelTasks := make([]*model.Task, 0) for _, nodeName := range nodeNames { - node := execSpec.Status.Nodes[nodeName] + node := nodes[nodeName] modelTask, err := toModelTask(node) if err != nil { return nil, util.Wrap(err, "Failed to convert Argo workflow to tasks details") diff --git a/backend/src/apiserver/server/api_converter_test.go b/backend/src/apiserver/server/api_converter_test.go index 1719ace361c..be94a6956ad 100644 --- a/backend/src/apiserver/server/api_converter_test.go +++ b/backend/src/apiserver/server/api_converter_test.go @@ -17,10 +17,8 @@ package server import ( "strings" "testing" - "time" "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" - workflowapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/golang/protobuf/ptypes/timestamp" "github.com/google/go-cmp/cmp" apiv1beta1 "github.com/kubeflow/pipelines/backend/api/v1beta1/go_client" @@ -3013,14 +3011,14 @@ func Test_toModelTask(t *testing.T) { }, { "argo node status", - workflowapi.NodeStatus{ + util.NodeStatus{ ID: "1", DisplayName: "node_1", - Name: "wrong name", - Phase: workflowapi.NodePhase("Pending"), + State: "Pending", Children: []string{"node3", "node4"}, - StartedAt: v1.Time{Time: time.Unix(4, 0)}, - FinishedAt: v1.Time{Time: time.Unix(5, 0)}, + StartTime: 4, + CreateTime: 4, + FinishTime: 5, }, &model.Task{ PodName: "1", diff --git a/backend/src/apiserver/server/report_server.go b/backend/src/apiserver/server/report_server.go index 8ae30ec2efe..892e43e102f 100644 --- a/backend/src/apiserver/server/report_server.go +++ b/backend/src/apiserver/server/report_server.go @@ -32,11 +32,11 @@ type ReportServer struct { } // Extracts task details from an execution spec and reports them to storage. -func (s ReportServer) reportTasksFromExecution(wf *util.Workflow, runId string) ([]*model.Task, error) { - if len(wf.Status.Nodes) == 0 { +func (s ReportServer) reportTasksFromExecution(execSpec util.ExecutionSpec, runId string) ([]*model.Task, error) { + if !execSpec.ExecutionStatus().HasNodes() { return nil, nil } - tasks, err := toModelTasks(wf) + tasks, err := toModelTasks(execSpec) if err != nil { return nil, util.Wrap(err, "Failed to report tasks of an execution") } @@ -55,7 +55,7 @@ func (s *ReportServer) reportWorkflow(ctx context.Context, workflow string) (*em } runId := newExecSpec.ExecutionObjectMeta().Labels[util.LabelKeyWorkflowRunId] - _, err = s.reportTasksFromExecution(newExecSpec.(*util.Workflow), runId) + _, err = s.reportTasksFromExecution(newExecSpec, runId) if err != nil { return nil, util.Wrap(err, "Failed to report task details") } diff --git a/backend/src/common/util/execution_status.go b/backend/src/common/util/execution_status.go index d7fcd7c3c3d..7eff8a20645 100644 --- a/backend/src/common/util/execution_status.go +++ b/backend/src/common/util/execution_status.go @@ -20,6 +20,17 @@ import ( v1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +// Data struct to represent Node status +type NodeStatus struct { + ID string + DisplayName string + State string + StartTime int64 + CreateTime int64 + FinishTime int64 + Children []string +} + type RetrieveArtifact func(request *api.ReadArtifactRequest, user string) (*api.ReadArtifactResponse, error) // Abstract interface to encapsulate the resources of the execution runtime specifically @@ -54,4 +65,9 @@ type ExecutionStatus interface { // does ExecutionStatus contain any finished node or not HasMetrics() bool + + // Any node status exists or not + HasNodes() bool + // Get node statuses, the NodeStatus data struct could be extended if needed + NodeStatuses() map[string]NodeStatus } diff --git a/backend/src/common/util/workflow.go b/backend/src/common/util/workflow.go index 3f5a9c37028..821f69df5de 100644 --- a/backend/src/common/util/workflow.go +++ b/backend/src/common/util/workflow.go @@ -751,6 +751,26 @@ func (w *Workflow) PatchTemplateOutputArtifacts() { } } +func (w *Workflow) NodeStatuses() map[string]NodeStatus { + rev := make(map[string]NodeStatus, len(w.Status.Nodes)) + for id, node := range w.Status.Nodes { + rev[id] = NodeStatus{ + ID: node.ID, + DisplayName: node.DisplayName, + State: string(node.Phase), + StartTime: node.StartedAt.Unix(), + CreateTime: node.StartedAt.Unix(), + FinishTime: node.FinishedAt.Unix(), + Children: node.Children, + } + } + return rev +} + +func (w *Workflow) HasNodes() bool { + return len(w.Status.Nodes) > 0 +} + // implementation of ExecutionClientInterface type WorkflowClient struct { client *argoclient.Clientset