Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v9] Moderated Sessions rollup backport #11803

Merged
merged 7 commits into from
Apr 7, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/pages/access-controls/guides/moderated-sessions.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ spec:
count: 2
```

#### Combining Policies

The authorizer applies require policies within a role together with an OR operator and the policies from each role with an AND operator. In practice, this means that for every role with at least one require policy, one of its policies must be met before a session can be started.

### `join_sessions`

#### Options
Expand Down
114 changes: 114 additions & 0 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ func TestIntegrations(t *testing.T) {
t.Run("UUIDBasedProxy", suite.bind(testUUIDBasedProxy))
t.Run("WindowChange", suite.bind(testWindowChange))
t.Run("SSHTracker", suite.bind(testSSHTracker))
t.Run("TestKubeAgentFiltering", suite.bind(testKubeAgentFiltering))
}

// testAuditOn creates a live session, records a bunch of data through it
Expand Down Expand Up @@ -5950,3 +5951,116 @@ outer:

t.FailNow()
}

// TestKubeAgentFiltering tests that kube-agent filtering for pre-v8 agents and
// moderated sessions users works as expected.
func testKubeAgentFiltering(t *testing.T, suite *integrationTestSuite) {
ctx := context.Background()

type testCase struct {
name string
server types.Server
role types.Role
user types.User
wantsLen int
}

v8Agent, err := types.NewServer("kube-h", types.KindKubeService, types.ServerSpecV2{
Version: "8.0.0",
KubernetesClusters: []*types.KubernetesCluster{{Name: "foo"}},
})
require.NoError(t, err)

v9Agent, err := types.NewServer("kube-h", types.KindKubeService, types.ServerSpecV2{
Version: "9.0.0",
KubernetesClusters: []*types.KubernetesCluster{{Name: "foo"}},
})
require.NoError(t, err)

plainRole, err := types.NewRole("plain", types.RoleSpecV5{})
require.NoError(t, err)

moderatedRole, err := types.NewRole("moderated", types.RoleSpecV5{
Allow: types.RoleConditions{
RequireSessionJoin: []*types.SessionRequirePolicy{
{
Name: "bar",
Kinds: []string{string(types.KubernetesSessionKind)},
},
},
},
})
require.NoError(t, err)

plainUser, err := types.NewUser("bob")
require.NoError(t, err)
plainUser.SetRoles([]string{plainRole.GetName()})

moderatedUser, err := types.NewUser("alice")
require.NoError(t, err)
moderatedUser.SetRoles([]string{moderatedRole.GetName()})

testCases := []testCase{
{
name: "unrestricted user, v8 agent",
server: v8Agent,
role: plainRole,
user: plainUser,
wantsLen: 1,
},
{
name: "restricted user, v8 agent",
server: v8Agent,
role: moderatedRole,
user: moderatedUser,
wantsLen: 0,
},
{
name: "unrestricted user, v9 agent",
server: v9Agent,
role: plainRole,
user: plainUser,
wantsLen: 1,
},
{
name: "restricted user, v9 agent",
server: v9Agent,
role: moderatedRole,
user: moderatedUser,
wantsLen: 1,
},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
teleport := suite.newTeleport(t, nil, true)
defer teleport.StopAll()

adminSite := teleport.Process.GetAuthServer()
_, err := adminSite.UpsertKubeServiceV2(ctx, testCase.server)
require.NoError(t, err)
err = adminSite.UpsertRole(ctx, testCase.role)
require.NoError(t, err)
err = adminSite.CreateUser(ctx, testCase.user)
require.NoError(t, err)

cl, err := teleport.NewClient(ClientConfig{
Login: testCase.user.GetName(),
Cluster: Site,
Host: Host,
Port: teleport.GetPortSSHInt(),
})
require.NoError(t, err)

proxy, err := cl.ConnectToProxy(ctx)
require.NoError(t, err)

userSite, err := proxy.ConnectToCluster(ctx, Site, false)
require.NoError(t, err)

services, err := userSite.GetKubeServices(ctx)
require.NoError(t, err)
require.Len(t, services, testCase.wantsLen)
})
}
}
24 changes: 24 additions & 0 deletions lib/auth/auth_with_roles.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net/url"
"time"

"github.com/coreos/go-semver/semver"
"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/client"
"github.com/gravitational/teleport/api/client/proto"
Expand Down Expand Up @@ -3581,6 +3582,29 @@ func (a *ServerWithRoles) filterKubeServices(server types.Server) error {
// connect to them.
mfaParams := services.AccessMFAParams{Verified: true}

// Filter out agents that don't have support for moderated sessions access
// checking if the user has any roles that require it.
if hasLocalUserRole(a.context.Checker) {
roles := a.context.Checker.(LocalUserRoleSet)
agentVersion, versionErr := semver.NewVersion(server.GetTeleportVersion())

hasK8SRequirePolicy := func() bool {
for _, role := range roles.RoleSet {
for _, policy := range role.GetSessionRequirePolicies() {
if ContainsSessionKind(policy.Kinds, types.KubernetesSessionKind) {
return true
}
}
}

return false
}

if hasK8SRequirePolicy() && (versionErr != nil || agentVersion.LessThan(*MinSupportedModeratedSessionsVersion)) {
return trace.AccessDenied("cannot use moderated sessions with pre-v9 kubernetes agents")
}
}

filtered := make([]*types.KubernetesCluster, 0, len(server.GetKubernetesClusters()))
for _, kube := range server.GetKubernetesClusters() {
k8sV3, err := types.NewKubernetesClusterV3FromLegacyCluster(server.GetNamespace(), kube)
Expand Down
3 changes: 1 addition & 2 deletions lib/auth/grpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1708,8 +1708,7 @@ func downgradeRole(ctx context.Context, role *types.RoleV5) (*types.RoleV5, erro
}
}

minSupportedVersionForV5Roles := semver.New(utils.VersionBeforeAlpha("9.0.0"))
if clientVersion == nil || clientVersion.LessThan(*minSupportedVersionForV5Roles) {
if clientVersion == nil || clientVersion.LessThan(*MinSupportedModeratedSessionsVersion) {
log.Debugf(`Client version "%s" is unknown or less than 9.0.0, converting role to v4`, clientVersionString)
downgraded, err := services.DowngradeRoleToV4(role)
if err != nil {
Expand Down
7 changes: 5 additions & 2 deletions lib/auth/session_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@ import (
"regexp"
"strings"

"github.com/coreos/go-semver/semver"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/utils"
"github.com/gravitational/trace"
"github.com/vulcand/predicate"
)

var MinSupportedModeratedSessionsVersion = semver.New(utils.VersionBeforeAlpha("9.0.0"))

// SessionAccessEvaluator takes a set of policies
// and uses rules to evaluate them to determine when a session may start
// and if a user can join a session.
Expand Down Expand Up @@ -70,7 +73,7 @@ func getAllowPolicies(participant SessionAccessContext) []*types.SessionJoinPoli
return policies
}

func containsKind(s []string, e types.SessionKind) bool {
func ContainsSessionKind(s []string, e types.SessionKind) bool {
for _, a := range s {
if types.SessionKind(a) == e {
return true
Expand Down Expand Up @@ -162,7 +165,7 @@ func (e *SessionAccessEvaluator) matchesJoin(allow *types.SessionJoinPolicy) boo
}

func (e *SessionAccessEvaluator) matchesKind(allow []string) bool {
if containsKind(allow, e.kind) || containsKind(allow, "*") {
if ContainsSessionKind(allow, e.kind) || ContainsSessionKind(allow, "*") {
return true
}

Expand Down
10 changes: 3 additions & 7 deletions lib/kube/proxy/sess.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,13 +329,6 @@ func newSession(ctx authContext, forwarder *Forwarder, req *http.Request, params

io := srv.NewTermManager()

if tty {
err = io.BroadcastMessage(fmt.Sprintf("Creating session with ID: %v...", id.String()))
if err != nil {
return nil, trace.Wrap(err)
}
}

s := &session{
ctx: ctx,
forwarder: forwarder,
Expand All @@ -361,6 +354,9 @@ func newSession(ctx authContext, forwarder *Forwarder, req *http.Request, params
displayParticipantRequirements: utils.AsBool(q.Get("displayParticipantRequirements")),
}

s.BroadcastMessage("Creating session with ID: %v...", id.String())
s.BroadcastMessage(srv.SessionControlsInfoBroadcast)

go func() {
if _, open := <-s.io.TerminateNotifier(); open {
err := s.Close()
Expand Down
5 changes: 5 additions & 0 deletions lib/srv/sess.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ import (
const PresenceVerifyInterval = time.Second * 15
const PresenceMaxDifference = time.Minute

// SessionControlsInfoBroadcast is sent in tandem with session creation
// to inform any joining users about the session controls.
const SessionControlsInfoBroadcast = "Controls\r\n - CTRL-C: Leave the session\r\n - t: Forcefully terminate the session (moderators only)"

var serverSessions = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: teleport.MetricServerInteractiveSessions,
Expand Down Expand Up @@ -1005,6 +1009,7 @@ func (s *session) startInteractive(ch ssh.Channel, ctx *ServerContext) error {
s.io.AddReader("reader", inReader)
s.io.AddWriter("session-recorder", utils.WriteCloserWithContext(ctx.srv.Context(), s.recorder))
s.BroadcastMessage("Creating session with ID: %v...", s.id)
s.BroadcastMessage(SessionControlsInfoBroadcast)

if err := s.term.Run(); err != nil {
ctx.Errorf("Unable to run shell command: %v.", err)
Expand Down
2 changes: 1 addition & 1 deletion lib/srv/termmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func NewTermManager() *TermManager {
closed: false,
readStateUpdate: sync.NewCond(&sync.Mutex{}),
incoming: make(chan []byte, 100),
terminateNotifier: make(chan struct{}),
terminateNotifier: make(chan struct{}, 1),
}
}

Expand Down
2 changes: 1 addition & 1 deletion lib/srv/termmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestCTRLCCapture(t *testing.T) {
m := NewTermManager()
r, w := io.Pipe()
m.AddReader("foo", r)
go w.Write([]byte("\x03"))
w.Write([]byte("\x03"))

select {
case <-m.TerminateNotifier():
Expand Down
5 changes: 4 additions & 1 deletion lib/web/terminal.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,10 @@ func (t *TerminalHandler) Serve(w http.ResponseWriter, r *http.Request) {

ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
t.log.Errorf("Error upgrading to websocket: %v", err)
errMsg := "Error upgrading to websocket"
t.log.Errorf("%v: %v", errMsg, err)
http.Error(w, errMsg, http.StatusInternalServerError)
return
}

t.handler(ws, r)
Expand Down