Skip to content

Commit

Permalink
Modify log link merge to take name into account (flyteorg#212)
Browse files Browse the repository at this point in the history
* Modify log link merge to take name into account

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* Handle log link with no name

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* lint

Signed-off-by: Haytham Abuelfutuh <[email protected]>
  • Loading branch information
EngHabu authored Jun 30, 2021
1 parent 5426c46 commit eacb9a9
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 6 deletions.
26 changes: 20 additions & 6 deletions flyteadmin/pkg/repositories/transformers/task_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,26 +139,40 @@ func CreateTaskExecutionModel(input CreateTaskExecutionModelInput) (*models.Task
return taskExecution, nil
}

// Returns the unique list of logs across an existing list and the latest list sent in a task execution event update.
// mergeLogs returns the unique list of logs across an existing list and the latest list sent in a task execution event
// update.
// It returns all the new logs receives + any existing log that hasn't been overwritten by a new log.
// An existing logLink is said to have been overwritten if a new logLink with the same Uri or the same Name has been
// received.
func mergeLogs(existing, latest []*core.TaskLog) []*core.TaskLog {
if len(latest) == 0 {
return existing
}

if len(existing) == 0 {
return latest
}
latestSet := make(map[string]*core.TaskLog, len(latest))

latestSetByURI := make(map[string]*core.TaskLog, len(latest))
latestSetByName := make(map[string]*core.TaskLog, len(latest))
for _, latestLog := range latest {
latestSet[latestLog.Uri] = latestLog
latestSetByURI[latestLog.Uri] = latestLog
if len(latestLog.Name) > 0 {
latestSetByName[latestLog.Name] = latestLog
}
}

// Copy over the latest logs since names will change for existing logs as a task transitions across phases.
logs := latest
for _, existingLog := range existing {
if _, ok := latestSet[existingLog.Uri]; !ok {
// We haven't seen this log before: add it to the output result list.
logs = append(logs, existingLog)
if _, ok := latestSetByURI[existingLog.Uri]; !ok {
if _, ok = latestSetByName[existingLog.Name]; !ok {
// We haven't seen this log before: add it to the output result list.
logs = append(logs, existingLog)
}
}
}

return logs
}

Expand Down
75 changes: 75 additions & 0 deletions flyteadmin/pkg/repositories/transformers/task_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,81 @@ func TestMergeLogs(t *testing.T) {
}

testCases := []testCase{
{
existing: []*core.TaskLog{
{
Uri: "uri_a",
},
{
Uri: "uri_b",
},
},
latest: []*core.TaskLog{
{
Uri: "uri_b",
},
{
Uri: "uri_c",
},
},
expected: []*core.TaskLog{
{
Uri: "uri_b",
},
{
Uri: "uri_c",
},
{
Uri: "uri_a",
},
},
name: "Merge logs with empty names",
},
{
existing: []*core.TaskLog{
{
Uri: "uri_a",
Name: "name_a",
},
{
Uri: "uri_b_old",
Name: "name_b",
},
{
Uri: "uri_c",
Name: "name_c",
},
},
latest: []*core.TaskLog{
{
Uri: "uri_b",
Name: "name_b",
},
{
Uri: "uri_d",
Name: "name_d",
},
},
expected: []*core.TaskLog{
{
Uri: "uri_b",
Name: "name_b",
},
{
Uri: "uri_d",
Name: "name_d",
},
{
Uri: "uri_a",
Name: "name_a",
},
{
Uri: "uri_c",
Name: "name_c",
},
},
name: "Merge unique logs by name",
},
{
existing: []*core.TaskLog{
{
Expand Down

0 comments on commit eacb9a9

Please sign in to comment.