Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
adding cache information on first node event
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw committed Oct 10, 2022
1 parent d1ebaae commit 6a8ffa8
Showing 1 changed file with 18 additions and 0 deletions.
18 changes: 18 additions & 0 deletions pkg/repositories/transformers/node_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,30 @@ func CreateNodeExecutionModel(ctx context.Context, input ToNodeExecutionModelInp
return nil, err
}
}

if common.IsNodeExecutionTerminal(input.Request.Event.Phase) {
err := addTerminalState(ctx, input.Request, nodeExecution, &closure, input.InlineEventDataPolicy, input.StorageClient)
if err != nil {
return nil, err
}
}

// Update TaskNodeMetadata, which includes caching information today.
if input.Request.Event.GetTaskNodeMetadata() != nil {
targetMetadata := &admin.NodeExecutionClosure_TaskNodeMetadata{
TaskNodeMetadata: &admin.TaskNodeMetadata{
CheckpointUri: input.Request.Event.GetTaskNodeMetadata().CheckpointUri,
},
}
if input.Request.Event.GetTaskNodeMetadata().CatalogKey != nil {
st := input.Request.Event.GetTaskNodeMetadata().GetCacheStatus().String()
targetMetadata.TaskNodeMetadata.CacheStatus = input.Request.Event.GetTaskNodeMetadata().GetCacheStatus()
targetMetadata.TaskNodeMetadata.CatalogKey = input.Request.Event.GetTaskNodeMetadata().GetCatalogKey()
nodeExecution.CacheStatus = &st
}
closure.TargetMetadata = targetMetadata
}

marshaledClosure, err := proto.Marshal(&closure)
if err != nil {
return nil, errors.NewFlyteAdminErrorf(
Expand Down

0 comments on commit 6a8ffa8

Please sign in to comment.