Skip to content

Commit

Permalink
Merge pull request kubernetes-sigs#10826 from sbueringer/pr-improv-cc…
Browse files Browse the repository at this point in the history
…t-error-logging

🌱 Improve CCT error logging
  • Loading branch information
k8s-ci-robot authored Jul 3, 2024
2 parents fd94039 + 7f01628 commit afdcb4c
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 46 deletions.
37 changes: 25 additions & 12 deletions controllers/remote/cluster_cache_healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -132,13 +134,12 @@ func TestClusterCacheHealthCheck(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

httpClient, err := rest.HTTPClientFor(env.Config)
restClient, err := getRESTClient(env.Config)
g.Expect(err).ToNot(HaveOccurred())

go cct.healthCheckCluster(ctx, &healthCheckInput{
cluster: testClusterKey,
cfg: env.Config,
httpClient: httpClient,
restClient: restClient,
interval: testPollInterval,
requestTimeout: testPollTimeout,
unhealthyThreshold: testUnhealthyThreshold,
Expand All @@ -164,12 +165,12 @@ func TestClusterCacheHealthCheck(t *testing.T) {
g.Expect(cct.clusterLock.TryLock(testClusterKey)).To(BeTrue())
startHealthCheck := time.Now()

httpClient, err := rest.HTTPClientFor(env.Config)
restClient, err := getRESTClient(env.Config)
g.Expect(err).ToNot(HaveOccurred())

cct.healthCheckCluster(ctx, &healthCheckInput{
cluster: testClusterKey,
cfg: env.Config,
httpClient: httpClient,
restClient: restClient,
interval: testPollInterval,
requestTimeout: testPollTimeout,
unhealthyThreshold: testUnhealthyThreshold,
Expand All @@ -190,13 +191,13 @@ func TestClusterCacheHealthCheck(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

httpClient, err := rest.HTTPClientFor(env.Config)
restClient, err := getRESTClient(env.Config)
g.Expect(err).ToNot(HaveOccurred())

go cct.healthCheckCluster(ctx,
&healthCheckInput{
cluster: testClusterKey,
cfg: env.Config,
httpClient: httpClient,
restClient: restClient,
interval: testPollInterval,
requestTimeout: testPollTimeout,
unhealthyThreshold: testUnhealthyThreshold,
Expand Down Expand Up @@ -228,12 +229,12 @@ func TestClusterCacheHealthCheck(t *testing.T) {
config := rest.CopyConfig(env.Config)
config.Host = fmt.Sprintf("http://127.0.0.1:%d", l.Addr().(*net.TCPAddr).Port)

httpClient, err := rest.HTTPClientFor(env.Config)
restClient, err := getRESTClient(config)
g.Expect(err).ToNot(HaveOccurred())

go cct.healthCheckCluster(ctx, &healthCheckInput{
cluster: testClusterKey,
cfg: config,
httpClient: httpClient,
restClient: restClient,
interval: testPollInterval,
requestTimeout: testPollTimeout,
unhealthyThreshold: testUnhealthyThreshold,
Expand All @@ -248,3 +249,15 @@ func TestClusterCacheHealthCheck(t *testing.T) {
})
})
}

func getRESTClient(config *rest.Config) (*rest.RESTClient, error) {
httpClient, err := rest.HTTPClientFor(config)
if err != nil {
return nil, err
}

codec := runtime.NoopEncoder{Decoder: scheme.Codecs.UniversalDecoder()}
restClientConfig := rest.CopyConfig(config)
restClientConfig.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(runtime.SerializerInfo{Serializer: codec})
return rest.UnversionedRESTClientForConfigAndClient(restClientConfig, httpClient)
}
67 changes: 33 additions & 34 deletions controllers/remote/cluster_cache_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func NewClusterCacheTracker(manager ctrl.Manager, options ClusterCacheTrackerOpt
func (t *ClusterCacheTracker) GetClient(ctx context.Context, cluster client.ObjectKey) (client.Client, error) {
accessor, err := t.getClusterAccessor(ctx, cluster)
if err != nil {
return nil, err
return nil, errors.Wrapf(err, "failed to get client")
}

return accessor.client, nil
Expand All @@ -198,7 +198,7 @@ func (t *ClusterCacheTracker) GetReader(ctx context.Context, cluster client.Obje
func (t *ClusterCacheTracker) GetRESTConfig(ctc context.Context, cluster client.ObjectKey) (*rest.Config, error) {
accessor, err := t.getClusterAccessor(ctc, cluster)
if err != nil {
return nil, err
return nil, errors.Wrapf(err, "failed to get REST config")
}

return accessor.config, nil
Expand All @@ -208,7 +208,7 @@ func (t *ClusterCacheTracker) GetRESTConfig(ctc context.Context, cluster client.
func (t *ClusterCacheTracker) GetEtcdClientCertificateKey(ctx context.Context, cluster client.ObjectKey) (*rsa.PrivateKey, error) {
accessor, err := t.getClusterAccessor(ctx, cluster)
if err != nil {
return nil, err
return nil, errors.Wrapf(err, "failed to get etcd client certificate key")
}

return accessor.etcdClientCertificateKey, nil
Expand Down Expand Up @@ -267,7 +267,7 @@ func (t *ClusterCacheTracker) getClusterAccessor(ctx context.Context, cluster cl
// for the cluster at the same time.
// Return an error if another go routine already tries to create a clusterAccessor.
if ok := t.clusterLock.TryLock(cluster); !ok {
return nil, errors.Wrapf(ErrClusterLocked, "failed to create cluster accessor: failed to get lock for cluster")
return nil, errors.Wrapf(ErrClusterLocked, "failed to create cluster accessor: failed to get lock for cluster (probably because another worker is trying to create the client at the moment)")
}
defer t.clusterLock.Unlock(cluster)

Expand Down Expand Up @@ -305,7 +305,7 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
}

// Create a http client and a mapper for the cluster.
httpClient, mapper, err := t.createHTTPClientAndMapper(config, cluster)
httpClient, mapper, restClient, err := t.createHTTPClientAndMapper(ctx, config, cluster)
if err != nil {
return nil, errors.Wrapf(err, "error creating http client and mapper for remote cluster %q", cluster.String())
}
Expand Down Expand Up @@ -337,7 +337,7 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
config.Host = inClusterConfig.Host

// Update the http client and the mapper to use in-cluster config.
httpClient, mapper, err = t.createHTTPClientAndMapper(config, cluster)
httpClient, mapper, restClient, err = t.createHTTPClientAndMapper(ctx, config, cluster)
if err != nil {
return nil, errors.Wrapf(err, "error creating http client and mapper (using in-cluster config) for remote cluster %q", cluster.String())
}
Expand All @@ -348,7 +348,7 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
}

// Create a client and a cache for the cluster.
cachedClient, err := t.createCachedClient(ctx, config, cluster, httpClient, mapper)
cachedClient, err := t.createCachedClient(ctx, config, cluster, httpClient, restClient, mapper)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -397,28 +397,40 @@ func (t *ClusterCacheTracker) runningOnWorkloadCluster(ctx context.Context, c cl
}

// createHTTPClientAndMapper creates a http client and a dynamic rest mapper for the given cluster, based on the rest.Config.
func (t *ClusterCacheTracker) createHTTPClientAndMapper(config *rest.Config, cluster client.ObjectKey) (*http.Client, meta.RESTMapper, error) {
func (t *ClusterCacheTracker) createHTTPClientAndMapper(ctx context.Context, config *rest.Config, cluster client.ObjectKey) (*http.Client, meta.RESTMapper, *rest.RESTClient, error) {
// Create a http client for the cluster.
httpClient, err := rest.HTTPClientFor(config)
if err != nil {
return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating http client", cluster.String())
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating http client", cluster.String())
}

// Create a mapper for it
mapper, err := apiutil.NewDynamicRESTMapper(config, httpClient)
if err != nil {
return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating dynamic rest mapper", cluster.String())
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating dynamic rest mapper", cluster.String())
}

// Create a REST client for the cluster (this is later used for health checking as well).
codec := runtime.NoopEncoder{Decoder: scheme.Codecs.UniversalDecoder()}
restClientConfig := rest.CopyConfig(config)
restClientConfig.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(runtime.SerializerInfo{Serializer: codec})
restClient, err := rest.UnversionedRESTClientForConfigAndClient(restClientConfig, httpClient)
if err != nil {
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating REST client", cluster.String())
}

// Note: This checks if the apiserver is up. We do this already here to produce a clearer error message if the cluster is unreachable.
if _, err := restClient.Get().AbsPath("/").Timeout(healthCheckRequestTimeout).DoRaw(ctx); err != nil {
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: cluster is not reachable", cluster.String())
}

// Verify if we can get a rest mapping from the workload cluster apiserver.
// Note: This also checks if the apiserver is up in general. We do this already here
// to avoid further effort creating a cache and a client and to produce a clearer error message.
_, err = mapper.RESTMapping(corev1.SchemeGroupVersion.WithKind("Node").GroupKind(), corev1.SchemeGroupVersion.Version)
if err != nil {
return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error getting rest mapping", cluster.String())
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error getting rest mapping", cluster.String())
}

return httpClient, mapper, nil
return httpClient, mapper, restClient, nil
}

// createUncachedClient creates an uncached client for the given cluster, based on the rest.Config.
Expand All @@ -442,7 +454,7 @@ type cachedClientOutput struct {
}

// createCachedClient creates a cached client for the given cluster, based on a rest.Config.
func (t *ClusterCacheTracker) createCachedClient(ctx context.Context, config *rest.Config, cluster client.ObjectKey, httpClient *http.Client, mapper meta.RESTMapper) (*cachedClientOutput, error) {
func (t *ClusterCacheTracker) createCachedClient(ctx context.Context, config *rest.Config, cluster client.ObjectKey, httpClient *http.Client, restClient *rest.RESTClient, mapper meta.RESTMapper) (*cachedClientOutput, error) {
// Create the cache for the remote cluster
cacheOptions := cache.Options{
HTTPClient: httpClient,
Expand Down Expand Up @@ -504,8 +516,7 @@ func (t *ClusterCacheTracker) createCachedClient(ctx context.Context, config *re
// Start cluster healthcheck!!!
go t.healthCheckCluster(cacheCtx, &healthCheckInput{
cluster: cluster,
cfg: config,
httpClient: httpClient,
restClient: restClient,
})

return &cachedClientOutput{
Expand Down Expand Up @@ -568,13 +579,13 @@ func (t *ClusterCacheTracker) Watch(ctx context.Context, input WatchInput) error

accessor, err := t.getClusterAccessor(ctx, input.Cluster)
if err != nil {
return errors.Wrapf(err, "failed to add %s watch on cluster %s", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name))
return errors.Wrapf(err, "failed to add %T watch on cluster %s", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name))
}

// We have to lock the cluster, so that the watch is not created multiple times in parallel.
ok := t.clusterLock.TryLock(input.Cluster)
if !ok {
return errors.Wrapf(ErrClusterLocked, "failed to add %T watch on cluster %s: failed to get lock for cluster", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name))
return errors.Wrapf(ErrClusterLocked, "failed to add %T watch on cluster %s: failed to get lock for cluster (probably because another worker is trying to create the client at the moment)", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name))
}
defer t.clusterLock.Unlock(input.Cluster)

Expand All @@ -586,7 +597,7 @@ func (t *ClusterCacheTracker) Watch(ctx context.Context, input WatchInput) error

// Need to create the watch
if err := input.Watcher.Watch(source.Kind(accessor.cache, input.Kind, input.EventHandler, input.Predicates...)); err != nil {
return errors.Wrapf(err, "failed to add %s watch on cluster %s: failed to create watch", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name))
return errors.Wrapf(err, "failed to add %T watch on cluster %s: failed to create watch", input.Kind, klog.KRef(input.Cluster.Namespace, input.Cluster.Name))
}

accessor.watches.Insert(input.Name)
Expand All @@ -597,8 +608,7 @@ func (t *ClusterCacheTracker) Watch(ctx context.Context, input WatchInput) error
// healthCheckInput provides the input for the healthCheckCluster method.
type healthCheckInput struct {
cluster client.ObjectKey
httpClient *http.Client
cfg *rest.Config
restClient *rest.RESTClient
interval time.Duration
requestTimeout time.Duration
unhealthyThreshold int
Expand Down Expand Up @@ -630,18 +640,7 @@ func (t *ClusterCacheTracker) healthCheckCluster(ctx context.Context, in *health

unhealthyCount := 0

// This gets us a client that can make raw http(s) calls to the remote apiserver. We only need to create it once
// and we can reuse it inside the polling loop.
codec := runtime.NoopEncoder{Decoder: scheme.Codecs.UniversalDecoder()}
cfg := rest.CopyConfig(in.cfg)
cfg.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(runtime.SerializerInfo{Serializer: codec})
restClient, restClientErr := rest.UnversionedRESTClientForConfigAndClient(cfg, in.httpClient)

runHealthCheckWithThreshold := func(ctx context.Context) (bool, error) {
if restClientErr != nil {
return false, restClientErr
}

cluster := &clusterv1.Cluster{}
if err := t.client.Get(ctx, in.cluster, cluster); err != nil {
if apierrors.IsNotFound(err) {
Expand Down Expand Up @@ -672,7 +671,7 @@ func (t *ClusterCacheTracker) healthCheckCluster(ctx context.Context, in *health

// An error here means there was either an issue connecting or the API returned an error.
// If no error occurs, reset the unhealthy counter.
_, err := restClient.Get().AbsPath(in.path).Timeout(in.requestTimeout).DoRaw(ctx)
_, err := in.restClient.Get().AbsPath(in.path).Timeout(in.requestTimeout).DoRaw(ctx)
if err != nil {
if apierrors.IsUnauthorized(err) {
// Unauthorized means that the underlying kubeconfig is not authorizing properly anymore, which
Expand Down

0 comments on commit afdcb4c

Please sign in to comment.