diff --git a/flyteadmin/pkg/manager/impl/node_execution_manager_test.go b/flyteadmin/pkg/manager/impl/node_execution_manager_test.go index 477bdcb160..1b47f16bf5 100644 --- a/flyteadmin/pkg/manager/impl/node_execution_manager_test.go +++ b/flyteadmin/pkg/manager/impl/node_execution_manager_test.go @@ -197,6 +197,9 @@ func TestCreateNodeEvent_Update(t *testing.T) { StartedAt: occurredAtProto, Phase: core.NodeExecution_RUNNING, UpdatedAt: occurredAtProto, + TargetMetadata: &admin.NodeExecutionClosure_TaskNodeMetadata{ + TaskNodeMetadata: &admin.TaskNodeMetadata{}, + }, } expectedClosureBytes, _ := proto.Marshal(&expectedClosure) actualClosure := admin.NodeExecutionClosure{} @@ -541,6 +544,11 @@ func TestGetNodeExecution(t *testing.T) { repository := repositoryMocks.NewMockRepository() expectedClosure := admin.NodeExecutionClosure{ Phase: core.NodeExecution_SUCCEEDED, + TargetMetadata: &admin.NodeExecutionClosure_TaskNodeMetadata{ + TaskNodeMetadata: &admin.TaskNodeMetadata{ + CheckpointUri: "last checkpoint uri", + }, + }, } expectedMetadata := admin.NodeExecutionMetaData{ SpecNodeId: "spec_node_id", diff --git a/flyteadmin/pkg/repositories/transformers/node_execution.go b/flyteadmin/pkg/repositories/transformers/node_execution.go index c56a1c0280..2029f69f7c 100644 --- a/flyteadmin/pkg/repositories/transformers/node_execution.go +++ b/flyteadmin/pkg/repositories/transformers/node_execution.go @@ -211,16 +211,19 @@ func UpdateNodeExecutionModel( } // Update TaskNodeMetadata, which includes caching information today. - if request.Event.GetTaskNodeMetadata() != nil && request.Event.GetTaskNodeMetadata().CatalogKey != nil { - st := request.Event.GetTaskNodeMetadata().GetCacheStatus().String() + if request.Event.GetTaskNodeMetadata() != nil { targetMetadata := &admin.NodeExecutionClosure_TaskNodeMetadata{ TaskNodeMetadata: &admin.TaskNodeMetadata{ - CacheStatus: request.Event.GetTaskNodeMetadata().GetCacheStatus(), - CatalogKey: request.Event.GetTaskNodeMetadata().GetCatalogKey(), + CheckpointUri: request.Event.GetTaskNodeMetadata().CheckpointUri, }, } + if request.Event.GetTaskNodeMetadata().CatalogKey != nil { + st := request.Event.GetTaskNodeMetadata().GetCacheStatus().String() + targetMetadata.TaskNodeMetadata.CacheStatus = request.Event.GetTaskNodeMetadata().GetCacheStatus() + targetMetadata.TaskNodeMetadata.CatalogKey = request.Event.GetTaskNodeMetadata().GetCatalogKey() + nodeExecutionModel.CacheStatus = &st + } nodeExecutionClosure.TargetMetadata = targetMetadata - nodeExecutionModel.CacheStatus = &st } marshaledClosure, err := proto.Marshal(&nodeExecutionClosure) diff --git a/flyteadmin/pkg/repositories/transformers/node_execution_test.go b/flyteadmin/pkg/repositories/transformers/node_execution_test.go index de755fe2a1..a06d762c7d 100644 --- a/flyteadmin/pkg/repositories/transformers/node_execution_test.go +++ b/flyteadmin/pkg/repositories/transformers/node_execution_test.go @@ -328,6 +328,7 @@ func TestUpdateNodeExecutionModel(t *testing.T) { }, }, }, + CheckpointUri: "last checkpoint uri", }, }, }, @@ -351,8 +352,9 @@ func TestUpdateNodeExecutionModel(t *testing.T) { UpdatedAt: occurredAtProto, TargetMetadata: &admin.NodeExecutionClosure_TaskNodeMetadata{ TaskNodeMetadata: &admin.TaskNodeMetadata{ - CacheStatus: request.Event.GetTaskNodeMetadata().CacheStatus, - CatalogKey: request.Event.GetTaskNodeMetadata().CatalogKey, + CacheStatus: request.Event.GetTaskNodeMetadata().CacheStatus, + CatalogKey: request.Event.GetTaskNodeMetadata().CatalogKey, + CheckpointUri: request.Event.GetTaskNodeMetadata().CheckpointUri, }, }, }