Skip to content

Commit

Permalink
feat(targets): Addition of egress and ingress worker filters (#2654)
Browse files Browse the repository at this point in the history
* feat(targets): Addition of egress and ingress worker filters
  • Loading branch information
irenarindos authored Nov 30, 2022
1 parent aca22ca commit 834a2a8
Show file tree
Hide file tree
Showing 34 changed files with 1,422 additions and 430 deletions.
24 changes: 24 additions & 0 deletions api/targets/option.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,36 @@ func DefaultDescription() Option {
}
}

func WithEgressWorkerFilter(inEgressWorkerFilter string) Option {
return func(o *options) {
o.postMap["egress_worker_filter"] = inEgressWorkerFilter
}
}

func DefaultEgressWorkerFilter() Option {
return func(o *options) {
o.postMap["egress_worker_filter"] = nil
}
}

func WithHostId(inHostId string) Option {
return func(o *options) {
o.postMap["host_id"] = inHostId
}
}

func WithIngressWorkerFilter(inIngressWorkerFilter string) Option {
return func(o *options) {
o.postMap["ingress_worker_filter"] = inIngressWorkerFilter
}
}

func DefaultIngressWorkerFilter() Option {
return func(o *options) {
o.postMap["ingress_worker_filter"] = nil
}
}

