diff --git a/flyteplugins/go.mod b/flyteplugins/go.mod index bd4d424637..45b4db65ec 100644 --- a/flyteplugins/go.mod +++ b/flyteplugins/go.mod @@ -12,7 +12,7 @@ require ( github.com/googleapis/gnostic v0.4.1 // indirect github.com/hashicorp/golang-lru v0.5.4 github.com/lyft/flyteidl v0.17.6 - github.com/lyft/flytestdlib v0.3.2 + github.com/lyft/flytestdlib v0.3.3 github.com/magiconair/properties v1.8.1 github.com/mitchellh/mapstructure v1.1.2 github.com/pkg/errors v0.9.1 diff --git a/flyteplugins/go.sum b/flyteplugins/go.sum index 9910eb12f9..f1738e912b 100644 --- a/flyteplugins/go.sum +++ b/flyteplugins/go.sum @@ -302,6 +302,8 @@ github.com/lyft/flytestdlib v0.3.0 h1:nIkX4MlyYdcLLzaF35RI2P5BhARt+qMgHoFto8eVNz github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= github.com/lyft/flytestdlib v0.3.2 h1:bY6Y+Fg6Jdc7zY4GAYuR7t2hjWwynIdmRvtLcRNaGnw= github.com/lyft/flytestdlib v0.3.2/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= +github.com/lyft/flytestdlib v0.3.3 h1:MkWXPkwQinh6MR3Yf5siZhmRSt9r4YmsF+5kvVVVedE= +github.com/lyft/flytestdlib v0.3.3/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= github.com/lyft/spark-on-k8s-operator v0.1.3 h1:rmke8lR2Oy8mvKXRhloKuEu7fgGuXepDxiBNiorVUFI= github.com/lyft/spark-on-k8s-operator v0.1.3/go.mod h1:hkRqdqAsdNnxT/Zst6MNMRbTAoiCZ0JRw7svRgAYb0A= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= diff --git a/flyteplugins/go/tasks/plugins/array/awsbatch/jobs_store.go b/flyteplugins/go/tasks/plugins/array/awsbatch/jobs_store.go index aec235bb9b..971194d1d2 100644 --- a/flyteplugins/go/tasks/plugins/array/awsbatch/jobs_store.go +++ b/flyteplugins/go/tasks/plugins/array/awsbatch/jobs_store.go @@ -10,6 +10,8 @@ import ( "strings" "time" + "k8s.io/apimachinery/pkg/util/sets" + "github.com/lyft/flyteplugins/go/tasks/plugins/array/awsbatch/config" "k8s.io/client-go/util/workqueue" @@ -117,69 +119,86 @@ func updateJob(ctx context.Context, source *batch.JobDetail, target *Job) (updat } logger.Debugf(ctx, "Job [%v] has (%v) attempts.", *source.JobId, len(source.Attempts)) - + uniqueLogStreams := sets.String{} target.Attempts = make([]Attempt, 0, len(source.Attempts)) lastStatusReason := "" for _, attempt := range source.Attempts { - a := Attempt{} - if attempt.StartedAt != nil { - a.StartedAt = time.Unix(*attempt.StartedAt, 0) + var a Attempt + a, lastStatusReason = convertBatchAttemptToAttempt(attempt) + if len(a.LogStream) > 0 { + uniqueLogStreams.Insert(a.LogStream) } - if attempt.StoppedAt != nil { - a.StoppedAt = time.Unix(*attempt.StoppedAt, 0) - } + target.Attempts = append(target.Attempts, a) + } - if container := attempt.Container; container != nil { - if container.LogStreamName != nil { - a.LogStream = *container.LogStreamName - } + // Add the "current" log stream to log links if one exists. + attempt, exitReason := createAttemptFromJobDetail(ctx, source) + if !uniqueLogStreams.Has(attempt.LogStream) { + target.Attempts = append(target.Attempts, attempt) + } - if container.Reason != nil { - lastStatusReason = *container.Reason - } + if len(lastStatusReason) == 0 { + lastStatusReason = exitReason + } - if container.ExitCode != nil { - lastStatusReason += fmt.Sprintf(" exit(%v)", *container.ExitCode) - } - } + msg = append(msg, lastStatusReason) - target.Attempts = append(target.Attempts, a) + target.Status.Message = strings.Join(msg, " - ") + return updated +} + +func convertBatchAttemptToAttempt(attempt *batch.AttemptDetail) (a Attempt, exitReason string) { + if attempt.StartedAt != nil { + a.StartedAt = time.Unix(*attempt.StartedAt, 0) + } + + if attempt.StoppedAt != nil { + a.StoppedAt = time.Unix(*attempt.StoppedAt, 0) } - // If no job attempts are present, try to construct one from container status - if len(source.Attempts) == 0 { - a := Attempt{} - if source.StartedAt != nil { - a.StartedAt = time.Unix(*source.StartedAt, 0) + if container := attempt.Container; container != nil { + if container.LogStreamName != nil { + a.LogStream = *container.LogStreamName } - if source.StoppedAt != nil { - a.StoppedAt = time.Unix(*source.StoppedAt, 0) + if container.Reason != nil { + exitReason = *container.Reason } - if container := source.Container; container != nil { - if container.LogStreamName != nil { - logger.Debug(ctx, "Using log stream from container info.") - a.LogStream = *container.LogStreamName - } + if container.ExitCode != nil { + exitReason += fmt.Sprintf(" exit(%v)", *container.ExitCode) + } + } - if container.Reason != nil { - lastStatusReason = *container.Reason - } + return a, exitReason +} - if container.ExitCode != nil { - lastStatusReason += fmt.Sprintf(" exit(%v)", *container.ExitCode) - } - } +func createAttemptFromJobDetail(ctx context.Context, source *batch.JobDetail) (a Attempt, exitReason string) { + if source.StartedAt != nil { + a.StartedAt = time.Unix(*source.StartedAt, 0) + } - target.Attempts = append(target.Attempts, a) + if source.StoppedAt != nil { + a.StoppedAt = time.Unix(*source.StoppedAt, 0) } - msg = append(msg, lastStatusReason) + if container := source.Container; container != nil { + if container.LogStreamName != nil { + logger.Debug(ctx, "Using log stream from container info.") + a.LogStream = *container.LogStreamName + } - target.Status.Message = strings.Join(msg, " - ") - return updated + if container.Reason != nil { + exitReason = *container.Reason + } + + if container.ExitCode != nil { + exitReason += fmt.Sprintf(" exit(%v)", *container.ExitCode) + } + } + + return a, exitReason } func minInt(a, b int) int { diff --git a/flyteplugins/go/tasks/plugins/array/awsbatch/jobs_store_test.go b/flyteplugins/go/tasks/plugins/array/awsbatch/jobs_store_test.go index bcd3009e01..6735460c5f 100644 --- a/flyteplugins/go/tasks/plugins/array/awsbatch/jobs_store_test.go +++ b/flyteplugins/go/tasks/plugins/array/awsbatch/jobs_store_test.go @@ -286,3 +286,62 @@ func Test_toRanges(t *testing.T) { }) } } + +func Test_updateJob(t *testing.T) { + ctx := context.Background() + withDefaults := func(source *batch.JobDetail) *batch.JobDetail { + source.JobId = refStr("job-1") + source.Status = refStr(batch.JobStatusRunning) + return source + } + + t.Run("Current attempt", func(t *testing.T) { + j := Job{} + updated := updateJob(ctx, withDefaults(&batch.JobDetail{ + Container: &batch.ContainerDetail{ + LogStreamName: refStr("stream://log2"), + }, + }), &j) + + assert.True(t, updated) + assert.Len(t, j.Attempts, 1) + }) + + t.Run("Current attempt, 1 already failed", func(t *testing.T) { + j := Job{} + updated := updateJob(ctx, withDefaults(&batch.JobDetail{ + Container: &batch.ContainerDetail{ + LogStreamName: refStr("stream://log2"), + }, + Attempts: []*batch.AttemptDetail{ + { + Container: &batch.AttemptContainerDetail{ + LogStreamName: refStr("stream://log1"), + }, + }, + }, + }), &j) + + assert.True(t, updated) + assert.Len(t, j.Attempts, 2) + }) + + t.Run("No current attempt, 1 already failed", func(t *testing.T) { + j := Job{} + updated := updateJob(ctx, withDefaults(&batch.JobDetail{ + Container: &batch.ContainerDetail{ + LogStreamName: refStr("stream://log1"), + }, + Attempts: []*batch.AttemptDetail{ + { + Container: &batch.AttemptContainerDetail{ + LogStreamName: refStr("stream://log1"), + }, + }, + }, + }), &j) + + assert.True(t, updated) + assert.Len(t, j.Attempts, 1) + }) +}