Skip to content

Commit

Permalink
Merge pull request #9543 from ionutbalutoiu/fix/cluster-cache-tracker…
Browse files Browse the repository at this point in the history
…-memory-leak

🐛 Fix ClusterCacheTracker memory leak
  • Loading branch information
k8s-ci-robot authored Jan 26, 2024
2 parents fe3d762 + 97534b8 commit 1bef042
Showing 1 changed file with 59 additions and 27 deletions.
86 changes: 59 additions & 27 deletions controllers/remote/cluster_cache_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
Expand Down Expand Up @@ -298,8 +299,14 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
return nil, errors.Wrapf(err, "error fetching REST client config for remote cluster %q", cluster.String())
}

// Create a client and a cache for the cluster.
c, uncachedClient, cache, err := t.createClient(ctx, config, cluster, indexes)
// Create a http client and a mapper for the cluster.
httpClient, mapper, err := t.createHTTPClientAndMapper(config, cluster)
if err != nil {
return nil, errors.Wrapf(err, "error creating http client and mapper for remote cluster %q", cluster.String())
}

// Create an uncached client for the cluster.
uncachedClient, err := t.createUncachedClient(config, cluster, httpClient, mapper)
if err != nil {
return nil, err
}
Expand All @@ -324,16 +331,23 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
config.CAFile = inClusterConfig.CAFile
config.Host = inClusterConfig.Host

// Create a new client and overwrite the previously created client.
c, _, cache, err = t.createClient(ctx, config, cluster, indexes)
// Update the http client and the mapper to use in-cluster config.
httpClient, mapper, err = t.createHTTPClientAndMapper(config, cluster)
if err != nil {
return nil, errors.Wrap(err, "error creating client for self-hosted cluster")
return nil, errors.Wrapf(err, "error creating http client and mapper (using in-cluster config) for remote cluster %q", cluster.String())
}

log.Info(fmt.Sprintf("Creating cluster accessor for cluster %q with in-cluster service %q", cluster.String(), config.Host))
} else {
log.Info(fmt.Sprintf("Creating cluster accessor for cluster %q with the regular apiserver endpoint %q", cluster.String(), config.Host))
}

// Create a client and a cache for the cluster.
cachedClient, err := t.createCachedClient(ctx, config, cluster, httpClient, mapper, indexes)
if err != nil {
return nil, err
}

// Generating a new private key to be used for generating temporary certificates to connect to
// etcd on the target cluster.
// NOTE: Generating a private key is an expensive operation, so we store it in the cluster accessor.
Expand All @@ -343,9 +357,9 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
}

return &clusterAccessor{
cache: cache,
cache: cachedClient.Cache,
config: config,
client: c,
client: cachedClient.Client,
watches: sets.Set[string]{},
etcdClientCertificateKey: etcdKey,
}, nil
Expand Down Expand Up @@ -377,28 +391,53 @@ func (t *ClusterCacheTracker) runningOnWorkloadCluster(ctx context.Context, c cl
return t.controllerPodMetadata.UID == pod.UID, nil
}

