Skip to content

Commit

Permalink
Tracking reasons time-series (flyteorg#540)
Browse files Browse the repository at this point in the history
* tracking reasons time-series

Signed-off-by: Daniel Rammer <[email protected]>

* bumped flyteidl dep and added comment

Signed-off-by: Daniel Rammer <[email protected]>

---------

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored Mar 22, 2023
1 parent c398f7c commit 2f51278
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 4 deletions.
2 changes: 1 addition & 1 deletion flyteadmin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/cloudevents/sdk-go/v2 v2.8.0
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/evanphx/json-patch v4.12.0+incompatible
github.com/flyteorg/flyteidl v1.3.9
github.com/flyteorg/flyteidl v1.3.13
github.com/flyteorg/flyteplugins v1.0.40
github.com/flyteorg/flytepropeller v1.1.70
github.com/flyteorg/flytestdlib v1.0.15
Expand Down
4 changes: 2 additions & 2 deletions flyteadmin/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,8 @@ github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v1.3.9 h1:MHUa89yKwCz58mQC2OxTzYjr0d3fA14qKG462v+RAyk=
github.com/flyteorg/flyteidl v1.3.9/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM=
github.com/flyteorg/flyteidl v1.3.13 h1:jOjiHl6jmSCOGC094QaRdSjjhThhzYPm0jHSxwAZ6UM=
github.com/flyteorg/flyteidl v1.3.13/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM=
github.com/flyteorg/flyteplugins v1.0.40 h1:RTsYingqmqr13qBbi4CB2ArXDHNHUOkAF+HTLJQiQ/s=
github.com/flyteorg/flyteplugins v1.0.40/go.mod h1:qyUPqVspLcLGJpKxVwHDWf+kBpOGuItOxCaF6zAmDio=
github.com/flyteorg/flytepropeller v1.1.70 h1:/d1qqz13rdVADM85ST70eerAdBstJJz9UUB/mNSZi0w=
Expand Down
21 changes: 21 additions & 0 deletions flyteadmin/pkg/repositories/transformers/task_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,15 @@ func CreateTaskExecutionModel(ctx context.Context, input CreateTaskExecutionMode
EventVersion: input.Request.Event.EventVersion,
}

if len(input.Request.Event.Reason) > 0 {
closure.Reasons = []*admin.Reason{
&admin.Reason{
OccurredAt: input.Request.Event.OccurredAt,
Message: input.Request.Event.Reason,
},
}
}

eventPhase := input.Request.Event.Phase

// Different tasks may report different phases as their first event.
Expand Down Expand Up @@ -362,6 +371,18 @@ func UpdateTaskExecutionModel(ctx context.Context, request *admin.TaskExecutionE
taskExecutionClosure.UpdatedAt = request.Event.OccurredAt
taskExecutionClosure.Logs = mergeLogs(taskExecutionClosure.Logs, request.Event.Logs)
if len(request.Event.Reason) > 0 {
if taskExecutionClosure.Reason != request.Event.Reason {
// by tracking a time-series of reasons we increase the size of the TaskExecutionClosure in scenarios where
// a task reports a large number of unique reasons. if this size increase becomes problematic we this logic
// will need to be revisited.
taskExecutionClosure.Reasons = append(
taskExecutionClosure.Reasons,
&admin.Reason{
OccurredAt: request.Event.OccurredAt,
Message: request.Event.Reason,
})
}

taskExecutionClosure.Reason = request.Event.Reason
}
if existingTaskPhase != core.TaskExecution_RUNNING.String() && taskExecutionModel.Phase == core.TaskExecution_RUNNING.String() {
Expand Down
27 changes: 26 additions & 1 deletion flyteadmin/pkg/repositories/transformers/task_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,13 @@ func TestCreateTaskExecutionModelQueued(t *testing.T) {
CreatedAt: taskEventOccurredAtProto,
UpdatedAt: taskEventOccurredAtProto,
Reason: "Task was scheduled",
TaskType: "sidecar",
Reasons: []*admin.Reason{
&admin.Reason{
OccurredAt: taskEventOccurredAtProto,
Message: "Task was scheduled",
},
},
TaskType: "sidecar",
}

expectedClosureBytes, err := proto.Marshal(expectedClosure)
Expand Down Expand Up @@ -338,6 +344,8 @@ func TestCreateTaskExecutionModelRunning(t *testing.T) {
CustomInfo: &customInfo,
}

t.Logf("expected %+v %+v\n", expectedClosure.Reason, expectedClosure.Reasons)

expectedClosureBytes, err := proto.Marshal(expectedClosure)
assert.Nil(t, err)

Expand Down Expand Up @@ -386,6 +394,13 @@ func TestUpdateTaskExecutionModelRunningToFailed(t *testing.T) {
CustomInfo: transformMapToStructPB(t, map[string]string{
"key1": "value1",
}),
Reason: "Task was scheduled",
Reasons: []*admin.Reason{
&admin.Reason{
OccurredAt: taskEventOccurredAtProto,
Message: "Task was scheduled",
},
},
}

closureBytes, err := proto.Marshal(existingClosure)
Expand Down Expand Up @@ -481,6 +496,16 @@ func TestUpdateTaskExecutionModelRunningToFailed(t *testing.T) {
"key1": "value1 updated",
}),
Reason: "task failed",
Reasons: []*admin.Reason{
&admin.Reason{
OccurredAt: taskEventOccurredAtProto,
Message: "Task was scheduled",
},
&admin.Reason{
OccurredAt: occuredAtProto,
Message: "task failed",
},
},
}

expectedClosureBytes, err := proto.Marshal(expectedClosure)
Expand Down

0 comments on commit 2f51278

Please sign in to comment.