Skip to content

Commit

Permalink
Add missing sub-task log links (flyteorg#62)
Browse files Browse the repository at this point in the history
* Add missing sub-task log links

* PR Comments

* Update deps

* Only update stdlib

* Unit tests
  • Loading branch information
EngHabu authored Mar 20, 2020
1 parent f17240c commit bdee697
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 43 deletions.
2 changes: 1 addition & 1 deletion flyteplugins/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/googleapis/gnostic v0.4.1 // indirect
github.com/hashicorp/golang-lru v0.5.4
github.com/lyft/flyteidl v0.17.6
github.com/lyft/flytestdlib v0.3.2
github.com/lyft/flytestdlib v0.3.3
github.com/magiconair/properties v1.8.1
github.com/mitchellh/mapstructure v1.1.2
github.com/pkg/errors v0.9.1
Expand Down
2 changes: 2 additions & 0 deletions flyteplugins/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,8 @@ github.com/lyft/flytestdlib v0.3.0 h1:nIkX4MlyYdcLLzaF35RI2P5BhARt+qMgHoFto8eVNz
github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/flytestdlib v0.3.2 h1:bY6Y+Fg6Jdc7zY4GAYuR7t2hjWwynIdmRvtLcRNaGnw=
github.com/lyft/flytestdlib v0.3.2/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/flytestdlib v0.3.3 h1:MkWXPkwQinh6MR3Yf5siZhmRSt9r4YmsF+5kvVVVedE=
github.com/lyft/flytestdlib v0.3.3/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/spark-on-k8s-operator v0.1.3 h1:rmke8lR2Oy8mvKXRhloKuEu7fgGuXepDxiBNiorVUFI=
github.com/lyft/spark-on-k8s-operator v0.1.3/go.mod h1:hkRqdqAsdNnxT/Zst6MNMRbTAoiCZ0JRw7svRgAYb0A=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
Expand Down
103 changes: 61 additions & 42 deletions flyteplugins/go/tasks/plugins/array/awsbatch/jobs_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"strings"
"time"

"k8s.io/apimachinery/pkg/util/sets"

"github.com/lyft/flyteplugins/go/tasks/plugins/array/awsbatch/config"

"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -117,69 +119,86 @@ func updateJob(ctx context.Context, source *batch.JobDetail, target *Job) (updat
}

logger.Debugf(ctx, "Job [%v] has (%v) attempts.", *source.JobId, len(source.Attempts))

uniqueLogStreams := sets.String{}
target.Attempts = make([]Attempt, 0, len(source.Attempts))
lastStatusReason := ""
for _, attempt := range source.Attempts {
a := Attempt{}
if attempt.StartedAt != nil {
a.StartedAt = time.Unix(*attempt.StartedAt, 0)
var a Attempt
a, lastStatusReason = convertBatchAttemptToAttempt(attempt)
if len(a.LogStream) > 0 {
uniqueLogStreams.Insert(a.LogStream)
}

if attempt.StoppedAt != nil {
a.StoppedAt = time.Unix(*attempt.StoppedAt, 0)
}
target.Attempts = append(target.Attempts, a)
}

if container := attempt.Container; container != nil {
if container.LogStreamName != nil {
a.LogStream = *container.LogStreamName
}
// Add the "current" log stream to log links if one exists.
attempt, exitReason := createAttemptFromJobDetail(ctx, source)
if !uniqueLogStreams.Has(attempt.LogStream) {
target.Attempts = append(target.Attempts, attempt)
}

if container.Reason != nil {
lastStatusReason = *container.Reason
}
if len(lastStatusReason) == 0 {
lastStatusReason = exitReason
}

if container.ExitCode != nil {
lastStatusReason += fmt.Sprintf(" exit(%v)", *container.ExitCode)
}
}
msg = append(msg, lastStatusReason)

target.Attempts = append(target.Attempts, a)
target.Status.Message = strings.Join(msg, " - ")
return updated
}