// createClient creates a cached client, and uncached client and a mapper based on a rest.Config.
func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Config, cluster client.ObjectKey, indexes []Index) (client.Client, client.Client, *stoppableCache, error) {
// createHTTPClientAndMapper creates a http client and a dynamic rest mapper for the given cluster, based on the rest.Config.
func (t *ClusterCacheTracker) createHTTPClientAndMapper(config *rest.Config, cluster client.ObjectKey) (*http.Client, meta.RESTMapper, error) {
// Create a http client for the cluster.
httpClient, err := rest.HTTPClientFor(config)
if err != nil {
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating http client", cluster.String())
return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating http client", cluster.String())
}

// Create a mapper for it
mapper, err := apiutil.NewDynamicRESTMapper(config, httpClient)
if err != nil {
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating dynamic rest mapper", cluster.String())
return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating dynamic rest mapper", cluster.String())
}

// Verify if we can get a rest mapping from the workload cluster apiserver.
// Note: This also checks if the apiserver is up in general. We do this already here
// to avoid further effort creating a cache and a client and to produce a clearer error message.
_, err = mapper.RESTMapping(corev1.SchemeGroupVersion.WithKind("Node").GroupKind(), corev1.SchemeGroupVersion.Version)
if err != nil {
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error getting rest mapping", cluster.String())
return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error getting rest mapping", cluster.String())
}

return httpClient, mapper, nil
}

// createUncachedClient creates an uncached client for the given cluster, based on the rest.Config.
func (t *ClusterCacheTracker) createUncachedClient(config *rest.Config, cluster client.ObjectKey, httpClient *http.Client, mapper meta.RESTMapper) (client.Client, error) {
// Create the uncached client for the remote cluster
uncachedClient, err := client.New(config, client.Options{
Scheme: t.scheme,
Mapper: mapper,
HTTPClient: httpClient,
})
if err != nil {
return nil, errors.Wrapf(err, "error creating uncached client for remote cluster %q", cluster.String())
}

return uncachedClient, nil
}

type cachedClientOutput struct {
Client client.Client
Cache *stoppableCache
}

// createCachedClient creates a cached client for the given cluster, based on a rest.Config.
func (t *ClusterCacheTracker) createCachedClient(ctx context.Context, config *rest.Config, cluster client.ObjectKey, httpClient *http.Client, mapper meta.RESTMapper, indexes []Index) (*cachedClientOutput, error) {
// Create the cache for the remote cluster
cacheOptions := cache.Options{
HTTPClient: httpClient,
Expand All @@ -407,7 +446,7 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con
}
remoteCache, err := cache.New(config, cacheOptions)
if err != nil {
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating cache", cluster.String())
return nil, errors.Wrapf(err, "error creating cached client for remote cluster %q: error creating cache", cluster.String())
}

cacheCtx, cacheCtxCancel := context.WithCancel(ctx)
Expand All @@ -420,7 +459,7 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con

for _, index := range indexes {
if err := cache.IndexField(ctx, index.Object, index.Field, index.ExtractValue); err != nil {
return nil, nil, nil, errors.Wrapf(err, "error adding index for field %q to cache for remote cluster %q", index.Field, cluster.String())
return nil, errors.Wrapf(err, "error creating cached client for remote cluster %q: error adding index for field %q to cache", cluster.String(), index.Field)
}
}

Expand All @@ -436,19 +475,9 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con
},
})
if err != nil {
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q", cluster.String())
return nil, errors.Wrapf(err, "error creating cached client for remote cluster %q", cluster.String())
}

// Create an uncached client. This is used in `runningOnWorkloadCluster` to ensure we don't continuously cache
// pods in the client.
uncachedClient, err := client.New(config, client.Options{
Scheme: t.scheme,
Mapper: mapper,
HTTPClient: httpClient,
})
if err != nil {
return nil, nil, nil, errors.Wrapf(err, "error creating uncached client for remote cluster %q", cluster.String())
}
// Start the cache!!!
go cache.Start(cacheCtx) //nolint:errcheck

Expand All @@ -457,7 +486,7 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con
defer cacheSyncCtxCancel()
if !cache.WaitForCacheSync(cacheSyncCtx) {
cache.Stop()
return nil, nil, nil, fmt.Errorf("failed waiting for cache for remote cluster %v to sync: %w", cluster, cacheCtx.Err())
return nil, fmt.Errorf("failed waiting for cache for remote cluster %v to sync: %w", cluster, cacheCtx.Err())
}

// Wrap the cached client with a client that sets timeouts on all Get and List calls
Expand All @@ -474,7 +503,10 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con
httpClient: httpClient,
})

return cachedClient, uncachedClient, cache, nil
return &cachedClientOutput{
Client: cachedClient,
Cache: cache,
}, nil
}

// deleteAccessor stops a clusterAccessor's cache and removes the clusterAccessor from the tracker.
Expand Down

0 comments on commit 1bef042

Please sign in to comment.