diff --git a/pkg/manager/impl/execution_manager.go b/pkg/manager/impl/execution_manager.go index 287abbf78d..ccdc6d0038 100644 --- a/pkg/manager/impl/execution_manager.go +++ b/pkg/manager/impl/execution_manager.go @@ -481,12 +481,7 @@ func mergeIntoExecConfig(workflowExecConfig admin.WorkflowExecutionConfig, spec workflowExecConfig.Annotations = spec.GetAnnotations() } - // Override interruptible flag if workflow execution config does not have a value set or the spec sets a different - // value that defined as the workflow default. This allows for workflows to have their interruptible setting - // explicitly turned on and off for a single execution. - if (workflowExecConfig.GetInterruptible() == nil && spec.GetInterruptible() != nil) || - (workflowExecConfig.GetInterruptible() != nil && spec.GetInterruptible() != nil && - workflowExecConfig.GetInterruptible().GetValue() != spec.GetInterruptible().GetValue()) { + if workflowExecConfig.GetInterruptible() == nil && spec.GetInterruptible() != nil { workflowExecConfig.Interruptible = spec.GetInterruptible() } return workflowExecConfig diff --git a/pkg/manager/impl/execution_manager_test.go b/pkg/manager/impl/execution_manager_test.go index 21098b663b..f7a981b17b 100644 --- a/pkg/manager/impl/execution_manager_test.go +++ b/pkg/manager/impl/execution_manager_test.go @@ -3974,6 +3974,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { requestOutputLocationPrefix := "requestOutputLocationPrefix" requestK8sServiceAccount := "requestK8sServiceAccount" requestMaxParallelism := int32(10) + requestInterruptible := false launchPlanLabels := map[string]string{"launchPlanLabelKey": "launchPlanLabelValue"} launchPlanAnnotations := map[string]string{"launchPlanAnnotationKey": "launchPlanAnnotationValue"} @@ -3981,6 +3982,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { launchPlanK8sServiceAccount := "launchPlanK8sServiceAccount" launchPlanAssumableIamRole := "launchPlanAssumableIamRole" launchPlanMaxParallelism := int32(50) + launchPlanInterruptible := true applicationConfig := runtime.NewConfigurationProvider() @@ -3993,6 +3995,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { rmOutputLocationPrefix := "rmOutputLocationPrefix" rmK8sServiceAccount := "rmK8sServiceAccount" rmMaxParallelism := int32(80) + rmInterruptible := false resourceManager := managerMocks.MockResourceManager{} executionManager := ExecutionManager{ @@ -4011,6 +4014,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { Target: &admin.MatchingAttributes_WorkflowExecutionConfig{ WorkflowExecutionConfig: &admin.WorkflowExecutionConfig{ MaxParallelism: rmMaxParallelism, + Interruptible: &wrappers.BoolValue{Value: rmInterruptible}, Labels: &admin.Labels{Values: rmLabels}, Annotations: &admin.Annotations{Values: rmAnnotations}, RawOutputDataConfig: &admin.RawOutputDataConfig{ @@ -4043,12 +4047,14 @@ func TestGetExecutionConfigOverrides(t *testing.T) { }, }, MaxParallelism: requestMaxParallelism, + Interruptible: &wrappers.BoolValue{Value: requestInterruptible}, }, } execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, nil) assert.NoError(t, err) assert.Equal(t, requestMaxParallelism, execConfig.MaxParallelism) assert.Equal(t, requestK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount) + assert.Equal(t, requestInterruptible, execConfig.Interruptible.Value) assert.Equal(t, requestOutputLocationPrefix, execConfig.RawOutputDataConfig.OutputLocationPrefix) assert.Equal(t, requestLabels, execConfig.GetLabels().Values) assert.Equal(t, requestAnnotations, execConfig.GetAnnotations().Values) @@ -4077,11 +4083,13 @@ func TestGetExecutionConfigOverrides(t *testing.T) { }, }, MaxParallelism: launchPlanMaxParallelism, + Interruptible: &wrappers.BoolValue{Value: launchPlanInterruptible}, }, } execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) assert.NoError(t, err) assert.Equal(t, requestMaxParallelism, execConfig.MaxParallelism) + assert.Equal(t, launchPlanInterruptible, execConfig.Interruptible.Value) assert.True(t, proto.Equal(launchPlan.Spec.SecurityContext, execConfig.SecurityContext)) assert.True(t, proto.Equal(launchPlan.Spec.Annotations, execConfig.Annotations)) assert.Equal(t, requestOutputLocationPrefix, execConfig.RawOutputDataConfig.OutputLocationPrefix) @@ -4111,11 +4119,13 @@ func TestGetExecutionConfigOverrides(t *testing.T) { }, }, MaxParallelism: launchPlanMaxParallelism, + Interruptible: &wrappers.BoolValue{Value: launchPlanInterruptible}, }, } execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) assert.NoError(t, err) assert.Equal(t, launchPlanMaxParallelism, execConfig.MaxParallelism) + assert.Equal(t, launchPlanInterruptible, execConfig.Interruptible.Value) assert.Equal(t, launchPlanK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount) assert.Equal(t, launchPlanOutputLocationPrefix, execConfig.RawOutputDataConfig.OutputLocationPrefix) assert.Equal(t, launchPlanLabels, execConfig.GetLabels().Values) @@ -4139,11 +4149,13 @@ func TestGetExecutionConfigOverrides(t *testing.T) { }, }, MaxParallelism: launchPlanMaxParallelism, + Interruptible: &wrappers.BoolValue{Value: launchPlanInterruptible}, }, } execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) assert.NoError(t, err) assert.Equal(t, launchPlanMaxParallelism, execConfig.MaxParallelism) + assert.Equal(t, launchPlanInterruptible, execConfig.Interruptible.Value) assert.Equal(t, launchPlanK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount) assert.Equal(t, launchPlanOutputLocationPrefix, execConfig.RawOutputDataConfig.OutputLocationPrefix) assert.Equal(t, launchPlanLabels, execConfig.GetLabels().Values) @@ -4173,6 +4185,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) assert.NoError(t, err) assert.Equal(t, launchPlanMaxParallelism, execConfig.MaxParallelism) + assert.Equal(t, rmInterruptible, execConfig.Interruptible.Value) assert.Equal(t, launchPlanK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount) assert.Equal(t, launchPlanOutputLocationPrefix, execConfig.RawOutputDataConfig.OutputLocationPrefix) assert.Equal(t, launchPlanLabels, execConfig.GetLabels().Values) @@ -4190,6 +4203,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) assert.NoError(t, err) assert.Equal(t, rmMaxParallelism, execConfig.MaxParallelism) + assert.Equal(t, rmInterruptible, execConfig.Interruptible.Value) assert.Equal(t, rmK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount) assert.Equal(t, rmOutputLocationPrefix, execConfig.RawOutputDataConfig.OutputLocationPrefix) assert.Equal(t, rmLabels, execConfig.GetLabels().Values) @@ -4230,6 +4244,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) assert.NoError(t, err) assert.Equal(t, rmMaxParallelism, execConfig.MaxParallelism) + assert.Nil(t, execConfig.GetInterruptible()) assert.Equal(t, rmK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount) assert.Nil(t, execConfig.GetRawOutputDataConfig()) assert.Nil(t, execConfig.GetLabels()) @@ -4262,6 +4277,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) assert.NoError(t, err) assert.Equal(t, defaultMaxParallelism, execConfig.MaxParallelism) + assert.Nil(t, execConfig.GetInterruptible()) assert.Equal(t, defaultK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount) assert.Nil(t, execConfig.GetRawOutputDataConfig()) assert.Nil(t, execConfig.GetLabels()) @@ -4298,6 +4314,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) assert.NoError(t, err) assert.Equal(t, defaultMaxParallelism, execConfig.MaxParallelism) + assert.Nil(t, execConfig.GetInterruptible()) assert.Equal(t, deprecatedLaunchPlanK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount) assert.Nil(t, execConfig.GetRawOutputDataConfig()) assert.Nil(t, execConfig.GetLabels()) @@ -4317,6 +4334,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { Target: &admin.MatchingAttributes_WorkflowExecutionConfig{ WorkflowExecutionConfig: &admin.WorkflowExecutionConfig{ MaxParallelism: 300, + Interruptible: &wrappers.BoolValue{Value: true}, SecurityContext: &core.SecurityContext{ RunAs: &core.Identity{ K8SServiceAccount: "workflowDefault", @@ -4342,6 +4360,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) { execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) assert.NoError(t, err) assert.Equal(t, int32(300), execConfig.MaxParallelism) + assert.True(t, execConfig.Interruptible.Value) assert.Equal(t, "workflowDefault", execConfig.SecurityContext.RunAs.K8SServiceAccount) assert.Nil(t, execConfig.GetRawOutputDataConfig()) assert.Nil(t, execConfig.GetLabels()) @@ -4367,11 +4386,172 @@ func TestGetExecutionConfigOverrides(t *testing.T) { } execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) assert.Equal(t, fmt.Errorf("failed to fetch the resources"), err) + assert.Nil(t, execConfig.GetInterruptible()) assert.Nil(t, execConfig.GetSecurityContext()) assert.Nil(t, execConfig.GetRawOutputDataConfig()) assert.Nil(t, execConfig.GetLabels()) assert.Nil(t, execConfig.GetAnnotations()) }) + t.Run("application configuration", func(t *testing.T) { + resourceManager.GetResourceFunc = func(ctx context.Context, + request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) { + assert.EqualValues(t, request, managerInterfaces.ResourceRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + ResourceType: admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG, + }) + return &managerInterfaces.ResourceResponse{ + Attributes: &admin.MatchingAttributes{ + Target: &admin.MatchingAttributes_WorkflowExecutionConfig{ + WorkflowExecutionConfig: &admin.WorkflowExecutionConfig{}, + }, + }, + }, nil + } + + executionManager.config.ApplicationConfiguration().GetTopLevelConfig().Interruptible = true + + t.Run("request with interruptible override disabled", func(t *testing.T) { + request := &admin.ExecutionCreateRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + Spec: &admin.ExecutionSpec{ + Interruptible: &wrappers.BoolValue{Value: false}, + }, + } + + execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, nil) + assert.NoError(t, err) + assert.Equal(t, defaultMaxParallelism, execConfig.MaxParallelism) + assert.False(t, execConfig.Interruptible.Value) + assert.Equal(t, defaultK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount) + assert.Nil(t, execConfig.GetRawOutputDataConfig()) + assert.Nil(t, execConfig.GetLabels()) + assert.Nil(t, execConfig.GetAnnotations()) + }) + t.Run("request with interruptible override enabled", func(t *testing.T) { + request := &admin.ExecutionCreateRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + Spec: &admin.ExecutionSpec{ + Interruptible: &wrappers.BoolValue{Value: true}, + }, + } + + execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, nil) + assert.NoError(t, err) + assert.Equal(t, defaultMaxParallelism, execConfig.MaxParallelism) + assert.True(t, execConfig.Interruptible.Value) + assert.Equal(t, defaultK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount) + assert.Nil(t, execConfig.GetRawOutputDataConfig()) + assert.Nil(t, execConfig.GetLabels()) + assert.Nil(t, execConfig.GetAnnotations()) + }) + t.Run("request with no interruptible override specified", func(t *testing.T) { + request := &admin.ExecutionCreateRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + Spec: &admin.ExecutionSpec{}, + } + + execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, nil) + assert.NoError(t, err) + assert.Equal(t, defaultMaxParallelism, execConfig.MaxParallelism) + assert.True(t, execConfig.Interruptible.Value) + assert.Equal(t, defaultK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount) + assert.Nil(t, execConfig.GetRawOutputDataConfig()) + assert.Nil(t, execConfig.GetLabels()) + assert.Nil(t, execConfig.GetAnnotations()) + }) + t.Run("launch plan with interruptible override disabled", func(t *testing.T) { + request := &admin.ExecutionCreateRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + Spec: &admin.ExecutionSpec{}, + } + + launchPlan := &admin.LaunchPlan{ + Spec: &admin.LaunchPlanSpec{ + Interruptible: &wrappers.BoolValue{Value: false}, + }, + } + + execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) + assert.NoError(t, err) + assert.Equal(t, defaultMaxParallelism, execConfig.MaxParallelism) + assert.False(t, execConfig.Interruptible.Value) + assert.Equal(t, defaultK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount) + assert.Nil(t, execConfig.GetRawOutputDataConfig()) + assert.Nil(t, execConfig.GetLabels()) + assert.Nil(t, execConfig.GetAnnotations()) + }) + t.Run("launch plan with interruptible override enabled", func(t *testing.T) { + request := &admin.ExecutionCreateRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + Spec: &admin.ExecutionSpec{}, + } + + launchPlan := &admin.LaunchPlan{ + Spec: &admin.LaunchPlanSpec{ + Interruptible: &wrappers.BoolValue{Value: true}, + }, + } + + execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) + assert.NoError(t, err) + assert.Equal(t, defaultMaxParallelism, execConfig.MaxParallelism) + assert.True(t, execConfig.Interruptible.Value) + assert.Equal(t, defaultK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount) + assert.Nil(t, execConfig.GetRawOutputDataConfig()) + assert.Nil(t, execConfig.GetLabels()) + assert.Nil(t, execConfig.GetAnnotations()) + }) + t.Run("launch plan with no interruptible override specified", func(t *testing.T) { + request := &admin.ExecutionCreateRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + Spec: &admin.ExecutionSpec{}, + } + + launchPlan := &admin.LaunchPlan{ + Spec: &admin.LaunchPlanSpec{}, + } + + execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) + assert.NoError(t, err) + assert.Equal(t, defaultMaxParallelism, execConfig.MaxParallelism) + assert.True(t, execConfig.Interruptible.Value) + assert.Equal(t, defaultK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount) + assert.Nil(t, execConfig.GetRawOutputDataConfig()) + assert.Nil(t, execConfig.GetLabels()) + assert.Nil(t, execConfig.GetAnnotations()) + }) + t.Run("request and launch plan with different interruptible overrides", func(t *testing.T) { + request := &admin.ExecutionCreateRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + Spec: &admin.ExecutionSpec{ + Interruptible: &wrappers.BoolValue{Value: true}, + }, + } + + launchPlan := &admin.LaunchPlan{ + Spec: &admin.LaunchPlanSpec{ + Interruptible: &wrappers.BoolValue{Value: false}, + }, + } + + execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) + assert.NoError(t, err) + assert.Equal(t, defaultMaxParallelism, execConfig.MaxParallelism) + assert.True(t, execConfig.Interruptible.Value) + assert.Equal(t, defaultK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount) + assert.Nil(t, execConfig.GetRawOutputDataConfig()) + assert.Nil(t, execConfig.GetLabels()) + assert.Nil(t, execConfig.GetAnnotations()) + }) + }) } func TestGetExecutionConfig(t *testing.T) {