From a96084fcb455d868e70bd932f4398ae365f770de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nick=20M=C3=BCller?= Date: Thu, 30 Jun 2022 08:02:30 +0200 Subject: [PATCH] Fixed interruptible override applying fallback values incorrectly (#451) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Setting different values in multiple parts of the configs applied resulted in the least specific spec being applied Added extra tests verifying mainly interruptible override fallbacks including application configuration Signed-off-by: Nick Müller --- pkg/manager/impl/execution_manager.go | 7 +- pkg/manager/impl/execution_manager_test.go | 180 +++++++++++++++++++++ 2 files changed, 181 insertions(+), 6 deletions(-) diff --git a/pkg/manager/impl/execution_manager.go b/pkg/manager/impl/execution_manager.go index 287abbf78d..ccdc6d0038 100644 --- a/pkg/manager/impl/execution_manager.go +++ b/pkg/manager/impl/execution_manager.go @@ -481,12 +481,7 @@ func mergeIntoExecConfig(workflowExecConfig admin.WorkflowExecutionConfig, spec workflowExecConfig.Annotations = spec.GetAnnotations() } - // Override interruptible flag if workflow execution config does not have a value set or the spec sets a different - // value that defined as the workflow default. This allows for workflows to have their interruptible setting - // explicitly turned on and off for a single execution. - if (workflowExecConfig.GetInterruptible() == nil && spec.GetInterruptible() != nil) || - (workflowExecConfig.GetInterruptible() != nil && spec.GetInterruptible() != nil && - workflowExecConfig.GetInterruptible().GetValue() != spec.GetInterruptible().GetValue()) { + if workflowExecConfig.GetInterruptible() == nil && spec.GetInterruptible() != nil { workflowExecConfig.Interruptible = spec.GetInterruptible() } return workflowExecConfig diff --git a/pkg/manager/impl/execution_manager_test.go b/pkg/manager/impl/execution_manager_test.go index 21098b663b..f7a981b17b 100644 --- a/pkg/manager/impl/execution_manager_test.go +++ b/pkg/manager/impl/execution_manager_test.go @@ -3974,6 +3974,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { requestOutputLocationPrefix := "requestOutputLocationPrefix" requestK8sServiceAccount := "requestK8sServiceAccount" requestMaxParallelism := int32(10) + requestInterruptible := false launchPlanLabels := map[string]string{"launchPlanLabelKey": "launchPlanLabelValue"} launchPlanAnnotations := map[string]string{"launchPlanAnnotationKey": "launchPlanAnnotationValue"} @@ -3981,6 +3982,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { launchPlanK8sServiceAccount := "launchPlanK8sServiceAccount" launchPlanAssumableIamRole := "launchPlanAssumableIamRole" launchPlanMaxParallelism := int32(50) + launchPlanInterruptible := true applicationConfig := runtime.NewConfigurationProvider() @@ -3993,6 +3995,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { rmOutputLocationPrefix := "rmOutputLocationPrefix" rmK8sServiceAccount := "rmK8sServiceAccount" rmMaxParallelism := int32(80) + rmInterruptible := false resourceManager := managerMocks.MockResourceManager{} executionManager := ExecutionManager{ @@ -4011,6 +4014,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { Target: &admin.MatchingAttributes_WorkflowExecutionConfig{ WorkflowExecutionConfig: &admin.WorkflowExecutionConfig{ MaxParallelism: rmMaxParallelism, + Interruptible: &wrappers.BoolValue{Value: rmInterruptible}, Labels: &admin.Labels{Values: rmLabels}, Annotations: &admin.Annotations{Values: rmAnnotations}, RawOutputDataConfig: &admin.RawOutputDataConfig{ @@ -4043,12 +4047,14 @@ func TestGetExecutionConfigOverrides(t *testing.T) { }, }, MaxParallelism: requestMaxParallelism, + Interruptible: &wrappers.BoolValue{Value: requestInterruptible}, }, } execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, nil) assert.NoError(t, err) assert.Equal(t, requestMaxParallelism, execConfig.MaxParallelism) assert.Equal(t, requestK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount) + assert.Equal(t, requestInterruptible, execConfig.Interruptible.Value) assert.Equal(t, requestOutputLocationPrefix, execConfig.RawOutputDataConfig.OutputLocationPrefix) assert.Equal(t, requestLabels, execConfig.GetLabels().Values) assert.Equal(t, requestAnnotations, execConfig.GetAnnotations().Values) @@ -4077,11 +4083,13 @@ func TestGetExecutionConfigOverrides(t *testing.T) { }, }, MaxParallelism: launchPlanMaxParallelism, + Interruptible: &wrappers.BoolValue{Value: launchPlanInterruptible}, }, } execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) assert.NoError(t, err) assert.Equal(t, requestMaxParallelism, execConfig.MaxParallelism) + assert.Equal(t, launchPlanInterruptible, execConfig.Interruptible.Value) assert.True(t, proto.Equal(launchPlan.Spec.SecurityContext, execConfig.SecurityContext)) assert.True(t, proto.Equal(launchPlan.Spec.Annotations, execConfig.Annotations)) assert.Equal(t, requestOutputLocationPrefix, execConfig.RawOutputDataConfig.OutputLocationPrefix) @@ -4111,11 +4119,13 @@ func TestGetExecutionConfigOverrides(t *testing.T) { }, }, MaxParallelism: launchPlanMaxParallelism, + Interruptible: &wrappers.BoolValue{Value: launchPlanInterruptible}, }, } execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) assert.NoError(t, err) assert.Equal(t, launchPlanMaxParallelism, execConfig.MaxParallelism) + assert.Equal(t, launchPlanInterruptible, execConfig.Interruptible.Value) assert.Equal(t, launchPlanK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount) assert.Equal(t, launchPlanOutputLocationPrefix, execConfig.RawOutputDataConfig.OutputLocationPrefix) assert.Equal(t, launchPlanLabels, execConfig.GetLabels().Values) @@ -4139,11 +4149,13 @@ func TestGetExecutionConfigOverrides(t *testing.T) { }, }, MaxParallelism: launchPlanMaxParallelism, + Interruptible: &wrappers.BoolValue{Value: launchPlanInterruptible}, }, } execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) assert.NoError(t, err) assert.Equal(t, launchPlanMaxParallelism, execConfig.MaxParallelism) + assert.Equal(t, launchPlanInterruptible, execConfig.Interruptible.Value) assert.Equal(t, launchPlanK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount) assert.Equal(t, launchPlanOutputLocationPrefix, execConfig.RawOutputDataConfig.OutputLocationPrefix) assert.Equal(t, launchPlanLabels, execConfig.GetLabels().Values) @@ -4173,6 +4185,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) assert.NoError(t, err) assert.Equal(t, launchPlanMaxParallelism, execConfig.MaxParallelism) + assert.Equal(t, rmInterruptible, execConfig.Interruptible.Value) assert.Equal(t, launchPlanK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount) assert.Equal(t, launchPlanOutputLocationPrefix, execConfig.RawOutputDataConfig.OutputLocationPrefix) assert.Equal(t, launchPlanLabels, execConfig.GetLabels().Values) @@ -4190,6 +4203,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) assert.NoError(t, err) assert.Equal(t, rmMaxParallelism, execConfig.MaxParallelism) + assert.Equal(t, rmInterruptible, execConfig.Interruptible.Value) assert.Equal(t, rmK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount) assert.Equal(t, rmOutputLocationPrefix, execConfig.RawOutputDataConfig.OutputLocationPrefix) assert.Equal(t, rmLabels, execConfig.GetLabels().Values) @@ -4230,6 +4244,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) assert.NoError(t, err) assert.Equal(t, rmMaxParallelism, execConfig.MaxParallelism) + assert.Nil(t, execConfig.GetInterruptible()) assert.Equal(t, rmK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount) assert.Nil(t, execConfig.GetRawOutputDataConfig()) assert.Nil(t, execConfig.GetLabels()) @@ -4262,6 +4277,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) assert.NoError(t, err) assert.Equal(t, defaultMaxParallelism, execConfig.MaxParallelism) + assert.Nil(t, execConfig.GetInterruptible()) assert.Equal(t, defaultK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount) assert.Nil(t, execConfig.GetRawOutputDataConfig()) assert.Nil(t, execConfig.GetLabels()) @@ -4298,6 +4314,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) assert.NoError(t, err) assert.Equal(t, defaultMaxParallelism, execConfig.MaxParallelism) + assert.Nil(t, execConfig.GetInterruptible()) assert.Equal(t, deprecatedLaunchPlanK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount) assert.Nil(t, execConfig.GetRawOutputDataConfig()) assert.Nil(t, execConfig.GetLabels()) @@ -4317,6 +4334,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { Target: &admin.MatchingAttributes_WorkflowExecutionConfig{ WorkflowExecutionConfig: &admin.WorkflowExecutionConfig{ MaxParallelism: 300, + Interruptible: &wrappers.BoolValue{Value: true}, SecurityContext: &core.SecurityContext{ RunAs: &core.Identity{ K8SServiceAccount: "workflowDefault", @@ -4342,6 +4360,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) assert.NoError(t, err) assert.Equal(t, int32(300), execConfig.MaxParallelism) + assert.True(t, execConfig.Interruptible.Value) assert.Equal(t, "workflowDefault", execConfig.SecurityContext.RunAs.K8SServiceAccount) assert.Nil(t, execConfig.GetRawOutputDataConfig()) assert.Nil(t, execConfig.GetLabels()) @@ -4367,11 +4386,172 @@ func TestGetExecutionConfigOverrides(t *testing.T) { } execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) assert.Equal(t, fmt.Errorf("failed to fetch the resources"), err) + assert.Nil(t, execConfig.GetInterruptible()) assert.Nil(t, execConfig.GetSecurityContext()) assert.Nil(t, execConfig.GetRawOutputDataConfig()) assert.Nil(t, execConfig.GetLabels()) assert.Nil(t, execConfig.GetAnnotations()) }) + t.Run("application configuration", func(t *testing.T) { + resourceManager.GetResourceFunc = func(ctx context.Context, + request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) { + assert.EqualValues(t, request, managerInterfaces.ResourceRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + ResourceType: admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG, + }) + return &managerInterfaces.ResourceResponse{ + Attributes: &admin.MatchingAttributes{ + Target: &admin.MatchingAttributes_WorkflowExecutionConfig{ + WorkflowExecutionConfig: &admin.WorkflowExecutionConfig{}, + }, + }, + }, nil + } + + executionManager.config.ApplicationConfiguration().GetTopLevelConfig().Interruptible = true + + t.Run("request with interruptible override disabled", func(t *testing.T) { + request := &admin.ExecutionCreateRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + Spec: &admin.ExecutionSpec{ + Interruptible: &wrappers.BoolValue{Value: false}, + }, + } + + execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, nil) + assert.NoError(t, err) + assert.Equal(t, defaultMaxParallelism, execConfig.MaxParallelism) + assert.False(t, execConfig.Interruptible.Value) + assert.Equal(t, defaultK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount) + assert.Nil(t, execConfig.GetRawOutputDataConfig()) + assert.Nil(t, execConfig.GetLabels()) + assert.Nil(t, execConfig.GetAnnotations()) + }) + t.Run("request with interruptible override enabled", func(t *testing.T) { + request := &admin.ExecutionCreateRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + Spec: &admin.ExecutionSpec{ + Interruptible: &wrappers.BoolValue{Value: true}, + }, + } + + execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, nil) + assert.NoError(t, err) + assert.Equal(t, defaultMaxParallelism, execConfig.MaxParallelism) + assert.True(t, execConfig.Interruptible.Value) + assert.Equal(t, defaultK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount) + assert.Nil(t, execConfig.GetRawOutputDataConfig()) + assert.Nil(t, execConfig.GetLabels()) + assert.Nil(t, execConfig.GetAnnotations()) + }) + t.Run("request with no interruptible override specified", func(t *testing.T) { + request := &admin.ExecutionCreateRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + Spec: &admin.ExecutionSpec{}, + } + + execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, nil) + assert.NoError(t, err) + assert.Equal(t, defaultMaxParallelism, execConfig.MaxParallelism) + assert.True(t, execConfig.Interruptible.Value) + assert.Equal(t, defaultK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount) + assert.Nil(t, execConfig.GetRawOutputDataConfig()) + assert.Nil(t, execConfig.GetLabels()) + assert.Nil(t, execConfig.GetAnnotations()) + }) + t.Run("launch plan with interruptible override disabled", func(t *testing.T) { + request := &admin.ExecutionCreateRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + Spec: &admin.ExecutionSpec{}, + } + + launchPlan := &admin.LaunchPlan{ + Spec: &admin.LaunchPlanSpec{ + Interruptible: &wrappers.BoolValue{Value: false}, + }, + } + + execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) + assert.NoError(t, err) + assert.Equal(t, defaultMaxParallelism, execConfig.MaxParallelism) + assert.False(t, execConfig.Interruptible.Value) + assert.Equal(t, defaultK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount) + assert.Nil(t, execConfig.GetRawOutputDataConfig()) + assert.Nil(t, execConfig.GetLabels()) + assert.Nil(t, execConfig.GetAnnotations()) + }) + t.Run("launch plan with interruptible override enabled", func(t *testing.T) { + request := &admin.ExecutionCreateRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + Spec: &admin.ExecutionSpec{}, + } + + launchPlan := &admin.LaunchPlan{ + Spec: &admin.LaunchPlanSpec{ + Interruptible: &wrappers.BoolValue{Value: true}, + }, + } + + execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) + assert.NoError(t, err) + assert.Equal(t, defaultMaxParallelism, execConfig.MaxParallelism) + assert.True(t, execConfig.Interruptible.Value) + assert.Equal(t, defaultK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount) + assert.Nil(t, execConfig.GetRawOutputDataConfig()) + assert.Nil(t, execConfig.GetLabels()) + assert.Nil(t, execConfig.GetAnnotations()) + }) + t.Run("launch plan with no interruptible override specified", func(t *testing.T) { + request := &admin.ExecutionCreateRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + Spec: &admin.ExecutionSpec{}, + } + + launchPlan := &admin.LaunchPlan{ + Spec: &admin.LaunchPlanSpec{}, + } + + execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) + assert.NoError(t, err) + assert.Equal(t, defaultMaxParallelism, execConfig.MaxParallelism) + assert.True(t, execConfig.Interruptible.Value) + assert.Equal(t, defaultK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount) + assert.Nil(t, execConfig.GetRawOutputDataConfig()) + assert.Nil(t, execConfig.GetLabels()) + assert.Nil(t, execConfig.GetAnnotations()) + }) + t.Run("request and launch plan with different interruptible overrides", func(t *testing.T) { + request := &admin.ExecutionCreateRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + Spec: &admin.ExecutionSpec{ + Interruptible: &wrappers.BoolValue{Value: true}, + }, + } + + launchPlan := &admin.LaunchPlan{ + Spec: &admin.LaunchPlanSpec{ + Interruptible: &wrappers.BoolValue{Value: false}, + }, + } + + execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) + assert.NoError(t, err) + assert.Equal(t, defaultMaxParallelism, execConfig.MaxParallelism) + assert.True(t, execConfig.Interruptible.Value) + assert.Equal(t, defaultK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount) + assert.Nil(t, execConfig.GetRawOutputDataConfig()) + assert.Nil(t, execConfig.GetLabels()) + assert.Nil(t, execConfig.GetAnnotations()) + }) + }) } func TestGetExecutionConfig(t *testing.T) {