Skip to content

Commit

Permalink
[v9] Moderated Sessions rollup backport (#11803)
Browse files Browse the repository at this point in the history
* Write error and return on failed websocket upgrade (#11606)

* Broadcast controls keys if session is moderated (#11661)

* Clarify RBAC rule application (#11672)

* Use a buffered channel for the terminate notifier (#11687)

* Restrict moderated sessions users from accessing V8 kube cluster agents (#11691)
  • Loading branch information
xacrimon authored Apr 7, 2022
1 parent 76be6d7 commit d540812
Show file tree
Hide file tree
Showing 10 changed files with 162 additions and 14 deletions.
4 changes: 4 additions & 0 deletions docs/pages/access-controls/guides/moderated-sessions.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ spec:
count: 2
```
#### Combining Policies
The authorizer applies require policies within a role together with an OR operator and the policies from each role with an AND operator. In practice, this means that for every role with at least one require policy, one of its policies must be met before a session can be started.
### `join_sessions`

#### Options
Expand Down
114 changes: 114 additions & 0 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ func TestIntegrations(t *testing.T) {
t.Run("UUIDBasedProxy", suite.bind(testUUIDBasedProxy))
t.Run("WindowChange", suite.bind(testWindowChange))
t.Run("SSHTracker", suite.bind(testSSHTracker))
t.Run("TestKubeAgentFiltering", suite.bind(testKubeAgentFiltering))
}

// testAuditOn creates a live session, records a bunch of data through it
Expand Down Expand Up @@ -5950,3 +5951,116 @@ outer:

t.FailNow()
}

// TestKubeAgentFiltering tests that kube-agent filtering for pre-v8 agents and
// moderated sessions users works as expected.
func testKubeAgentFiltering(t *testing.T, suite *integrationTestSuite) {
ctx := context.Background()

type testCase struct {
name string
server types.Server
role types.Role
user types.User
wantsLen int
}

v8Agent, err := types.NewServer("kube-h", types.KindKubeService, types.ServerSpecV2{
Version: "8.0.0",
KubernetesClusters: []*types.KubernetesCluster{{Name: "foo"}},
})
require.NoError(t, err)

v9Agent, err := types.NewServer("kube-h", types.KindKubeService, types.ServerSpecV2{
Version: "9.0.0",
KubernetesClusters: []*types.KubernetesCluster{{Name: "foo"}},
})
require.NoError(t, err)

plainRole, err := types.NewRole("plain", types.RoleSpecV5{})
require.NoError(t, err)

moderatedRole, err := types.NewRole("moderated", types.RoleSpecV5{
Allow: types.RoleConditions{
RequireSessionJoin: []*types.SessionRequirePolicy{
{
Name: "bar",
Kinds: []string{string(types.KubernetesSessionKind)},
},
},
},
})
require.NoError(t, err)

plainUser, err := types.NewUser("bob")
require.NoError(t, err)
plainUser.SetRoles([]string{plainRole.GetName()})

moderatedUser, err := types.NewUser("alice")
require.NoError(t, err)
moderatedUser.SetRoles([]string{moderatedRole.GetName()})

testCases := []testCase{
{
name: "unrestricted user, v8 agent",
server: v8Agent,
role: plainRole,
user: plainUser,
wantsLen: 1,
},
{
name: "restricted user, v8 agent",
server: v8Agent,
role: moderatedRole,
user: moderatedUser,
wantsLen: 0,
},
{
name: "unrestricted user, v9 agent",
server: v9Agent,
role: plainRole,
user: plainUser,
wantsLen: 1,
},
{
name: "restricted user, v9 agent",
server: v9Agent,
role: moderatedRole,
user: moderatedUser,
wantsLen: 1,
},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
teleport := suite.newTeleport(t, nil, true)
defer teleport.StopAll()

adminSite := teleport.Process.GetAuthServer()
_, err := adminSite.UpsertKubeServiceV2(ctx, testCase.server)
require.NoError(t, err)
err = adminSite.UpsertRole(ctx, testCase.role)
require.NoError(t, err)
err = adminSite.CreateUser(ctx, testCase.user)
require.NoError(t, err)

cl, err := teleport.NewClient(ClientConfig{
Login: testCase.user.GetName(),
Cluster: Site,
Host: Host,
Port: teleport.GetPortSSHInt(),
})
require.NoError(t, err)

proxy, err := cl.ConnectToProxy(ctx)
require.NoError(t, err)

userSite, err := proxy.ConnectToCluster(ctx, Site, false)
require.NoError(t, err)

services, err := userSite.GetKubeServices(ctx)
require.NoError(t, err)
require.Len(t, services, testCase.wantsLen)
})
}
}
24 changes: 24 additions & 0 deletions lib/auth/auth_with_roles.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net/url"
"time"

"github.com/coreos/go-semver/semver"
"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/client"
"github.com/gravitational/teleport/api/client/proto"
Expand Down Expand Up @@ -3581,6 +3582,29 @@ func (a *ServerWithRoles) filterKubeServices(server types.Server) error {
// connect to them.
mfaParams := services.AccessMFAParams{Verified: true}

// Filter out agents that don't have support for moderated sessions access
// checking if the user has any roles that require it.
if hasLocalUserRole(a.context.Checker) {
roles := a.context.Checker.(LocalUserRoleSet)
agentVersion, versionErr := semver.NewVersion(server.GetTeleportVersion())

hasK8SRequirePolicy := func() bool {
for _, role := range roles.RoleSet {
for _, policy := range role.GetSessionRequirePolicies() {
if ContainsSessionKind(policy.Kinds, types.KubernetesSessionKind) {
return true
}
}
}

return false
}

if hasK8SRequirePolicy() && (versionErr != nil || agentVersion.LessThan(*MinSupportedModeratedSessionsVersion)) {
return trace.AccessDenied("cannot use moderated sessions with pre-v9 kubernetes agents")
}
}

filtered := make([]*types.KubernetesCluster, 0, len(server.GetKubernetesClusters()))
for _, kube := range server.GetKubernetesClusters() {
k8sV3, err := types.NewKubernetesClusterV3FromLegacyCluster(server.GetNamespace(), kube)
Expand Down
3 changes: 1 addition & 2 deletions lib/auth/grpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1708,8 +1708,7 @@ func downgradeRole(ctx context.Context, role *types.RoleV5) (*types.RoleV5, erro
}
}

minSupportedVersionForV5Roles := semver.New(utils.VersionBeforeAlpha("9.0.0"))
if clientVersion == nil || clientVersion.LessThan(*minSupportedVersionForV5Roles) {
if clientVersion == nil || clientVersion.LessThan(*MinSupportedModeratedSessionsVersion) {
log.Debugf(`Client version "%s" is unknown or less than 9.0.0, converting role to v4`, clientVersionString)
downgraded, err := services.DowngradeRoleToV4(role)
if err != nil {
Expand Down
7 changes: 5 additions & 2 deletions lib/auth/session_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@ import (
"regexp"
"strings"

"github.com/coreos/go-semver/semver"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/utils"
"github.com/gravitational/trace"
"github.com/vulcand/predicate"
)

var MinSupportedModeratedSessionsVersion = semver.New(utils.VersionBeforeAlpha("9.0.0"))

// SessionAccessEvaluator takes a set of policies
// and uses rules to evaluate them to determine when a session may start
// and if a user can join a session.
Expand Down Expand Up @@ -70,7 +73,7 @@ func getAllowPolicies(participant SessionAccessContext) []*types.SessionJoinPoli
return policies
}

func containsKind(s []string, e types.SessionKind) bool {
func ContainsSessionKind(s []string, e types.SessionKind) bool {
for _, a := range s {
if types.SessionKind(a) == e {
return true
Expand Down Expand Up @@ -162,7 +165,7 @@ func (e *SessionAccessEvaluator) matchesJoin(allow *types.SessionJoinPolicy) boo
}

func (e *SessionAccessEvaluator) matchesKind(allow []string) bool {
if containsKind(allow, e.kind) || containsKind(allow, "*") {
if ContainsSessionKind(allow, e.kind) || ContainsSessionKind(allow, "*") {
return true
}

Expand Down
10 changes: 3 additions & 7 deletions lib/kube/proxy/sess.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,13 +329,6 @@ func newSession(ctx authContext, forwarder *Forwarder, req *http.Request, params

io := srv.NewTermManager()

if tty {
err = io.BroadcastMessage(fmt.Sprintf("Creating session with ID: %v...", id.String()))
if err != nil {
return nil, trace.Wrap(err)
}
}

s := &session{
ctx: ctx,
forwarder: forwarder,
Expand All @@ -361,6 +354,9 @@ func newSession(ctx authContext, forwarder *Forwarder, req *http.Request, params
displayParticipantRequirements: utils.AsBool(q.Get("displayParticipantRequirements")),
}

s.BroadcastMessage("Creating session with ID: %v...", id.String())
s.BroadcastMessage(srv.SessionControlsInfoBroadcast)

go func() {
if _, open := <-s.io.TerminateNotifier(); open {
err := s.Close()
Expand Down
5 changes: 5 additions & 0 deletions lib/srv/sess.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ import (
const PresenceVerifyInterval = time.Second * 15
const PresenceMaxDifference = time.Minute

// SessionControlsInfoBroadcast is sent in tandem with session creation
// to inform any joining users about the session controls.
const SessionControlsInfoBroadcast = "Controls\r\n - CTRL-C: Leave the session\r\n - t: Forcefully terminate the session (moderators only)"

var serverSessions = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: teleport.MetricServerInteractiveSessions,
Expand Down Expand Up @@ -1000,6 +1004,7 @@ func (s *session) startInteractive(ch ssh.Channel, ctx *ServerContext) error {
s.io.AddReader("reader", inReader)
s.io.AddWriter("session-recorder", utils.WriteCloserWithContext(ctx.srv.Context(), s.recorder))
s.BroadcastMessage("Creating session with ID: %v...", s.id)
s.BroadcastMessage(SessionControlsInfoBroadcast)

if err := s.term.Run(); err != nil {
ctx.Errorf("Unable to run shell command: %v.", err)
Expand Down
2 changes: 1 addition & 1 deletion lib/srv/termmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func NewTermManager() *TermManager {
closed: false,
readStateUpdate: sync.NewCond(&sync.Mutex{}),
incoming: make(chan []byte, 100),
terminateNotifier: make(chan struct{}),
terminateNotifier: make(chan struct{}, 1),
}
}

Expand Down
2 changes: 1 addition & 1 deletion lib/srv/termmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestCTRLCCapture(t *testing.T) {
m := NewTermManager()
r, w := io.Pipe()
m.AddReader("foo", r)
go w.Write([]byte("\x03"))
w.Write([]byte("\x03"))

select {
case <-m.TerminateNotifier():
Expand Down
5 changes: 4 additions & 1 deletion lib/web/terminal.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,10 @@ func (t *TerminalHandler) Serve(w http.ResponseWriter, r *http.Request) {

ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
t.log.Errorf("Error upgrading to websocket: %v", err)
errMsg := "Error upgrading to websocket"
t.log.Errorf("%v: %v", errMsg, err)
http.Error(w, errMsg, http.StatusInternalServerError)
return
}

t.handler(ws, r)
Expand Down

0 comments on commit d540812

Please sign in to comment.