diff --git a/controllers/remote/cluster_cache_healthcheck_test.go b/controllers/remote/cluster_cache_healthcheck_test.go index ce55499b01d4..bc9e29beca20 100644 --- a/controllers/remote/cluster_cache_healthcheck_test.go +++ b/controllers/remote/cluster_cache_healthcheck_test.go @@ -26,6 +26,8 @@ import ( . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" ctrl "sigs.k8s.io/controller-runtime" @@ -132,13 +134,12 @@ func TestClusterCacheHealthCheck(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() - httpClient, err := rest.HTTPClientFor(env.Config) + restClient, err := getRESTClient(env.Config) g.Expect(err).ToNot(HaveOccurred()) go cct.healthCheckCluster(ctx, &healthCheckInput{ cluster: testClusterKey, - cfg: env.Config, - httpClient: httpClient, + restClient: restClient, interval: testPollInterval, requestTimeout: testPollTimeout, unhealthyThreshold: testUnhealthyThreshold, @@ -164,12 +165,12 @@ func TestClusterCacheHealthCheck(t *testing.T) { g.Expect(cct.clusterLock.TryLock(testClusterKey)).To(BeTrue()) startHealthCheck := time.Now() - httpClient, err := rest.HTTPClientFor(env.Config) + restClient, err := getRESTClient(env.Config) g.Expect(err).ToNot(HaveOccurred()) + cct.healthCheckCluster(ctx, &healthCheckInput{ cluster: testClusterKey, - cfg: env.Config, - httpClient: httpClient, + restClient: restClient, interval: testPollInterval, requestTimeout: testPollTimeout, unhealthyThreshold: testUnhealthyThreshold, @@ -190,13 +191,13 @@ func TestClusterCacheHealthCheck(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() - httpClient, err := rest.HTTPClientFor(env.Config) + restClient, err := getRESTClient(env.Config) g.Expect(err).ToNot(HaveOccurred()) + go cct.healthCheckCluster(ctx, &healthCheckInput{ cluster: testClusterKey, - cfg: env.Config, - httpClient: httpClient, + restClient: restClient, interval: testPollInterval, requestTimeout: testPollTimeout, unhealthyThreshold: testUnhealthyThreshold, @@ -228,12 +229,12 @@ func TestClusterCacheHealthCheck(t *testing.T) { config := rest.CopyConfig(env.Config) config.Host = fmt.Sprintf("http://127.0.0.1:%d", l.Addr().(*net.TCPAddr).Port) - httpClient, err := rest.HTTPClientFor(env.Config) + restClient, err := getRESTClient(config) g.Expect(err).ToNot(HaveOccurred()) + go cct.healthCheckCluster(ctx, &healthCheckInput{ cluster: testClusterKey, - cfg: config, - httpClient: httpClient, + restClient: restClient, interval: testPollInterval, requestTimeout: testPollTimeout, unhealthyThreshold: testUnhealthyThreshold, @@ -248,3 +249,15 @@ func TestClusterCacheHealthCheck(t *testing.T) { }) }) } + +func getRESTClient(config *rest.Config) (*rest.RESTClient, error) { + httpClient, err := rest.HTTPClientFor(config) + if err != nil { + return nil, err + } + + codec := runtime.NoopEncoder{Decoder: scheme.Codecs.UniversalDecoder()} + restClientConfig := rest.CopyConfig(config) + restClientConfig.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(runtime.SerializerInfo{Serializer: codec}) + return rest.UnversionedRESTClientForConfigAndClient(restClientConfig, httpClient) +} diff --git a/controllers/remote/cluster_cache_tracker.go b/controllers/remote/cluster_cache_tracker.go index 4a7b71a47c7e..22a46b2b6260 100644 --- a/controllers/remote/cluster_cache_tracker.go +++ b/controllers/remote/cluster_cache_tracker.go @@ -183,7 +183,7 @@ func NewClusterCacheTracker(manager ctrl.Manager, options ClusterCacheTrackerOpt func (t *ClusterCacheTracker) GetClient(ctx context.Context, cluster client.ObjectKey) (client.Client, error) { accessor, err := t.getClusterAccessor(ctx, cluster) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "failed to get client") } return accessor.client, nil @@ -198,7 +198,7 @@ func (t *ClusterCacheTracker) GetReader(ctx context.Context, cluster client.Obje func (t *ClusterCacheTracker) GetRESTConfig(ctc context.Context, cluster client.ObjectKey) (*rest.Config, error) { accessor, err := t.getClusterAccessor(ctc, cluster) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "failed to get REST config") } return accessor.config, nil @@ -208,7 +208,7 @@ func (t *ClusterCacheTracker) GetRESTConfig(ctc context.Context, cluster client. func (t *ClusterCacheTracker) GetEtcdClientCertificateKey(ctx context.Context, cluster client.ObjectKey) (*rsa.PrivateKey, error) { accessor, err := t.getClusterAccessor(ctx, cluster) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "failed to get etcd client certificate key") } return accessor.etcdClientCertificateKey, nil @@ -267,7 +267,7 @@ func (t *ClusterCacheTracker) getClusterAccessor(ctx context.Context, cluster cl // for the cluster at the same time. // Return an error if another go routine already tries to create a clusterAccessor. if ok := t.clusterLock.TryLock(cluster); !ok { - return nil, errors.Wrapf(ErrClusterLocked, "failed to create cluster accessor: failed to get lock for cluster") + return nil, errors.Wrapf(ErrClusterLocked, "failed to create cluster accessor: failed to get lock for cluster (probably because another worker is trying to create the client at the moment)") } defer t.clusterLock.Unlock(cluster) @@ -305,7 +305,7 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl } // Create a http client and a mapper for the cluster. - httpClient, mapper, err := t.createHTTPClientAndMapper(config, cluster) + httpClient, mapper, restClient, err := t.createHTTPClientAndMapper(ctx, config, cluster) if err != nil { return nil, errors.Wrapf(err, "error creating http client and mapper for remote cluster %q", cluster.String()) } @@ -337,7 +337,7 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl config.Host = inClusterConfig.Host // Update the http client and the mapper to use in-cluster config. - httpClient, mapper, err = t.createHTTPClientAndMapper(config, cluster) + httpClient, mapper, restClient, err = t.createHTTPClientAndMapper(ctx, config, cluster) if err != nil { return nil, errors.Wrapf(err, "error creating http client and mapper (using in-cluster config) for remote cluster %q", cluster.String()) } @@ -348,7 +348,7 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl } // Create a client and a cache for the cluster. - cachedClient, err := t.createCachedClient(ctx, config, cluster, httpClient, mapper) + cachedClient, err := t.createCachedClient(ctx, config, cluster, httpClient, restClient, mapper) if err != nil { return nil, err } @@ -397,28 +397,40 @@ func (t *ClusterCacheTracker) runningOnWorkloadCluster(ctx context.Context, c cl } // createHTTPClientAndMapper creates a http client and a dynamic rest mapper for the given cluster, based on the rest.Config. -func (t *ClusterCacheTracker) createHTTPClientAndMapper(config *rest.Config, cluster client.ObjectKey) (*http.Client, meta.RESTMapper, error) { +func (t *ClusterCacheTracker) createHTTPClientAndMapper(ctx context.Context, config *rest.Config, cluster client.ObjectKey) (*http.Client, meta.RESTMapper, *rest.RESTClient, error) { // Create a http client for the cluster. httpClient, err := rest.HTTPClientFor(config) if err != nil { - return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating http client", cluster.String()) + return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating http client", cluster.String()) } // Create a mapper for it mapper, err := apiutil.NewDynamicRESTMapper(config, httpClient) if err != nil { - return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating dynamic rest mapper", cluster.String()) + return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating dynamic rest mapper", cluster.String()) + } + + // Create a REST client for the cluster (this is later used for health checking as well). + codec := runtime.NoopEncoder{Decoder: scheme.Codecs.UniversalDecoder()} + restClientConfig := rest.CopyConfig(config) + restClientConfig.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(runtime.SerializerInfo{Serializer: codec}) + restClient, err := rest.UnversionedRESTClientForConfigAndClient(restClientConfig, httpClient) + if err != nil { + return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating REST client", cluster.String()) + } + + // Note: This checks if the apiserver is up. We do this already here to produce a clearer error message if the cluster is unreachable. + if _, err := restClient.Get().AbsPath("/").Timeout(healthCheckRequestTimeout).DoRaw(ctx); err != nil { + return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: cluster is not reachable", cluster.String()) } // Verify if we can get a rest mapping from the workload cluster apiserver. - // Note: This also checks if the apiserver is up in general. We do this already here - // to avoid further effort creating a cache and a client and to produce a clearer error message. _, err = mapper.RESTMapping(corev1.SchemeGroupVersion.WithKind("Node").GroupKind(), corev1.SchemeGroupVersion.Version) if err != nil { - return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error getting rest mapping", cluster.String()) + return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error getting rest mapping", cluster.String()) } - return httpClient, mapper, nil + return httpClient, mapper, restClient, nil } // createUncachedClient creates an uncached client for the given cluster, based on the rest.Config. @@ -442,7 +454,7 @@ type cachedClientOutput struct { } // createCachedClient creates a cached client for the given cluster, based on a rest.Config. -func (t *ClusterCacheTracker) createCachedClient(ctx context.Context, config *rest.Config, cluster client.ObjectKey, httpClient *http.Client, mapper meta.RESTMapper) (*cachedClientOutput, error) { +func (t *ClusterCacheTracker) createCachedClient(ctx context.Context, config *rest.Config, cluster client.ObjectKey, httpClient *http.Client, restClient *rest.RESTClient, mapper meta.RESTMapper) (*cachedClientOutput, error) { // Create the cache for the remote cluster cacheOptions := cache.Options{ HTTPClient: httpClient, @@ -504,8 +516,7 @@ func (t *ClusterCacheTracker) createCachedClient(ctx context.Context, config *re // Start cluster healthcheck!!! go t.healthCheckCluster(cacheCtx, &healthCheckInput{ cluster: cluster, - cfg: config, - httpClient: httpClient, + restClient: restClient, }) return &cachedClientOutput{ @@ -568,13 +579,13 @@ func (t *ClusterCacheTracker) Watch(ctx context.Context, input WatchInput) error accessor, err := t.getClusterAccessor(ctx, input.Cluster) if err != nil { - return errors.Wrapf(err, "failed to add %s watch on cluster %s", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name)) + return errors.Wrapf(err, "failed to add %T watch on cluster %s", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name)) } // We have to lock the cluster, so that the watch is not created multiple times in parallel. ok := t.clusterLock.TryLock(input.Cluster) if !ok { - return errors.Wrapf(ErrClusterLocked, "failed to add %T watch on cluster %s: failed to get lock for cluster", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name)) + return errors.Wrapf(ErrClusterLocked, "failed to add %T watch on cluster %s: failed to get lock for cluster (probably because another worker is trying to create the client at the moment)", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name)) } defer t.clusterLock.Unlock(input.Cluster) @@ -586,7 +597,7 @@ func (t *ClusterCacheTracker) Watch(ctx context.Context, input WatchInput) error // Need to create the watch if err := input.Watcher.Watch(source.Kind(accessor.cache, input.Kind, input.EventHandler, input.Predicates...)); err != nil { - return errors.Wrapf(err, "failed to add %s watch on cluster %s: failed to create watch", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name)) + return errors.Wrapf(err, "failed to add %T watch on cluster %s: failed to create watch", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name)) } accessor.watches.Insert(input.Name) @@ -597,8 +608,7 @@ func (t *ClusterCacheTracker) Watch(ctx context.Context, input WatchInput) error // healthCheckInput provides the input for the healthCheckCluster method. type healthCheckInput struct { cluster client.ObjectKey - httpClient *http.Client - cfg *rest.Config + restClient *rest.RESTClient interval time.Duration requestTimeout time.Duration unhealthyThreshold int @@ -630,18 +640,7 @@ func (t *ClusterCacheTracker) healthCheckCluster(ctx context.Context, in *health unhealthyCount := 0 - // This gets us a client that can make raw http(s) calls to the remote apiserver. We only need to create it once - // and we can reuse it inside the polling loop. - codec := runtime.NoopEncoder{Decoder: scheme.Codecs.UniversalDecoder()} - cfg := rest.CopyConfig(in.cfg) - cfg.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(runtime.SerializerInfo{Serializer: codec}) - restClient, restClientErr := rest.UnversionedRESTClientForConfigAndClient(cfg, in.httpClient) - runHealthCheckWithThreshold := func(ctx context.Context) (bool, error) { - if restClientErr != nil { - return false, restClientErr - } - cluster := &clusterv1.Cluster{} if err := t.client.Get(ctx, in.cluster, cluster); err != nil { if apierrors.IsNotFound(err) { @@ -672,7 +671,7 @@ func (t *ClusterCacheTracker) healthCheckCluster(ctx context.Context, in *health // An error here means there was either an issue connecting or the API returned an error. // If no error occurs, reset the unhealthy counter. - _, err := restClient.Get().AbsPath(in.path).Timeout(in.requestTimeout).DoRaw(ctx) + _, err := in.restClient.Get().AbsPath(in.path).Timeout(in.requestTimeout).DoRaw(ctx) if err != nil { if apierrors.IsUnauthorized(err) { // Unauthorized means that the underlying kubeconfig is not authorizing properly anymore, which