From a162853df4d079025dc500a88337bf9d6e739257 Mon Sep 17 00:00:00 2001 From: Anand Swaminathan Date: Wed, 3 Jun 2020 15:22:59 -0700 Subject: [PATCH] Add AWS Batch metrics for queued (#94) * Add AWS Batch metrics for queued --- flyteplugins/go.sum | 2 + .../array/awsbatch/executor_metrics.go | 11 +++-- .../tasks/plugins/array/awsbatch/monitor.go | 8 ++++ .../plugins/array/awsbatch/monitor_test.go | 43 +++++++++++++++++++ 4 files changed, 60 insertions(+), 4 deletions(-) diff --git a/flyteplugins/go.sum b/flyteplugins/go.sum index deaab3b17c..e915e2c932 100644 --- a/flyteplugins/go.sum +++ b/flyteplugins/go.sum @@ -310,6 +310,8 @@ github.com/lyft/apimachinery v0.0.0-20191031200210-047e3ea32d7f/go.mod h1:llRdnz github.com/lyft/flyteidl v0.17.9 h1:JXT9PovHqS9V3YN74x9zWT0kvIEL48c2uNoujF1KMes= github.com/lyft/flyteidl v0.17.9/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= github.com/lyft/flyteidl v0.17.29/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= +github.com/lyft/flyteidl v0.17.32 h1:Iio3gYjTyPhAiOMWJ/H/4YtfWIZm5KZSlWMULT1Ef6U= +github.com/lyft/flyteidl v0.17.32/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= github.com/lyft/flytestdlib v0.3.0 h1:nIkX4MlyYdcLLzaF35RI2P5BhARt+qMgHoFto8eVNzU= github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= github.com/lyft/flytestdlib v0.3.2 h1:bY6Y+Fg6Jdc7zY4GAYuR7t2hjWwynIdmRvtLcRNaGnw= diff --git a/flyteplugins/go/tasks/plugins/array/awsbatch/executor_metrics.go b/flyteplugins/go/tasks/plugins/array/awsbatch/executor_metrics.go index 7966f13df0..cea784347b 100644 --- a/flyteplugins/go/tasks/plugins/array/awsbatch/executor_metrics.go +++ b/flyteplugins/go/tasks/plugins/array/awsbatch/executor_metrics.go @@ -10,6 +10,7 @@ type ExecutorMetrics struct { SubTasksSubmitted labeled.Counter SubTasksSucceeded labeled.Counter SubTasksFailed labeled.Counter + SubTasksQueued labeled.Counter BatchJobTerminated labeled.Counter } @@ -18,10 +19,12 @@ func getAwsBatchExecutorMetrics(scope promutils.Scope) ExecutorMetrics { Scope: scope, SubTasksSubmitted: labeled.NewCounter("sub_task_submitted", "Sub tasks submitted", scope), - SubTasksSucceeded: labeled.NewCounter("batch_task_success", - "Batch tasks successful", scope), - SubTasksFailed: labeled.NewCounter("batch_task_failure", - "Batch tasks failure", scope), + SubTasksSucceeded: labeled.NewCounter("sub_task_success", + "Batch sub tasks succeeded", scope), + SubTasksQueued: labeled.NewCounter("sub_task_queued", + "Batch sub tasks queued", scope), + SubTasksFailed: labeled.NewCounter("sub_task_failure", + "Batch sub tasks failed", scope), BatchJobTerminated: labeled.NewCounter("batch_job_terminated", "Batch job terminated", scope), } diff --git a/flyteplugins/go/tasks/plugins/array/awsbatch/monitor.go b/flyteplugins/go/tasks/plugins/array/awsbatch/monitor.go index 3231ab7345..bb01db2662 100644 --- a/flyteplugins/go/tasks/plugins/array/awsbatch/monitor.go +++ b/flyteplugins/go/tasks/plugins/array/awsbatch/monitor.go @@ -63,9 +63,13 @@ func CheckSubTasksState(ctx context.Context, taskMeta core.TaskExecutionMetadata Detailed: arrayCore.NewPhasesCompactArray(uint(currentState.GetExecutionArraySize())), } + queued := 0 for childIdx, subJob := range job.SubJobs { actualPhase := subJob.Status.Phase originalIdx := arrayCore.CalculateOriginalIndex(childIdx, currentState.GetIndexesToCache()) + if subJob.Status.Phase == core.PhaseQueued { + queued++ + } if subJob.Status.Phase.IsFailure() { if len(subJob.Status.Message) > 0 { // If the service reported an error but there is no error.pb written, write one with the @@ -110,6 +114,10 @@ func CheckSubTasksState(ctx context.Context, taskMeta core.TaskExecutionMetadata newArrayStatus.Summary.Inc(actualPhase) } + if queued > 0 { + metrics.SubTasksQueued.Add(ctx, float64(queued)) + } + parentState = parentState.SetArrayStatus(newArrayStatus) // Based on the summary produced above, deduce the overall phase of the task. phase := arrayCore.SummaryToPhase(ctx, currentState.GetOriginalMinSuccesses()-currentState.GetOriginalArraySize()+int64(currentState.GetExecutionArraySize()), newArrayStatus.Summary) diff --git a/flyteplugins/go/tasks/plugins/array/awsbatch/monitor_test.go b/flyteplugins/go/tasks/plugins/array/awsbatch/monitor_test.go index b3b9e5ba04..6ccbc22cb1 100644 --- a/flyteplugins/go/tasks/plugins/array/awsbatch/monitor_test.go +++ b/flyteplugins/go/tasks/plugins/array/awsbatch/monitor_test.go @@ -114,6 +114,49 @@ func TestCheckSubTasksState(t *testing.T) { assert.Equal(t, arrayCore.PhaseWriteToDiscovery.String(), p.String()) }) + t.Run("queued", func(t *testing.T) { + mBatchClient := batchMocks.NewMockAwsBatchClient() + batchClient := NewCustomBatchClient(mBatchClient, "", "", + utils.NewRateLimiter("", 10, 20), + utils.NewRateLimiter("", 10, 20)) + + jobStore := newJobsStore(t, batchClient) + _, err := jobStore.GetOrCreate(tID.GetGeneratedName(), &Job{ + ID: "job-id", + Status: JobStatus{ + Phase: core.PhaseRunning, + }, + SubJobs: []*Job{ + {Status: JobStatus{Phase: core.PhaseQueued}}, + }, + }) + + assert.NoError(t, err) + + inMemDatastore, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) + assert.NoError(t, err) + + newState, err := CheckSubTasksState(ctx, tMeta, "", "", jobStore, inMemDatastore, &config.Config{}, &State{ + State: &arrayCore.State{ + CurrentPhase: arrayCore.PhaseCheckingSubTaskExecutions, + ExecutionArraySize: 1, + OriginalArraySize: 1, + OriginalMinSuccesses: 1, + ArrayStatus: arraystatus.ArrayStatus{ + Detailed: arrayCore.NewPhasesCompactArray(1), + }, + IndexesToCache: bitarray.NewBitSet(1), + }, + ExternalJobID: refStr("job-id"), + JobDefinitionArn: "", + }, getAwsBatchExecutorMetrics(promutils.NewTestScope())) + + assert.NoError(t, err) + p, _ := newState.GetPhase() + assert.Equal(t, arrayCore.PhaseCheckingSubTaskExecutions.String(), p.String()) + + }) + t.Run("Still running", func(t *testing.T) { mBatchClient := batchMocks.NewMockAwsBatchClient() batchClient := NewCustomBatchClient(mBatchClient, "", "",