diff --git a/flyteadmin/pkg/repositories/transformers/task_execution.go b/flyteadmin/pkg/repositories/transformers/task_execution.go index e3eca3884f..f57f4b6b3d 100644 --- a/flyteadmin/pkg/repositories/transformers/task_execution.go +++ b/flyteadmin/pkg/repositories/transformers/task_execution.go @@ -5,24 +5,22 @@ import ( "sort" "strconv" - "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces" - "github.com/flyteorg/flytestdlib/storage" - + jsonpatch "github.com/evanphx/json-patch" + "github.com/golang/protobuf/proto" + "github.com/golang/protobuf/ptypes" + _struct "github.com/golang/protobuf/ptypes/struct" + "google.golang.org/grpc/codes" "google.golang.org/protobuf/encoding/protojson" - jsonpatch "github.com/evanphx/json-patch" "github.com/flyteorg/flyteadmin/pkg/common" "github.com/flyteorg/flyteadmin/pkg/errors" "github.com/flyteorg/flyteadmin/pkg/repositories/models" + "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" "github.com/flyteorg/flytestdlib/logger" - "github.com/golang/protobuf/proto" - "github.com/golang/protobuf/ptypes" - _struct "github.com/golang/protobuf/ptypes/struct" - - "google.golang.org/grpc/codes" + "github.com/flyteorg/flytestdlib/storage" ) var empty _struct.Struct @@ -40,8 +38,13 @@ func addTaskStartedState(request *admin.TaskExecutionEventRequest, taskExecution if err != nil { return errors.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal occurredAt with error: %v", err) } - taskExecutionModel.StartedAt = &occurredAt - closure.StartedAt = request.Event.OccurredAt + //Updated the startedAt timestamp only if its not set. + // The task start event should already be updating this through addTaskStartedState + // This check makes sure any out of order + if taskExecutionModel.StartedAt == nil { + taskExecutionModel.StartedAt = &occurredAt + closure.StartedAt = request.Event.OccurredAt + } return nil } diff --git a/flyteadmin/pkg/repositories/transformers/task_execution_test.go b/flyteadmin/pkg/repositories/transformers/task_execution_test.go index da7af92a2c..e3155b12f7 100644 --- a/flyteadmin/pkg/repositories/transformers/task_execution_test.go +++ b/flyteadmin/pkg/repositories/transformers/task_execution_test.go @@ -19,12 +19,13 @@ import ( "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" + ptypesStruct "github.com/golang/protobuf/ptypes/struct" + "github.com/stretchr/testify/assert" + "github.com/flyteorg/flyteadmin/pkg/repositories/models" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" - ptypesStruct "github.com/golang/protobuf/ptypes/struct" - "github.com/stretchr/testify/assert" ) var taskEventOccurredAt = time.Now().UTC() @@ -73,23 +74,50 @@ func transformMapToStructPB(t *testing.T, thing map[string]string) *structpb.Str } func TestAddTaskStartedState(t *testing.T) { - var startedAt = time.Now().UTC() - var startedAtProto, _ = ptypes.TimestampProto(startedAt) - request := admin.TaskExecutionEventRequest{ - Event: &event.TaskExecutionEvent{ - Phase: core.TaskExecution_RUNNING, - OccurredAt: startedAtProto, - }, - } - taskExecutionModel := models.TaskExecution{} - closure := &admin.TaskExecutionClosure{} - err := addTaskStartedState(&request, &taskExecutionModel, closure) - assert.Nil(t, err) + t.Run("model with unset started At ", func(t *testing.T) { + var startedAt = time.Now().UTC() + var startedAtProto, _ = ptypes.TimestampProto(startedAt) + request := admin.TaskExecutionEventRequest{ + Event: &event.TaskExecutionEvent{ + Phase: core.TaskExecution_RUNNING, + OccurredAt: startedAtProto, + }, + } + taskExecutionModel := models.TaskExecution{} + closure := &admin.TaskExecutionClosure{} + err := addTaskStartedState(&request, &taskExecutionModel, closure) + assert.Nil(t, err) + + timestamp, err := ptypes.Timestamp(closure.StartedAt) + assert.Nil(t, err) + assert.Equal(t, startedAt, timestamp) + assert.Equal(t, &startedAt, taskExecutionModel.StartedAt) + }) + t.Run("model with set started At ", func(t *testing.T) { + var oldStartedAt = time.Now().UTC() + var newStartedAt = time.Now().UTC().Add(time.Minute * -10) + var startedAtProto, _ = ptypes.TimestampProto(newStartedAt) + request := admin.TaskExecutionEventRequest{ + Event: &event.TaskExecutionEvent{ + Phase: core.TaskExecution_RUNNING, + OccurredAt: startedAtProto, + }, + } + taskExecutionModel := models.TaskExecution{ + StartedAt: &oldStartedAt, + } + closure := &admin.TaskExecutionClosure{ + StartedAt: startedAtProto, + } + err := addTaskStartedState(&request, &taskExecutionModel, closure) + assert.Nil(t, err) + + timestamp, err := ptypes.Timestamp(closure.StartedAt) + assert.Nil(t, err) + assert.NotEqual(t, oldStartedAt, timestamp) + assert.Equal(t, &oldStartedAt, taskExecutionModel.StartedAt) + }) - timestamp, err := ptypes.Timestamp(closure.StartedAt) - assert.Nil(t, err) - assert.Equal(t, startedAt, timestamp) - assert.Equal(t, &startedAt, taskExecutionModel.StartedAt) } func TestAddTaskTerminalState_Error(t *testing.T) { @@ -106,6 +134,7 @@ func TestAddTaskTerminalState_Error(t *testing.T) { }, } startedAt := occurredAt.Add(-time.Minute) + startedAtProto, _ := ptypes.TimestampProto(startedAt) taskExecutionModel := models.TaskExecution{ StartedAt: &startedAt,