func convertBatchAttemptToAttempt(attempt *batch.AttemptDetail) (a Attempt, exitReason string) {
if attempt.StartedAt != nil {
a.StartedAt = time.Unix(*attempt.StartedAt, 0)
}

if attempt.StoppedAt != nil {
a.StoppedAt = time.Unix(*attempt.StoppedAt, 0)
}

// If no job attempts are present, try to construct one from container status
if len(source.Attempts) == 0 {
a := Attempt{}
if source.StartedAt != nil {
a.StartedAt = time.Unix(*source.StartedAt, 0)
if container := attempt.Container; container != nil {
if container.LogStreamName != nil {
a.LogStream = *container.LogStreamName
}

if source.StoppedAt != nil {
a.StoppedAt = time.Unix(*source.StoppedAt, 0)
if container.Reason != nil {
exitReason = *container.Reason
}

if container := source.Container; container != nil {
if container.LogStreamName != nil {
logger.Debug(ctx, "Using log stream from container info.")
a.LogStream = *container.LogStreamName
}
if container.ExitCode != nil {
exitReason += fmt.Sprintf(" exit(%v)", *container.ExitCode)
}
}

if container.Reason != nil {
lastStatusReason = *container.Reason
}
return a, exitReason
}

if container.ExitCode != nil {
lastStatusReason += fmt.Sprintf(" exit(%v)", *container.ExitCode)
}
}
func createAttemptFromJobDetail(ctx context.Context, source *batch.JobDetail) (a Attempt, exitReason string) {
if source.StartedAt != nil {
a.StartedAt = time.Unix(*source.StartedAt, 0)
}

target.Attempts = append(target.Attempts, a)
if source.StoppedAt != nil {
a.StoppedAt = time.Unix(*source.StoppedAt, 0)
}

msg = append(msg, lastStatusReason)
if container := source.Container; container != nil {
if container.LogStreamName != nil {
logger.Debug(ctx, "Using log stream from container info.")
a.LogStream = *container.LogStreamName
}

target.Status.Message = strings.Join(msg, " - ")
return updated
if container.Reason != nil {
exitReason = *container.Reason
}

if container.ExitCode != nil {
exitReason += fmt.Sprintf(" exit(%v)", *container.ExitCode)
}
}

return a, exitReason
}

func minInt(a, b int) int {
Expand Down
59 changes: 59 additions & 0 deletions flyteplugins/go/tasks/plugins/array/awsbatch/jobs_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,3 +286,62 @@ func Test_toRanges(t *testing.T) {
})
}
}

func Test_updateJob(t *testing.T) {
ctx := context.Background()
withDefaults := func(source *batch.JobDetail) *batch.JobDetail {
source.JobId = refStr("job-1")
source.Status = refStr(batch.JobStatusRunning)
return source
}

t.Run("Current attempt", func(t *testing.T) {
j := Job{}
updated := updateJob(ctx, withDefaults(&batch.JobDetail{
Container: &batch.ContainerDetail{
LogStreamName: refStr("stream://log2"),
},
}), &j)

assert.True(t, updated)
assert.Len(t, j.Attempts, 1)
})

t.Run("Current attempt, 1 already failed", func(t *testing.T) {
j := Job{}
updated := updateJob(ctx, withDefaults(&batch.JobDetail{
Container: &batch.ContainerDetail{
LogStreamName: refStr("stream://log2"),
},
Attempts: []*batch.AttemptDetail{
{
Container: &batch.AttemptContainerDetail{
LogStreamName: refStr("stream://log1"),
},
},
},
}), &j)

assert.True(t, updated)
assert.Len(t, j.Attempts, 2)
})

t.Run("No current attempt, 1 already failed", func(t *testing.T) {
j := Job{}
updated := updateJob(ctx, withDefaults(&batch.JobDetail{
Container: &batch.ContainerDetail{
LogStreamName: refStr("stream://log1"),
},
Attempts: []*batch.AttemptDetail{
{
Container: &batch.AttemptContainerDetail{
LogStreamName: refStr("stream://log1"),
},
},
},
}), &j)

assert.True(t, updated)
assert.Len(t, j.Attempts, 1)
})
}

0 comments on commit bdee697

Please sign in to comment.