diff --git a/config/manager/manager.yaml b/config/manager/manager.yaml index 8590dfa92ad1..9fd19a385afd 100644 --- a/config/manager/manager.yaml +++ b/config/manager/manager.yaml @@ -25,6 +25,19 @@ spec: - "--feature-gates=MachinePool=${EXP_MACHINE_POOL:=false},ClusterResourceSet=${EXP_CLUSTER_RESOURCE_SET:=false},ClusterTopology=${CLUSTER_TOPOLOGY:=false},RuntimeSDK=${EXP_RUNTIME_SDK:=false}" image: controller:latest name: manager + env: + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: POD_UID + valueFrom: + fieldRef: + fieldPath: metadata.uid ports: - containerPort: 9440 name: healthz diff --git a/controllers/remote/cluster_cache_reconciler.go b/controllers/remote/cluster_cache_reconciler.go index 0b33ef0821d1..748778c63021 100644 --- a/controllers/remote/cluster_cache_reconciler.go +++ b/controllers/remote/cluster_cache_reconciler.go @@ -73,7 +73,7 @@ func (r *ClusterCacheReconciler) Reconcile(ctx context.Context, req reconcile.Re log.V(2).Info("Cluster no longer exists") - r.Tracker.deleteAccessor(req.NamespacedName) + r.Tracker.deleteAccessor(ctx, req.NamespacedName) return reconcile.Result{}, nil } diff --git a/controllers/remote/cluster_cache.go b/controllers/remote/cluster_cache_tracker.go similarity index 70% rename from controllers/remote/cluster_cache.go rename to controllers/remote/cluster_cache_tracker.go index 92f141855e54..51e75cada756 100644 --- a/controllers/remote/cluster_cache.go +++ b/controllers/remote/cluster_cache_tracker.go @@ -19,6 +19,7 @@ package remote import ( "context" "fmt" + "os" "sync" "time" @@ -26,8 +27,11 @@ import ( "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes/scheme" @@ -62,6 +66,12 @@ type ClusterCacheTracker struct { lock sync.RWMutex clusterAccessors map[client.ObjectKey]*clusterAccessor indexes []Index + + // controllerPodMetadata is the Pod metadata of the controller using this ClusterCacheTracker. + // This is only set when the POD_NAMESPACE, POD_NAME and POD_UID environment variables are set. + // This information will be used to detected if the controller is running on a workload cluster, so + // that we can then access the apiserver directly. + controllerPodMetadata *metav1.ObjectMeta } // ClusterCacheTrackerOptions defines options to configure @@ -96,7 +106,23 @@ func setDefaultOptions(opts *ClusterCacheTrackerOptions) { func NewClusterCacheTracker(manager ctrl.Manager, options ClusterCacheTrackerOptions) (*ClusterCacheTracker, error) { setDefaultOptions(&options) + var controllerPodMetadata *metav1.ObjectMeta + podNamespace := os.Getenv("POD_NAMESPACE") + podName := os.Getenv("POD_NAME") + podUID := os.Getenv("POD_UID") + if podNamespace != "" && podName != "" && podUID != "" { + options.Log.Info("Found controller pod metadata, the ClusterCacheTracker will try to access the cluster directly when possible") + controllerPodMetadata = &metav1.ObjectMeta{ + Namespace: podNamespace, + Name: podName, + UID: types.UID(podUID), + } + } else { + options.Log.Info("Couldn't find controller pod metadata, the ClusterCacheTracker will always access clusters using the regular apiserver endpoint") + } + return &ClusterCacheTracker{ + controllerPodMetadata: controllerPodMetadata, log: *options.Log, clientUncachedObjects: options.ClientUncachedObjects, client: manager.GetClient(), @@ -119,11 +145,25 @@ func (t *ClusterCacheTracker) GetClient(ctx context.Context, cluster client.Obje return accessor.client, nil } +// GetRESTConfig returns a cached REST config for the given cluster. +func (t *ClusterCacheTracker) GetRESTConfig(ctc context.Context, cluster client.ObjectKey) (*rest.Config, error) { + t.lock.Lock() + defer t.lock.Unlock() + + accessor, err := t.getClusterAccessorLH(ctc, cluster, t.indexes...) + if err != nil { + return nil, err + } + + return accessor.config, nil +} + // clusterAccessor represents the combination of a delegating client, cache, and watches for a remote cluster. type clusterAccessor struct { cache *stoppableCache client client.Client watches sets.String + config *rest.Config } // clusterAccessorExists returns true if a clusterAccessor exists for cluster. @@ -155,22 +195,47 @@ func (t *ClusterCacheTracker) getClusterAccessorLH(ctx context.Context, cluster // newClusterAccessor creates a new clusterAccessor. func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster client.ObjectKey, indexes ...Index) (*clusterAccessor, error) { + log := ctrl.LoggerFrom(ctx) + // Get a rest config for the remote cluster config, err := RESTConfig(ctx, clusterCacheControllerName, t.client, cluster) if err != nil { return nil, errors.Wrapf(err, "error fetching REST client config for remote cluster %q", cluster.String()) } - // Create a mapper for it - mapper, err := apiutil.NewDynamicRESTMapper(config) + // Create a client and a mapper for the cluster. + c, mapper, err := t.createClient(config, cluster) if err != nil { - return nil, errors.Wrapf(err, "error creating dynamic rest mapper for remote cluster %q", cluster.String()) + return nil, err } - // Create the client for the remote cluster - c, err := client.New(config, client.Options{Scheme: t.scheme, Mapper: mapper}) + // Detect if the controller is running on the workload cluster. + runningOnCluster, err := t.runningOnWorkloadCluster(ctx, c, cluster) if err != nil { - return nil, errors.Wrapf(err, "error creating client for remote cluster %q", cluster.String()) + return nil, err + } + + // If the controller runs on the workload cluster, access the apiserver directly by using the + // CA and Host from the in-cluster configuration. + if runningOnCluster { + inClusterConfig, err := ctrl.GetConfig() + if err != nil { + return nil, errors.Wrap(err, "error creating client for self-hosted cluster") + } + + // Use CA and Host from in-cluster config. + config.CAData = nil + config.CAFile = inClusterConfig.CAFile + config.Host = inClusterConfig.Host + + // Create a new client and overwrite the previously created client. + c, mapper, err = t.createClient(config, cluster) + if err != nil { + return nil, errors.Wrap(err, "error creating client for self-hosted cluster") + } + log.Info(fmt.Sprintf("Creating cluster accessor for cluster %q with in-cluster service %q", cluster.String(), config.Host)) + } else { + log.Info(fmt.Sprintf("Creating cluster accessor for cluster %q with the regular apiserver endpoint %q", cluster.String(), config.Host)) } // Create the cache for the remote cluster @@ -220,13 +285,59 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl return &clusterAccessor{ cache: cache, + config: config, client: delegatingClient, watches: sets.NewString(), }, nil } +// runningOnWorkloadCluster detects if the current controller runs on the workload cluster. +func (t *ClusterCacheTracker) runningOnWorkloadCluster(ctx context.Context, c client.Client, cluster client.ObjectKey) (bool, error) { + // Controller Pod metadata was not found, so we can't detect if we run on the workload cluster. + if t.controllerPodMetadata == nil { + return false, nil + } + + // Try to get the controller pod. + var pod corev1.Pod + if err := c.Get(ctx, client.ObjectKey{ + Namespace: t.controllerPodMetadata.Namespace, + Name: t.controllerPodMetadata.Name, + }, &pod); err != nil { + // If the controller pod is not found, we assume we are not running on the workload cluster. + if apierrors.IsNotFound(err) { + return false, nil + } + + // If we got another error, we return the error so that this will be retried later. + return false, errors.Wrapf(err, "error checking if we're running on workload cluster %q", cluster.String()) + } + + // If the uid is the same we found the controller pod on the workload cluster. + return t.controllerPodMetadata.UID == pod.UID, nil +} + +// createClient creates a client and a mapper based on a rest.Config. +func (t *ClusterCacheTracker) createClient(config *rest.Config, cluster client.ObjectKey) (client.Client, meta.RESTMapper, error) { + // Create a mapper for it + mapper, err := apiutil.NewDynamicRESTMapper(config) + if err != nil { + return nil, nil, errors.Wrapf(err, "error creating dynamic rest mapper for remote cluster %q", cluster.String()) + } + + // Create the client for the remote cluster + c, err := client.New(config, client.Options{Scheme: t.scheme, Mapper: mapper}) + if err != nil { + return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q", cluster.String()) + } + + return c, mapper, nil +} + // deleteAccessor stops a clusterAccessor's cache and removes the clusterAccessor from the tracker. -func (t *ClusterCacheTracker) deleteAccessor(cluster client.ObjectKey) { +func (t *ClusterCacheTracker) deleteAccessor(ctx context.Context, cluster client.ObjectKey) { + log := ctrl.LoggerFrom(ctx) + t.lock.Lock() defer t.lock.Unlock() @@ -235,11 +346,11 @@ func (t *ClusterCacheTracker) deleteAccessor(cluster client.ObjectKey) { return } - t.log.V(2).Info("Deleting clusterAccessor", "cluster", cluster.String()) + log.V(2).Info("Deleting clusterAccessor", "cluster", cluster.String()) - t.log.V(4).Info("Stopping cache", "cluster", cluster.String()) + log.V(4).Info("Stopping cache", "cluster", cluster.String()) a.cache.Stop() - t.log.V(4).Info("Cache stopped", "cluster", cluster.String()) + log.V(4).Info("Cache stopped", "cluster", cluster.String()) delete(t.clusterAccessors, cluster) } @@ -286,7 +397,8 @@ func (t *ClusterCacheTracker) Watch(ctx context.Context, input WatchInput) error } if a.watches.Has(input.Name) { - t.log.V(6).Info("Watch already exists", "namespace", input.Cluster.Namespace, "cluster", input.Cluster.Name, "name", input.Name) + log := ctrl.LoggerFrom(ctx) + log.V(6).Info("Watch already exists", "namespace", input.Cluster.Namespace, "cluster", input.Cluster.Name, "name", input.Name) return nil } @@ -391,7 +503,8 @@ func (t *ClusterCacheTracker) healthCheckCluster(ctx context.Context, in *health // NB. we are ignoring ErrWaitTimeout because this error happens when the channel is close, that in this case // happens when the cache is explicitly stopped. if err != nil && err != wait.ErrWaitTimeout { - t.log.Error(err, "Error health checking cluster", "cluster", in.cluster.String()) - t.deleteAccessor(in.cluster) + log := ctrl.LoggerFrom(ctx) + log.Error(err, "Error health checking cluster", "cluster", in.cluster.String()) + t.deleteAccessor(ctx, in.cluster) } } diff --git a/controllers/remote/cluster_cache_fake.go b/controllers/remote/cluster_cache_tracker_fake.go similarity index 100% rename from controllers/remote/cluster_cache_fake.go rename to controllers/remote/cluster_cache_tracker_fake.go diff --git a/controllers/remote/cluster_cache_tracker_test.go b/controllers/remote/cluster_cache_tracker_test.go index 071f773ca99f..52f25346f755 100644 --- a/controllers/remote/cluster_cache_tracker_test.go +++ b/controllers/remote/cluster_cache_tracker_test.go @@ -28,6 +28,7 @@ import ( "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -186,6 +187,128 @@ func TestClusterCacheTracker(t *testing.T) { g.Consistently(c.ch).ShouldNot(Receive()) }) }) + + t.Run("runningOnWorkloadCluster", func(t *testing.T) { + tests := []struct { + name string + currentControllerMetadata *metav1.ObjectMeta + clusterObjects []client.Object + expected bool + }{ + { + name: "should return true if the controller is running on the workload cluster", + currentControllerMetadata: &metav1.ObjectMeta{ + Name: "controller-pod", + Namespace: "controller-pod-namespace", + UID: types.UID("controller-pod-uid"), + }, + clusterObjects: []client.Object{ + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "controller-pod", + Namespace: "controller-pod-namespace", + UID: types.UID("controller-pod-uid"), + }, + }, + }, + expected: true, + }, + { + name: "should return false if the controller is not running on the workload cluster: name mismatch", + currentControllerMetadata: &metav1.ObjectMeta{ + Name: "controller-pod", + Namespace: "controller-pod-namespace", + UID: types.UID("controller-pod-uid"), + }, + clusterObjects: []client.Object{ + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "controller-pod-mismatch", + Namespace: "controller-pod-namespace", + UID: types.UID("controller-pod-uid"), + }, + }, + }, + expected: false, + }, + { + name: "should return false if the controller is not running on the workload cluster: namespace mismatch", + currentControllerMetadata: &metav1.ObjectMeta{ + Name: "controller-pod", + Namespace: "controller-pod-namespace", + UID: types.UID("controller-pod-uid"), + }, + clusterObjects: []client.Object{ + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "controller-pod", + Namespace: "controller-pod-namespace-mismatch", + UID: types.UID("controller-pod-uid"), + }, + }, + }, + expected: false, + }, + { + name: "should return false if the controller is not running on the workload cluster: uid mismatch", + currentControllerMetadata: &metav1.ObjectMeta{ + Name: "controller-pod", + Namespace: "controller-pod-namespace", + UID: types.UID("controller-pod-uid"), + }, + clusterObjects: []client.Object{ + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "controller-pod", + Namespace: "controller-pod-namespace", + UID: types.UID("controller-pod-uid-mismatch"), + }, + }, + }, + expected: false, + }, + { + name: "should return false if the controller is not running on the workload cluster: no pod in cluster", + currentControllerMetadata: &metav1.ObjectMeta{ + Name: "controller-pod", + Namespace: "controller-pod-namespace", + UID: types.UID("controller-pod-uid"), + }, + clusterObjects: []client.Object{}, + expected: false, + }, + { + name: "should return false if the controller is not running on the workload cluster: no controller metadata", + currentControllerMetadata: nil, + clusterObjects: []client.Object{ + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "controller-pod", + Namespace: "controller-pod-namespace", + UID: types.UID("controller-pod-uid"), + }, + }, + }, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + + c := fake.NewClientBuilder().WithObjects(tt.clusterObjects...).Build() + + cct := &ClusterCacheTracker{ + controllerPodMetadata: tt.currentControllerMetadata, + } + + found, err := cct.runningOnWorkloadCluster(ctx, c, client.ObjectKey{Name: "test-cluster", Namespace: "test-namespace"}) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(found).To(Equal(tt.expected)) + }) + } + }) } type testController struct { diff --git a/controlplane/kubeadm/config/manager/manager.yaml b/controlplane/kubeadm/config/manager/manager.yaml index baa32cd401af..1566e010a702 100644 --- a/controlplane/kubeadm/config/manager/manager.yaml +++ b/controlplane/kubeadm/config/manager/manager.yaml @@ -24,6 +24,19 @@ spec: - "--feature-gates=ClusterTopology=${CLUSTER_TOPOLOGY:=false},KubeadmBootstrapFormatIgnition=${EXP_KUBEADM_BOOTSTRAP_FORMAT_IGNITION:=false}" image: controller:latest name: manager + env: + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: POD_UID + valueFrom: + fieldRef: + fieldPath: metadata.uid ports: - containerPort: 9440 name: healthz diff --git a/controlplane/kubeadm/internal/cluster.go b/controlplane/kubeadm/internal/cluster.go index 5fc8e3848bb3..2c5ba9db8c93 100644 --- a/controlplane/kubeadm/internal/cluster.go +++ b/controlplane/kubeadm/internal/cluster.go @@ -116,6 +116,17 @@ func (m *Management) GetWorkloadCluster(ctx context.Context, clusterKey client.O return nil, err } + clientConfig, err := m.Tracker.GetRESTConfig(ctx, clusterKey) + if err != nil { + return nil, err + } + + // Make sure we use the same CA and Host as the client. + // Note: This has to be done to be able to communicate directly on self-hosted clusters. + restConfig.CAData = clientConfig.CAData + restConfig.CAFile = clientConfig.CAFile + restConfig.Host = clientConfig.Host + // Retrieves the etcd CA key Pair crtData, keyData, err := m.getEtcdCAKeyPair(ctx, clusterKey) if err != nil { diff --git a/controlplane/kubeadm/main.go b/controlplane/kubeadm/main.go index 4677c4ab2b7a..79263ec41edf 100644 --- a/controlplane/kubeadm/main.go +++ b/controlplane/kubeadm/main.go @@ -218,7 +218,9 @@ func setupChecks(mgr ctrl.Manager) { func setupReconcilers(ctx context.Context, mgr ctrl.Manager) { // Set up a ClusterCacheTracker to provide to controllers // requiring a connection to a remote cluster + log := ctrl.Log.WithName("remote").WithName("ClusterCacheTracker") tracker, err := remote.NewClusterCacheTracker(mgr, remote.ClusterCacheTrackerOptions{ + Log: &log, Indexes: remote.DefaultIndexes, ClientUncachedObjects: []client.Object{ &corev1.ConfigMap{}, diff --git a/test/infrastructure/docker/config/manager/manager.yaml b/test/infrastructure/docker/config/manager/manager.yaml index fa2488f99735..b238f6bd4e0a 100644 --- a/test/infrastructure/docker/config/manager/manager.yaml +++ b/test/infrastructure/docker/config/manager/manager.yaml @@ -22,6 +22,19 @@ spec: - "--feature-gates=MachinePool=${EXP_MACHINE_POOL:=false},ClusterTopology=${CLUSTER_TOPOLOGY:=false}" image: controller:latest name: manager + env: + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: POD_UID + valueFrom: + fieldRef: + fieldPath: metadata.uid ports: - containerPort: 9440 name: healthz