Skip to content

Commit

Permalink
Update startedAt timestamp only if not set (flyteorg#567)
Browse files Browse the repository at this point in the history
* Update startedAt timestamp only if not set

Signed-off-by: pmahindrakar-oss <[email protected]>

* sort imports

Signed-off-by: pmahindrakar-oss <[email protected]>

---------

Signed-off-by: pmahindrakar-oss <[email protected]>
  • Loading branch information
pmahindrakar-oss authored May 22, 2023
1 parent 7649683 commit ca0ebf1
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 29 deletions.
25 changes: 14 additions & 11 deletions flyteadmin/pkg/repositories/transformers/task_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,22 @@ import (
"sort"
"strconv"

"github.com/flyteorg/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flytestdlib/storage"

jsonpatch "github.com/evanphx/json-patch"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
_struct "github.com/golang/protobuf/ptypes/struct"
"google.golang.org/grpc/codes"
"google.golang.org/protobuf/encoding/protojson"

jsonpatch "github.com/evanphx/json-patch"
"github.com/flyteorg/flyteadmin/pkg/common"
"github.com/flyteorg/flyteadmin/pkg/errors"
"github.com/flyteorg/flyteadmin/pkg/repositories/models"
"github.com/flyteorg/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"
"github.com/flyteorg/flytestdlib/logger"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
_struct "github.com/golang/protobuf/ptypes/struct"

"google.golang.org/grpc/codes"
"github.com/flyteorg/flytestdlib/storage"
)

var empty _struct.Struct
Expand All @@ -40,8 +38,13 @@ func addTaskStartedState(request *admin.TaskExecutionEventRequest, taskExecution
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal occurredAt with error: %v", err)
}
taskExecutionModel.StartedAt = &occurredAt
closure.StartedAt = request.Event.OccurredAt
//Updated the startedAt timestamp only if its not set.
// The task start event should already be updating this through addTaskStartedState
// This check makes sure any out of order
if taskExecutionModel.StartedAt == nil {
taskExecutionModel.StartedAt = &occurredAt
closure.StartedAt = request.Event.OccurredAt
}
return nil
}

Expand Down
65 changes: 47 additions & 18 deletions flyteadmin/pkg/repositories/transformers/task_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ import (
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"

ptypesStruct "github.com/golang/protobuf/ptypes/struct"
"github.com/stretchr/testify/assert"

"github.com/flyteorg/flyteadmin/pkg/repositories/models"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"
ptypesStruct "github.com/golang/protobuf/ptypes/struct"
"github.com/stretchr/testify/assert"
)

var taskEventOccurredAt = time.Now().UTC()
Expand Down Expand Up @@ -73,23 +74,50 @@ func transformMapToStructPB(t *testing.T, thing map[string]string) *structpb.Str
}

func TestAddTaskStartedState(t *testing.T) {
var startedAt = time.Now().UTC()
var startedAtProto, _ = ptypes.TimestampProto(startedAt)
request := admin.TaskExecutionEventRequest{
Event: &event.TaskExecutionEvent{
Phase: core.TaskExecution_RUNNING,
OccurredAt: startedAtProto,
},
}
taskExecutionModel := models.TaskExecution{}
closure := &admin.TaskExecutionClosure{}
err := addTaskStartedState(&request, &taskExecutionModel, closure)
assert.Nil(t, err)
t.Run("model with unset started At ", func(t *testing.T) {
var startedAt = time.Now().UTC()
var startedAtProto, _ = ptypes.TimestampProto(startedAt)
request := admin.TaskExecutionEventRequest{
Event: &event.TaskExecutionEvent{
Phase: core.TaskExecution_RUNNING,
OccurredAt: startedAtProto,
},
}
taskExecutionModel := models.TaskExecution{}
closure := &admin.TaskExecutionClosure{}
err := addTaskStartedState(&request, &taskExecutionModel, closure)
assert.Nil(t, err)

timestamp, err := ptypes.Timestamp(closure.StartedAt)
assert.Nil(t, err)
assert.Equal(t, startedAt, timestamp)
assert.Equal(t, &startedAt, taskExecutionModel.StartedAt)
})
t.Run("model with set started At ", func(t *testing.T) {
var oldStartedAt = time.Now().UTC()
var newStartedAt = time.Now().UTC().Add(time.Minute * -10)
var startedAtProto, _ = ptypes.TimestampProto(newStartedAt)
request := admin.TaskExecutionEventRequest{
Event: &event.TaskExecutionEvent{
Phase: core.TaskExecution_RUNNING,
OccurredAt: startedAtProto,
},
}
taskExecutionModel := models.TaskExecution{
StartedAt: &oldStartedAt,
}
closure := &admin.TaskExecutionClosure{
StartedAt: startedAtProto,
}
err := addTaskStartedState(&request, &taskExecutionModel, closure)
assert.Nil(t, err)

timestamp, err := ptypes.Timestamp(closure.StartedAt)
assert.Nil(t, err)
assert.NotEqual(t, oldStartedAt, timestamp)
assert.Equal(t, &oldStartedAt, taskExecutionModel.StartedAt)
})

timestamp, err := ptypes.Timestamp(closure.StartedAt)
assert.Nil(t, err)
assert.Equal(t, startedAt, timestamp)
assert.Equal(t, &startedAt, taskExecutionModel.StartedAt)
}

func TestAddTaskTerminalState_Error(t *testing.T) {
Expand All @@ -106,6 +134,7 @@ func TestAddTaskTerminalState_Error(t *testing.T) {
},
}
startedAt := occurredAt.Add(-time.Minute)

startedAtProto, _ := ptypes.TimestampProto(startedAt)
taskExecutionModel := models.TaskExecution{
StartedAt: &startedAt,
Expand Down

0 comments on commit ca0ebf1

Please sign in to comment.