func WithInjectedApplicationCredentialSourceIds(inInjectedApplicationCredentialSourceIds []string) Option {
return func(o *options) {
o.postMap["injected_application_credential_source_ids"] = inInjectedApplicationCredentialSourceIds
Expand Down
2 changes: 2 additions & 0 deletions api/targets/target.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions globals/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ const (
SessionConnectionLimitField = "session_connection_limit"
SessionMaxSecondsField = "session_max_seconds"
WorkerFilterField = "worker_filter"
EgressWorkerFilterField = "egress_worker_filter"
IngressWorkerFilterField = "ingress_worker_filter"
AccountIdsField = "account_ids"
AccountsField = "accounts"
LoginNameField = "login_name"
Expand Down
6 changes: 6 additions & 0 deletions internal/cmd/commands/targetscmd/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,12 @@ func printItemTable(item *targets.Target, resp *api.Response) string {
if item.WorkerFilter != "" {
nonAttributeMap["Worker Filter"] = item.WorkerFilter
}
if item.EgressWorkerFilter != "" {
nonAttributeMap["Egress Worker Filter"] = item.EgressWorkerFilter
}
if item.IngressWorkerFilter != "" {
nonAttributeMap["Ingress Worker Filter"] = item.IngressWorkerFilter
}
if resp != nil && resp.Map != nil {
if resp.Map[globals.SessionConnectionLimitField] != nil {
nonAttributeMap["Session Connection Limit"] = item.SessionConnectionLimit
Expand Down
45 changes: 41 additions & 4 deletions internal/cmd/commands/targetscmd/ssh_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ func init() {

func extraSshActionsFlagsMapFuncImpl() map[string][]string {
return map[string][]string{
"create": {"default-port", "session-max-seconds", "session-connection-limit", "worker-filter"},
"update": {"default-port", "session-max-seconds", "session-connection-limit", "worker-filter"},
"create": {"default-port", "session-max-seconds", "session-connection-limit", "egress-worker-filter", "ingress-worker-filter"},
"update": {"default-port", "session-max-seconds", "session-connection-limit", "worker-filter", "egress-worker-filter", "ingress-worker-filter"},
}
}

Expand All @@ -29,6 +29,8 @@ type extraSshCmdVars struct {
flagSessionMaxSeconds string
flagSessionConnectionLimit string
flagWorkerFilter string
flagEgressWorkerFilter string
flagIngressWorkerFilter string
}

func (c *SshCommand) extraSshHelpFunc(helpMap map[string]func() string) string {
Expand Down Expand Up @@ -86,7 +88,19 @@ func extraSshFlagsFuncImpl(c *SshCommand, set *base.FlagSets, f *base.FlagSet) {
fs.StringVar(&base.StringVar{
Name: "worker-filter",
Target: &c.flagWorkerFilter,
Usage: "A boolean expression to filter which workers can handle sessions for this target.",
Usage: "Deprecated: use egress or ingress filters instead.",
})
case "egress-worker-filter":
fs.StringVar(&base.StringVar{
Name: "egress-worker-filter",
Target: &c.flagEgressWorkerFilter,
Usage: "A boolean expression to filter which egress workers can handle sessions for this target.",
})
case "ingress-worker-filter":
fs.StringVar(&base.StringVar{
Name: "ingress-worker-filter",
Target: &c.flagIngressWorkerFilter,
Usage: "A boolean expression to filter which ingress workers can handle sessions for this target.",
})
}
}
Expand Down Expand Up @@ -145,12 +159,35 @@ func extraSshFlagsHandlingFuncImpl(c *SshCommand, _ *base.FlagSets, opts *[]targ
*opts = append(*opts, targets.DefaultWorkerFilter())
default:
if _, err := bexpr.CreateEvaluator(c.flagWorkerFilter); err != nil {
c.UI.Error(fmt.Sprintf("Unable to successfully parse filter expression: %s", err))
c.UI.Error(fmt.Sprintf("Unable to successfully parse worker filter expression: %s", err))
return false
}
*opts = append(*opts, targets.WithWorkerFilter(c.flagWorkerFilter))
}

switch c.flagEgressWorkerFilter {
case "":
case "null":
*opts = append(*opts, targets.DefaultEgressWorkerFilter())
default:
if _, err := bexpr.CreateEvaluator(c.flagEgressWorkerFilter); err != nil {
c.UI.Error(fmt.Sprintf("Unable to successfully parse egress filter expression: %s", err))
return false
}
*opts = append(*opts, targets.WithEgressWorkerFilter(c.flagEgressWorkerFilter))
}
switch c.flagIngressWorkerFilter {
case "":
case "null":
*opts = append(*opts, targets.DefaultIngressWorkerFilter())
default:
if _, err := bexpr.CreateEvaluator(c.flagIngressWorkerFilter); err != nil {
c.UI.Error(fmt.Sprintf("Unable to successfully parse ingress filter expression: %s", err))
return false
}
*opts = append(*opts, targets.WithIngressWorkerFilter(c.flagIngressWorkerFilter))
}

return true
}

Expand Down
45 changes: 41 additions & 4 deletions internal/cmd/commands/targetscmd/tcp_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ func init() {

func extraTcpActionsFlagsMapFuncImpl() map[string][]string {
return map[string][]string{
"create": {"default-port", "session-max-seconds", "session-connection-limit", "worker-filter"},
"update": {"default-port", "session-max-seconds", "session-connection-limit", "worker-filter"},
"create": {"default-port", "session-max-seconds", "session-connection-limit", "egress-worker-filter", "ingress-worker-filter"},
"update": {"default-port", "session-max-seconds", "session-connection-limit", "worker-filter", "egress-worker-filter", "ingress-worker-filter"},
}
}

Expand All @@ -28,6 +28,8 @@ type extraTcpCmdVars struct {
flagSessionMaxSeconds string
flagSessionConnectionLimit string
flagWorkerFilter string
flagEgressWorkerFilter string
flagIngressWorkerFilter string
}

func (c *TcpCommand) extraTcpHelpFunc(helpMap map[string]func() string) string {
Expand Down Expand Up @@ -85,7 +87,19 @@ func extraTcpFlagsFuncImpl(c *TcpCommand, set *base.FlagSets, f *base.FlagSet) {
fs.StringVar(&base.StringVar{
Name: "worker-filter",
Target: &c.flagWorkerFilter,
Usage: "A boolean expression to filter which workers can handle sessions for this target.",
Usage: "Deprecated: use egress or ingress filters instead.",
})
case "egress-worker-filter":
fs.StringVar(&base.StringVar{
Name: "egress-worker-filter",
Target: &c.flagEgressWorkerFilter,
Usage: "A boolean expression to filter which egress workers can handle sessions for this target.",
})
case "ingress-worker-filter":
fs.StringVar(&base.StringVar{
Name: "ingress-worker-filter",
Target: &c.flagIngressWorkerFilter,
Usage: "A boolean expression to filter which ingress workers can handle sessions for this target.",
})
}
}
Expand Down Expand Up @@ -144,11 +158,34 @@ func extraTcpFlagsHandlingFuncImpl(c *TcpCommand, _ *base.FlagSets, opts *[]targ
*opts = append(*opts, targets.DefaultWorkerFilter())
default:
if _, err := bexpr.CreateEvaluator(c.flagWorkerFilter); err != nil {
c.UI.Error(fmt.Sprintf("Unable to successfully parse filter expression: %s", err))
c.UI.Error(fmt.Sprintf("Unable to successfully parse worker filter expression: %s", err))
return false
}
*opts = append(*opts, targets.WithWorkerFilter(c.flagWorkerFilter))
}

switch c.flagEgressWorkerFilter {
case "":
case "null":
*opts = append(*opts, targets.DefaultEgressWorkerFilter())
default:
if _, err := bexpr.CreateEvaluator(c.flagEgressWorkerFilter); err != nil {
c.UI.Error(fmt.Sprintf("Unable to successfully parse egress filter expression: %s", err))
return false
}
*opts = append(*opts, targets.WithEgressWorkerFilter(c.flagEgressWorkerFilter))
}
switch c.flagIngressWorkerFilter {
case "":
case "null":
*opts = append(*opts, targets.DefaultIngressWorkerFilter())
default:
if _, err := bexpr.CreateEvaluator(c.flagIngressWorkerFilter); err != nil {
c.UI.Error(fmt.Sprintf("Unable to successfully parse ingress filter expression: %s", err))
return false
}
*opts = append(*opts, targets.WithIngressWorkerFilter(c.flagIngressWorkerFilter))
}

return true
}
110 changes: 68 additions & 42 deletions internal/daemon/cluster/handlers/worker_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type workerServiceServer struct {
var (
_ pbs.SessionServiceServer = &workerServiceServer{}
_ pbs.ServerCoordinationServiceServer = &workerServiceServer{}

workerFilterSelectionFn = workerFilterSelector
)

func NewWorkerServiceServer(
Expand Down Expand Up @@ -273,6 +275,69 @@ func (ws *workerServiceServer) ListHcpbWorkers(ctx context.Context, req *pbs.Lis
return resp, nil
}

// Single-hop filter lookup. We have either an egress filter or worker filter to use, if any
// Used to verify that the worker serving this session to a client matches this filter
func workerFilterSelector(sessionInfo *session.Session) string {
if sessionInfo.EgressWorkerFilter != "" {
return sessionInfo.EgressWorkerFilter
} else if sessionInfo.WorkerFilter != "" {
return sessionInfo.WorkerFilter
}
return ""
}

func lookupSessionWorkerFilter(ctx context.Context, sessionInfo *session.Session, ws *workerServiceServer,
req *pbs.LookupSessionRequest,
) error {
const op = "workers.lookupSessionEgressWorkerFilter"

filter := workerFilterSelectionFn(sessionInfo)
if filter == "" {
return nil
}

if req.WorkerId == "" {
event.WriteError(ctx, op, errors.New("worker filter enabled for session but got no id information from worker"))
return status.Errorf(codes.Internal, "Did not receive worker id when looking up session but filtering is enabled")
}
serversRepo, err := ws.serversRepoFn()
if err != nil {
event.WriteError(ctx, op, err, event.WithInfoMsg("error getting server repo"))
return status.Errorf(codes.Internal, "Error acquiring server repo when looking up session: %v", err)
}
w, err := serversRepo.LookupWorker(ctx, req.WorkerId)
if err != nil {
event.WriteError(ctx, op, err, event.WithInfoMsg("error looking up worker", "worker_id", req.WorkerId))
return status.Errorf(codes.Internal, "Error looking up worker: %v", err)
}
if w == nil {
event.WriteError(ctx, op, err, event.WithInfoMsg("error looking up worker", "worker_id", req.WorkerId))
return status.Errorf(codes.Internal, "Worker not found")
}
// Build the map for filtering.
tagMap := w.CanonicalTags()

// Create the evaluator
eval, err := bexpr.CreateEvaluator(filter)
if err != nil {
event.WriteError(ctx, op, err, event.WithInfoMsg("error creating worker filter evaluator", "worker_id", req.WorkerId))
return status.Errorf(codes.Internal, "Error creating worker filter evaluator: %v", err)
}
filterInput := map[string]interface{}{
"name": w.GetName(),
"tags": tagMap,
}
ok, err := eval.Evaluate(filterInput)
if err != nil {
return status.Errorf(codes.Internal, fmt.Sprintf("Worker filter expression evaluation resulted in error: %s", err))
}
if !ok {
return handlers.ApiErrorWithCodeAndMessage(codes.FailedPrecondition, "Worker filter expression precludes this worker from serving this session")
}

return nil
}

func (ws *workerServiceServer) LookupSession(ctx context.Context, req *pbs.LookupSessionRequest) (*pbs.LookupSessionResponse, error) {
const op = "workers.(workerServiceServer).LookupSession"

Expand All @@ -292,48 +357,9 @@ func (ws *workerServiceServer) LookupSession(ctx context.Context, req *pbs.Looku
return nil, status.Error(codes.Internal, "Empty session states during lookup.")
}

if sessionInfo.WorkerFilter != "" {
if req.WorkerId == "" {
event.WriteError(ctx, op, errors.New("worker filter enabled for session but got no id information from worker"))
return nil, status.Errorf(codes.Internal, "Did not receive worker id when looking up session but filtering is enabled")
}
serversRepo, err := ws.serversRepoFn()
if err != nil {
event.WriteError(ctx, op, err, event.WithInfoMsg("error getting server repo"))
return nil, status.Errorf(codes.Internal, "Error acquiring server repo when looking up session: %v", err)
}
w, err := serversRepo.LookupWorker(ctx, req.WorkerId)
if err != nil {
event.WriteError(ctx, op, err, event.WithInfoMsg("error looking up worker", "worker_id", req.WorkerId))
return nil, status.Errorf(codes.Internal, "Error looking up worker: %v", err)
}
if w == nil {
event.WriteError(ctx, op, err, event.WithInfoMsg("error looking up worker", "worker_id", req.WorkerId))
return nil, status.Errorf(codes.Internal, "Worker not found")
}
// Build the map for filtering.
tagMap := w.CanonicalTags()

// Create the evaluator
eval, err := bexpr.CreateEvaluator(sessionInfo.WorkerFilter)
if err != nil {
event.WriteError(ctx, op, err, event.WithInfoMsg("error creating worker filter evaluator", "worker_id", req.WorkerId))
return nil, status.Errorf(codes.Internal, "Error creating worker filter evaluator: %v", err)
}
filterInput := map[string]any{
"name": w.GetName(),
"tags": tagMap,
}
ok, err := eval.Evaluate(filterInput)
if err != nil {
return nil, status.Errorf(codes.Internal,
fmt.Sprintf("Worker filter expression evaluation resulted in error: %s", err))
}
if !ok {
return nil, handlers.ApiErrorWithCodeAndMessage(
codes.FailedPrecondition,
"Worker filter expression precludes this worker from serving this session")
}
err = lookupSessionWorkerFilter(ctx, sessionInfo, ws, req)
if err != nil {
return nil, err
}

creds, err := sessRepo.ListSessionCredentials(ctx, sessionInfo.ProjectId, sessionInfo.PublicId)
Expand Down
Loading

0 comments on commit 834a2a8

Please sign in to comment.