Skip to content

Commit

Permalink
ClusterCacheTracker: access controller cluster directly
Browse files Browse the repository at this point in the history
* also use the adjust rest.Config for etcd
* add GetRESTConfig to ClusterCacheTracker

Co-authored-by: Yuvaraj Kakaraparthi <[email protected]>
  • Loading branch information
sbueringer and Yuvaraj Kakaraparthi committed Jul 26, 2022
1 parent 7c5b9ef commit 24f2849
Show file tree
Hide file tree
Showing 9 changed files with 302 additions and 14 deletions.
13 changes: 13 additions & 0 deletions config/manager/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,19 @@ spec:
- "--feature-gates=MachinePool=${EXP_MACHINE_POOL:=false},ClusterResourceSet=${EXP_CLUSTER_RESOURCE_SET:=false},ClusterTopology=${CLUSTER_TOPOLOGY:=false},RuntimeSDK=${EXP_RUNTIME_SDK:=false}"
image: controller:latest
name: manager
env:
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: POD_UID
valueFrom:
fieldRef:
fieldPath: metadata.uid
ports:
- containerPort: 9440
name: healthz
Expand Down
2 changes: 1 addition & 1 deletion controllers/remote/cluster_cache_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (r *ClusterCacheReconciler) Reconcile(ctx context.Context, req reconcile.Re

log.V(2).Info("Cluster no longer exists")

r.Tracker.deleteAccessor(req.NamespacedName)
r.Tracker.deleteAccessor(ctx, req.NamespacedName)

return reconcile.Result{}, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@ package remote
import (
"context"
"fmt"
"os"
"sync"
"time"

"github.com/go-logr/logr"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -62,6 +66,12 @@ type ClusterCacheTracker struct {
lock sync.RWMutex
clusterAccessors map[client.ObjectKey]*clusterAccessor
indexes []Index

// controllerPodMetadata is the Pod metadata of the controller using this ClusterCacheTracker.
// This is only set when the POD_NAMESPACE, POD_NAME and POD_UID environment variables are set.
// This information will be used to detected if the controller is running on a workload cluster, so
// that we can then access the apiserver directly.
controllerPodMetadata *metav1.ObjectMeta
}

// ClusterCacheTrackerOptions defines options to configure
Expand Down Expand Up @@ -96,7 +106,23 @@ func setDefaultOptions(opts *ClusterCacheTrackerOptions) {
func NewClusterCacheTracker(manager ctrl.Manager, options ClusterCacheTrackerOptions) (*ClusterCacheTracker, error) {
setDefaultOptions(&options)

var controllerPodMetadata *metav1.ObjectMeta
podNamespace := os.Getenv("POD_NAMESPACE")
podName := os.Getenv("POD_NAME")
podUID := os.Getenv("POD_UID")
if podNamespace != "" && podName != "" && podUID != "" {
options.Log.Info("Found controller pod metadata, the ClusterCacheTracker will try to access the cluster directly when possible")
controllerPodMetadata = &metav1.ObjectMeta{
Namespace: podNamespace,
Name: podName,
UID: types.UID(podUID),
}
} else {
options.Log.Info("Couldn't find controller pod metadata, the ClusterCacheTracker will always access clusters using the regular apiserver endpoint")
}

return &ClusterCacheTracker{
controllerPodMetadata: controllerPodMetadata,
log: *options.Log,
clientUncachedObjects: options.ClientUncachedObjects,
client: manager.GetClient(),
Expand All @@ -119,11 +145,25 @@ func (t *ClusterCacheTracker) GetClient(ctx context.Context, cluster client.Obje
return accessor.client, nil
}

// GetRESTConfig returns a cached REST config for the given cluster.
func (t *ClusterCacheTracker) GetRESTConfig(ctc context.Context, cluster client.ObjectKey) (*rest.Config, error) {
t.lock.Lock()
defer t.lock.Unlock()

accessor, err := t.getClusterAccessorLH(ctc, cluster, t.indexes...)
if err != nil {
return nil, err
}

return accessor.config, nil
}

// clusterAccessor represents the combination of a delegating client, cache, and watches for a remote cluster.
type clusterAccessor struct {
cache *stoppableCache
client client.Client
watches sets.String
config *rest.Config
}

// clusterAccessorExists returns true if a clusterAccessor exists for cluster.
Expand Down Expand Up @@ -155,22 +195,47 @@ func (t *ClusterCacheTracker) getClusterAccessorLH(ctx context.Context, cluster

// newClusterAccessor creates a new clusterAccessor.
func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster client.ObjectKey, indexes ...Index) (*clusterAccessor, error) {
log := ctrl.LoggerFrom(ctx)

// Get a rest config for the remote cluster
config, err := RESTConfig(ctx, clusterCacheControllerName, t.client, cluster)
if err != nil {
return nil, errors.Wrapf(err, "error fetching REST client config for remote cluster %q", cluster.String())
}

// Create a mapper for it
mapper, err := apiutil.NewDynamicRESTMapper(config)
// Create a client and a mapper for the cluster.
c, mapper, err := t.createClient(config, cluster)
if err != nil {
return nil, errors.Wrapf(err, "error creating dynamic rest mapper for remote cluster %q", cluster.String())
return nil, err
}

// Create the client for the remote cluster
c, err := client.New(config, client.Options{Scheme: t.scheme, Mapper: mapper})
// Detect if the controller is running on the workload cluster.
runningOnCluster, err := t.runningOnWorkloadCluster(ctx, c, cluster)
if err != nil {
return nil, errors.Wrapf(err, "error creating client for remote cluster %q", cluster.String())
return nil, err
}

// If the controller runs on the workload cluster, access the apiserver directly by using the
// CA and Host from the in-cluster configuration.
if runningOnCluster {
inClusterConfig, err := ctrl.GetConfig()
if err != nil {
return nil, errors.Wrap(err, "error creating client for self-hosted cluster")
}

// Use CA and Host from in-cluster config.
config.CAData = nil
config.CAFile = inClusterConfig.CAFile
config.Host = inClusterConfig.Host

// Create a new client and overwrite the previously created client.
c, mapper, err = t.createClient(config, cluster)
if err != nil {
return nil, errors.Wrap(err, "error creating client for self-hosted cluster")
}
log.Info(fmt.Sprintf("Creating cluster accessor for cluster %q with in-cluster service %q", cluster.String(), config.Host))
} else {
log.Info(fmt.Sprintf("Creating cluster accessor for cluster %q with the regular apiserver endpoint %q", cluster.String(), config.Host))
}

// Create the cache for the remote cluster
Expand Down Expand Up @@ -220,13 +285,59 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl

return &clusterAccessor{
cache: cache,
config: config,
client: delegatingClient,
watches: sets.NewString(),
}, nil
}

// runningOnWorkloadCluster detects if the current controller runs on the workload cluster.
func (t *ClusterCacheTracker) runningOnWorkloadCluster(ctx context.Context, c client.Client, cluster client.ObjectKey) (bool, error) {
// Controller Pod metadata was not found, so we can't detect if we run on the workload cluster.
if t.controllerPodMetadata == nil {
return false, nil
}

// Try to get the controller pod.
var pod corev1.Pod
if err := c.Get(ctx, client.ObjectKey{
Namespace: t.controllerPodMetadata.Namespace,
Name: t.controllerPodMetadata.Name,
}, &pod); err != nil {
// If the controller pod is not found, we assume we are not running on the workload cluster.
if apierrors.IsNotFound(err) {
return false, nil
}

// If we got another error, we return the error so that this will be retried later.
return false, errors.Wrapf(err, "error checking if we're running on workload cluster %q", cluster.String())
}

// If the uid is the same we found the controller pod on the workload cluster.
return t.controllerPodMetadata.UID == pod.UID, nil
}

// createClient creates a client and a mapper based on a rest.Config.
func (t *ClusterCacheTracker) createClient(config *rest.Config, cluster client.ObjectKey) (client.Client, meta.RESTMapper, error) {
// Create a mapper for it
mapper, err := apiutil.NewDynamicRESTMapper(config)
if err != nil {
return nil, nil, errors.Wrapf(err, "error creating dynamic rest mapper for remote cluster %q", cluster.String())
}

// Create the client for the remote cluster
c, err := client.New(config, client.Options{Scheme: t.scheme, Mapper: mapper})
if err != nil {
return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q", cluster.String())
}

return c, mapper, nil
}

// deleteAccessor stops a clusterAccessor's cache and removes the clusterAccessor from the tracker.
func (t *ClusterCacheTracker) deleteAccessor(cluster client.ObjectKey) {
func (t *ClusterCacheTracker) deleteAccessor(ctx context.Context, cluster client.ObjectKey) {
log := ctrl.LoggerFrom(ctx)

t.lock.Lock()
defer t.lock.Unlock()

Expand All @@ -235,11 +346,11 @@ func (t *ClusterCacheTracker) deleteAccessor(cluster client.ObjectKey) {
return
}

t.log.V(2).Info("Deleting clusterAccessor", "cluster", cluster.String())
log.V(2).Info("Deleting clusterAccessor", "cluster", cluster.String())

t.log.V(4).Info("Stopping cache", "cluster", cluster.String())
log.V(4).Info("Stopping cache", "cluster", cluster.String())
a.cache.Stop()
t.log.V(4).Info("Cache stopped", "cluster", cluster.String())
log.V(4).Info("Cache stopped", "cluster", cluster.String())

delete(t.clusterAccessors, cluster)
}
Expand Down Expand Up @@ -286,7 +397,8 @@ func (t *ClusterCacheTracker) Watch(ctx context.Context, input WatchInput) error
}

if a.watches.Has(input.Name) {
t.log.V(6).Info("Watch already exists", "namespace", input.Cluster.Namespace, "cluster", input.Cluster.Name, "name", input.Name)
log := ctrl.LoggerFrom(ctx)
log.V(6).Info("Watch already exists", "namespace", input.Cluster.Namespace, "cluster", input.Cluster.Name, "name", input.Name)
return nil
}

Expand Down Expand Up @@ -391,7 +503,8 @@ func (t *ClusterCacheTracker) healthCheckCluster(ctx context.Context, in *health
// NB. we are ignoring ErrWaitTimeout because this error happens when the channel is close, that in this case
// happens when the cache is explicitly stopped.
if err != nil && err != wait.ErrWaitTimeout {
t.log.Error(err, "Error health checking cluster", "cluster", in.cluster.String())
t.deleteAccessor(in.cluster)
log := ctrl.LoggerFrom(ctx)
log.Error(err, "Error health checking cluster", "cluster", in.cluster.String())
t.deleteAccessor(ctx, in.cluster)
}
}
File renamed without changes.
123 changes: 123 additions & 0 deletions controllers/remote/cluster_cache_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -186,6 +187,128 @@ func TestClusterCacheTracker(t *testing.T) {
g.Consistently(c.ch).ShouldNot(Receive())
})
})

t.Run("runningOnWorkloadCluster", func(t *testing.T) {
tests := []struct {
name string
currentControllerMetadata *metav1.ObjectMeta
clusterObjects []client.Object
expected bool
}{
{
name: "should return true if the controller is running on the workload cluster",
currentControllerMetadata: &metav1.ObjectMeta{
Name: "controller-pod",
Namespace: "controller-pod-namespace",
UID: types.UID("controller-pod-uid"),
},
clusterObjects: []client.Object{
&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "controller-pod",
Namespace: "controller-pod-namespace",
UID: types.UID("controller-pod-uid"),
},
},
},
expected: true,
},
{
name: "should return false if the controller is not running on the workload cluster: name mismatch",
currentControllerMetadata: &metav1.ObjectMeta{
Name: "controller-pod",
Namespace: "controller-pod-namespace",
UID: types.UID("controller-pod-uid"),
},
clusterObjects: []client.Object{
&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "controller-pod-mismatch",
Namespace: "controller-pod-namespace",
UID: types.UID("controller-pod-uid"),
},
},
},
expected: false,
},
{
name: "should return false if the controller is not running on the workload cluster: namespace mismatch",
currentControllerMetadata: &metav1.ObjectMeta{
Name: "controller-pod",
Namespace: "controller-pod-namespace",
UID: types.UID("controller-pod-uid"),
},
clusterObjects: []client.Object{
&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "controller-pod",
Namespace: "controller-pod-namespace-mismatch",
UID: types.UID("controller-pod-uid"),
},
},
},
expected: false,
},
{
name: "should return false if the controller is not running on the workload cluster: uid mismatch",
currentControllerMetadata: &metav1.ObjectMeta{
Name: "controller-pod",
Namespace: "controller-pod-namespace",
UID: types.UID("controller-pod-uid"),
},
clusterObjects: []client.Object{
&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "controller-pod",
Namespace: "controller-pod-namespace",
UID: types.UID("controller-pod-uid-mismatch"),
},
},
},
expected: false,
},
{
name: "should return false if the controller is not running on the workload cluster: no pod in cluster",
currentControllerMetadata: &metav1.ObjectMeta{
Name: "controller-pod",
Namespace: "controller-pod-namespace",
UID: types.UID("controller-pod-uid"),
},
clusterObjects: []client.Object{},
expected: false,
},
{
name: "should return false if the controller is not running on the workload cluster: no controller metadata",
currentControllerMetadata: nil,
clusterObjects: []client.Object{
&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "controller-pod",
Namespace: "controller-pod-namespace",
UID: types.UID("controller-pod-uid"),
},
},
},
expected: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)

c := fake.NewClientBuilder().WithObjects(tt.clusterObjects...).Build()

cct := &ClusterCacheTracker{
controllerPodMetadata: tt.currentControllerMetadata,
}

found, err := cct.runningOnWorkloadCluster(ctx, c, client.ObjectKey{Name: "test-cluster", Namespace: "test-namespace"})
g.Expect(err).ToNot(HaveOccurred())
g.Expect(found).To(Equal(tt.expected))
})
}
})
}

type testController struct {
Expand Down
Loading

0 comments on commit 24f2849

Please sign in to comment.