Skip to content

Commit

Permalink
Read and pass along cluster assignment to workflow executor (flyteorg…
Browse files Browse the repository at this point in the history
…#376)

* wip

Signed-off-by: Katrina Rogan <[email protected]>

* add a test too

Signed-off-by: Katrina Rogan <[email protected]>

* Matchable attribute impl

Signed-off-by: Katrina Rogan <[email protected]>
  • Loading branch information
katrogan authored Mar 23, 2022
1 parent 3e04a94 commit cc40d26
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 2 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/benbjohnson/clock v1.1.0
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/evanphx/json-patch v4.9.0+incompatible
github.com/flyteorg/flyteidl v0.24.0
github.com/flyteorg/flyteidl v0.24.2
github.com/flyteorg/flyteplugins v0.10.16
github.com/flyteorg/flytepropeller v0.16.36
github.com/flyteorg/flytestdlib v0.4.13
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,9 @@ github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4
github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v0.23.0/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.24.0 h1:bEr9LGCilUX8t6gE8+K6qfW+LDXuCvBpHfllA+zDPZI=
github.com/flyteorg/flyteidl v0.24.0/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.24.2 h1:RQzWmtVQR+NKAppjw7xTsIn6gosP0Q/j58tfF6Cr6h4=
github.com/flyteorg/flyteidl v0.24.2/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteplugins v0.10.16 h1:rwNI2MACPbcST2O6CEUsNW6bccz7ZLni0GiY3orevfw=
github.com/flyteorg/flyteplugins v0.10.16/go.mod h1:YBWV8QnFakDJfLyua8pYddiWqszAqseBKIJPNMERlos=
github.com/flyteorg/flytepropeller v0.16.36 h1:5uE8JsutrPVyLVrRJ8BgvhZUOmTBFkEkn5xmIOo21nU=
Expand Down
36 changes: 36 additions & 0 deletions pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,30 @@ func (m *ExecutionManager) getExecutionConfig(ctx context.Context, request *admi
}, nil
}

func (m *ExecutionManager) getClusterAssignment(ctx context.Context, request *admin.ExecutionCreateRequest) (
*admin.ClusterAssignment, error) {
if request.Spec.ClusterAssignment != nil {
return request.Spec.ClusterAssignment, nil
}

resource, err := m.resourceManager.GetResource(ctx, interfaces.ResourceRequest{
Project: request.Project,
Domain: request.Domain,
ResourceType: admin.MatchableResource_CLUSTER_ASSIGNMENT,
})
if err != nil {
if flyteAdminError, ok := err.(errors.FlyteAdminError); !ok || flyteAdminError.Code() != codes.NotFound {
logger.Errorf(ctx, "Failed to get cluster assignment overrides with error: %v", err)
return nil, err
}
}
if resource != nil && resource.Attributes.GetClusterAssignment() != nil {
return resource.Attributes.GetClusterAssignment(), nil
}
// Defaults to empty assignment with no selectors
return &admin.ClusterAssignment{}, nil
}

