From 4648a2e5c1415eb27a74bda4d3a7f82d4d48c813 Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Wed, 23 Mar 2022 10:31:46 -0700 Subject: [PATCH] Read and pass along cluster assignment to workflow executor (#376) * wip Signed-off-by: Katrina Rogan * add a test too Signed-off-by: Katrina Rogan * Matchable attribute impl Signed-off-by: Katrina Rogan --- flyteadmin/go.mod | 2 +- flyteadmin/go.sum | 3 +- .../pkg/manager/impl/execution_manager.go | 36 +++++++++ .../manager/impl/execution_manager_test.go | 77 +++++++++++++++++++ .../impl/validation/attributes_validator.go | 2 + .../validation/attributes_validator_test.go | 19 +++++ .../pkg/workflowengine/interfaces/executor.go | 1 + 7 files changed, 138 insertions(+), 2 deletions(-) diff --git a/flyteadmin/go.mod b/flyteadmin/go.mod index 0ea30893d1..0828db3958 100644 --- a/flyteadmin/go.mod +++ b/flyteadmin/go.mod @@ -11,7 +11,7 @@ require ( github.com/benbjohnson/clock v1.1.0 github.com/coreos/go-oidc v2.2.1+incompatible github.com/evanphx/json-patch v4.9.0+incompatible - github.com/flyteorg/flyteidl v0.24.0 + github.com/flyteorg/flyteidl v0.24.2 github.com/flyteorg/flyteplugins v0.10.16 github.com/flyteorg/flytepropeller v0.16.36 github.com/flyteorg/flytestdlib v0.4.13 diff --git a/flyteadmin/go.sum b/flyteadmin/go.sum index 207b23419f..967f3c492d 100644 --- a/flyteadmin/go.sum +++ b/flyteadmin/go.sum @@ -351,8 +351,9 @@ github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4 github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/flyteorg/flyteidl v0.23.0/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= -github.com/flyteorg/flyteidl v0.24.0 h1:bEr9LGCilUX8t6gE8+K6qfW+LDXuCvBpHfllA+zDPZI= github.com/flyteorg/flyteidl v0.24.0/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= +github.com/flyteorg/flyteidl v0.24.2 h1:RQzWmtVQR+NKAppjw7xTsIn6gosP0Q/j58tfF6Cr6h4= +github.com/flyteorg/flyteidl v0.24.2/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= github.com/flyteorg/flyteplugins v0.10.16 h1:rwNI2MACPbcST2O6CEUsNW6bccz7ZLni0GiY3orevfw= github.com/flyteorg/flyteplugins v0.10.16/go.mod h1:YBWV8QnFakDJfLyua8pYddiWqszAqseBKIJPNMERlos= github.com/flyteorg/flytepropeller v0.16.36 h1:5uE8JsutrPVyLVrRJ8BgvhZUOmTBFkEkn5xmIOo21nU= diff --git a/flyteadmin/pkg/manager/impl/execution_manager.go b/flyteadmin/pkg/manager/impl/execution_manager.go index 69c2185a44..1d895f551e 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager.go +++ b/flyteadmin/pkg/manager/impl/execution_manager.go @@ -465,6 +465,30 @@ func (m *ExecutionManager) getExecutionConfig(ctx context.Context, request *admi }, nil } +func (m *ExecutionManager) getClusterAssignment(ctx context.Context, request *admin.ExecutionCreateRequest) ( + *admin.ClusterAssignment, error) { + if request.Spec.ClusterAssignment != nil { + return request.Spec.ClusterAssignment, nil + } + + resource, err := m.resourceManager.GetResource(ctx, interfaces.ResourceRequest{ + Project: request.Project, + Domain: request.Domain, + ResourceType: admin.MatchableResource_CLUSTER_ASSIGNMENT, + }) + if err != nil { + if flyteAdminError, ok := err.(errors.FlyteAdminError); !ok || flyteAdminError.Code() != codes.NotFound { + logger.Errorf(ctx, "Failed to get cluster assignment overrides with error: %v", err) + return nil, err + } + } + if resource != nil && resource.Attributes.GetClusterAssignment() != nil { + return resource.Attributes.GetClusterAssignment(), nil + } + // Defaults to empty assignment with no selectors + return &admin.ClusterAssignment{}, nil +} + func (m *ExecutionManager) launchSingleTaskExecution( ctx context.Context, request admin.ExecutionCreateRequest, requestedAt time.Time) ( context.Context, *models.Execution, error) { @@ -572,6 +596,11 @@ func (m *ExecutionManager) launchSingleTaskExecution( rawOutputDataConfig = requestSpec.RawOutputDataConfig } + clusterAssignment, err := m.getClusterAssignment(ctx, &request) + if err != nil { + return nil, nil, err + } + resolvedAuthRole := resolveAuthRole(request, launchPlan) resolvedSecurityCtx := resolveSecurityCtx(ctx, request, launchPlan, resolvedAuthRole) executionParameters := workflowengineInterfaces.ExecutionParameters{ @@ -585,6 +614,7 @@ func (m *ExecutionManager) launchSingleTaskExecution( EventVersion: m.config.ApplicationConfiguration().GetTopLevelConfig().EventVersion, RoleNameKey: m.config.ApplicationConfiguration().GetTopLevelConfig().RoleNameKey, RawOutputDataConfig: rawOutputDataConfig, + ClusterAssignment: clusterAssignment, } overrides, err := m.addPluginOverrides(ctx, &workflowExecutionID, workflowExecutionID.Name, "") @@ -804,6 +834,11 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( rawOutputDataConfig = requestSpec.RawOutputDataConfig } + clusterAssignment, err := m.getClusterAssignment(ctx, &request) + if err != nil { + return nil, nil, err + } + resolvedAuthRole := resolveAuthRole(request, launchPlan) resolvedSecurityCtx := resolveSecurityCtx(ctx, request, launchPlan, resolvedAuthRole) executionParameters := workflowengineInterfaces.ExecutionParameters{ @@ -817,6 +852,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( EventVersion: m.config.ApplicationConfiguration().GetTopLevelConfig().EventVersion, RoleNameKey: m.config.ApplicationConfiguration().GetTopLevelConfig().RoleNameKey, RawOutputDataConfig: rawOutputDataConfig, + ClusterAssignment: clusterAssignment, } overrides, err := m.addPluginOverrides(ctx, &workflowExecutionID, launchPlan.GetSpec().WorkflowId.Name, launchPlan.Id.Name) diff --git a/flyteadmin/pkg/manager/impl/execution_manager_test.go b/flyteadmin/pkg/manager/impl/execution_manager_test.go index c0c016bc1e..895dc45c51 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager_test.go +++ b/flyteadmin/pkg/manager/impl/execution_manager_test.go @@ -262,6 +262,17 @@ func TestCreateExecution(t *testing.T) { principal := "principal" rawOutput := "raw_output" + clusterAssignment := admin.ClusterAssignment{ + Affinity: &admin.Affinity{ + Selectors: []*admin.Selector{ + { + Key: "foo", + Value: []string{"bar"}, + Operator: admin.Selector_NOT_EQUALS, + }, + }, + }, + } repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback( func(ctx context.Context, input models.Execution) error { var spec admin.ExecutionSpec @@ -269,6 +280,7 @@ func TestCreateExecution(t *testing.T) { assert.NoError(t, err) assert.Equal(t, principal, spec.Metadata.Principal) assert.Equal(t, rawOutput, spec.RawOutputDataConfig.OutputLocationPrefix) + assert.True(t, proto.Equal(spec.ClusterAssignment, &clusterAssignment)) return nil }) setDefaultLpCallbackForExecTest(repository) @@ -338,6 +350,7 @@ func TestCreateExecution(t *testing.T) { Principal: "unused - populated from authenticated context", } request.Spec.RawOutputDataConfig = &admin.RawOutputDataConfig{OutputLocationPrefix: rawOutput} + request.Spec.ClusterAssignment = &clusterAssignment identity := auth.NewIdentityContext("", principal, "", time.Now(), sets.NewString(), nil) ctx := identity.WithContext(context.Background()) @@ -3700,6 +3713,70 @@ func TestGetExecutionConfig_Spec(t *testing.T) { assert.Equal(t, execConfig.MaxParallelism, int32(25)) } +func TestGetClusterAssignment(t *testing.T) { + clusterAssignment := admin.ClusterAssignment{ + Affinity: &admin.Affinity{ + Selectors: []*admin.Selector{ + { + Key: "foo", + Value: []string{"bar"}, + Operator: admin.Selector_EQUALS, + }, + }, + }, + } + resourceManager := managerMocks.MockResourceManager{} + resourceManager.GetResourceFunc = func(ctx context.Context, + request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) { + assert.EqualValues(t, request, managerInterfaces.ResourceRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + ResourceType: admin.MatchableResource_CLUSTER_ASSIGNMENT, + }) + return &managerInterfaces.ResourceResponse{ + Attributes: &admin.MatchingAttributes{ + Target: &admin.MatchingAttributes_ClusterAssignment{ + ClusterAssignment: &clusterAssignment, + }, + }, + }, nil + } + + executionManager := ExecutionManager{ + resourceManager: &resourceManager, + } + t.Run("value from db", func(t *testing.T) { + ca, err := executionManager.getClusterAssignment(context.TODO(), &admin.ExecutionCreateRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + Spec: &admin.ExecutionSpec{}, + }) + assert.NoError(t, err) + assert.True(t, proto.Equal(ca, &clusterAssignment)) + }) + t.Run("value from request", func(t *testing.T) { + reqClusterAssignment := admin.ClusterAssignment{ + Affinity: &admin.Affinity{ + Selectors: []*admin.Selector{ + { + Key: "baz", + Operator: admin.Selector_IN, + }, + }, + }, + } + ca, err := executionManager.getClusterAssignment(context.TODO(), &admin.ExecutionCreateRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + Spec: &admin.ExecutionSpec{ + ClusterAssignment: &reqClusterAssignment, + }, + }) + assert.NoError(t, err) + assert.True(t, proto.Equal(ca, &reqClusterAssignment)) + }) +} + func TestResolvePermissions(t *testing.T) { assumableIamRole := "role" k8sServiceAccount := "sa" diff --git a/flyteadmin/pkg/manager/impl/validation/attributes_validator.go b/flyteadmin/pkg/manager/impl/validation/attributes_validator.go index 0045f3643d..d0ad15a2ef 100644 --- a/flyteadmin/pkg/manager/impl/validation/attributes_validator.go +++ b/flyteadmin/pkg/manager/impl/validation/attributes_validator.go @@ -31,6 +31,8 @@ func validateMatchingAttributes(attributes *admin.MatchingAttributes, identifier return admin.MatchableResource_PLUGIN_OVERRIDE, nil } else if attributes.GetWorkflowExecutionConfig() != nil { return admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG, nil + } else if attributes.GetClusterAssignment() != nil { + return admin.MatchableResource_CLUSTER_ASSIGNMENT, nil } return defaultMatchableResource, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "Unrecognized matching attributes type for request %s", identifier) diff --git a/flyteadmin/pkg/manager/impl/validation/attributes_validator_test.go b/flyteadmin/pkg/manager/impl/validation/attributes_validator_test.go index 214382cc17..476b571eb3 100644 --- a/flyteadmin/pkg/manager/impl/validation/attributes_validator_test.go +++ b/flyteadmin/pkg/manager/impl/validation/attributes_validator_test.go @@ -97,6 +97,25 @@ func TestValidateMatchingAttributes(t *testing.T) { admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG, nil, }, + { + &admin.MatchingAttributes{ + Target: &admin.MatchingAttributes_ClusterAssignment{ + ClusterAssignment: &admin.ClusterAssignment{ + Affinity: &admin.Affinity{ + Selectors: []*admin.Selector{ + { + Key: "bar", + Operator: admin.Selector_EXISTS, + }, + }, + }, + }, + }, + }, + "foo", + admin.MatchableResource_CLUSTER_ASSIGNMENT, + nil, + }, } for _, tc := range testCases { matchableResource, err := validateMatchingAttributes(tc.attributes, tc.identifier) diff --git a/flyteadmin/pkg/workflowengine/interfaces/executor.go b/flyteadmin/pkg/workflowengine/interfaces/executor.go index b22cd8c4b2..fa0c523849 100644 --- a/flyteadmin/pkg/workflowengine/interfaces/executor.go +++ b/flyteadmin/pkg/workflowengine/interfaces/executor.go @@ -30,6 +30,7 @@ type ExecutionParameters struct { EventVersion int RoleNameKey string RawOutputDataConfig *admin.RawOutputDataConfig + ClusterAssignment *admin.ClusterAssignment } // ExecutionData includes all parameters required to create an execution CRD object.