Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(targets): Addition of egress and ingress worker filters #2654

Merged
merged 5 commits into from
Nov 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions api/targets/option.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,36 @@ func DefaultDescription() Option {
}
}

func WithEgressWorkerFilter(inEgressWorkerFilter string) Option {
return func(o *options) {
o.postMap["egress_worker_filter"] = inEgressWorkerFilter
}
}

func DefaultEgressWorkerFilter() Option {
return func(o *options) {
o.postMap["egress_worker_filter"] = nil
}
}

func WithHostId(inHostId string) Option {
return func(o *options) {
o.postMap["host_id"] = inHostId
}
}

func WithIngressWorkerFilter(inIngressWorkerFilter string) Option {
return func(o *options) {
o.postMap["ingress_worker_filter"] = inIngressWorkerFilter
}
}

func DefaultIngressWorkerFilter() Option {
return func(o *options) {
o.postMap["ingress_worker_filter"] = nil
}
}

func WithInjectedApplicationCredentialSourceIds(inInjectedApplicationCredentialSourceIds []string) Option {
return func(o *options) {
o.postMap["injected_application_credential_source_ids"] = inInjectedApplicationCredentialSourceIds
Expand Down
2 changes: 2 additions & 0 deletions api/targets/target.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions globals/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ const (
SessionConnectionLimitField = "session_connection_limit"
SessionMaxSecondsField = "session_max_seconds"
WorkerFilterField = "worker_filter"
EgressWorkerFilterField = "egress_worker_filter"
IngressWorkerFilterField = "ingress_worker_filter"
AccountIdsField = "account_ids"
AccountsField = "accounts"
LoginNameField = "login_name"
Expand Down
6 changes: 6 additions & 0 deletions internal/cmd/commands/targetscmd/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,12 @@ func printItemTable(item *targets.Target, resp *api.Response) string {
if item.WorkerFilter != "" {
nonAttributeMap["Worker Filter"] = item.WorkerFilter
}
if item.EgressWorkerFilter != "" {
nonAttributeMap["Egress Worker Filter"] = item.EgressWorkerFilter
}
if item.IngressWorkerFilter != "" {
nonAttributeMap["Ingress Worker Filter"] = item.IngressWorkerFilter
}
if resp != nil && resp.Map != nil {
if resp.Map[globals.SessionConnectionLimitField] != nil {
nonAttributeMap["Session Connection Limit"] = item.SessionConnectionLimit
Expand Down
45 changes: 41 additions & 4 deletions internal/cmd/commands/targetscmd/ssh_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ func init() {

func extraSshActionsFlagsMapFuncImpl() map[string][]string {
return map[string][]string{
"create": {"default-port", "session-max-seconds", "session-connection-limit", "worker-filter"},
"update": {"default-port", "session-max-seconds", "session-connection-limit", "worker-filter"},
"create": {"default-port", "session-max-seconds", "session-connection-limit", "egress-worker-filter", "ingress-worker-filter"},
"update": {"default-port", "session-max-seconds", "session-connection-limit", "worker-filter", "egress-worker-filter", "ingress-worker-filter"},
}
}

Expand All @@ -29,6 +29,8 @@ type extraSshCmdVars struct {
flagSessionMaxSeconds string
flagSessionConnectionLimit string
flagWorkerFilter string
flagEgressWorkerFilter string
flagIngressWorkerFilter string
}

func (c *SshCommand) extraSshHelpFunc(helpMap map[string]func() string) string {
Expand Down Expand Up @@ -86,7 +88,19 @@ func extraSshFlagsFuncImpl(c *SshCommand, set *base.FlagSets, f *base.FlagSet) {
fs.StringVar(&base.StringVar{
Name: "worker-filter",
Target: &c.flagWorkerFilter,
Usage: "A boolean expression to filter which workers can handle sessions for this target.",
Usage: "Deprecated: use egress or ingress filters instead.",
})
case "egress-worker-filter":
fs.StringVar(&base.StringVar{
Name: "egress-worker-filter",
Target: &c.flagEgressWorkerFilter,
Usage: "A boolean expression to filter which egress workers can handle sessions for this target.",
})
case "ingress-worker-filter":
fs.StringVar(&base.StringVar{
Name: "ingress-worker-filter",
Target: &c.flagIngressWorkerFilter,
Usage: "A boolean expression to filter which ingress workers can handle sessions for this target.",
})
}
}
Expand Down Expand Up @@ -145,12 +159,35 @@ func extraSshFlagsHandlingFuncImpl(c *SshCommand, _ *base.FlagSets, opts *[]targ
*opts = append(*opts, targets.DefaultWorkerFilter())
default:
if _, err := bexpr.CreateEvaluator(c.flagWorkerFilter); err != nil {
c.UI.Error(fmt.Sprintf("Unable to successfully parse filter expression: %s", err))
c.UI.Error(fmt.Sprintf("Unable to successfully parse worker filter expression: %s", err))
return false
}
*opts = append(*opts, targets.WithWorkerFilter(c.flagWorkerFilter))
}

switch c.flagEgressWorkerFilter {
case "":
case "null":
*opts = append(*opts, targets.DefaultEgressWorkerFilter())
default:
if _, err := bexpr.CreateEvaluator(c.flagEgressWorkerFilter); err != nil {
c.UI.Error(fmt.Sprintf("Unable to successfully parse egress filter expression: %s", err))
return false
}
*opts = append(*opts, targets.WithEgressWorkerFilter(c.flagEgressWorkerFilter))
}
switch c.flagIngressWorkerFilter {
case "":
case "null":
*opts = append(*opts, targets.DefaultIngressWorkerFilter())
default:
if _, err := bexpr.CreateEvaluator(c.flagIngressWorkerFilter); err != nil {
c.UI.Error(fmt.Sprintf("Unable to successfully parse ingress filter expression: %s", err))
return false
}
*opts = append(*opts, targets.WithIngressWorkerFilter(c.flagIngressWorkerFilter))
}

return true
}

Expand Down
45 changes: 41 additions & 4 deletions internal/cmd/commands/targetscmd/tcp_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ func init() {

func extraTcpActionsFlagsMapFuncImpl() map[string][]string {
return map[string][]string{
"create": {"default-port", "session-max-seconds", "session-connection-limit", "worker-filter"},
"update": {"default-port", "session-max-seconds", "session-connection-limit", "worker-filter"},
"create": {"default-port", "session-max-seconds", "session-connection-limit", "egress-worker-filter", "ingress-worker-filter"},
"update": {"default-port", "session-max-seconds", "session-connection-limit", "worker-filter", "egress-worker-filter", "ingress-worker-filter"},
}
}

Expand All @@ -28,6 +28,8 @@ type extraTcpCmdVars struct {
flagSessionMaxSeconds string
flagSessionConnectionLimit string
flagWorkerFilter string
flagEgressWorkerFilter string
flagIngressWorkerFilter string
}

func (c *TcpCommand) extraTcpHelpFunc(helpMap map[string]func() string) string {
Expand Down Expand Up @@ -85,7 +87,19 @@ func extraTcpFlagsFuncImpl(c *TcpCommand, set *base.FlagSets, f *base.FlagSet) {
fs.StringVar(&base.StringVar{
Name: "worker-filter",
Target: &c.flagWorkerFilter,
Usage: "A boolean expression to filter which workers can handle sessions for this target.",
Usage: "Deprecated: use egress or ingress filters instead.",
})
case "egress-worker-filter":
fs.StringVar(&base.StringVar{
Name: "egress-worker-filter",
Target: &c.flagEgressWorkerFilter,
Usage: "A boolean expression to filter which egress workers can handle sessions for this target.",
})
case "ingress-worker-filter":
fs.StringVar(&base.StringVar{
Name: "ingress-worker-filter",
Target: &c.flagIngressWorkerFilter,
Usage: "A boolean expression to filter which ingress workers can handle sessions for this target.",
})
}
}
Expand Down Expand Up @@ -144,11 +158,34 @@ func extraTcpFlagsHandlingFuncImpl(c *TcpCommand, _ *base.FlagSets, opts *[]targ
*opts = append(*opts, targets.DefaultWorkerFilter())
default:
if _, err := bexpr.CreateEvaluator(c.flagWorkerFilter); err != nil {
c.UI.Error(fmt.Sprintf("Unable to successfully parse filter expression: %s", err))
c.UI.Error(fmt.Sprintf("Unable to successfully parse worker filter expression: %s", err))
return false
}
*opts = append(*opts, targets.WithWorkerFilter(c.flagWorkerFilter))
}

switch c.flagEgressWorkerFilter {
case "":
case "null":
*opts = append(*opts, targets.DefaultEgressWorkerFilter())
default:
if _, err := bexpr.CreateEvaluator(c.flagEgressWorkerFilter); err != nil {
c.UI.Error(fmt.Sprintf("Unable to successfully parse egress filter expression: %s", err))
return false
}
*opts = append(*opts, targets.WithEgressWorkerFilter(c.flagEgressWorkerFilter))
}
switch c.flagIngressWorkerFilter {
case "":
case "null":
*opts = append(*opts, targets.DefaultIngressWorkerFilter())
default:
if _, err := bexpr.CreateEvaluator(c.flagIngressWorkerFilter); err != nil {
c.UI.Error(fmt.Sprintf("Unable to successfully parse ingress filter expression: %s", err))
return false
}
*opts = append(*opts, targets.WithIngressWorkerFilter(c.flagIngressWorkerFilter))
}

return true
}
110 changes: 68 additions & 42 deletions internal/daemon/cluster/handlers/worker_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type workerServiceServer struct {
var (
_ pbs.SessionServiceServer = &workerServiceServer{}
_ pbs.ServerCoordinationServiceServer = &workerServiceServer{}

workerFilterSelectionFn = workerFilterSelector
)

func NewWorkerServiceServer(
Expand Down Expand Up @@ -273,6 +275,69 @@ func (ws *workerServiceServer) ListHcpbWorkers(ctx context.Context, req *pbs.Lis
return resp, nil
}

// Single-hop filter lookup. We have either an egress filter or worker filter to use, if any
// Used to verify that the worker serving this session to a client matches this filter
func workerFilterSelector(sessionInfo *session.Session) string {
if sessionInfo.EgressWorkerFilter != "" {
return sessionInfo.EgressWorkerFilter
} else if sessionInfo.WorkerFilter != "" {
return sessionInfo.WorkerFilter
}
return ""
}

func lookupSessionWorkerFilter(ctx context.Context, sessionInfo *session.Session, ws *workerServiceServer,
req *pbs.LookupSessionRequest,
) error {
const op = "workers.lookupSessionEgressWorkerFilter"

filter := workerFilterSelectionFn(sessionInfo)
if filter == "" {
return nil
}

if req.WorkerId == "" {
event.WriteError(ctx, op, errors.New("worker filter enabled for session but got no id information from worker"))
return status.Errorf(codes.Internal, "Did not receive worker id when looking up session but filtering is enabled")
}
serversRepo, err := ws.serversRepoFn()
if err != nil {
event.WriteError(ctx, op, err, event.WithInfoMsg("error getting server repo"))
return status.Errorf(codes.Internal, "Error acquiring server repo when looking up session: %v", err)
}
w, err := serversRepo.LookupWorker(ctx, req.WorkerId)
if err != nil {
event.WriteError(ctx, op, err, event.WithInfoMsg("error looking up worker", "worker_id", req.WorkerId))
return status.Errorf(codes.Internal, "Error looking up worker: %v", err)
}
if w == nil {
event.WriteError(ctx, op, err, event.WithInfoMsg("error looking up worker", "worker_id", req.WorkerId))
return status.Errorf(codes.Internal, "Worker not found")
}
// Build the map for filtering.
tagMap := w.CanonicalTags()

// Create the evaluator
eval, err := bexpr.CreateEvaluator(filter)
if err != nil {
event.WriteError(ctx, op, err, event.WithInfoMsg("error creating worker filter evaluator", "worker_id", req.WorkerId))
return status.Errorf(codes.Internal, "Error creating worker filter evaluator: %v", err)
}
filterInput := map[string]interface{}{
"name": w.GetName(),
"tags": tagMap,
}
ok, err := eval.Evaluate(filterInput)
if err != nil {
return status.Errorf(codes.Internal, fmt.Sprintf("Worker filter expression evaluation resulted in error: %s", err))
}
if !ok {
return handlers.ApiErrorWithCodeAndMessage(codes.FailedPrecondition, "Worker filter expression precludes this worker from serving this session")
}

return nil
}

func (ws *workerServiceServer) LookupSession(ctx context.Context, req *pbs.LookupSessionRequest) (*pbs.LookupSessionResponse, error) {
const op = "workers.(workerServiceServer).LookupSession"

Expand All @@ -292,48 +357,9 @@ func (ws *workerServiceServer) LookupSession(ctx context.Context, req *pbs.Looku
return nil, status.Error(codes.Internal, "Empty session states during lookup.")
}

if sessionInfo.WorkerFilter != "" {
if req.WorkerId == "" {
event.WriteError(ctx, op, errors.New("worker filter enabled for session but got no id information from worker"))
return nil, status.Errorf(codes.Internal, "Did not receive worker id when looking up session but filtering is enabled")
}
serversRepo, err := ws.serversRepoFn()
if err != nil {
event.WriteError(ctx, op, err, event.WithInfoMsg("error getting server repo"))
return nil, status.Errorf(codes.Internal, "Error acquiring server repo when looking up session: %v", err)
}
w, err := serversRepo.LookupWorker(ctx, req.WorkerId)
if err != nil {
event.WriteError(ctx, op, err, event.WithInfoMsg("error looking up worker", "worker_id", req.WorkerId))
return nil, status.Errorf(codes.Internal, "Error looking up worker: %v", err)
}
if w == nil {
event.WriteError(ctx, op, err, event.WithInfoMsg("error looking up worker", "worker_id", req.WorkerId))
return nil, status.Errorf(codes.Internal, "Worker not found")
}
// Build the map for filtering.
tagMap := w.CanonicalTags()

// Create the evaluator
eval, err := bexpr.CreateEvaluator(sessionInfo.WorkerFilter)
if err != nil {
event.WriteError(ctx, op, err, event.WithInfoMsg("error creating worker filter evaluator", "worker_id", req.WorkerId))
return nil, status.Errorf(codes.Internal, "Error creating worker filter evaluator: %v", err)
}
filterInput := map[string]any{
"name": w.GetName(),
"tags": tagMap,
}
ok, err := eval.Evaluate(filterInput)
if err != nil {
return nil, status.Errorf(codes.Internal,
fmt.Sprintf("Worker filter expression evaluation resulted in error: %s", err))
}
if !ok {
return nil, handlers.ApiErrorWithCodeAndMessage(
codes.FailedPrecondition,
"Worker filter expression precludes this worker from serving this session")
}
err = lookupSessionWorkerFilter(ctx, sessionInfo, ws, req)
if err != nil {
return nil, err
}

creds, err := sessRepo.ListSessionCredentials(ctx, sessionInfo.ProjectId, sessionInfo.PublicId)
Expand Down
Loading