diff --git a/flyteadmin/go.mod b/flyteadmin/go.mod index 18c54e5afb..97dbe34fb3 100644 --- a/flyteadmin/go.mod +++ b/flyteadmin/go.mod @@ -16,7 +16,7 @@ require ( github.com/coreos/go-oidc v2.2.1+incompatible github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/evanphx/json-patch v4.9.0+incompatible - github.com/flyteorg/flyteidl v0.18.54 + github.com/flyteorg/flyteidl v0.19.2 github.com/flyteorg/flyteplugins v0.5.49 github.com/flyteorg/flytepropeller v0.10.16 github.com/flyteorg/flytestdlib v0.3.22 diff --git a/flyteadmin/go.sum b/flyteadmin/go.sum index 7355c0f212..793ab8cbd0 100644 --- a/flyteadmin/go.sum +++ b/flyteadmin/go.sum @@ -307,8 +307,8 @@ github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSw github.com/flyteorg/flyteidl v0.18.48/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= github.com/flyteorg/flyteidl v0.18.50 h1:L1fMj6QEXoKin+cPQn9sfwJ1x14tlChdz1mG1WaaIW4= github.com/flyteorg/flyteidl v0.18.50/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= -github.com/flyteorg/flyteidl v0.18.54 h1:OWgA9LUqH7rTLd8DywFzfm+LkTtLzrPztP5JaO/4hpM= -github.com/flyteorg/flyteidl v0.18.54/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= +github.com/flyteorg/flyteidl v0.19.2 h1:jXuRrLJEzSo33N9pw7bMEd6mRYSL7LCz/vnazz5XcOg= +github.com/flyteorg/flyteidl v0.19.2/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= github.com/flyteorg/flyteplugins v0.5.49 h1:jqmNrsTQ2+m+vYKqDVNO3CYy9q3XYTms3XzOr3roAT0= github.com/flyteorg/flyteplugins v0.5.49/go.mod h1:567WIA0Rr6QjmXsqvGsU+Cyb57Ia6qzddxIw//RPwYk= github.com/flyteorg/flytepropeller v0.10.16 h1:WSVh0X0F9xSw1BxGKbHqu0oha37YaBmO3bOS7GjR3Qo= diff --git a/flyteadmin/pkg/manager/impl/execution_manager.go b/flyteadmin/pkg/manager/impl/execution_manager.go index 350da2083a..0f0655cf86 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager.go +++ b/flyteadmin/pkg/manager/impl/execution_manager.go @@ -404,7 +404,22 @@ func (m *ExecutionManager) getInheritedExecMetadata(ctx context.Context, request return parentNodeExecutionID, sourceExecutionID, nil } -func (m *ExecutionManager) getExecutionConfig(ctx context.Context, request *admin.ExecutionCreateRequest) (*admin.WorkflowExecutionConfig, error) { +// Produces execution-time attributes for workflow execution. +// Defaults to overridable execution values set in the execution create request, then looks at the launch plan values +// (if any) before defaulting to values set in the matchable resource db. +func (m *ExecutionManager) getExecutionConfig(ctx context.Context, request *admin.ExecutionCreateRequest, + launchPlan *admin.LaunchPlan) (*admin.WorkflowExecutionConfig, error) { + if request.Spec.MaxParallelism > 0 { + return &admin.WorkflowExecutionConfig{ + MaxParallelism: request.Spec.MaxParallelism, + }, nil + } + if launchPlan != nil && launchPlan.Spec.MaxParallelism > 0 { + return &admin.WorkflowExecutionConfig{ + MaxParallelism: launchPlan.Spec.MaxParallelism, + }, nil + } + resource, err := m.resourceManager.GetResource(ctx, interfaces.ResourceRequest{ Project: request.Project, Domain: request.Domain, @@ -512,7 +527,7 @@ func (m *ExecutionManager) launchSingleTaskExecution( logger.Errorf(ctx, "Failed to get quality of service for [%+v] with error: %v", workflowExecutionID, err) return nil, nil, err } - executionConfig, err := m.getExecutionConfig(ctx, &request) + executionConfig, err := m.getExecutionConfig(ctx, &request, nil) if err != nil { return nil, nil, err } @@ -684,7 +699,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( logger.Errorf(ctx, "Failed to get quality of service for [%+v] with error: %v", workflowExecutionID, err) return nil, nil, err } - executionConfig, err := m.getExecutionConfig(ctx, &request) + executionConfig, err := m.getExecutionConfig(ctx, &request, launchPlan) if err != nil { return nil, nil, err } diff --git a/flyteadmin/pkg/manager/impl/execution_manager_test.go b/flyteadmin/pkg/manager/impl/execution_manager_test.go index 3c318a2701..3615b773ec 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager_test.go +++ b/flyteadmin/pkg/manager/impl/execution_manager_test.go @@ -2932,7 +2932,46 @@ func TestGetExecutionConfig(t *testing.T) { execConfig, err := executionManager.getExecutionConfig(context.TODO(), &admin.ExecutionCreateRequest{ Project: workflowIdentifier.Project, Domain: workflowIdentifier.Domain, + Spec: &admin.ExecutionSpec{}, + }, nil) + assert.NoError(t, err) + assert.Equal(t, execConfig.MaxParallelism, int32(100)) +} + +func TestGetExecutionConfig_Spec(t *testing.T) { + resourceManager := managerMocks.MockResourceManager{} + resourceManager.GetResourceFunc = func(ctx context.Context, + request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) { + t.Errorf("When a user specifies max parallelism in a spec, the db should not be queried") + return nil, nil + } + + executionManager := ExecutionManager{ + resourceManager: &resourceManager, + } + execConfig, err := executionManager.getExecutionConfig(context.TODO(), &admin.ExecutionCreateRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + Spec: &admin.ExecutionSpec{ + MaxParallelism: 100, + }, + }, &admin.LaunchPlan{ + Spec: &admin.LaunchPlanSpec{ + MaxParallelism: 50, + }, }) assert.NoError(t, err) assert.Equal(t, execConfig.MaxParallelism, int32(100)) + + execConfig, err = executionManager.getExecutionConfig(context.TODO(), &admin.ExecutionCreateRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + Spec: &admin.ExecutionSpec{}, + }, &admin.LaunchPlan{ + Spec: &admin.LaunchPlanSpec{ + MaxParallelism: 50, + }, + }) + assert.NoError(t, err) + assert.Equal(t, execConfig.MaxParallelism, int32(50)) }