From 7ff897461f76edffbf7a0910beb232243b87ed18 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 6 Jan 2023 01:46:25 -0800 Subject: [PATCH 1/7] Correct aws batch job state Signed-off-by: Kevin Su --- go/tasks/plugins/array/awsbatch/monitor.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/go/tasks/plugins/array/awsbatch/monitor.go b/go/tasks/plugins/array/awsbatch/monitor.go index 4f09b911f..bc8457df0 100644 --- a/go/tasks/plugins/array/awsbatch/monitor.go +++ b/go/tasks/plugins/array/awsbatch/monitor.go @@ -145,6 +145,14 @@ func CheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionContext, job // Based on the summary produced above, deduce the overall phase of the task. phase := arrayCore.SummaryToPhase(ctx, currentState.GetOriginalMinSuccesses()-currentState.GetOriginalArraySize()+int64(currentState.GetExecutionArraySize()), newArrayStatus.Summary) + if job.Status.Phase.IsSuccess() && phase == arrayCore.PhaseCheckingSubTaskExecutions { + // In some cases, batch job succeed, but all subtasks failed. + // The reason for this is that Flytekit catches the exception and writes error.pb to the bucket. + // Flyte gets status of the subtask from error.pb in the bucket, + // while AWS batch gets it from the return code of subtask. + phase = arrayCore.PhasePermanentFailure + } + if phase != arrayCore.PhaseCheckingSubTaskExecutions { metrics.SubTasksSucceeded.Add(ctx, float64(newArrayStatus.Summary[core.PhaseSuccess])) totalFailed := newArrayStatus.Summary[core.PhasePermanentFailure] + newArrayStatus.Summary[core.PhaseRetryableFailure] From e8df357526ee3a635ab5acf53b172310fe6c17dc Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 11 Jan 2023 01:36:51 -0800 Subject: [PATCH 2/7] Add env FAST_ON_ERROR to aws batch job Signed-off-by: Kevin Su --- go/tasks/plugins/array/awsbatch/transformer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/tasks/plugins/array/awsbatch/transformer.go b/go/tasks/plugins/array/awsbatch/transformer.go index f978d92ef..559d46626 100644 --- a/go/tasks/plugins/array/awsbatch/transformer.go +++ b/go/tasks/plugins/array/awsbatch/transformer.go @@ -118,7 +118,7 @@ func UpdateBatchInputForArray(_ context.Context, batchInput *batch.SubmitJobInpu envVars = append(envVars, &batch.KeyValuePair{Name: refStr(ArrayJobIndex), Value: refStr("FAKE_JOB_ARRAY_INDEX")}, &batch.KeyValuePair{Name: refStr("FAKE_JOB_ARRAY_INDEX"), Value: refStr("0")}) } - + envVars = append(envVars, &batch.KeyValuePair{Name: refStr("FLYTE_FAIL_ON_ERROR"), Value: refStr("True")}) batchInput.ArrayProperties = arrayProps batchInput.ContainerOverrides.Environment = envVars From 40d531eb5d946051c0ac6dc891cc8d7d89ec9b3d Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 11 Jan 2023 02:14:14 -0800 Subject: [PATCH 3/7] Add tests Signed-off-by: Kevin Su --- go/tasks/plugins/array/awsbatch/transformer.go | 3 ++- go/tasks/plugins/array/awsbatch/transformer_test.go | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/go/tasks/plugins/array/awsbatch/transformer.go b/go/tasks/plugins/array/awsbatch/transformer.go index 559d46626..d72c5302b 100644 --- a/go/tasks/plugins/array/awsbatch/transformer.go +++ b/go/tasks/plugins/array/awsbatch/transformer.go @@ -25,6 +25,7 @@ import ( const ( ArrayJobIndex = "BATCH_JOB_ARRAY_INDEX_VAR_NAME" arrayJobIDFormatter = "%v:%v" + failOnError = "FLYTE_FAIL_ON_ERROR" ) const assignResources = true @@ -118,7 +119,7 @@ func UpdateBatchInputForArray(_ context.Context, batchInput *batch.SubmitJobInpu envVars = append(envVars, &batch.KeyValuePair{Name: refStr(ArrayJobIndex), Value: refStr("FAKE_JOB_ARRAY_INDEX")}, &batch.KeyValuePair{Name: refStr("FAKE_JOB_ARRAY_INDEX"), Value: refStr("0")}) } - envVars = append(envVars, &batch.KeyValuePair{Name: refStr("FLYTE_FAIL_ON_ERROR"), Value: refStr("True")}) + envVars = append(envVars, &batch.KeyValuePair{Name: refStr(failOnError), Value: refStr("True")}) batchInput.ArrayProperties = arrayProps batchInput.ContainerOverrides.Environment = envVars diff --git a/go/tasks/plugins/array/awsbatch/transformer_test.go b/go/tasks/plugins/array/awsbatch/transformer_test.go index 29fc8022c..02a70f952 100644 --- a/go/tasks/plugins/array/awsbatch/transformer_test.go +++ b/go/tasks/plugins/array/awsbatch/transformer_test.go @@ -131,6 +131,7 @@ func TestArrayJobToBatchInput(t *testing.T) { Command: []*string{ref("cmd"), ref("/inputs/prefix")}, Environment: []*batch.KeyValuePair{ {Name: refStr("BATCH_JOB_ARRAY_INDEX_VAR_NAME"), Value: refStr("AWS_BATCH_JOB_ARRAY_INDEX")}, + {Name: ref(failOnError), Value: refStr("True")}, }, Memory: refInt(1074), Vcpus: refInt(1), From defa17510d7c0351f3f53a33fc47d237f5024d60 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 11 Jan 2023 02:32:00 -0800 Subject: [PATCH 4/7] update tests Signed-off-by: Kevin Su --- go/tasks/plugins/array/awsbatch/transformer.go | 3 +-- go/tasks/plugins/array/awsbatch/transformer_test.go | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/go/tasks/plugins/array/awsbatch/transformer.go b/go/tasks/plugins/array/awsbatch/transformer.go index d72c5302b..a2dddf04e 100644 --- a/go/tasks/plugins/array/awsbatch/transformer.go +++ b/go/tasks/plugins/array/awsbatch/transformer.go @@ -119,7 +119,6 @@ func UpdateBatchInputForArray(_ context.Context, batchInput *batch.SubmitJobInpu envVars = append(envVars, &batch.KeyValuePair{Name: refStr(ArrayJobIndex), Value: refStr("FAKE_JOB_ARRAY_INDEX")}, &batch.KeyValuePair{Name: refStr("FAKE_JOB_ARRAY_INDEX"), Value: refStr("0")}) } - envVars = append(envVars, &batch.KeyValuePair{Name: refStr(failOnError), Value: refStr("True")}) batchInput.ArrayProperties = arrayProps batchInput.ContainerOverrides.Environment = envVars @@ -145,7 +144,7 @@ func getEnvVarsForTask(ctx context.Context, execID pluginCore.TaskExecutionID, c Value: val, }) } - + finalEnvVars = append(finalEnvVars, v1.EnvVar{Name: failOnError, Value: "true"}) return finalEnvVars } diff --git a/go/tasks/plugins/array/awsbatch/transformer_test.go b/go/tasks/plugins/array/awsbatch/transformer_test.go index 02a70f952..beb3cf57b 100644 --- a/go/tasks/plugins/array/awsbatch/transformer_test.go +++ b/go/tasks/plugins/array/awsbatch/transformer_test.go @@ -130,8 +130,8 @@ func TestArrayJobToBatchInput(t *testing.T) { ContainerOverrides: &batch.ContainerOverrides{ Command: []*string{ref("cmd"), ref("/inputs/prefix")}, Environment: []*batch.KeyValuePair{ + {Name: ref(failOnError), Value: refStr("true")}, {Name: refStr("BATCH_JOB_ARRAY_INDEX_VAR_NAME"), Value: refStr("AWS_BATCH_JOB_ARRAY_INDEX")}, - {Name: ref(failOnError), Value: refStr("True")}, }, Memory: refInt(1074), Vcpus: refInt(1), From e40aa1478d7923f15136c4c6980ac65b26406e18 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 11 Jan 2023 11:17:49 -0800 Subject: [PATCH 5/7] nit Signed-off-by: Kevin Su --- go/tasks/plugins/array/awsbatch/monitor.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/go/tasks/plugins/array/awsbatch/monitor.go b/go/tasks/plugins/array/awsbatch/monitor.go index bc8457df0..4f09b911f 100644 --- a/go/tasks/plugins/array/awsbatch/monitor.go +++ b/go/tasks/plugins/array/awsbatch/monitor.go @@ -145,14 +145,6 @@ func CheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionContext, job // Based on the summary produced above, deduce the overall phase of the task. phase := arrayCore.SummaryToPhase(ctx, currentState.GetOriginalMinSuccesses()-currentState.GetOriginalArraySize()+int64(currentState.GetExecutionArraySize()), newArrayStatus.Summary) - if job.Status.Phase.IsSuccess() && phase == arrayCore.PhaseCheckingSubTaskExecutions { - // In some cases, batch job succeed, but all subtasks failed. - // The reason for this is that Flytekit catches the exception and writes error.pb to the bucket. - // Flyte gets status of the subtask from error.pb in the bucket, - // while AWS batch gets it from the return code of subtask. - phase = arrayCore.PhasePermanentFailure - } - if phase != arrayCore.PhaseCheckingSubTaskExecutions { metrics.SubTasksSucceeded.Add(ctx, float64(newArrayStatus.Summary[core.PhaseSuccess])) totalFailed := newArrayStatus.Summary[core.PhasePermanentFailure] + newArrayStatus.Summary[core.PhaseRetryableFailure] From 9679fb385ff75f43f9db9850b3054f06b250a500 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 11 Jan 2023 11:31:58 -0800 Subject: [PATCH 6/7] update test Signed-off-by: Kevin Su --- go/tasks/plugins/array/awsbatch/transformer_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/go/tasks/plugins/array/awsbatch/transformer_test.go b/go/tasks/plugins/array/awsbatch/transformer_test.go index beb3cf57b..473fd3bad 100644 --- a/go/tasks/plugins/array/awsbatch/transformer_test.go +++ b/go/tasks/plugins/array/awsbatch/transformer_test.go @@ -238,5 +238,9 @@ func Test_getEnvVarsForTask(t *testing.T) { Name: "MyKey", Value: "MyVal", }, + { + Name: "FLYTE_FAIL_ON_ERROR", + Value: "true", + }, }, envVars) } From c2e0080e8a9d72d4649f2ca4bcfa3a9e0c25b95f Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 11 Jan 2023 14:04:42 -0800 Subject: [PATCH 7/7] nit Signed-off-by: Kevin Su --- go/tasks/plugins/array/awsbatch/transformer.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/go/tasks/plugins/array/awsbatch/transformer.go b/go/tasks/plugins/array/awsbatch/transformer.go index a2dddf04e..6b6da84fb 100644 --- a/go/tasks/plugins/array/awsbatch/transformer.go +++ b/go/tasks/plugins/array/awsbatch/transformer.go @@ -136,7 +136,7 @@ func getEnvVarsForTask(ctx context.Context, execID pluginCore.TaskExecutionID, c for key, value := range defaultEnvVars { m[key] = value } - + m[failOnError] = "true" finalEnvVars := make([]v1.EnvVar, 0, len(m)) for key, val := range m { finalEnvVars = append(finalEnvVars, v1.EnvVar{ @@ -144,7 +144,6 @@ func getEnvVarsForTask(ctx context.Context, execID pluginCore.TaskExecutionID, c Value: val, }) } - finalEnvVars = append(finalEnvVars, v1.EnvVar{Name: failOnError, Value: "true"}) return finalEnvVars }