Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Fixing pod plugin event reporting timestamps #307

Merged
merged 9 commits into from
Mar 23, 2023
7 changes: 5 additions & 2 deletions go/tasks/pluginmachinery/core/phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,12 @@ type ExternalResource struct {
type TaskInfo struct {
// log information for the task execution
Logs []*core.TaskLog
// Set this value to the intended time when the status occurred at. If not provided, will be defaulted to the current
// time at the time of publishing the event.
// This value represents the time the status occurred at. If not provided, it will be defaulted to the time Flyte
// checked the task status.
OccurredAt *time.Time
// This value represents the time the status was reported at. If not provided, will be defaulted to the current time
// when Flyte published the event.
ReportedAt *time.Time
// Custom Event information that the plugin would like to expose to the front-end
CustomInfo *structpb.Struct
// A collection of information about external resources launched by this task
Expand Down
19 changes: 16 additions & 3 deletions go/tasks/pluginmachinery/flytek8s/pod_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,13 +656,13 @@ func GetLastTransitionOccurredAt(pod *v1.Pod) metav1.Time {
var lastTransitionTime metav1.Time
containerStatuses := append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...)
for _, containerStatus := range containerStatuses {
if r := containerStatus.LastTerminationState.Running; r != nil {
if r := containerStatus.State.Running; r != nil {
if r.StartedAt.Unix() > lastTransitionTime.Unix() {
lastTransitionTime = r.StartedAt
}
} else if r := containerStatus.LastTerminationState.Terminated; r != nil {
} else if r := containerStatus.State.Terminated; r != nil {
if r.FinishedAt.Unix() > lastTransitionTime.Unix() {
lastTransitionTime = r.StartedAt
lastTransitionTime = r.FinishedAt
}
}
}
Expand All @@ -673,3 +673,16 @@ func GetLastTransitionOccurredAt(pod *v1.Pod) metav1.Time {

return lastTransitionTime
}

func GetReportedAt(pod *v1.Pod) metav1.Time {
var reportedAt metav1.Time
for _, condition := range pod.Status.Conditions {
if condition.Reason == "PodCompleted" && condition.Type == v1.PodReady && condition.Status == v1.ConditionFalse {
if condition.LastTransitionTime.Unix() > reportedAt.Unix() {
reportedAt = condition.LastTransitionTime
}
}
}

return reportedAt
}
6 changes: 6 additions & 0 deletions go/tasks/plugins/k8s/pod/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,14 @@ func (plugin) GetTaskPhaseWithLogs(ctx context.Context, pluginContext k8s.Plugin
pod := r.(*v1.Pod)

transitionOccurredAt := flytek8s.GetLastTransitionOccurredAt(pod).Time
reportedAt := flytek8s.GetReportedAt(pod).Time
if reportedAt.IsZero() {
reportedAt = transitionOccurredAt
}

info := pluginsCore.TaskInfo{
OccurredAt: &transitionOccurredAt,
ReportedAt: &reportedAt,
}

if pod.Status.Phase != v1.PodPending && pod.Status.Phase != v1.PodUnknown {
Expand Down