Skip to content

Commit

Permalink
Use ClusterCacheTracker consistently (intead of NewClusterClient)
Browse files Browse the repository at this point in the history
Signed-off-by: Stefan Büringer [email protected]
  • Loading branch information
sbueringer committed Jun 1, 2023
1 parent 9be885c commit b42793e
Show file tree
Hide file tree
Showing 12 changed files with 178 additions and 83 deletions.
4 changes: 4 additions & 0 deletions bootstrap/kubeadm/controllers/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller"

kubeadmbootstrapcontrollers "sigs.k8s.io/cluster-api/bootstrap/kubeadm/internal/controllers"
"sigs.k8s.io/cluster-api/controllers/remote"
)

// Following types provides access to reconcilers implemented in internal/controllers, thus
Expand All @@ -39,6 +40,8 @@ const (
type KubeadmConfigReconciler struct {
Client client.Client

Tracker *remote.ClusterCacheTracker

// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string

Expand All @@ -50,6 +53,7 @@ type KubeadmConfigReconciler struct {
func (r *KubeadmConfigReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
return (&kubeadmbootstrapcontrollers.KubeadmConfigReconciler{
Client: r.Client,
Tracker: r.Tracker,
WatchFilterValue: r.WatchFilterValue,
TokenTTL: r.TokenTTL,
}).SetupWithManager(ctx, mgr, options)
Expand Down
36 changes: 22 additions & 14 deletions bootstrap/kubeadm/internal/controllers/kubeadmconfig_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,6 @@ import (
"sigs.k8s.io/cluster-api/util/secret"
)

const (
// KubeadmConfigControllerName defines the controller used when creating clients.
KubeadmConfigControllerName = "kubeadmconfig-controller"
)

const (
// DefaultTokenTTL is the default TTL used for tokens.
DefaultTokenTTL = 15 * time.Minute
Expand All @@ -82,15 +77,14 @@ type InitLocker interface {
// KubeadmConfigReconciler reconciles a KubeadmConfig object.
type KubeadmConfigReconciler struct {
Client client.Client
Tracker *remote.ClusterCacheTracker
KubeadmInitLock InitLocker

// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string

// TokenTTL is the amount of time a bootstrap token (and therefore a KubeadmConfig) will be valid.
TokenTTL time.Duration

remoteClientGetter remote.ClusterClientGetter
}

// Scope is a scoped struct used during reconciliation.
Expand All @@ -106,9 +100,6 @@ func (r *KubeadmConfigReconciler) SetupWithManager(ctx context.Context, mgr ctrl
if r.KubeadmInitLock == nil {
r.KubeadmInitLock = locking.NewControlPlaneInitMutex(mgr.GetClient())
}
if r.remoteClientGetter == nil {
r.remoteClientGetter = remote.NewClusterClient
}
if r.TokenTTL == 0 {
r.TokenTTL = DefaultTokenTTL
}
Expand Down Expand Up @@ -239,6 +230,24 @@ func (r *KubeadmConfigReconciler) Reconcile(ctx context.Context, req ctrl.Reques
}
}
}()

// Ignore deleted KubeadmConfigs.
if !config.DeletionTimestamp.IsZero() {
return ctrl.Result{}, nil
}

res, err := r.reconcile(ctx, scope, cluster, config, configOwner)
if err != nil && errors.Is(err, remote.ErrClusterLocked) {
// Requeue if the reconcile failed because the ClusterCacheTracker was locked for
// the current cluster because of concurrent access.
log.V(5).Info("Requeuing because another worker has the lock on the ClusterCacheTracker")
return ctrl.Result{Requeue: true}, nil
}
return res, err
}
func (r *KubeadmConfigReconciler) reconcile(ctx context.Context, scope *Scope, cluster *clusterv1.Cluster, config *bootstrapv1.KubeadmConfig, configOwner *bsutil.ConfigOwner) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)

// Ensure the bootstrap secret associated with this KubeadmConfig has the correct ownerReference.
if err := r.ensureBootstrapSecretOwnersRef(ctx, scope); err != nil {
return ctrl.Result{}, err
Expand Down Expand Up @@ -305,9 +314,8 @@ func (r *KubeadmConfigReconciler) refreshBootstrapToken(ctx context.Context, con
log := ctrl.LoggerFrom(ctx)
token := config.Spec.JoinConfiguration.Discovery.BootstrapToken.Token

remoteClient, err := r.remoteClientGetter(ctx, KubeadmConfigControllerName, r.Client, util.ObjectKey(cluster))
remoteClient, err := r.Tracker.GetClient(ctx, util.ObjectKey(cluster))
if err != nil {
log.Error(err, "Error creating remote cluster client")
return ctrl.Result{}, err
}

Expand All @@ -323,7 +331,7 @@ func (r *KubeadmConfigReconciler) refreshBootstrapToken(ctx context.Context, con
func (r *KubeadmConfigReconciler) rotateMachinePoolBootstrapToken(ctx context.Context, config *bootstrapv1.KubeadmConfig, cluster *clusterv1.Cluster, scope *Scope) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)
log.V(2).Info("Config is owned by a MachinePool, checking if token should be rotated")
remoteClient, err := r.remoteClientGetter(ctx, KubeadmConfigControllerName, r.Client, util.ObjectKey(cluster))
remoteClient, err := r.Tracker.GetClient(ctx, util.ObjectKey(cluster))
if err != nil {
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -928,7 +936,7 @@ func (r *KubeadmConfigReconciler) reconcileDiscovery(ctx context.Context, cluste

// if BootstrapToken already contains a token, respect it; otherwise create a new bootstrap token for the node to join
if config.Spec.JoinConfiguration.Discovery.BootstrapToken.Token == "" {
remoteClient, err := r.remoteClientGetter(ctx, KubeadmConfigControllerName, r.Client, util.ObjectKey(cluster))
remoteClient, err := r.Tracker.GetClient(ctx, util.ObjectKey(cluster))
if err != nil {
return ctrl.Result{}, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

ignition "github.com/flatcar/ignition/config/v2_3"
"github.com/go-logr/logr"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -34,13 +35,14 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/yaml"

clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
bootstrapv1 "sigs.k8s.io/cluster-api/bootstrap/kubeadm/api/v1beta1"
bootstrapbuilder "sigs.k8s.io/cluster-api/bootstrap/kubeadm/internal/builder"
fakeremote "sigs.k8s.io/cluster-api/controllers/remote/fake"
"sigs.k8s.io/cluster-api/controllers/remote"
expv1 "sigs.k8s.io/cluster-api/exp/api/v1beta1"
"sigs.k8s.io/cluster-api/feature"
"sigs.k8s.io/cluster-api/internal/test/builder"
Expand Down Expand Up @@ -495,9 +497,9 @@ func TestKubeadmConfigReconciler_Reconcile_GenerateCloudConfigData(t *testing.T)
myclient := fake.NewClientBuilder().WithObjects(objects...).WithStatusSubresource(&bootstrapv1.KubeadmConfig{}).Build()

k := &KubeadmConfigReconciler{
Client: myclient,
KubeadmInitLock: &myInitLocker{},
remoteClientGetter: fakeremote.NewClusterClient,
Client: myclient,
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), myclient, myclient.Scheme(), client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}),
KubeadmInitLock: &myInitLocker{},
}

request := ctrl.Request{
Expand Down Expand Up @@ -556,9 +558,9 @@ func TestKubeadmConfigReconciler_Reconcile_ErrorIfJoiningControlPlaneHasInvalidC
myclient := fake.NewClientBuilder().WithObjects(objects...).WithStatusSubresource(&bootstrapv1.KubeadmConfig{}).Build()

k := &KubeadmConfigReconciler{
Client: myclient,
KubeadmInitLock: &myInitLocker{},
remoteClientGetter: fakeremote.NewClusterClient,
Client: myclient,
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), myclient, myclient.Scheme(), client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}),
KubeadmInitLock: &myInitLocker{},
}

request := ctrl.Request{
Expand Down Expand Up @@ -677,9 +679,9 @@ func TestReconcileIfJoinCertificatesAvailableConditioninNodesAndControlPlaneIsRe
objects = append(objects, createSecrets(t, cluster, config)...)
myclient := fake.NewClientBuilder().WithObjects(objects...).WithStatusSubresource(&bootstrapv1.KubeadmConfig{}).Build()
k := &KubeadmConfigReconciler{
Client: myclient,
KubeadmInitLock: &myInitLocker{},
remoteClientGetter: fakeremote.NewClusterClient,
Client: myclient,
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), myclient, myclient.Scheme(), client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}),
KubeadmInitLock: &myInitLocker{},
}

request := ctrl.Request{
Expand Down Expand Up @@ -754,9 +756,9 @@ func TestReconcileIfJoinNodePoolsAndControlPlaneIsReady(t *testing.T) {
objects = append(objects, createSecrets(t, cluster, config)...)
myclient := fake.NewClientBuilder().WithObjects(objects...).WithStatusSubresource(&bootstrapv1.KubeadmConfig{}).Build()
k := &KubeadmConfigReconciler{
Client: myclient,
KubeadmInitLock: &myInitLocker{},
remoteClientGetter: fakeremote.NewClusterClient,
Client: myclient,
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), myclient, myclient.Scheme(), client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}),
KubeadmInitLock: &myInitLocker{},
}

request := ctrl.Request{
Expand Down Expand Up @@ -854,9 +856,9 @@ func TestBootstrapDataFormat(t *testing.T) {
myclient := fake.NewClientBuilder().WithObjects(objects...).WithStatusSubresource(&bootstrapv1.KubeadmConfig{}).Build()

k := &KubeadmConfigReconciler{
Client: myclient,
KubeadmInitLock: &myInitLocker{},
remoteClientGetter: fakeremote.NewClusterClient,
Client: myclient,
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), myclient, myclient.Scheme(), client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}),
KubeadmInitLock: &myInitLocker{},
}
request := ctrl.Request{
NamespacedName: client.ObjectKey{
Expand Down Expand Up @@ -934,9 +936,9 @@ func TestKubeadmConfigSecretCreatedStatusNotPatched(t *testing.T) {
objects = append(objects, createSecrets(t, cluster, initConfig)...)
myclient := fake.NewClientBuilder().WithObjects(objects...).WithStatusSubresource(&bootstrapv1.KubeadmConfig{}).Build()
k := &KubeadmConfigReconciler{
Client: myclient,
KubeadmInitLock: &myInitLocker{},
remoteClientGetter: fakeremote.NewClusterClient,
Client: myclient,
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), myclient, myclient.Scheme(), client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}),
KubeadmInitLock: &myInitLocker{},
}
request := ctrl.Request{
NamespacedName: client.ObjectKey{
Expand Down Expand Up @@ -1011,10 +1013,10 @@ func TestBootstrapTokenTTLExtension(t *testing.T) {
objects = append(objects, createSecrets(t, cluster, initConfig)...)
myclient := fake.NewClientBuilder().WithObjects(objects...).WithStatusSubresource(&bootstrapv1.KubeadmConfig{}, &clusterv1.Machine{}).Build()
k := &KubeadmConfigReconciler{
Client: myclient,
KubeadmInitLock: &myInitLocker{},
TokenTTL: DefaultTokenTTL,
remoteClientGetter: fakeremote.NewClusterClient,
Client: myclient,
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), myclient, myclient.Scheme(), client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}),
KubeadmInitLock: &myInitLocker{},
TokenTTL: DefaultTokenTTL,
}
request := ctrl.Request{
NamespacedName: client.ObjectKey{
Expand Down Expand Up @@ -1212,10 +1214,10 @@ func TestBootstrapTokenRotationMachinePool(t *testing.T) {
objects = append(objects, createSecrets(t, cluster, initConfig)...)
myclient := fake.NewClientBuilder().WithObjects(objects...).WithStatusSubresource(&bootstrapv1.KubeadmConfig{}, &expv1.MachinePool{}).Build()
k := &KubeadmConfigReconciler{
Client: myclient,
KubeadmInitLock: &myInitLocker{},
TokenTTL: DefaultTokenTTL,
remoteClientGetter: fakeremote.NewClusterClient,
Client: myclient,
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), myclient, myclient.Scheme(), client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}),
KubeadmInitLock: &myInitLocker{},
TokenTTL: DefaultTokenTTL,
}
request := ctrl.Request{
NamespacedName: client.ObjectKey{
Expand Down Expand Up @@ -1368,12 +1370,6 @@ func TestBootstrapTokenRotationMachinePool(t *testing.T) {

// Ensure the discovery portion of the JoinConfiguration gets generated correctly.
func TestKubeadmConfigReconciler_Reconcile_DiscoveryReconcileBehaviors(t *testing.T) {
k := &KubeadmConfigReconciler{
Client: fake.NewClientBuilder().Build(),
KubeadmInitLock: &myInitLocker{},
remoteClientGetter: fakeremote.NewClusterClient,
}

caHash := []string{"...."}
bootstrapToken := bootstrapv1.Discovery{
BootstrapToken: &bootstrapv1.BootstrapTokenDiscovery{
Expand Down Expand Up @@ -1499,6 +1495,13 @@ func TestKubeadmConfigReconciler_Reconcile_DiscoveryReconcileBehaviors(t *testin
t.Run(tc.name, func(t *testing.T) {
g := NewWithT(t)

fakeClient := fake.NewClientBuilder().Build()
k := &KubeadmConfigReconciler{
Client: fakeClient,
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), fakeClient, fakeClient.Scheme(), client.ObjectKey{Name: tc.cluster.Name, Namespace: tc.cluster.Namespace}),
KubeadmInitLock: &myInitLocker{},
}

res, err := k.reconcileDiscovery(ctx, tc.cluster, tc.config, secret.Certificates{})
g.Expect(res.IsZero()).To(BeTrue())
g.Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -1710,9 +1713,9 @@ func TestKubeadmConfigReconciler_Reconcile_AlwaysCheckCAVerificationUnlessReques

myclient := fake.NewClientBuilder().WithObjects(objects...).Build()
reconciler := KubeadmConfigReconciler{
Client: myclient,
KubeadmInitLock: &myInitLocker{},
remoteClientGetter: fakeremote.NewClusterClient,
Client: myclient,
Tracker: remote.NewTestClusterCacheTracker(logr.New(log.NullLogSink{}), myclient, myclient.Scheme(), client.ObjectKey{Name: cluster.Name, Namespace: cluster.Namespace}),
KubeadmInitLock: &myInitLocker{},
}

wc := newWorkerJoinKubeadmConfig(metav1.NamespaceDefault, "worker-join-cfg")
Expand Down
36 changes: 33 additions & 3 deletions bootstrap/kubeadm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ import (
)

var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
controllerName = "cluster-api-kubeadm-bootstrap-manager"
)

func init() {
Expand All @@ -80,6 +81,7 @@ var (
watchFilterValue string
watchNamespace string
profilerAddress string
clusterConcurrency int
kubeadmConfigConcurrency int
syncPeriod time.Duration
restConfigQPS float32
Expand Down Expand Up @@ -117,6 +119,9 @@ func InitFlags(fs *pflag.FlagSet) {
fs.StringVar(&profilerAddress, "profiler-address", "",
"Bind address to expose the pprof profiler (e.g. localhost:6060)")

fs.IntVar(&clusterConcurrency, "cluster-concurrency", 10,
"Number of clusters to process simultaneously")

fs.IntVar(&kubeadmConfigConcurrency, "kubeadmconfig-concurrency", 10,
"Number of kubeadm configs to process simultaneously")

Expand Down Expand Up @@ -166,7 +171,7 @@ func main() {
restConfig := ctrl.GetConfigOrDie()
restConfig.QPS = restConfigQPS
restConfig.Burst = restConfigBurst
restConfig.UserAgent = remote.DefaultClusterAPIUserAgent("cluster-api-kubeadm-bootstrap-manager")
restConfig.UserAgent = remote.DefaultClusterAPIUserAgent(controllerName)

tlsOptionOverrides, err := flags.GetTLSOptionOverrideFuncs(tlsOptions)
if err != nil {
Expand Down Expand Up @@ -245,8 +250,33 @@ func setupChecks(mgr ctrl.Manager) {
}

func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
// Set up a ClusterCacheTracker and ClusterCacheReconciler to provide to controllers
// requiring a connection to a remote cluster
log := ctrl.Log.WithName("remote").WithName("ClusterCacheTracker")
tracker, err := remote.NewClusterCacheTracker(
mgr,
remote.ClusterCacheTrackerOptions{
ControllerName: controllerName,
Log: &log,
Indexes: remote.DefaultIndexes,
},
)
if err != nil {
setupLog.Error(err, "unable to create cluster cache tracker")
os.Exit(1)
}
if err := (&remote.ClusterCacheReconciler{
Client: mgr.GetClient(),
Tracker: tracker,
WatchFilterValue: watchFilterValue,
}).SetupWithManager(ctx, mgr, concurrency(clusterConcurrency)); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ClusterCacheReconciler")
os.Exit(1)
}

if err := (&kubeadmbootstrapcontrollers.KubeadmConfigReconciler{
Client: mgr.GetClient(),
Tracker: tracker,
WatchFilterValue: watchFilterValue,
TokenTTL: tokenTTL,
}).SetupWithManager(ctx, mgr, concurrency(kubeadmConfigConcurrency)); err != nil {
Expand Down
Loading

0 comments on commit b42793e

Please sign in to comment.