diff --git a/controllers/remote/cluster_cache_tracker.go b/controllers/remote/cluster_cache_tracker.go index e826c67dd902..7ebf4efa8a83 100644 --- a/controllers/remote/cluster_cache_tracker.go +++ b/controllers/remote/cluster_cache_tracker.go @@ -278,13 +278,14 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl } // Create a client and a cache for the cluster. - c, cache, err := t.createClient(ctx, config, cluster, indexes) + c, uncachedClient, cache, err := t.createClient(ctx, config, cluster, indexes) if err != nil { return nil, err } // Detect if the controller is running on the workload cluster. - runningOnCluster, err := t.runningOnWorkloadCluster(ctx, c, cluster) + // This function uses an uncached client to ensure pods aren't cached by the long-lived client. + runningOnCluster, err := t.runningOnWorkloadCluster(ctx, uncachedClient, cluster) if err != nil { return nil, err } @@ -303,7 +304,7 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl config.Host = inClusterConfig.Host // Create a new client and overwrite the previously created client. - c, cache, err = t.createClient(ctx, config, cluster, indexes) + c, _, cache, err = t.createClient(ctx, config, cluster, indexes) if err != nil { return nil, errors.Wrap(err, "error creating client for self-hosted cluster") } @@ -355,18 +356,18 @@ func (t *ClusterCacheTracker) runningOnWorkloadCluster(ctx context.Context, c cl return t.controllerPodMetadata.UID == pod.UID, nil } -// createClient creates a client and a mapper based on a rest.Config. -func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Config, cluster client.ObjectKey, indexes []Index) (client.Client, *stoppableCache, error) { +// createClient creates a cached client, and uncached client and a mapper based on a rest.Config. +func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Config, cluster client.ObjectKey, indexes []Index) (client.Client, client.Client, *stoppableCache, error) { // Create a http client for the cluster. httpClient, err := rest.HTTPClientFor(config) if err != nil { - return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating http client", cluster.String()) + return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating http client", cluster.String()) } // Create a mapper for it mapper, err := apiutil.NewDynamicRESTMapper(config, httpClient) if err != nil { - return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating dynamic rest mapper", cluster.String()) + return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating dynamic rest mapper", cluster.String()) } // Verify if we can get a rest mapping from the workload cluster apiserver. @@ -374,7 +375,7 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con // to avoid further effort creating a cache and a client and to produce a clearer error message. _, err = mapper.RESTMapping(corev1.SchemeGroupVersion.WithKind("Node").GroupKind(), corev1.SchemeGroupVersion.Version) if err != nil { - return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error getting rest mapping", cluster.String()) + return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error getting rest mapping", cluster.String()) } // Create the cache for the remote cluster @@ -385,7 +386,7 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con } remoteCache, err := cache.New(config, cacheOptions) if err != nil { - return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating cache", cluster.String()) + return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating cache", cluster.String()) } cacheCtx, cacheCtxCancel := context.WithCancel(ctx) @@ -398,12 +399,12 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con for _, index := range indexes { if err := cache.IndexField(ctx, index.Object, index.Field, index.ExtractValue); err != nil { - return nil, nil, errors.Wrapf(err, "error adding index for field %q to cache for remote cluster %q", index.Field, cluster.String()) + return nil, nil, nil, errors.Wrapf(err, "error adding index for field %q to cache for remote cluster %q", index.Field, cluster.String()) } } // Create the client for the remote cluster - c, err := client.New(config, client.Options{ + cachedClient, err := client.New(config, client.Options{ Scheme: t.scheme, Mapper: mapper, HTTPClient: httpClient, @@ -414,9 +415,19 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con }, }) if err != nil { - return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q", cluster.String()) + return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q", cluster.String()) } + // Create an uncached client. This is used in `runningOnWorkloadCluster` to ensure we don't continuously cache + // pods in the client. + uncachedClient, err := client.New(config, client.Options{ + Scheme: t.scheme, + Mapper: mapper, + HTTPClient: httpClient, + }) + if err != nil { + return nil, nil, nil, errors.Wrapf(err, "error creating uncached client for remote cluster %q", cluster.String()) + } // Start the cache!!! go cache.Start(cacheCtx) //nolint:errcheck @@ -425,7 +436,7 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con defer cacheSyncCtxCancel() if !cache.WaitForCacheSync(cacheSyncCtx) { cache.Stop() - return nil, nil, fmt.Errorf("failed waiting for cache for remote cluster %v to sync: %w", cluster, cacheCtx.Err()) + return nil, nil, nil, fmt.Errorf("failed waiting for cache for remote cluster %v to sync: %w", cluster, cacheCtx.Err()) } // Start cluster healthcheck!!! @@ -435,7 +446,7 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con httpClient: httpClient, }) - return c, cache, nil + return cachedClient, uncachedClient, cache, nil } // deleteAccessor stops a clusterAccessor's cache and removes the clusterAccessor from the tracker.