Skip to content

Commit

Permalink
Fixed interruptible override applying fallback values incorrectly (fl…
Browse files Browse the repository at this point in the history
…yteorg#451)

Setting different values in multiple parts of the configs applied resulted in the least specific spec being applied
Added extra tests verifying mainly interruptible override fallbacks including application configuration

Signed-off-by: Nick Müller <[email protected]>
  • Loading branch information
Nick Müller authored Jun 30, 2022
1 parent e92694f commit a96084f
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 6 deletions.
7 changes: 1 addition & 6 deletions pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,12 +481,7 @@ func mergeIntoExecConfig(workflowExecConfig admin.WorkflowExecutionConfig, spec
workflowExecConfig.Annotations = spec.GetAnnotations()
}

// Override interruptible flag if workflow execution config does not have a value set or the spec sets a different
// value that defined as the workflow default. This allows for workflows to have their interruptible setting
// explicitly turned on and off for a single execution.
if (workflowExecConfig.GetInterruptible() == nil && spec.GetInterruptible() != nil) ||
(workflowExecConfig.GetInterruptible() != nil && spec.GetInterruptible() != nil &&
workflowExecConfig.GetInterruptible().GetValue() != spec.GetInterruptible().GetValue()) {
if workflowExecConfig.GetInterruptible() == nil && spec.GetInterruptible() != nil {
workflowExecConfig.Interruptible = spec.GetInterruptible()
}
return workflowExecConfig
Expand Down
180 changes: 180 additions & 0 deletions pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3974,13 +3974,15 @@ func TestGetExecutionConfigOverrides(t *testing.T) {
requestOutputLocationPrefix := "requestOutputLocationPrefix"
requestK8sServiceAccount := "requestK8sServiceAccount"
requestMaxParallelism := int32(10)
requestInterruptible := false

launchPlanLabels := map[string]string{"launchPlanLabelKey": "launchPlanLabelValue"}
launchPlanAnnotations := map[string]string{"launchPlanAnnotationKey": "launchPlanAnnotationValue"}
launchPlanOutputLocationPrefix := "launchPlanOutputLocationPrefix"
launchPlanK8sServiceAccount := "launchPlanK8sServiceAccount"
launchPlanAssumableIamRole := "launchPlanAssumableIamRole"
launchPlanMaxParallelism := int32(50)
launchPlanInterruptible := true

applicationConfig := runtime.NewConfigurationProvider()

Expand All @@ -3993,6 +3995,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) {
rmOutputLocationPrefix := "rmOutputLocationPrefix"
rmK8sServiceAccount := "rmK8sServiceAccount"
rmMaxParallelism := int32(80)
rmInterruptible := false

resourceManager := managerMocks.MockResourceManager{}
executionManager := ExecutionManager{
Expand All @@ -4011,6 +4014,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) {
Target: &admin.MatchingAttributes_WorkflowExecutionConfig{
WorkflowExecutionConfig: &admin.WorkflowExecutionConfig{
MaxParallelism: rmMaxParallelism,
Interruptible: &wrappers.BoolValue{Value: rmInterruptible},
Labels: &admin.Labels{Values: rmLabels},
Annotations: &admin.Annotations{Values: rmAnnotations},
RawOutputDataConfig: &admin.RawOutputDataConfig{
Expand Down Expand Up @@ -4043,12 +4047,14 @@ func TestGetExecutionConfigOverrides(t *testing.T) {
},
},
MaxParallelism: requestMaxParallelism,
Interruptible: &wrappers.BoolValue{Value: requestInterruptible},
},
}
execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, nil)
assert.NoError(t, err)
assert.Equal(t, requestMaxParallelism, execConfig.MaxParallelism)
assert.Equal(t, requestK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount)
assert.Equal(t, requestInterruptible, execConfig.Interruptible.Value)
assert.Equal(t, requestOutputLocationPrefix, execConfig.RawOutputDataConfig.OutputLocationPrefix)
assert.Equal(t, requestLabels, execConfig.GetLabels().Values)
assert.Equal(t, requestAnnotations, execConfig.GetAnnotations().Values)
Expand Down Expand Up @@ -4077,11 +4083,13 @@ func TestGetExecutionConfigOverrides(t *testing.T) {
},
},
MaxParallelism: launchPlanMaxParallelism,
Interruptible: &wrappers.BoolValue{Value: launchPlanInterruptible},
},
}
execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan)
assert.NoError(t, err)
assert.Equal(t, requestMaxParallelism, execConfig.MaxParallelism)
assert.Equal(t, launchPlanInterruptible, execConfig.Interruptible.Value)
assert.True(t, proto.Equal(launchPlan.Spec.SecurityContext, execConfig.SecurityContext))
assert.True(t, proto.Equal(launchPlan.Spec.Annotations, execConfig.Annotations))
assert.Equal(t, requestOutputLocationPrefix, execConfig.RawOutputDataConfig.OutputLocationPrefix)
Expand Down Expand Up @@ -4111,11 +4119,13 @@ func TestGetExecutionConfigOverrides(t *testing.T) {
},
},
MaxParallelism: launchPlanMaxParallelism,
Interruptible: &wrappers.BoolValue{Value: launchPlanInterruptible},
},
}
execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan)
assert.NoError(t, err)
assert.Equal(t, launchPlanMaxParallelism, execConfig.MaxParallelism)
assert.Equal(t, launchPlanInterruptible, execConfig.Interruptible.Value)
assert.Equal(t, launchPlanK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount)
assert.Equal(t, launchPlanOutputLocationPrefix, execConfig.RawOutputDataConfig.OutputLocationPrefix)
assert.Equal(t, launchPlanLabels, execConfig.GetLabels().Values)
Expand All @@ -4139,11 +4149,13 @@ func TestGetExecutionConfigOverrides(t *testing.T) {
},
},
MaxParallelism: launchPlanMaxParallelism,
Interruptible: &wrappers.BoolValue{Value: launchPlanInterruptible},
},
}
execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan)
assert.NoError(t, err)
assert.Equal(t, launchPlanMaxParallelism, execConfig.MaxParallelism)
assert.Equal(t, launchPlanInterruptible, execConfig.Interruptible.Value)
assert.Equal(t, launchPlanK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount)
assert.Equal(t, launchPlanOutputLocationPrefix, execConfig.RawOutputDataConfig.OutputLocationPrefix)
assert.Equal(t, launchPlanLabels, execConfig.GetLabels().Values)
Expand Down Expand Up @@ -4173,6 +4185,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) {
execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan)
assert.NoError(t, err)
assert.Equal(t, launchPlanMaxParallelism, execConfig.MaxParallelism)
assert.Equal(t, rmInterruptible, execConfig.Interruptible.Value)
assert.Equal(t, launchPlanK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount)
assert.Equal(t, launchPlanOutputLocationPrefix, execConfig.RawOutputDataConfig.OutputLocationPrefix)
assert.Equal(t, launchPlanLabels, execConfig.GetLabels().Values)
Expand All @@ -4190,6 +4203,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) {
execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan)
assert.NoError(t, err)
assert.Equal(t, rmMaxParallelism, execConfig.MaxParallelism)
assert.Equal(t, rmInterruptible, execConfig.Interruptible.Value)
assert.Equal(t, rmK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount)
assert.Equal(t, rmOutputLocationPrefix, execConfig.RawOutputDataConfig.OutputLocationPrefix)
assert.Equal(t, rmLabels, execConfig.GetLabels().Values)
Expand Down Expand Up @@ -4230,6 +4244,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) {
execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan)
assert.NoError(t, err)
assert.Equal(t, rmMaxParallelism, execConfig.MaxParallelism)
assert.Nil(t, execConfig.GetInterruptible())
assert.Equal(t, rmK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount)
assert.Nil(t, execConfig.GetRawOutputDataConfig())
assert.Nil(t, execConfig.GetLabels())
Expand Down Expand Up @@ -4262,6 +4277,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) {
execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan)
assert.NoError(t, err)
assert.Equal(t, defaultMaxParallelism, execConfig.MaxParallelism)
assert.Nil(t, execConfig.GetInterruptible())
assert.Equal(t, defaultK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount)
assert.Nil(t, execConfig.GetRawOutputDataConfig())
assert.Nil(t, execConfig.GetLabels())
Expand Down Expand Up @@ -4298,6 +4314,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) {
execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan)
assert.NoError(t, err)
assert.Equal(t, defaultMaxParallelism, execConfig.MaxParallelism)
assert.Nil(t, execConfig.GetInterruptible())
assert.Equal(t, deprecatedLaunchPlanK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount)
assert.Nil(t, execConfig.GetRawOutputDataConfig())
assert.Nil(t, execConfig.GetLabels())
Expand All @@ -4317,6 +4334,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) {
Target: &admin.MatchingAttributes_WorkflowExecutionConfig{
WorkflowExecutionConfig: &admin.WorkflowExecutionConfig{
MaxParallelism: 300,
Interruptible: &wrappers.BoolValue{Value: true},
SecurityContext: &core.SecurityContext{
RunAs: &core.Identity{
K8SServiceAccount: "workflowDefault",
Expand All @@ -4342,6 +4360,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) {
execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan)
assert.NoError(t, err)
assert.Equal(t, int32(300), execConfig.MaxParallelism)
assert.True(t, execConfig.Interruptible.Value)
assert.Equal(t, "workflowDefault", execConfig.SecurityContext.RunAs.K8SServiceAccount)
assert.Nil(t, execConfig.GetRawOutputDataConfig())
assert.Nil(t, execConfig.GetLabels())
Expand All @@ -4367,11 +4386,172 @@ func TestGetExecutionConfigOverrides(t *testing.T) {
}
execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan)
assert.Equal(t, fmt.Errorf("failed to fetch the resources"), err)
assert.Nil(t, execConfig.GetInterruptible())
assert.Nil(t, execConfig.GetSecurityContext())
assert.Nil(t, execConfig.GetRawOutputDataConfig())
assert.Nil(t, execConfig.GetLabels())
assert.Nil(t, execConfig.GetAnnotations())
})
t.Run("application configuration", func(t *testing.T) {
resourceManager.GetResourceFunc = func(ctx context.Context,
request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) {
assert.EqualValues(t, request, managerInterfaces.ResourceRequest{
Project: workflowIdentifier.Project,
Domain: workflowIdentifier.Domain,
ResourceType: admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG,
})
return &managerInterfaces.ResourceResponse{
Attributes: &admin.MatchingAttributes{
Target: &admin.MatchingAttributes_WorkflowExecutionConfig{
WorkflowExecutionConfig: &admin.WorkflowExecutionConfig{},
},
},
}, nil
}

executionManager.config.ApplicationConfiguration().GetTopLevelConfig().Interruptible = true

t.Run("request with interruptible override disabled", func(t *testing.T) {
request := &admin.ExecutionCreateRequest{
Project: workflowIdentifier.Project,
Domain: workflowIdentifier.Domain,
Spec: &admin.ExecutionSpec{
Interruptible: &wrappers.BoolValue{Value: false},
},
}

execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, nil)
assert.NoError(t, err)
assert.Equal(t, defaultMaxParallelism, execConfig.MaxParallelism)
assert.False(t, execConfig.Interruptible.Value)
assert.Equal(t, defaultK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount)
assert.Nil(t, execConfig.GetRawOutputDataConfig())
assert.Nil(t, execConfig.GetLabels())
assert.Nil(t, execConfig.GetAnnotations())
})
t.Run("request with interruptible override enabled", func(t *testing.T) {
request := &admin.ExecutionCreateRequest{
Project: workflowIdentifier.Project,
Domain: workflowIdentifier.Domain,
Spec: &admin.ExecutionSpec{
Interruptible: &wrappers.BoolValue{Value: true},
},
}

execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, nil)
assert.NoError(t, err)
assert.Equal(t, defaultMaxParallelism, execConfig.MaxParallelism)
assert.True(t, execConfig.Interruptible.Value)
assert.Equal(t, defaultK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount)
assert.Nil(t, execConfig.GetRawOutputDataConfig())
assert.Nil(t, execConfig.GetLabels())
assert.Nil(t, execConfig.GetAnnotations())
})
t.Run("request with no interruptible override specified", func(t *testing.T) {
request := &admin.ExecutionCreateRequest{
Project: workflowIdentifier.Project,
Domain: workflowIdentifier.Domain,
Spec: &admin.ExecutionSpec{},
}

execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, nil)
assert.NoError(t, err)
assert.Equal(t, defaultMaxParallelism, execConfig.MaxParallelism)
assert.True(t, execConfig.Interruptible.Value)
assert.Equal(t, defaultK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount)
assert.Nil(t, execConfig.GetRawOutputDataConfig())
assert.Nil(t, execConfig.GetLabels())
assert.Nil(t, execConfig.GetAnnotations())
})
t.Run("launch plan with interruptible override disabled", func(t *testing.T) {
request := &admin.ExecutionCreateRequest{
Project: workflowIdentifier.Project,
Domain: workflowIdentifier.Domain,
Spec: &admin.ExecutionSpec{},
}

launchPlan := &admin.LaunchPlan{
Spec: &admin.LaunchPlanSpec{
Interruptible: &wrappers.BoolValue{Value: false},
},
}

execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan)
assert.NoError(t, err)
assert.Equal(t, defaultMaxParallelism, execConfig.MaxParallelism)
assert.False(t, execConfig.Interruptible.Value)
assert.Equal(t, defaultK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount)
assert.Nil(t, execConfig.GetRawOutputDataConfig())
assert.Nil(t, execConfig.GetLabels())
assert.Nil(t, execConfig.GetAnnotations())
})
t.Run("launch plan with interruptible override enabled", func(t *testing.T) {
request := &admin.ExecutionCreateRequest{
Project: workflowIdentifier.Project,
Domain: workflowIdentifier.Domain,
Spec: &admin.ExecutionSpec{},
}

launchPlan := &admin.LaunchPlan{
Spec: &admin.LaunchPlanSpec{
Interruptible: &wrappers.BoolValue{Value: true},
},
}

execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan)
assert.NoError(t, err)
assert.Equal(t, defaultMaxParallelism, execConfig.MaxParallelism)
assert.True(t, execConfig.Interruptible.Value)
assert.Equal(t, defaultK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount)
assert.Nil(t, execConfig.GetRawOutputDataConfig())
assert.Nil(t, execConfig.GetLabels())
assert.Nil(t, execConfig.GetAnnotations())
})
t.Run("launch plan with no interruptible override specified", func(t *testing.T) {
request := &admin.ExecutionCreateRequest{
Project: workflowIdentifier.Project,
Domain: workflowIdentifier.Domain,
Spec: &admin.ExecutionSpec{},
}

launchPlan := &admin.LaunchPlan{
Spec: &admin.LaunchPlanSpec{},
}

execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan)
assert.NoError(t, err)
assert.Equal(t, defaultMaxParallelism, execConfig.MaxParallelism)
assert.True(t, execConfig.Interruptible.Value)
assert.Equal(t, defaultK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount)
assert.Nil(t, execConfig.GetRawOutputDataConfig())
assert.Nil(t, execConfig.GetLabels())
assert.Nil(t, execConfig.GetAnnotations())
})
t.Run("request and launch plan with different interruptible overrides", func(t *testing.T) {
request := &admin.ExecutionCreateRequest{
Project: workflowIdentifier.Project,
Domain: workflowIdentifier.Domain,
Spec: &admin.ExecutionSpec{
Interruptible: &wrappers.BoolValue{Value: true},
},
}

launchPlan := &admin.LaunchPlan{
Spec: &admin.LaunchPlanSpec{
Interruptible: &wrappers.BoolValue{Value: false},
},
}

execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan)
assert.NoError(t, err)
assert.Equal(t, defaultMaxParallelism, execConfig.MaxParallelism)
assert.True(t, execConfig.Interruptible.Value)
assert.Equal(t, defaultK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount)
assert.Nil(t, execConfig.GetRawOutputDataConfig())
assert.Nil(t, execConfig.GetLabels())
assert.Nil(t, execConfig.GetAnnotations())
})
})
}

func TestGetExecutionConfig(t *testing.T) {
Expand Down

0 comments on commit a96084f

Please sign in to comment.