Skip to content

Commit

Permalink
Limit Kubernetes connections (#12275)
Browse files Browse the repository at this point in the history
  • Loading branch information
xacrimon authored May 2, 2022
1 parent 2db4e7d commit 21ff622
Show file tree
Hide file tree
Showing 7 changed files with 915 additions and 738 deletions.
6 changes: 6 additions & 0 deletions api/types/semaphore.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ import (
// role option).
const SemaphoreKindConnection = "connection"

// SemaphoreKindKubernetesConnection is the semaphore kind used by
// the Concurrent Session Control feature to limit concurrent
// connections for Kubernetes (corresponds to the `max_kubernetes_connections`
// role option).
const SemaphoreKindKubernetesConnection = "kubernetes_connection"

// Semaphore represents distributed semaphore concept
type Semaphore interface {
// Resource contains common resource values
Expand Down
1,508 changes: 771 additions & 737 deletions api/types/types.pb.go

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions api/types/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1600,6 +1600,11 @@ message RoleOptions {
// CertExtensions specifies the key/values
repeated CertExtension CertExtensions = 17
[ (gogoproto.jsontag) = "cert_extensions,omitempty" ];

// MaxKubernetesConnections defines the maximum number of concurrent
// Kubernetes sessions a user may hold.
int64 MaxKubernetesConnections = 18
[ (gogoproto.jsontag) = "max_kubernetes_connections,omitempty" ];
}

message RecordSession {
Expand Down
80 changes: 80 additions & 0 deletions integration/kube_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ func TestKube(t *testing.T) {
t.Run("TrustedClustersSNI", suite.bind(testKubeTrustedClustersSNI))
t.Run("Disconnect", suite.bind(testKubeDisconnect))
t.Run("Join", suite.bind(testKubeJoin))
t.Run("ConnectionLimit", suite.bind(testKubeConnectionLimit))
}

// TestKubeExec tests kubernetes Exec command set
Expand Down Expand Up @@ -1611,3 +1612,82 @@ func testKubeJoin(t *testing.T, suite *KubeSuite) {
require.Contains(t, participantOutput, []byte("echo hi"))
require.Contains(t, out.String(), []byte("echo hi2"))
}

// testKubeConnectionLimit checks that the `max_kubernetes_connections` role option is enforced.
func testKubeConnectionLimit(t *testing.T, suite *KubeSuite) {
teleport := NewInstance(InstanceConfig{
ClusterName: Site,
HostID: HostID,
NodeName: Host,
Priv: suite.priv,
Pub: suite.pub,
log: suite.log,
})

const maxConnections = 10
hostUsername := suite.me.Username
kubeGroups := []string{testImpersonationGroup}
kubeUsers := []string{"[email protected]"}
role, err := types.NewRoleV3("kubemaster", types.RoleSpecV5{
Allow: types.RoleConditions{
Logins: []string{hostUsername},
KubeGroups: kubeGroups,
KubeUsers: kubeUsers,
},
Options: types.RoleOptions{
MaxKubernetesConnections: maxConnections,
},
})
require.NoError(t, err)
teleport.AddUserWithRole(hostUsername, role)

err = teleport.Start()
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, teleport.StopAll()) })

// set up kube configuration using proxy
proxyClient, proxyClientConfig, err := kubeProxyClient(kubeProxyConfig{
t: teleport,
username: hostUsername,
kubeUsers: kubeUsers,
kubeGroups: kubeGroups,
})
require.NoError(t, err)

ctx := context.Background()
// try get request to fetch available pods
pod, err := proxyClient.CoreV1().Pods(testNamespace).Get(ctx, testPod, metav1.GetOptions{})
require.NoError(t, err)

openExec := func() error {
// interactive command, allocate pty
term := NewTerminal(250)
out := &bytes.Buffer{}

return kubeExec(proxyClientConfig, kubeExecArgs{
podName: pod.Name,
podNamespace: pod.Namespace,
container: pod.Spec.Containers[0].Name,
command: []string{"/bin/sh", "-c", "sleep 300"},
stdout: out,
tty: true,
stdin: term,
})
}

// Create and maintain the maximum amount of open connections
for i := 0; i < maxConnections; i++ {
go openExec()
}

// Wait for the connections to open and check for any errors
require.Eventually(t, func() bool {
trackers, err := teleport.Process.GetAuthServer().GetActiveSessionTrackers(ctx)
require.NoError(t, err)
return len(trackers) == maxConnections
}, time.Second*30, time.Second)

// Open one more connection. It should fail due to the limit.
err = openExec()
require.Error(t, err)
}
38 changes: 38 additions & 0 deletions lib/kube/proxy/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,9 @@ func (f *Forwarder) withAuth(handler handlerWithAuthFunc) httprouter.Handle {
if err := f.authorize(req.Context(), authContext); err != nil {
return nil, trace.Wrap(err)
}
if err := f.acquireConnectionLock(req.Context(), authContext); err != nil {
return nil, trace.Wrap(err)
}
return handler(authContext, w, req, p)
}, f.formatResponseError)
}
Expand Down Expand Up @@ -894,6 +897,41 @@ func wsProxy(wsSource *websocket.Conn, wsTarget *websocket.Conn) error {
return trace.Wrap(err)
}

