diff --git a/pkg/manager/impl/node_execution_manager_test.go b/pkg/manager/impl/node_execution_manager_test.go index 1b47f16bf..104a2c9ac 100644 --- a/pkg/manager/impl/node_execution_manager_test.go +++ b/pkg/manager/impl/node_execution_manager_test.go @@ -132,6 +132,9 @@ func TestCreateNodeEvent(t *testing.T) { StartedAt: occurredAtProto, CreatedAt: occurredAtProto, UpdatedAt: occurredAtProto, + TargetMetadata: &admin.NodeExecutionClosure_TaskNodeMetadata{ + TaskNodeMetadata: &admin.TaskNodeMetadata{}, + }, } closureBytes, _ := proto.Marshal(&expectedClosure) repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).SetCreateCallback( diff --git a/pkg/repositories/transformers/node_execution_test.go b/pkg/repositories/transformers/node_execution_test.go index a06d762c7..e07bff85b 100644 --- a/pkg/repositories/transformers/node_execution_test.go +++ b/pkg/repositories/transformers/node_execution_test.go @@ -188,33 +188,49 @@ func TestAddTerminalState_Error(t *testing.T) { func TestCreateNodeExecutionModel(t *testing.T) { parentTaskExecID := uint(8) - nodeExecutionModel, err := CreateNodeExecutionModel(context.TODO(), ToNodeExecutionModelInput{ - Request: &admin.NodeExecutionEventRequest{ - Event: &event.NodeExecutionEvent{ - Id: &core.NodeExecutionIdentifier{ - NodeId: "node id", - ExecutionId: &core.WorkflowExecutionIdentifier{ - Project: "project", - Domain: "domain", - Name: "name", - }, - }, - Phase: core.NodeExecution_RUNNING, - InputUri: "input uri", - OutputResult: &event.NodeExecutionEvent_OutputUri{ - OutputUri: "output uri", + request := &admin.NodeExecutionEventRequest{ + Event: &event.NodeExecutionEvent{ + Id: &core.NodeExecutionIdentifier{ + NodeId: "node id", + ExecutionId: &core.WorkflowExecutionIdentifier{ + Project: "project", + Domain: "domain", + Name: "name", }, - OccurredAt: occurredAtProto, - ParentTaskMetadata: &event.ParentTaskExecutionMetadata{ - Id: &core.TaskExecutionIdentifier{ - RetryAttempt: 1, + }, + Phase: core.NodeExecution_RUNNING, + InputUri: "input uri", + OutputResult: &event.NodeExecutionEvent_OutputUri{ + OutputUri: "output uri", + }, + OccurredAt: occurredAtProto, + TargetMetadata: &event.NodeExecutionEvent_TaskNodeMetadata{ + TaskNodeMetadata: &event.TaskNodeMetadata{ + CacheStatus: core.CatalogCacheStatus_CACHE_POPULATED, + CatalogKey: &core.CatalogMetadata{ + DatasetId: &core.Identifier{ + ResourceType: core.ResourceType_DATASET, + Name: "x", + Project: "proj", + Domain: "domain", + }, }, + CheckpointUri: "last checkpoint uri", + }, + }, + ParentTaskMetadata: &event.ParentTaskExecutionMetadata{ + Id: &core.TaskExecutionIdentifier{ + RetryAttempt: 1, }, - IsParent: true, - IsDynamic: true, - EventVersion: 2, }, + IsParent: true, + IsDynamic: true, + EventVersion: 2, }, + } + + nodeExecutionModel, err := CreateNodeExecutionModel(context.TODO(), ToNodeExecutionModelInput{ + Request: request, ParentTaskExecutionID: &parentTaskExecID, }) assert.Nil(t, err) @@ -224,6 +240,13 @@ func TestCreateNodeExecutionModel(t *testing.T) { StartedAt: occurredAtProto, CreatedAt: occurredAtProto, UpdatedAt: occurredAtProto, + TargetMetadata: &admin.NodeExecutionClosure_TaskNodeMetadata{ + TaskNodeMetadata: &admin.TaskNodeMetadata{ + CacheStatus: request.Event.GetTaskNodeMetadata().CacheStatus, + CatalogKey: request.Event.GetTaskNodeMetadata().CatalogKey, + CheckpointUri: request.Event.GetTaskNodeMetadata().CheckpointUri, + }, + }, } var closureBytes, _ = proto.Marshal(closure) var nodeExecutionMetadata, _ = proto.Marshal(&admin.NodeExecutionMetaData{ @@ -234,6 +257,7 @@ func TestCreateNodeExecutionModel(t *testing.T) { EventVersion: 2, } internalDataBytes, _ := proto.Marshal(internalData) + cacheStatus := request.Event.GetTaskNodeMetadata().CacheStatus.String() assert.Equal(t, &models.NodeExecution{ NodeExecutionKey: models.NodeExecutionKey{ NodeID: "node id", @@ -251,6 +275,7 @@ func TestCreateNodeExecutionModel(t *testing.T) { NodeExecutionUpdatedAt: &occurredAt, NodeExecutionMetadata: nodeExecutionMetadata, ParentTaskExecutionID: &parentTaskExecID, + CacheStatus: &cacheStatus, InternalData: internalDataBytes, }, nodeExecutionModel) }