Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

The status of the AWS batch job should become failed once the retry limit exceeded #291

Merged
merged 15 commits into from
Dec 1, 2022
12 changes: 11 additions & 1 deletion go/tasks/plugins/array/awsbatch/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,19 @@ func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (c
pluginState, err = LaunchSubTasks(ctx, tCtx, e.jobStore, pluginConfig, pluginState, e.metrics)

case arrayCore.PhaseCheckingSubTaskExecutions:
// Check that the taskTemplate is valid
var taskTemplate *idlCore.TaskTemplate
taskTemplate, err = tCtx.TaskReader().Read(ctx)
if err != nil {
return core.UnknownTransition, errors.Wrapf(errors.CorruptedPluginState, err, "Failed to read task template")
} else if taskTemplate == nil {
return core.UnknownTransition, errors.Errorf(errors.BadTaskSpecification, "Required value not set, taskTemplate is nil")
}
retry := toRetryStrategy(ctx, toBackoffLimit(taskTemplate.Metadata), pluginConfig.MinRetries, pluginConfig.MaxRetries)
hamersaw marked this conversation as resolved.
Show resolved Hide resolved

pluginState, err = CheckSubTasksState(ctx, tCtx.TaskExecutionMetadata(),
tCtx.OutputWriter().GetOutputPrefixPath(), tCtx.OutputWriter().GetRawOutputPrefix(),
e.jobStore, tCtx.DataStore(), pluginConfig, pluginState, e.metrics)
e.jobStore, tCtx.DataStore(), pluginConfig, pluginState, e.metrics, *retry.Attempts)

case arrayCore.PhaseAssembleFinalOutput:
pluginState.State, err = array.AssembleFinalOutputs(ctx, e.outputAssembler, tCtx, arrayCore.PhaseSuccess, version, pluginState.State)
Expand Down
6 changes: 5 additions & 1 deletion go/tasks/plugins/array/awsbatch/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func createSubJobList(count int) []*Job {
}

func CheckSubTasksState(ctx context.Context, taskMeta core.TaskExecutionMetadata, outputPrefix, baseOutputSandbox storage.DataReference, jobStore *JobStore,
dataStore *storage.DataStore, cfg *config.Config, currentState *State, metrics ExecutorMetrics) (newState *State, err error) {
dataStore *storage.DataStore, cfg *config.Config, currentState *State, metrics ExecutorMetrics, retryLimit int64) (newState *State, err error) {
newState = currentState
parentState := currentState.State
jobName := taskMeta.GetTaskExecutionID().GetGeneratedName()
Expand Down Expand Up @@ -108,6 +108,10 @@ func CheckSubTasksState(ctx context.Context, taskMeta core.TaskExecutionMetadata
} else {
msg.Collect(childIdx, "Job failed")
}

if subJob.Status.Phase == core.PhaseRetryableFailure && retryLimit == int64(len(subJob.Attempts)) {
actualPhase = core.PhasePermanentFailure
}
} else if subJob.Status.Phase.IsSuccess() {
actualPhase, err = array.CheckTaskOutput(ctx, dataStore, outputPrefix, baseOutputSandbox, childIdx, originalIdx)
if err != nil {
Expand Down
54 changes: 50 additions & 4 deletions go/tasks/plugins/array/awsbatch/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestCheckSubTasksState(t *testing.T) {
},
ExternalJobID: refStr("job-id"),
JobDefinitionArn: "",
}, getAwsBatchExecutorMetrics(promutils.NewTestScope()))
}, getAwsBatchExecutorMetrics(promutils.NewTestScope()), 3)

assert.NoError(t, err)
p, _ := newState.GetPhase()
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestCheckSubTasksState(t *testing.T) {
},
ExternalJobID: refStr("job-id"),
JobDefinitionArn: "",
}, getAwsBatchExecutorMetrics(promutils.NewTestScope()))
}, getAwsBatchExecutorMetrics(promutils.NewTestScope()), 3)

assert.NoError(t, err)
p, _ := newState.GetPhase()
Expand Down Expand Up @@ -153,7 +153,7 @@ func TestCheckSubTasksState(t *testing.T) {
},
ExternalJobID: refStr("job-id"),
JobDefinitionArn: "",
}, getAwsBatchExecutorMetrics(promutils.NewTestScope()))
}, getAwsBatchExecutorMetrics(promutils.NewTestScope()), 3)

assert.NoError(t, err)
p, _ := newState.GetPhase()
Expand Down Expand Up @@ -201,11 +201,57 @@ func TestCheckSubTasksState(t *testing.T) {
},
ExternalJobID: refStr("job-id"),
JobDefinitionArn: "",
}, getAwsBatchExecutorMetrics(promutils.NewTestScope()))
}, getAwsBatchExecutorMetrics(promutils.NewTestScope()), 3)

assert.NoError(t, err)
p, _ := newState.GetPhase()
assert.Equal(t, arrayCore.PhaseCheckingSubTaskExecutions.String(), p.String())
})

t.Run("retry limit exceeded", func(t *testing.T) {
mBatchClient := batchMocks.NewMockAwsBatchClient()
batchClient := NewCustomBatchClient(mBatchClient, "", "",
utils.NewRateLimiter("", 10, 20),
utils.NewRateLimiter("", 10, 20))

jobStore := newJobsStore(t, batchClient)
_, err := jobStore.GetOrCreate(tID.GetGeneratedName(), &Job{
ID: "job-id",
Status: JobStatus{
Phase: core.PhaseRunning,
},
SubJobs: []*Job{
{Status: JobStatus{Phase: core.PhaseRetryableFailure}, Attempts: []Attempt{{LogStream: "failed"}}},
{Status: JobStatus{Phase: core.PhaseSuccess}},
},
})

assert.NoError(t, err)

inMemDatastore, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)

retryAttemptsArray, err := bitarray.NewCompactArray(2, bitarray.Item(1))
assert.NoError(t, err)

newState, err := CheckSubTasksState(ctx, tMeta, "", "", jobStore, inMemDatastore, &config.Config{}, &State{
State: &arrayCore.State{
CurrentPhase: arrayCore.PhaseWriteToDiscoveryThenFail,
ExecutionArraySize: 2,
OriginalArraySize: 2,
OriginalMinSuccesses: 2,
ArrayStatus: arraystatus.ArrayStatus{
Detailed: arrayCore.NewPhasesCompactArray(2),
},
IndexesToCache: bitarray.NewBitSet(2),
RetryAttempts: retryAttemptsArray,
},
ExternalJobID: refStr("job-id"),
JobDefinitionArn: "",
}, getAwsBatchExecutorMetrics(promutils.NewTestScope()), 1)

assert.NoError(t, err)
p, _ := newState.GetPhase()
assert.Equal(t, arrayCore.PhaseWriteToDiscoveryThenFail, p)
})
}
16 changes: 16 additions & 0 deletions go/tasks/plugins/array/core/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,22 @@ func TestSummaryToPhase(t *testing.T) {
core.PhaseSuccess: 10,
},
},
{
"FailedToRetry",
PhaseWriteToDiscoveryThenFail,
map[core.Phase]int64{
core.PhaseSuccess: 5,
core.PhasePermanentFailure: 5,
},
},
{
"Retrying",
PhaseCheckingSubTaskExecutions,
map[core.Phase]int64{
core.PhaseSuccess: 5,
core.PhaseRetryableFailure: 5,
},
},
}

for _, tt := range tests {
Expand Down