From 975fe4dcd5083a5ad252ebc6f59f12632dd4a04d Mon Sep 17 00:00:00 2001 From: Iaroslav Ciupin Date: Fri, 17 May 2024 18:37:29 +0300 Subject: [PATCH] Log error in read inputs/outputs (#282) ## Overview * Log error and execution id when inputs/outputs read fail ## Test Plan * NA ## Rollout Plan (if applicable) *TODO: Describe any deployment or compatibility considerations for rolling out this change.* ## Upstream Changes Should this change be upstreamed to OSS (flyteorg/flyte)? If so, please check this box for auditing. Note, this is the responsibility of each developer. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F). - [ ] To be upstreamed ## Issue fixes CLD-917 *TODO: Link Linear issue(s) using [magic words](https://linear.app/docs/github#magic-words). `fixes` will move to merged status, while `ref` will only link the PR.* ## Checklist * [ ] Added tests * [ ] Ran a deploy dry run and shared the terraform plan * [ ] Added logging and metrics * [ ] Updated [dashboards](https://unionai.grafana.net/dashboards) and [alerts](https://unionai.grafana.net/alerting/list) * [ ] Updated documentation --- flyteadmin/pkg/manager/impl/execution_manager.go | 12 ++++++++++-- .../pkg/manager/impl/node_execution_manager.go | 12 ++++++++++-- .../pkg/manager/impl/task_execution_manager.go | 13 +++++++++++-- flyteadmin/pkg/manager/impl/util/data.go | 1 - 4 files changed, 31 insertions(+), 7 deletions(-) diff --git a/flyteadmin/pkg/manager/impl/execution_manager.go b/flyteadmin/pkg/manager/impl/execution_manager.go index 01bf6c7d6cd..436f1b0fce4 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager.go +++ b/flyteadmin/pkg/manager/impl/execution_manager.go @@ -1770,7 +1770,11 @@ func (m *ExecutionManager) GetExecutionData( id.Project, id.Domain, executionModel.InputsURI.String(), - objectStore) + objectStore, + ) + if err != nil { + logger.Errorf(ctx, "failed to read inputs during execution [%v]: %v", id, err) + } return err }) @@ -1786,7 +1790,11 @@ func (m *ExecutionManager) GetExecutionData( cluster, id.Project, id.Domain, - objectStore) + objectStore, + ) + if err != nil { + logger.Errorf(ctx, "failed to read outputs during execution [%v]: %v", id, err) + } return err }) diff --git a/flyteadmin/pkg/manager/impl/node_execution_manager.go b/flyteadmin/pkg/manager/impl/node_execution_manager.go index 4c6b52fe353..1747381256e 100644 --- a/flyteadmin/pkg/manager/impl/node_execution_manager.go +++ b/flyteadmin/pkg/manager/impl/node_execution_manager.go @@ -575,7 +575,11 @@ func (m *NodeExecutionManager) GetNodeExecutionData( id.Project, id.Domain, nodeExecution.InputUri, - objectStore) + objectStore, + ) + if err != nil { + logger.Errorf(ctx, "failed to read inputs during execution [%v]: %v", id, err) + } return err }) @@ -591,7 +595,11 @@ func (m *NodeExecutionManager) GetNodeExecutionData( cluster, id.Project, id.Domain, - objectStore) + objectStore, + ) + if err != nil { + logger.Errorf(ctx, "failed to read outputs during execution [%v]: %v", id, err) + } return err }) diff --git a/flyteadmin/pkg/manager/impl/task_execution_manager.go b/flyteadmin/pkg/manager/impl/task_execution_manager.go index 0e88ab1998c..4fab87a5b4e 100644 --- a/flyteadmin/pkg/manager/impl/task_execution_manager.go +++ b/flyteadmin/pkg/manager/impl/task_execution_manager.go @@ -303,6 +303,7 @@ func (m *TaskExecutionManager) GetTaskExecutionData(ctx context.Context, if err := validation.ValidateTaskExecutionIdentifier(request.Id); err != nil { logger.Debugf(ctx, "Invalid identifier [%+v]: %v", request.Id, err) } + ctx = getTaskExecutionContext(ctx, request.Id) group, groupCtx := errgroup.WithContext(ctx) var taskExecution *admin.TaskExecution @@ -351,7 +352,11 @@ func (m *TaskExecutionManager) GetTaskExecutionData(ctx context.Context, id.Project, id.Domain, taskExecution.InputUri, - objectStore) + objectStore, + ) + if err != nil { + logger.Errorf(ctx, "failed to read inputs during execution [%v]: %v", id, err) + } return err }) @@ -367,7 +372,11 @@ func (m *TaskExecutionManager) GetTaskExecutionData(ctx context.Context, cluster, id.Project, id.Domain, - objectStore) + objectStore, + ) + if err != nil { + logger.Errorf(ctx, "failed to read outputs during execution [%v]: %v", id, err) + } return err }) diff --git a/flyteadmin/pkg/manager/impl/util/data.go b/flyteadmin/pkg/manager/impl/util/data.go index 828b4c6ccf1..975e43e26d4 100644 --- a/flyteadmin/pkg/manager/impl/util/data.go +++ b/flyteadmin/pkg/manager/impl/util/data.go @@ -150,7 +150,6 @@ func GetOutputs(ctx context.Context, err = readFromDataPlane(ctx, objectStore, cluster, project, domain, closure.GetOutputUri(), fullOutputs) } } - return fullOutputs, &outputsURLBlob, err }