func (m *ExecutionManager) launchSingleTaskExecution(
ctx context.Context, request admin.ExecutionCreateRequest, requestedAt time.Time) (
context.Context, *models.Execution, error) {
Expand Down Expand Up @@ -572,6 +596,11 @@ func (m *ExecutionManager) launchSingleTaskExecution(
rawOutputDataConfig = requestSpec.RawOutputDataConfig
}

clusterAssignment, err := m.getClusterAssignment(ctx, &request)
if err != nil {
return nil, nil, err
}

resolvedAuthRole := resolveAuthRole(request, launchPlan)
resolvedSecurityCtx := resolveSecurityCtx(ctx, request, launchPlan, resolvedAuthRole)
executionParameters := workflowengineInterfaces.ExecutionParameters{
Expand All @@ -585,6 +614,7 @@ func (m *ExecutionManager) launchSingleTaskExecution(
EventVersion: m.config.ApplicationConfiguration().GetTopLevelConfig().EventVersion,
RoleNameKey: m.config.ApplicationConfiguration().GetTopLevelConfig().RoleNameKey,
RawOutputDataConfig: rawOutputDataConfig,
ClusterAssignment: clusterAssignment,
}

overrides, err := m.addPluginOverrides(ctx, &workflowExecutionID, workflowExecutionID.Name, "")
Expand Down Expand Up @@ -804,6 +834,11 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
rawOutputDataConfig = requestSpec.RawOutputDataConfig
}

clusterAssignment, err := m.getClusterAssignment(ctx, &request)
if err != nil {
return nil, nil, err
}

resolvedAuthRole := resolveAuthRole(request, launchPlan)
resolvedSecurityCtx := resolveSecurityCtx(ctx, request, launchPlan, resolvedAuthRole)
executionParameters := workflowengineInterfaces.ExecutionParameters{
Expand All @@ -817,6 +852,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
EventVersion: m.config.ApplicationConfiguration().GetTopLevelConfig().EventVersion,
RoleNameKey: m.config.ApplicationConfiguration().GetTopLevelConfig().RoleNameKey,
RawOutputDataConfig: rawOutputDataConfig,
ClusterAssignment: clusterAssignment,
}

overrides, err := m.addPluginOverrides(ctx, &workflowExecutionID, launchPlan.GetSpec().WorkflowId.Name, launchPlan.Id.Name)
Expand Down
77 changes: 77 additions & 0 deletions pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,13 +262,25 @@ func TestCreateExecution(t *testing.T) {

principal := "principal"
rawOutput := "raw_output"
clusterAssignment := admin.ClusterAssignment{
Affinity: &admin.Affinity{
Selectors: []*admin.Selector{
{
Key: "foo",
Value: []string{"bar"},
Operator: admin.Selector_NOT_EQUALS,
},
},
},
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(
func(ctx context.Context, input models.Execution) error {
var spec admin.ExecutionSpec
err := proto.Unmarshal(input.Spec, &spec)
assert.NoError(t, err)
assert.Equal(t, principal, spec.Metadata.Principal)
assert.Equal(t, rawOutput, spec.RawOutputDataConfig.OutputLocationPrefix)
assert.True(t, proto.Equal(spec.ClusterAssignment, &clusterAssignment))
return nil
})
setDefaultLpCallbackForExecTest(repository)
Expand Down Expand Up @@ -338,6 +350,7 @@ func TestCreateExecution(t *testing.T) {
Principal: "unused - populated from authenticated context",
}
request.Spec.RawOutputDataConfig = &admin.RawOutputDataConfig{OutputLocationPrefix: rawOutput}
request.Spec.ClusterAssignment = &clusterAssignment

identity := auth.NewIdentityContext("", principal, "", time.Now(), sets.NewString(), nil)
ctx := identity.WithContext(context.Background())
Expand Down Expand Up @@ -3700,6 +3713,70 @@ func TestGetExecutionConfig_Spec(t *testing.T) {
assert.Equal(t, execConfig.MaxParallelism, int32(25))
}

func TestGetClusterAssignment(t *testing.T) {
clusterAssignment := admin.ClusterAssignment{
Affinity: &admin.Affinity{
Selectors: []*admin.Selector{
{
Key: "foo",
Value: []string{"bar"},
Operator: admin.Selector_EQUALS,
},
},
},
}
resourceManager := managerMocks.MockResourceManager{}
resourceManager.GetResourceFunc = func(ctx context.Context,
request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) {
assert.EqualValues(t, request, managerInterfaces.ResourceRequest{
Project: workflowIdentifier.Project,
Domain: workflowIdentifier.Domain,
ResourceType: admin.MatchableResource_CLUSTER_ASSIGNMENT,
})
return &managerInterfaces.ResourceResponse{
Attributes: &admin.MatchingAttributes{
Target: &admin.MatchingAttributes_ClusterAssignment{
ClusterAssignment: &clusterAssignment,
},
},
}, nil
}

executionManager := ExecutionManager{
resourceManager: &resourceManager,
}
t.Run("value from db", func(t *testing.T) {
ca, err := executionManager.getClusterAssignment(context.TODO(), &admin.ExecutionCreateRequest{
Project: workflowIdentifier.Project,
Domain: workflowIdentifier.Domain,
Spec: &admin.ExecutionSpec{},
})
assert.NoError(t, err)
assert.True(t, proto.Equal(ca, &clusterAssignment))
})
t.Run("value from request", func(t *testing.T) {
reqClusterAssignment := admin.ClusterAssignment{
Affinity: &admin.Affinity{
Selectors: []*admin.Selector{
{
Key: "baz",
Operator: admin.Selector_IN,
},
},
},
}
ca, err := executionManager.getClusterAssignment(context.TODO(), &admin.ExecutionCreateRequest{
Project: workflowIdentifier.Project,
Domain: workflowIdentifier.Domain,
Spec: &admin.ExecutionSpec{
ClusterAssignment: &reqClusterAssignment,
},
})
assert.NoError(t, err)
assert.True(t, proto.Equal(ca, &reqClusterAssignment))
})
}

func TestResolvePermissions(t *testing.T) {
assumableIamRole := "role"
k8sServiceAccount := "sa"
Expand Down
2 changes: 2 additions & 0 deletions pkg/manager/impl/validation/attributes_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ func validateMatchingAttributes(attributes *admin.MatchingAttributes, identifier
return admin.MatchableResource_PLUGIN_OVERRIDE, nil
} else if attributes.GetWorkflowExecutionConfig() != nil {
return admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG, nil
} else if attributes.GetClusterAssignment() != nil {
return admin.MatchableResource_CLUSTER_ASSIGNMENT, nil
}
return defaultMatchableResource, errors.NewFlyteAdminErrorf(codes.InvalidArgument,
"Unrecognized matching attributes type for request %s", identifier)
Expand Down
19 changes: 19 additions & 0 deletions pkg/manager/impl/validation/attributes_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,25 @@ func TestValidateMatchingAttributes(t *testing.T) {
admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG,
nil,
},
{
&admin.MatchingAttributes{
Target: &admin.MatchingAttributes_ClusterAssignment{
ClusterAssignment: &admin.ClusterAssignment{
Affinity: &admin.Affinity{
Selectors: []*admin.Selector{
{
Key: "bar",
Operator: admin.Selector_EXISTS,
},
},
},
},
},
},
"foo",
admin.MatchableResource_CLUSTER_ASSIGNMENT,
nil,
},
}
for _, tc := range testCases {
matchableResource, err := validateMatchingAttributes(tc.attributes, tc.identifier)
Expand Down
1 change: 1 addition & 0 deletions pkg/workflowengine/interfaces/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type ExecutionParameters struct {
EventVersion int
RoleNameKey string
RawOutputDataConfig *admin.RawOutputDataConfig
ClusterAssignment *admin.ClusterAssignment
}

// ExecutionData includes all parameters required to create an execution CRD object.
Expand Down

0 comments on commit cc40d26

Please sign in to comment.