Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Inject user identifier to ExecutionSpec #549

Merged
merged 17 commits into from
May 15, 2023
14 changes: 14 additions & 0 deletions auth/identity_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ type IdentityContext struct {
scopes *sets.String
// Raw JWT token from the IDP. Set to a pointer to support the equal operator for this struct.
claims *claimsType
// userIdentifier stores a unique string that can be used to identify the user associated with a given task.
// This identifier is passed down to the ExecutionSpec and can be used for various purposes, such as setting the user identifier on a pod label.
// By default, the user identifier is filled with the value of IdentityContext.userID. However, you can customize your middleware to assign other values if needed.
// Providing a user identifier can be useful for tracking tasks and associating them with specific users, especially in multi-user environments.
userIdentifier string
}

func (c IdentityContext) Audience() string {
Expand Down Expand Up @@ -81,6 +86,15 @@ func (c IdentityContext) AuthenticatedAt() time.Time {
return c.authenticatedAt
}

func (c IdentityContext) UserIdentifier() string {
return c.userIdentifier
}
ByronHsu marked this conversation as resolved.
Show resolved Hide resolved

// SetUserIdentifier allows you to explicitly set user identifier
func (c *IdentityContext) SetUserIdentifier(id string) {
c.userIdentifier = id
}
ByronHsu marked this conversation as resolved.
Show resolved Hide resolved

// NewIdentityContext creates a new IdentityContext.
func NewIdentityContext(audience, userID, appID string, authenticatedAt time.Time, scopes sets.String, userInfo *service.UserInfoResponse, claims map[string]interface{}) (
IdentityContext, error) {
Expand Down
9 changes: 9 additions & 0 deletions auth/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,12 @@ func BlanketAuthorization(ctx context.Context, req interface{}, _ *grpc.UnarySer

return handler(ctx, req)
}

// UserIdentifierInterceptor injects identityContext.UserID() to identityContext.userIdentifier
func UserIdentifierInterceptor(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (
resp interface{}, err error) {
identityContext := IdentityContextFromContext(ctx)
identityContext.SetUserIdentifier(identityContext.UserID())
ctx = identityContext.WithContext(ctx)
return handler(ctx, req)
}
17 changes: 17 additions & 0 deletions auth/interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,20 @@ func TestBlanketAuthorization(t *testing.T) {
assert.False(t, handlerCalled)
})
}

func TestGetUserIdentityFromContext(t *testing.T) {
identityContext := IdentityContext{
userID: "yeee",
}

ctx := identityContext.WithContext(context.Background())

handler := func(ctx context.Context, req interface{}) (interface{}, error) {
identityContext := IdentityContextFromContext(ctx)
userIdentifier := identityContext.UserIdentifier()
assert.Equal(t, userIdentifier, "yeee")
return nil, nil
}

UserIdentifierInterceptor(ctx, nil, nil, handler)
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/cloudevents/sdk-go/v2 v2.8.0
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/evanphx/json-patch v4.12.0+incompatible
github.com/flyteorg/flyteidl v1.5.0
github.com/flyteorg/flyteidl v1.5.2
github.com/flyteorg/flyteplugins v1.0.40
github.com/flyteorg/flytepropeller v1.1.70
github.com/flyteorg/flytestdlib v1.0.15
Expand Down Expand Up @@ -208,4 +208,4 @@ require (
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
)

replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.20220915080349-5790c370e63a
replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.20220915080349-5790c370e63a
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,13 @@ github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
<<<<<<< HEAD
github.com/flyteorg/flyteidl v1.5.0 h1:vdaA5Cg9eqi5NMuASSod/AE7RXlHvzdWjSL9abDyd/M=
github.com/flyteorg/flyteidl v1.5.0/go.mod h1:ckLjB51moX4L0oQml+WTCrPK50zrJf6IZJ6LPC0RB4I=
=======
github.com/flyteorg/flyteidl v1.5.2 h1:DZPzYkTg92qA4e17fd0ZW1M+gh1gJKh/VOK+F4bYgM8=
github.com/flyteorg/flyteidl v1.5.2/go.mod h1:ckLjB51moX4L0oQml+WTCrPK50zrJf6IZJ6LPC0RB4I=
>>>>>>> 5ff9b59... improve
github.com/flyteorg/flyteplugins v1.0.40 h1:RTsYingqmqr13qBbi4CB2ArXDHNHUOkAF+HTLJQiQ/s=
github.com/flyteorg/flyteplugins v1.0.40/go.mod h1:qyUPqVspLcLGJpKxVwHDWf+kBpOGuItOxCaF6zAmDio=
github.com/flyteorg/flytepropeller v1.1.70 h1:/d1qqz13rdVADM85ST70eerAdBstJJz9UUB/mNSZi0w=
Expand Down
15 changes: 14 additions & 1 deletion pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,18 @@ func (m *ExecutionManager) getExecutionConfig(ctx context.Context, request *admi
RunAs: &core.Identity{},
}
}

if workflowExecConfig.GetSecurityContext().GetRunAs() == nil {
workflowExecConfig.SecurityContext.RunAs = &core.Identity{}
}

// In the case of reference_launch_plan subworkflow, the context comes from flytepropeller instead of the user side, so user auth is missing.
// We skip getUserIdentityFromContext but can still get ExecUserId because flytepropeller passes it in the execution request.
// https://github.com/flyteorg/flytepropeller/blob/03a6672960ed04e7687ba4f790fee9a02a4057fb/pkg/controller/nodes/subworkflow/launchplan/admin.go#L114
if workflowExecConfig.GetSecurityContext().GetRunAs().GetUserIdentifier() == "" {
workflowExecConfig.SecurityContext.RunAs.UserIdentifier = auth.IdentityContextFromContext(ctx).UserIdentifier()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when admin is running without auth, this still won't fail right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah UserIdentifier would just be an empty string

}

logger.Infof(ctx, "getting the workflow execution config from application configuration")
// Defaults to one from the application config
return &workflowExecConfig, nil
Expand Down Expand Up @@ -676,7 +688,8 @@ func resolveSecurityCtx(ctx context.Context, executionConfigSecurityCtx *core.Se
// Use security context from the executionConfigSecurityCtx if its set and non empty or else resolve from authRole
if executionConfigSecurityCtx != nil && executionConfigSecurityCtx.RunAs != nil &&
(len(executionConfigSecurityCtx.RunAs.K8SServiceAccount) > 0 ||
len(executionConfigSecurityCtx.RunAs.IamRole) > 0) {
len(executionConfigSecurityCtx.RunAs.IamRole) > 0 ||
len(executionConfigSecurityCtx.RunAs.UserIdentifier) > 0) {
return executionConfigSecurityCtx
}
logger.Warn(ctx, "Setting security context from auth Role")
Expand Down
7 changes: 6 additions & 1 deletion pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4307,7 +4307,11 @@ func TestGetExecutionConfigOverrides(t *testing.T) {
OverwriteCache: requestOverwriteCache,
},
}
execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, nil)
identityContext, err := auth.NewIdentityContext("", "", "", time.Now(), sets.String{}, nil, nil)
identityContext.SetUserIdentifier("yeee")
assert.NoError(t, err)
ctx := identityContext.WithContext(context.Background())
execConfig, err := executionManager.getExecutionConfig(ctx, request, nil)
assert.NoError(t, err)
assert.Equal(t, requestMaxParallelism, execConfig.MaxParallelism)
assert.Equal(t, requestK8sServiceAccount, execConfig.SecurityContext.RunAs.K8SServiceAccount)
Expand All @@ -4316,6 +4320,7 @@ func TestGetExecutionConfigOverrides(t *testing.T) {
assert.Equal(t, requestOutputLocationPrefix, execConfig.RawOutputDataConfig.OutputLocationPrefix)
assert.Equal(t, requestLabels, execConfig.GetLabels().Values)
assert.Equal(t, requestAnnotations, execConfig.GetAnnotations().Values)
assert.Equal(t, "yeee", execConfig.GetSecurityContext().GetRunAs().GetUserIdentifier())
})
t.Run("request with partial config", func(t *testing.T) {
request := &admin.ExecutionCreateRequest{
Expand Down
3 changes: 2 additions & 1 deletion pkg/manager/impl/util/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,8 @@ func MergeIntoExecConfig(workflowExecConfig admin.WorkflowExecutionConfig, spec
if workflowExecConfig.GetSecurityContext() == nil && spec.GetSecurityContext() != nil {
if spec.GetSecurityContext().GetRunAs() != nil &&
(len(spec.GetSecurityContext().GetRunAs().GetK8SServiceAccount()) > 0 ||
len(spec.GetSecurityContext().GetRunAs().GetIamRole()) > 0) {
len(spec.GetSecurityContext().GetRunAs().GetIamRole()) > 0 ||
len(spec.GetSecurityContext().GetRunAs().GetUserIdentifier()) > 0) {
workflowExecConfig.SecurityContext = spec.GetSecurityContext()
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func newGRPCServer(ctx context.Context, pluginRegistry *plugins.Registry, cfg *c
scope promutils.Scope, opts ...grpc.ServerOption) (*grpc.Server, error) {

logger.Infof(ctx, "Registering default middleware with blanket auth validation")
pluginRegistry.RegisterDefault(plugins.PluginIDUnaryServiceMiddleware, grpcmiddleware.ChainUnaryServer(auth.BlanketAuthorization))
pluginRegistry.RegisterDefault(plugins.PluginIDUnaryServiceMiddleware, grpcmiddleware.ChainUnaryServer(auth.BlanketAuthorization, auth.UserIdentifierInterceptor))

// Not yet implemented for streaming
var chainedUnaryInterceptors grpc.UnaryServerInterceptor
Expand Down