From 76d69569744f409b3e608919449f9a53415dd14a Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Tue, 22 Oct 2019 10:56:02 -0700 Subject: [PATCH] Merge logs on task execution event updates (#18) --- .../transformers/task_execution.go | 25 +++- .../transformers/task_execution_test.go | 121 ++++++++++++++++-- 2 files changed, 137 insertions(+), 9 deletions(-) diff --git a/flyteadmin/pkg/repositories/transformers/task_execution.go b/flyteadmin/pkg/repositories/transformers/task_execution.go index c5a5a2ca70..c41f7e7ad7 100644 --- a/flyteadmin/pkg/repositories/transformers/task_execution.go +++ b/flyteadmin/pkg/repositories/transformers/task_execution.go @@ -129,6 +129,29 @@ func CreateTaskExecutionModel(input CreateTaskExecutionModelInput) (*models.Task return taskExecution, nil } +// Returns the unique list of logs across an existing list and the latest list sent in a task execution event update. +func mergeLogs(existing, latest []*core.TaskLog) []*core.TaskLog { + if len(latest) == 0 { + return existing + } + if len(existing) == 0 { + return latest + } + existingSet := make(map[string]*core.TaskLog, len(existing)) + for _, existingLog := range existing { + existingSet[existingLog.Uri] = existingLog + } + // Copy over the existing logs from the input list so we preserve the original order. + logs := existing + for _, latestLog := range latest { + if _, ok := existingSet[latestLog.Uri]; !ok { + // We haven't seen this log before: add it to the output result list. + logs = append(logs, latestLog) + } + } + return logs +} + func UpdateTaskExecutionModel(request *admin.TaskExecutionEventRequest, taskExecutionModel *models.TaskExecution) error { var taskExecutionClosure admin.TaskExecutionClosure err := proto.Unmarshal(taskExecutionModel.Closure, &taskExecutionClosure) @@ -141,7 +164,7 @@ func UpdateTaskExecutionModel(request *admin.TaskExecutionEventRequest, taskExec taskExecutionModel.PhaseVersion = request.Event.PhaseVersion taskExecutionClosure.Phase = request.Event.Phase taskExecutionClosure.UpdatedAt = request.Event.OccurredAt - taskExecutionClosure.Logs = request.Event.Logs + taskExecutionClosure.Logs = mergeLogs(taskExecutionClosure.Logs, request.Event.Logs) if (existingTaskPhase == core.TaskExecution_QUEUED.String() || existingTaskPhase == core.TaskExecution_UNDEFINED.String()) && taskExecutionModel.Phase == core.TaskExecution_RUNNING.String() { err = addTaskStartedState(request, taskExecutionModel, &taskExecutionClosure) if err != nil { diff --git a/flyteadmin/pkg/repositories/transformers/task_execution_test.go b/flyteadmin/pkg/repositories/transformers/task_execution_test.go index 3795322a2c..f175a9e243 100644 --- a/flyteadmin/pkg/repositories/transformers/task_execution_test.go +++ b/flyteadmin/pkg/repositories/transformers/task_execution_test.go @@ -1,6 +1,7 @@ package transformers import ( + "fmt" "testing" "time" @@ -260,6 +261,14 @@ func TestUpdateTaskExecutionModelRunningToFailed(t *testing.T) { StartedAt: taskEventOccurredAtProto, CreatedAt: taskEventOccurredAtProto, UpdatedAt: taskEventOccurredAtProto, + Logs: []*core.TaskLog{ + { + Uri: "uri_a", + }, + { + Uri: "uri_b", + }, + }, } closureBytes, err := proto.Marshal(existingClosure) @@ -312,12 +321,10 @@ func TestUpdateTaskExecutionModelRunningToFailed(t *testing.T) { OccurredAt: occuredAtProto, Logs: []*core.TaskLog{ { - Name: "some_log", - Uri: "some_uri", + Uri: "uri_b", }, { - Name: "some_log2", - Uri: "some_uri2", + Uri: "uri_c", }, }, CustomInfo: &customInfo, @@ -338,12 +345,13 @@ func TestUpdateTaskExecutionModelRunningToFailed(t *testing.T) { }, Logs: []*core.TaskLog{ { - Name: "some_log", - Uri: "some_uri", + Uri: "uri_a", }, { - Name: "some_log2", - Uri: "some_uri2", + Uri: "uri_b", + }, + { + Uri: "uri_c", }, }, CustomInfo: &customInfo, @@ -501,3 +509,100 @@ func TestFromTaskExecutionModels(t *testing.T) { Closure: taskClosure, }, taskExecutions[0])) } + +func TestMergeLogs(t *testing.T) { + type testCase struct { + existing []*core.TaskLog + latest []*core.TaskLog + expected []*core.TaskLog + name string + } + + testCases := []testCase{ + { + existing: []*core.TaskLog{ + { + Uri: "uri_a", + Name: "name_a", + }, + { + Uri: "uri_b", + Name: "name_b", + }, + { + Uri: "uri_c", + Name: "name_c", + }, + }, + latest: []*core.TaskLog{ + { + Uri: "uri_b", + Name: "name_b", + }, + { + Uri: "uri_d", + Name: "name_d", + }, + }, + expected: []*core.TaskLog{ + { + Uri: "uri_a", + Name: "name_a", + }, + { + Uri: "uri_b", + Name: "name_b", + }, + { + Uri: "uri_c", + Name: "name_c", + }, + { + Uri: "uri_d", + Name: "name_d", + }, + }, + name: "Merge unique logs", + }, + { + latest: []*core.TaskLog{ + { + Uri: "uri_b", + Name: "name_b", + }, + }, + expected: []*core.TaskLog{ + { + Uri: "uri_b", + Name: "name_b", + }, + }, + name: "Empty existing logs", + }, + { + existing: []*core.TaskLog{ + { + Uri: "uri_b", + Name: "name_b", + }, + }, + expected: []*core.TaskLog{ + { + Uri: "uri_b", + Name: "name_b", + }, + }, + name: "Empty latest logs", + }, + { + name: "Nothing to do", + }, + } + for _, mergeTestCase := range testCases { + actual := mergeLogs(mergeTestCase.existing, mergeTestCase.latest) + assert.Equal(t, len(mergeTestCase.expected), len(actual), fmt.Sprintf("%s failed", mergeTestCase.name)) + for idx, expectedLog := range mergeTestCase.expected { + assert.True(t, proto.Equal(expectedLog, actual[idx]), fmt.Sprintf("%s failed", mergeTestCase.name)) + } + } +}