Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Label selector conditional reconciler #9003

5 changes: 3 additions & 2 deletions bootstrap/kubeadm/controllers/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

kubeadmbootstrapcontrollers "sigs.k8s.io/cluster-api/bootstrap/kubeadm/internal/controllers"
"sigs.k8s.io/cluster-api/controllers/remote"
"sigs.k8s.io/cluster-api/util/predicates"
)

// Following types provides access to reconcilers implemented in internal/controllers, thus
Expand All @@ -43,8 +44,8 @@ type KubeadmConfigReconciler struct {

Tracker *remote.ClusterCacheTracker

// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string
// WatchFilterValue is the label selector value used to filter events prior to reconciliation.
WatchFilterValue predicates.LabelMatcher

// TokenTTL is the amount of time a bootstrap token (and therefore a KubeadmConfig) will be valid.
TokenTTL time.Duration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ type KubeadmConfigReconciler struct {
Tracker *remote.ClusterCacheTracker
KubeadmInitLock InitLocker

// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string
// WatchFilterValue is the label selector value used to filter events prior to reconciliation.
WatchFilterValue predicates.LabelMatcher

// TokenTTL is the amount of time a bootstrap token (and therefore a KubeadmConfig) will be valid.
TokenTTL time.Duration
Expand Down
27 changes: 23 additions & 4 deletions bootstrap/kubeadm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
expv1 "sigs.k8s.io/cluster-api/exp/api/v1beta1"
"sigs.k8s.io/cluster-api/feature"
"sigs.k8s.io/cluster-api/util/flags"
"sigs.k8s.io/cluster-api/util/predicates"
"sigs.k8s.io/cluster-api/version"
)

Expand Down Expand Up @@ -82,6 +83,7 @@ var (
leaderElectionRenewDeadline time.Duration
leaderElectionRetryPeriod time.Duration
watchFilterValue string
watchExpressionValue string
watchNamespace string
profilerAddress string
enableContentionProfiling bool
Expand Down Expand Up @@ -147,6 +149,9 @@ func InitFlags(fs *pflag.FlagSet) {
fs.StringVar(&watchFilterValue, "watch-filter", "",
fmt.Sprintf("Label value that the controller watches to reconcile cluster-api objects. Label key is always %s. If unspecified, the controller watches for all cluster-api objects.", clusterv1.WatchLabel))

fs.StringVar(&watchExpressionValue, "watch-expression", "",
"More generic version of watch-filter which allows to pass a label selector to filter reconcled objects. If unspecified, then the behavior defaults to the watch-filter argument")

fs.IntVar(&webhookPort, "webhook-port", 9443,
"Webhook Server port")

Expand Down Expand Up @@ -198,6 +203,12 @@ func main() {
req, _ := labels.NewRequirement(clusterv1.ClusterNameLabel, selection.Exists, nil)
clusterSecretCacheSelector := labels.NewSelector().Add(*req)

labelSelector, err := predicates.InitLabelMatcher(setupLog, watchExpressionValue)
if err != nil {
setupLog.Error(err, "unable to create expression matcher from provided expression %s", watchExpressionValue)
os.Exit(1)
}

ctrlOptions := ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsBindAddr,
Expand All @@ -210,8 +221,9 @@ func main() {
HealthProbeBindAddress: healthAddr,
PprofBindAddress: profilerAddress,
Cache: cache.Options{
Namespaces: watchNamespaces,
SyncPeriod: &syncPeriod,
Namespaces: watchNamespaces,
SyncPeriod: &syncPeriod,
DefaultLabelSelector: labelSelector.Selector(),
ByObject: map[client.Object]cache.ByObject{
// Note: Only Secrets with the cluster name label are cached.
// The default client of the manager won't use the cache for secrets at all (see Client.Cache.DisableFor).
Expand Down Expand Up @@ -298,10 +310,17 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
setupLog.Error(err, "unable to create cluster cache tracker")
os.Exit(1)
}

labelSelector, err := predicates.InitLabelMatcher(setupLog, predicates.ComposeFilterExpression(watchExpressionValue, watchFilterValue))
if err != nil {
setupLog.Error(err, "unable to create expression matcher from provided expression %s", watchExpressionValue)
os.Exit(1)
}

if err := (&remote.ClusterCacheReconciler{
Client: mgr.GetClient(),
Tracker: tracker,
WatchFilterValue: watchFilterValue,
WatchFilterValue: labelSelector,
}).SetupWithManager(ctx, mgr, concurrency(clusterConcurrency)); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ClusterCacheReconciler")
os.Exit(1)
Expand All @@ -311,7 +330,7 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
Client: mgr.GetClient(),
SecretCachingClient: secretCachingClient,
Tracker: tracker,
WatchFilterValue: watchFilterValue,
WatchFilterValue: labelSelector,
TokenTTL: tokenTTL,
}).SetupWithManager(ctx, mgr, concurrency(kubeadmConfigConcurrency)); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "KubeadmConfig")
Expand Down
33 changes: 17 additions & 16 deletions controllers/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
machinedeploymenttopologycontroller "sigs.k8s.io/cluster-api/internal/controllers/topology/machinedeployment"
machinesettopologycontroller "sigs.k8s.io/cluster-api/internal/controllers/topology/machineset"
runtimeclient "sigs.k8s.io/cluster-api/internal/runtime/client"
"sigs.k8s.io/cluster-api/util/predicates"
)

// Following types provides access to reconcilers implemented in internal/controllers, thus
Expand All @@ -46,8 +47,8 @@ type ClusterReconciler struct {
UnstructuredCachingClient client.Client
APIReader client.Reader

// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string
// WatchFilterValue is the label selector value used to filter events prior to reconciliation.
WatchFilterValue predicates.LabelMatcher
}

func (r *ClusterReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
Expand All @@ -66,8 +67,8 @@ type MachineReconciler struct {
APIReader client.Reader
Tracker *remote.ClusterCacheTracker

// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string
// WatchFilterValue is the label selector value used to filter events prior to reconciliation.
WatchFilterValue predicates.LabelMatcher

// NodeDrainClientTimeout timeout of the client used for draining nodes.
NodeDrainClientTimeout time.Duration
Expand All @@ -91,8 +92,8 @@ type MachineSetReconciler struct {
APIReader client.Reader
Tracker *remote.ClusterCacheTracker

// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string
// WatchFilterValue is the label selector value used to filter events prior to reconciliation.
WatchFilterValue predicates.LabelMatcher
}

func (r *MachineSetReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
Expand All @@ -111,8 +112,8 @@ type MachineDeploymentReconciler struct {
UnstructuredCachingClient client.Client
APIReader client.Reader

// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string
// WatchFilterValue is the label selector value used to filter events prior to reconciliation.
WatchFilterValue predicates.LabelMatcher
}

func (r *MachineDeploymentReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
Expand All @@ -129,8 +130,8 @@ type MachineHealthCheckReconciler struct {
Client client.Client
Tracker *remote.ClusterCacheTracker

// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string
// WatchFilterValue is the label selector value used to filter events prior to reconciliation.
WatchFilterValue predicates.LabelMatcher
}

func (r *MachineHealthCheckReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
Expand All @@ -150,8 +151,8 @@ type ClusterTopologyReconciler struct {

RuntimeClient runtimeclient.Client

// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string
// WatchFilterValue is the label selector value used to filter events prior to reconciliation.
WatchFilterValue predicates.LabelMatcher

// UnstructuredCachingClient provides a client that forces caching of unstructured objects,
// thus allowing to optimize reads for templates or provider specific objects in a managed topology.
Expand All @@ -177,7 +178,7 @@ type MachineDeploymentTopologyReconciler struct {
// APIReader is used to list MachineSets directly via the API server to avoid
// race conditions caused by an outdated cache.
APIReader client.Reader
WatchFilterValue string
WatchFilterValue predicates.LabelMatcher
}

func (r *MachineDeploymentTopologyReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
Expand All @@ -197,7 +198,7 @@ type MachineSetTopologyReconciler struct {
// APIReader is used to list MachineSets directly via the API server to avoid
// race conditions caused by an outdated cache.
APIReader client.Reader
WatchFilterValue string
WatchFilterValue predicates.LabelMatcher
}

func (r *MachineSetTopologyReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
Expand All @@ -216,8 +217,8 @@ type ClusterClassReconciler struct {
// RuntimeClient is a client for calling runtime extensions.
RuntimeClient runtimeclient.Client

// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string
// WatchFilterValue is the label selector value used to filter events prior to reconciliation.
WatchFilterValue predicates.LabelMatcher

// UnstructuredCachingClient provides a client that forces caching of unstructured objects,
// thus allowing to optimize reads for templates or provider specific objects.
Expand Down
4 changes: 2 additions & 2 deletions controllers/remote/cluster_cache_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ type ClusterCacheReconciler struct {
Client client.Client
Tracker *ClusterCacheTracker

// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string
// WatchFilterValue is the label selector value used to filter events prior to reconciliation.
WatchFilterValue predicates.LabelMatcher
}

func (r *ClusterCacheReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
Expand Down
5 changes: 3 additions & 2 deletions controlplane/kubeadm/controllers/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"sigs.k8s.io/cluster-api/controllers/remote"
kubeadmcontrolplanecontrollers "sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/controllers"
"sigs.k8s.io/cluster-api/util/predicates"
)

// KubeadmControlPlaneReconciler reconciles a KubeadmControlPlane object.
Expand All @@ -37,8 +38,8 @@ type KubeadmControlPlaneReconciler struct {
EtcdDialTimeout time.Duration
EtcdCallTimeout time.Duration

// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string
// WatchFilterValue is the label selector value used to filter events prior to reconciliation.
WatchFilterValue predicates.LabelMatcher
}

// SetupWithManager sets up the reconciler with the Manager.
Expand Down
4 changes: 2 additions & 2 deletions controlplane/kubeadm/internal/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ type KubeadmControlPlaneReconciler struct {
EtcdDialTimeout time.Duration
EtcdCallTimeout time.Duration

// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string
// WatchFilterValue is the label selector value used to filter events prior to reconciliation.
WatchFilterValue predicates.LabelMatcher

managementCluster internal.ManagementCluster
managementClusterUncached internal.ManagementCluster
Expand Down
27 changes: 23 additions & 4 deletions controlplane/kubeadm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import (
kcpwebhooks "sigs.k8s.io/cluster-api/controlplane/kubeadm/webhooks"
"sigs.k8s.io/cluster-api/feature"
"sigs.k8s.io/cluster-api/util/flags"
"sigs.k8s.io/cluster-api/util/predicates"
"sigs.k8s.io/cluster-api/version"
)

Expand Down Expand Up @@ -87,6 +88,7 @@ var (
leaderElectionRenewDeadline time.Duration
leaderElectionRetryPeriod time.Duration
watchFilterValue string
watchExpressionValue string
watchNamespace string
profilerAddress string
enableContentionProfiling bool
Expand Down Expand Up @@ -146,6 +148,9 @@ func InitFlags(fs *pflag.FlagSet) {
fs.StringVar(&watchFilterValue, "watch-filter", "",
fmt.Sprintf("Label value that the controller watches to reconcile cluster-api objects. Label key is always %s. If unspecified, the controller watches for all cluster-api objects.", clusterv1.WatchLabel))

fs.StringVar(&watchExpressionValue, "watch-expression", "",
"More generic version of watch-filter which allows to pass a label selector to filter reconcled objects. If unspecified, then the behavior defaults to the watch-filter argument")

fs.IntVar(&webhookPort, "webhook-port", 9443,
"Webhook Server port")

Expand Down Expand Up @@ -202,6 +207,12 @@ func main() {
req, _ := labels.NewRequirement(clusterv1.ClusterNameLabel, selection.Exists, nil)
clusterSecretCacheSelector := labels.NewSelector().Add(*req)

labelSelector, err := predicates.InitLabelMatcher(setupLog, watchExpressionValue)
if err != nil {
setupLog.Error(err, "unable to create expression matcher from provided expression %s", watchExpressionValue)
os.Exit(1)
}

ctrlOptions := ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsBindAddr,
Expand All @@ -214,8 +225,9 @@ func main() {
HealthProbeBindAddress: healthAddr,
PprofBindAddress: profilerAddress,
Cache: cache.Options{
Namespaces: watchNamespaces,
SyncPeriod: &syncPeriod,
Namespaces: watchNamespaces,
SyncPeriod: &syncPeriod,
DefaultLabelSelector: labelSelector.Selector(),
ByObject: map[client.Object]cache.ByObject{
// Note: Only Secrets with the cluster name label are cached.
// The default client of the manager won't use the cache for secrets at all (see Client.Cache.DisableFor).
Expand Down Expand Up @@ -311,10 +323,17 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
setupLog.Error(err, "unable to create cluster cache tracker")
os.Exit(1)
}

labelSelector, err := predicates.InitLabelMatcher(setupLog, predicates.ComposeFilterExpression(watchExpressionValue, watchFilterValue))
if err != nil {
setupLog.Error(err, "unable to create expression matcher from provided expression %s", watchExpressionValue)
os.Exit(1)
}

if err := (&remote.ClusterCacheReconciler{
Client: mgr.GetClient(),
Tracker: tracker,
WatchFilterValue: watchFilterValue,
WatchFilterValue: labelSelector,
}).SetupWithManager(ctx, mgr, concurrency(kubeadmControlPlaneConcurrency)); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ClusterCacheReconciler")
os.Exit(1)
Expand All @@ -324,7 +343,7 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
Client: mgr.GetClient(),
SecretCachingClient: secretCachingClient,
Tracker: tracker,
WatchFilterValue: watchFilterValue,
WatchFilterValue: labelSelector,
EtcdDialTimeout: etcdDialTimeout,
EtcdCallTimeout: etcdCallTimeout,
}).SetupWithManager(ctx, mgr, concurrency(kubeadmControlPlaneConcurrency)); err != nil {
Expand Down
9 changes: 5 additions & 4 deletions exp/addons/controllers/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@ import (

"sigs.k8s.io/cluster-api/controllers/remote"
clusterresourcesets "sigs.k8s.io/cluster-api/exp/addons/internal/controllers"
"sigs.k8s.io/cluster-api/util/predicates"
)

// ClusterResourceSetReconciler reconciles a ClusterResourceSet object.
type ClusterResourceSetReconciler struct {
Client client.Client
Tracker *remote.ClusterCacheTracker

// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string
// WatchFilterValue is the label selector value used to filter events prior to reconciliation.
WatchFilterValue predicates.LabelMatcher
}

func (r *ClusterResourceSetReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
Expand All @@ -48,8 +49,8 @@ func (r *ClusterResourceSetReconciler) SetupWithManager(ctx context.Context, mgr
type ClusterResourceSetBindingReconciler struct {
Client client.Client

// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string
// WatchFilterValue is the label selector value used to filter events prior to reconciliation.
WatchFilterValue predicates.LabelMatcher
}

func (r *ClusterResourceSetBindingReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ type ClusterResourceSetReconciler struct {
Client client.Client
Tracker *remote.ClusterCacheTracker

// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string
// WatchFilterValue is the label selector value used to filter events prior to reconciliation.
WatchFilterValue predicates.LabelMatcher
}

func (r *ClusterResourceSetReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ import (
type ClusterResourceSetBindingReconciler struct {
Client client.Client

// WatchFilterValue is the label value used to filter events prior to reconciliation.
WatchFilterValue string
// WatchFilterValue is the label selector value used to filter events prior to reconciliation.
WatchFilterValue predicates.LabelMatcher
}

func (r *ClusterResourceSetBindingReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error {
Expand Down
Loading