Skip to content

Commit

Permalink
Fixes Kubernetes Service using expired credentials (#50197)
Browse files Browse the repository at this point in the history
The Kubernetes service occasionally fails to forward requests to EKS clusters or retrieve the cluster schema due to AWS rejecting the request with an "expired token" error.

EKS access tokens are generated using STS presigned URLs, which include details such as the cluster, backend credentials, and assumed roles. By default, these tokens are valid for 15 minutes, and the Kubernetes service refreshes them every $(15 - 1) / 2 = 7\text{ }minutes$.
However, our cloud SDK caches the underlying `aws.Session`, particularly those with assumed roles, for 15 minutes.

This leads to a scenario where the token is refreshed a second time at approximately 14 minutes, close to the token's 15-minute validity. If the underlying credentials expire before the next token refresh, given that they were reused from the previous query and cached since then, it  results in the Kubernetes Service considering the token valid (since it is a Base64-encoded presigned URL without knowledge about the credentials), but AWS EKS cluster rejects the request, treating the credentials as expired.

This PR adds an option to disable cache for EKS STS token signing which results in creating a session per EKS cluster sign process.

Bellow one can find the error message EKS returns.
```
2024-12-09T17:00:15Z ERRO [KUBERNETE] Failed to update cluster schema error:[
ERROR REPORT:
Original Error: *errors.StatusError the server has asked for the client to provide credentials
Stack Trace:
	github.com/gravitational/teleport/lib/kube/proxy/scheme.go:140 github.com/gravitational/teleport/lib/kube/proxy.newClusterSchemaBuilder
	github.com/gravitational/teleport/lib/kube/proxy/cluster_details.go:193 github.com/gravitational/teleport/lib/kube/proxy.newClusterDetails.func1
	runtime/asm_amd64.s:1695 runtime.goexit
User Message: the server has asked for the client to provide credentials] pid:7.1 start_time:2024-12-09T17:00:15Z proxy/cluster_details.go:210
2024-12-09T17:00:24Z ERRO [KUBERNETE] Failed to update cluster schema  error:[
ERROR REPORT:
Original Error: *errors.StatusError the server has asked for the client to provide credentials
Stack Trace:
	github.com/gravitational/teleport/lib/kube/proxy/scheme.go:140 github.com/gravitational/teleport/lib/kube/proxy.newClusterSchemaBuilder
	github.com/gravitational/teleport/lib/kube/proxy/cluster_details.go:193 github.com/gravitational/teleport/lib/kube/proxy.newClusterDetails.func1
	runtime/asm_amd64.s:1695 runtime.goexit
User Message: the server has asked for the client to provide credentials] pid:7.1 start_time:2024-12-09T17:00:24Z proxy/cluster_details.go:210
```

Changelog: Fixes an intermittent EKS authentication failure when dealing with EKS auto-discovery.

Signed-off-by: Tiago Silva <[email protected]>
  • Loading branch information
tigrato authored Dec 13, 2024
1 parent 8c8326d commit 94a872a
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 15 deletions.
59 changes: 49 additions & 10 deletions lib/cloud/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,9 @@ type awsOptions struct {

// maxRetries is the maximum number of retries to use for the session.
maxRetries *int

// withoutSessionCache disables the session cache for the AWS session.
withoutSessionCache bool
}

func (a *awsOptions) checkAndSetDefaults() error {
Expand Down Expand Up @@ -429,6 +432,13 @@ func WithAssumeRole(roleARN, externalID string) AWSOptionsFn {
}
}

// WithoutSessionCache disables the session cache for the AWS session.
func WithoutSessionCache() AWSOptionsFn {
return func(options *awsOptions) {
options.withoutSessionCache = true
}
}

// WithAssumeRoleFromAWSMeta extracts options needed from AWS metadata for
// assuming an AWS role.
func WithAssumeRoleFromAWSMeta(meta types.AWS) AWSOptionsFn {
Expand Down Expand Up @@ -495,7 +505,7 @@ func (c *cloudClients) GetAWSSession(ctx context.Context, region string, opts ..
}
var err error
if options.baseSession == nil {
options.baseSession, err = c.getAWSSessionForRegion(region, options)
options.baseSession, err = c.getAWSSessionForRegion(ctx, region, options)
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down Expand Up @@ -793,17 +803,12 @@ func awsAmbientSessionProvider(ctx context.Context, region string) (*awssession.
}

// getAWSSessionForRegion returns AWS session for the specified region.
func (c *cloudClients) getAWSSessionForRegion(region string, opts awsOptions) (*awssession.Session, error) {
func (c *cloudClients) getAWSSessionForRegion(ctx context.Context, region string, opts awsOptions) (*awssession.Session, error) {
if err := opts.checkAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}

cacheKey := awsSessionCacheKey{
region: region,
integration: opts.integration,
}

sess, err := utils.FnCacheGet(context.Background(), c.awsSessionsCache, cacheKey, func(ctx context.Context) (*awssession.Session, error) {
createSession := func(ctx context.Context) (*awssession.Session, error) {
if opts.credentialsSource == credentialsSourceIntegration {
if c.awsIntegrationSessionProviderFn == nil {
return nil, trace.BadParameter("missing aws integration session provider")
Expand All @@ -817,6 +822,30 @@ func (c *cloudClients) getAWSSessionForRegion(region string, opts awsOptions) (*
logrus.Debugf("Initializing AWS session for region %v using environment credentials.", region)
session, err := awsAmbientSessionProvider(ctx, region)
return session, trace.Wrap(err)
}

if opts.withoutSessionCache {
sess, err := createSession(ctx)
if err != nil {
return nil, trace.Wrap(err)
}
if opts.customRetryer != nil || opts.maxRetries != nil {
return sess.Copy(&aws.Config{
Retryer: opts.customRetryer,
MaxRetries: opts.maxRetries,
}), nil
}
return sess, trace.Wrap(err)
}

cacheKey := awsSessionCacheKey{
region: region,
integration: opts.integration,
}

sess, err := utils.FnCacheGet(ctx, c.awsSessionsCache, cacheKey, func(ctx context.Context) (*awssession.Session, error) {
session, err := createSession(ctx)
return session, trace.Wrap(err)
})
if err != nil {
return nil, trace.Wrap(err)
Expand All @@ -836,15 +865,25 @@ func (c *cloudClients) getAWSSessionForRole(ctx context.Context, region string,
return nil, trace.Wrap(err)
}

createSession := func(ctx context.Context) (*awssession.Session, error) {
stsClient := sts.New(options.baseSession)
return newSessionWithRole(ctx, stsClient, region, options.assumeRoleARN, options.assumeRoleExternalID)
}

if options.withoutSessionCache {
session, err := createSession(ctx)
return session, trace.Wrap(err)
}

cacheKey := awsSessionCacheKey{
region: region,
integration: options.integration,
roleARN: options.assumeRoleARN,
externalID: options.assumeRoleExternalID,
}
return utils.FnCacheGet(ctx, c.awsSessionsCache, cacheKey, func(ctx context.Context) (*awssession.Session, error) {
stsClient := sts.New(options.baseSession)
return newSessionWithRole(ctx, stsClient, region, options.assumeRoleARN, options.assumeRoleExternalID)
session, err := createSession(ctx)
return session, trace.Wrap(err)
})
}

Expand Down
9 changes: 4 additions & 5 deletions lib/kube/proxy/cluster_details.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,9 @@ func newClusterDetails(ctx context.Context, cfg clusterDetailsConfig) (_ *kubeDe
go dynLabels.Start()
}

kubeClient := creds.getKubeClient()

var isClusterOffline bool
// Create the codec factory and the list of supported types for RBAC.
codecFactory, rbacSupportedTypes, gvkSupportedRes, err := newClusterSchemaBuilder(cfg.log, kubeClient)
codecFactory, rbacSupportedTypes, gvkSupportedRes, err := newClusterSchemaBuilder(cfg.log, creds.getKubeClient())
if err != nil {
cfg.log.WithError(err).Warn("Failed to create cluster schema. Possibly the cluster is offline.")
// If the cluster is offline, we will not be able to create the codec factory
Expand All @@ -145,7 +143,7 @@ func newClusterDetails(ctx context.Context, cfg clusterDetailsConfig) (_ *kubeDe
isClusterOffline = true
}

kubeVersion, err := kubeClient.Discovery().ServerVersion()
kubeVersion, err := creds.getKubeClient().Discovery().ServerVersion()
if err != nil {
cfg.log.WithError(err).Warn("Failed to get Kubernetes cluster version. Possibly the cluster is offline.")
}
Expand Down Expand Up @@ -204,7 +202,7 @@ func newClusterDetails(ctx context.Context, cfg clusterDetailsConfig) (_ *kubeDe
continue
}

kubeVersion, err := kubeClient.Discovery().ServerVersion()
kubeVersion, err := creds.getKubeClient().Discovery().ServerVersion()
if err != nil {
cfg.log.WithError(err).Warn("Failed to get Kubernetes cluster version. Possibly the cluster is offline.")
}
Expand Down Expand Up @@ -342,6 +340,7 @@ func getAWSClientRestConfig(cloudClients cloud.Clients, clock clockwork.Clock, r
region := cluster.GetAWSConfig().Region
opts := []cloud.AWSOptionsFn{
cloud.WithAmbientCredentials(),
cloud.WithoutSessionCache(),
}
if awsAssume := getAWSResourceMatcherToCluster(cluster, resourceMatchers); awsAssume != nil {
opts = append(opts, cloud.WithAssumeRole(awsAssume.AssumeRoleARN, awsAssume.ExternalID))
Expand Down

0 comments on commit 94a872a

Please sign in to comment.