Skip to content

Commit

Permalink
Add AWS Batch metrics for queued (flyteorg#94)
Browse files Browse the repository at this point in the history
* Add AWS Batch metrics for queued
  • Loading branch information
anandswaminathan authored Jun 3, 2020
1 parent b898563 commit a162853
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 4 deletions.
2 changes: 2 additions & 0 deletions flyteplugins/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,8 @@ github.com/lyft/apimachinery v0.0.0-20191031200210-047e3ea32d7f/go.mod h1:llRdnz
github.com/lyft/flyteidl v0.17.9 h1:JXT9PovHqS9V3YN74x9zWT0kvIEL48c2uNoujF1KMes=
github.com/lyft/flyteidl v0.17.9/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.29/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.32 h1:Iio3gYjTyPhAiOMWJ/H/4YtfWIZm5KZSlWMULT1Ef6U=
github.com/lyft/flyteidl v0.17.32/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flytestdlib v0.3.0 h1:nIkX4MlyYdcLLzaF35RI2P5BhARt+qMgHoFto8eVNzU=
github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/flytestdlib v0.3.2 h1:bY6Y+Fg6Jdc7zY4GAYuR7t2hjWwynIdmRvtLcRNaGnw=
Expand Down
11 changes: 7 additions & 4 deletions flyteplugins/go/tasks/plugins/array/awsbatch/executor_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type ExecutorMetrics struct {
SubTasksSubmitted labeled.Counter
SubTasksSucceeded labeled.Counter
SubTasksFailed labeled.Counter
SubTasksQueued labeled.Counter
BatchJobTerminated labeled.Counter
}

Expand All @@ -18,10 +19,12 @@ func getAwsBatchExecutorMetrics(scope promutils.Scope) ExecutorMetrics {
Scope: scope,
SubTasksSubmitted: labeled.NewCounter("sub_task_submitted",
"Sub tasks submitted", scope),
SubTasksSucceeded: labeled.NewCounter("batch_task_success",
"Batch tasks successful", scope),
SubTasksFailed: labeled.NewCounter("batch_task_failure",
"Batch tasks failure", scope),
SubTasksSucceeded: labeled.NewCounter("sub_task_success",
"Batch sub tasks succeeded", scope),
SubTasksQueued: labeled.NewCounter("sub_task_queued",
"Batch sub tasks queued", scope),
SubTasksFailed: labeled.NewCounter("sub_task_failure",
"Batch sub tasks failed", scope),
BatchJobTerminated: labeled.NewCounter("batch_job_terminated",
"Batch job terminated", scope),
}
Expand Down
8 changes: 8 additions & 0 deletions flyteplugins/go/tasks/plugins/array/awsbatch/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,13 @@ func CheckSubTasksState(ctx context.Context, taskMeta core.TaskExecutionMetadata
Detailed: arrayCore.NewPhasesCompactArray(uint(currentState.GetExecutionArraySize())),
}

queued := 0
for childIdx, subJob := range job.SubJobs {
actualPhase := subJob.Status.Phase
originalIdx := arrayCore.CalculateOriginalIndex(childIdx, currentState.GetIndexesToCache())
if subJob.Status.Phase == core.PhaseQueued {
queued++
}
if subJob.Status.Phase.IsFailure() {
if len(subJob.Status.Message) > 0 {
// If the service reported an error but there is no error.pb written, write one with the
Expand Down Expand Up @@ -110,6 +114,10 @@ func CheckSubTasksState(ctx context.Context, taskMeta core.TaskExecutionMetadata
newArrayStatus.Summary.Inc(actualPhase)
}

if queued > 0 {
metrics.SubTasksQueued.Add(ctx, float64(queued))
}

parentState = parentState.SetArrayStatus(newArrayStatus)
// Based on the summary produced above, deduce the overall phase of the task.
phase := arrayCore.SummaryToPhase(ctx, currentState.GetOriginalMinSuccesses()-currentState.GetOriginalArraySize()+int64(currentState.GetExecutionArraySize()), newArrayStatus.Summary)
Expand Down
43 changes: 43 additions & 0 deletions flyteplugins/go/tasks/plugins/array/awsbatch/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,49 @@ func TestCheckSubTasksState(t *testing.T) {
assert.Equal(t, arrayCore.PhaseWriteToDiscovery.String(), p.String())
})

t.Run("queued", func(t *testing.T) {
mBatchClient := batchMocks.NewMockAwsBatchClient()
batchClient := NewCustomBatchClient(mBatchClient, "", "",
utils.NewRateLimiter("", 10, 20),
utils.NewRateLimiter("", 10, 20))

jobStore := newJobsStore(t, batchClient)
_, err := jobStore.GetOrCreate(tID.GetGeneratedName(), &Job{
ID: "job-id",
Status: JobStatus{
Phase: core.PhaseRunning,
},
SubJobs: []*Job{
{Status: JobStatus{Phase: core.PhaseQueued}},
},
})

assert.NoError(t, err)

inMemDatastore, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)

newState, err := CheckSubTasksState(ctx, tMeta, "", "", jobStore, inMemDatastore, &config.Config{}, &State{
State: &arrayCore.State{
CurrentPhase: arrayCore.PhaseCheckingSubTaskExecutions,
ExecutionArraySize: 1,
OriginalArraySize: 1,
OriginalMinSuccesses: 1,
ArrayStatus: arraystatus.ArrayStatus{
Detailed: arrayCore.NewPhasesCompactArray(1),
},
IndexesToCache: bitarray.NewBitSet(1),
},
ExternalJobID: refStr("job-id"),
JobDefinitionArn: "",
}, getAwsBatchExecutorMetrics(promutils.NewTestScope()))

assert.NoError(t, err)
p, _ := newState.GetPhase()
assert.Equal(t, arrayCore.PhaseCheckingSubTaskExecutions.String(), p.String())

})

t.Run("Still running", func(t *testing.T) {
mBatchClient := batchMocks.NewMockAwsBatchClient()
batchClient := NewCustomBatchClient(mBatchClient, "", "",
Expand Down

0 comments on commit a162853

Please sign in to comment.