// acquireConnectionLock acquires a semaphore used to limit connections to the Kubernetes agent.
// The semaphore is releasted when the request is returned/connection is closed.
// Returns an error if a semaphore could not be acquired.
func (f *Forwarder) acquireConnectionLock(ctx context.Context, identity *authContext) error {
user := identity.Identity.GetIdentity().Username
roles, err := getRolesByName(f, identity.Identity.GetIdentity().Groups)
if err != nil {
return trace.Wrap(err)
}

maxConnections := services.RoleSet(roles).MaxKubernetesConnections()
semLock, err := services.AcquireSemaphoreLock(ctx, services.SemaphoreLockConfig{
Service: f.cfg.AuthClient,
Expiry: sessionMaxLifetime,
Params: types.AcquireSemaphoreRequest{
SemaphoreKind: types.SemaphoreKindKubernetesConnection,
SemaphoreName: user,
MaxLeases: maxConnections,
Holder: identity.teleportCluster.name,
},
})
if err != nil {
if strings.Contains(err.Error(), teleport.MaxLeases) {
err = trace.AccessDenied("too many concurrent kubernetes connections for user %q (max=%d)",
user,
maxConnections,
)
}

return trace.Wrap(err)
}
go semLock.KeepAlive(ctx)
return nil
}

// exec forwards all exec requests to the target server, captures
// all output from the session
func (f *Forwarder) exec(ctx *authContext, w http.ResponseWriter, req *http.Request, p httprouter.Params) (resp interface{}, err error) {
Expand Down
3 changes: 2 additions & 1 deletion lib/kube/proxy/sess.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const sessionRecorderID = "session-recorder"

const PresenceVerifyInterval = time.Second * 15
const PresenceMaxDifference = time.Minute
const sessionMaxLifetime = time.Hour * 24

// remoteClient is either a kubectl or websocket client.
type remoteClient interface {
Expand Down Expand Up @@ -350,7 +351,7 @@ func newSession(ctx authContext, forwarder *Forwarder, req *http.Request, params
sess: sess,
closeC: make(chan struct{}),
initiator: initiator.ID,
expires: time.Now().UTC().Add(time.Hour * 24),
expires: time.Now().UTC().Add(sessionMaxLifetime),
PresenceEnabled: ctx.Identity.GetIdentity().MFAVerified != "",
stateUpdate: sync.NewCond(&sync.Mutex{}),
displayParticipantRequirements: utils.AsBool(q.Get("displayParticipantRequirements")),
Expand Down
13 changes: 13 additions & 0 deletions lib/services/role.go
Original file line number Diff line number Diff line change
Expand Up @@ -1110,6 +1110,19 @@ func (set RoleSet) MaxSessions() int64 {
return ms
}

// MaxConnections returns the maximum number of concurrent Kubernetes connections
// allowed. If MaxConnections is zero then no maximum was defined
// and the number of concurrent connections is unconstrained.
func (set RoleSet) MaxKubernetesConnections() int64 {
var mcs int64
for _, role := range set {
if m := role.GetOptions().MaxKubernetesConnections; m != 0 && (m < mcs || mcs == 0) {
mcs = m
}
}
return mcs
}

// AdjustClientIdleTimeout adjusts requested idle timeout
// to the lowest max allowed timeout, the most restrictive
// option will be picked, negative values will be assumed as 0
Expand Down

0 comments on commit 21ff622

Please sign in to comment.