Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Merge 049e37c into cca9e17
Browse files Browse the repository at this point in the history
  • Loading branch information
hamersaw authored Mar 29, 2023
2 parents cca9e17 + 049e37c commit dca1a2d
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 24 deletions.
3 changes: 3 additions & 0 deletions pkg/manager/impl/node_execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ func TestCreateNodeEvent(t *testing.T) {
StartedAt: occurredAtProto,
CreatedAt: occurredAtProto,
UpdatedAt: occurredAtProto,
TargetMetadata: &admin.NodeExecutionClosure_TaskNodeMetadata{
TaskNodeMetadata: &admin.TaskNodeMetadata{},
},
}
closureBytes, _ := proto.Marshal(&expectedClosure)
repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).SetCreateCallback(
Expand Down
18 changes: 18 additions & 0 deletions pkg/repositories/transformers/node_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,30 @@ func CreateNodeExecutionModel(ctx context.Context, input ToNodeExecutionModelInp
return nil, err
}
}

if common.IsNodeExecutionTerminal(input.Request.Event.Phase) {
err := addTerminalState(ctx, input.Request, nodeExecution, &closure, input.InlineEventDataPolicy, input.StorageClient)
if err != nil {
return nil, err
}
}

// Update TaskNodeMetadata, which includes caching information today.
if input.Request.Event.GetTaskNodeMetadata() != nil {
targetMetadata := &admin.NodeExecutionClosure_TaskNodeMetadata{
TaskNodeMetadata: &admin.TaskNodeMetadata{
CheckpointUri: input.Request.Event.GetTaskNodeMetadata().CheckpointUri,
},
}
if input.Request.Event.GetTaskNodeMetadata().CatalogKey != nil {
st := input.Request.Event.GetTaskNodeMetadata().GetCacheStatus().String()
targetMetadata.TaskNodeMetadata.CacheStatus = input.Request.Event.GetTaskNodeMetadata().GetCacheStatus()
targetMetadata.TaskNodeMetadata.CatalogKey = input.Request.Event.GetTaskNodeMetadata().GetCatalogKey()
nodeExecution.CacheStatus = &st
}
closure.TargetMetadata = targetMetadata
}

marshaledClosure, err := proto.Marshal(&closure)
if err != nil {
return nil, errors.NewFlyteAdminErrorf(
Expand Down
73 changes: 49 additions & 24 deletions pkg/repositories/transformers/node_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,35 +199,51 @@ func TestAddTerminalState_Error(t *testing.T) {

func TestCreateNodeExecutionModel(t *testing.T) {
parentTaskExecID := uint(8)
nodeExecutionModel, err := CreateNodeExecutionModel(context.TODO(), ToNodeExecutionModelInput{
Request: &admin.NodeExecutionEventRequest{
Event: &event.NodeExecutionEvent{
Id: &core.NodeExecutionIdentifier{
NodeId: "node id",
ExecutionId: &core.WorkflowExecutionIdentifier{
Project: "project",
Domain: "domain",
Name: "name",
},
},
Phase: core.NodeExecution_RUNNING,
InputValue: &event.NodeExecutionEvent_InputUri{
InputUri: testInputURI,
},
OutputResult: &event.NodeExecutionEvent_OutputUri{
OutputUri: "output uri",
request := &admin.NodeExecutionEventRequest{
Event: &event.NodeExecutionEvent{
Id: &core.NodeExecutionIdentifier{
NodeId: "node id",
ExecutionId: &core.WorkflowExecutionIdentifier{
Project: "project",
Domain: "domain",
Name: "name",
},
OccurredAt: occurredAtProto,
ParentTaskMetadata: &event.ParentTaskExecutionMetadata{
Id: &core.TaskExecutionIdentifier{
RetryAttempt: 1,
},
Phase: core.NodeExecution_RUNNING,
InputValue: &event.NodeExecutionEvent_InputUri{
InputUri: testInputURI,
},
OutputResult: &event.NodeExecutionEvent_OutputUri{
OutputUri: "output uri",
},
OccurredAt: occurredAtProto,
TargetMetadata: &event.NodeExecutionEvent_TaskNodeMetadata{
TaskNodeMetadata: &event.TaskNodeMetadata{
CacheStatus: core.CatalogCacheStatus_CACHE_POPULATED,
CatalogKey: &core.CatalogMetadata{
DatasetId: &core.Identifier{
ResourceType: core.ResourceType_DATASET,
Name: "x",
Project: "proj",
Domain: "domain",
},
},
CheckpointUri: "last checkpoint uri",
},
IsParent: true,
IsDynamic: true,
EventVersion: 2,
},
ParentTaskMetadata: &event.ParentTaskExecutionMetadata{
Id: &core.TaskExecutionIdentifier{
RetryAttempt: 1,
},
},
IsParent: true,
IsDynamic: true,
EventVersion: 2,
},
}

nodeExecutionModel, err := CreateNodeExecutionModel(context.TODO(), ToNodeExecutionModelInput{
Request: request,
ParentTaskExecutionID: &parentTaskExecID,
})
assert.Nil(t, err)
Expand All @@ -237,6 +253,13 @@ func TestCreateNodeExecutionModel(t *testing.T) {
StartedAt: occurredAtProto,
CreatedAt: occurredAtProto,
UpdatedAt: occurredAtProto,
TargetMetadata: &admin.NodeExecutionClosure_TaskNodeMetadata{
TaskNodeMetadata: &admin.TaskNodeMetadata{
CacheStatus: request.Event.GetTaskNodeMetadata().CacheStatus,
CatalogKey: request.Event.GetTaskNodeMetadata().CatalogKey,
CheckpointUri: request.Event.GetTaskNodeMetadata().CheckpointUri,
},
},
}
var closureBytes, _ = proto.Marshal(closure)
var nodeExecutionMetadata, _ = proto.Marshal(&admin.NodeExecutionMetaData{
Expand All @@ -247,6 +270,7 @@ func TestCreateNodeExecutionModel(t *testing.T) {
EventVersion: 2,
}
internalDataBytes, _ := proto.Marshal(internalData)
cacheStatus := request.Event.GetTaskNodeMetadata().CacheStatus.String()
assert.Equal(t, &models.NodeExecution{
NodeExecutionKey: models.NodeExecutionKey{
NodeID: "node id",
Expand All @@ -264,6 +288,7 @@ func TestCreateNodeExecutionModel(t *testing.T) {
NodeExecutionUpdatedAt: &occurredAt,
NodeExecutionMetadata: nodeExecutionMetadata,
ParentTaskExecutionID: &parentTaskExecID,
CacheStatus: &cacheStatus,
InternalData: internalDataBytes,
}, nodeExecutionModel)
}
Expand Down

0 comments on commit dca1a2d

Please sign in to comment.