Skip to content

Commit

Permalink
Add default cache filter based on label selector
Browse files Browse the repository at this point in the history
  • Loading branch information
Danil-Grigorev committed Jul 14, 2023
1 parent 0930b48 commit a65bfa2
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 64 deletions.
32 changes: 20 additions & 12 deletions bootstrap/kubeadm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func InitFlags(fs *pflag.FlagSet) {
fmt.Sprintf("Label value that the controller watches to reconcile cluster-api objects. Label key is always %s. If unspecified, the controller watches for all cluster-api objects.", clusterv1.WatchLabel))

fs.StringVar(&watchExpressionValue, "watch-expression", "",
"More generic version of watch-filter which allows to pass a CEL expression evaluating to boolean result to filter reconcled objects. If unspecified, then the behavior defaults to watch-filter argument")
"More generic version of watch-filter which allows to pass a label selector to filter reconcled objects. If unspecified, then the behavior defaults to the watch-filter argument")

fs.IntVar(&webhookPort, "webhook-port", 9443,
"Webhook Server port")
Expand Down Expand Up @@ -203,6 +203,12 @@ func main() {
req, _ := labels.NewRequirement(clusterv1.ClusterNameLabel, selection.Exists, nil)
clusterSecretCacheSelector := labels.NewSelector().Add(*req)

labelSelector, err := predicates.InitLabelMatcher(setupLog, predicates.ComposeFilterExpression(watchExpressionValue, watchFilterValue))
if err != nil {
setupLog.Error(err, "unable to create expression matcher from provided expression %s", watchExpressionValue)
os.Exit(1)
}

ctrlOptions := ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsBindAddr,
Expand All @@ -215,8 +221,9 @@ func main() {
HealthProbeBindAddress: healthAddr,
PprofBindAddress: profilerAddress,
Cache: cache.Options{
Namespaces: watchNamespaces,
SyncPeriod: &syncPeriod,
Namespaces: watchNamespaces,
SyncPeriod: &syncPeriod,
DefaultLabelSelector: labelSelector.Selector(),
ByObject: map[client.Object]cache.ByObject{
// Note: Only Secrets with the cluster name label are cached.
// The default client of the manager won't use the cache for secrets at all (see Client.Cache.DisableFor).
Expand Down Expand Up @@ -304,26 +311,27 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
os.Exit(1)
}

if err := predicates.InitExpressionMatcher(setupLog, watchExpressionValue); err != nil {
labelSelector, err := predicates.InitLabelMatcher(setupLog, predicates.ComposeFilterExpression(watchExpressionValue, watchFilterValue))
if err != nil {
setupLog.Error(err, "unable to create expression matcher from provided expression %s", watchExpressionValue)
os.Exit(1)
}

if err := (&remote.ClusterCacheReconciler{
Client: mgr.GetClient(),
Tracker: tracker,
WatchFilterValue: watchFilterValue,
Client: mgr.GetClient(),
Tracker: tracker,
WatchFilterPredicate: labelSelector,
}).SetupWithManager(ctx, mgr, concurrency(clusterConcurrency)); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ClusterCacheReconciler")
os.Exit(1)
}

if err := (&kubeadmbootstrapcontrollers.KubeadmConfigReconciler{
Client: mgr.GetClient(),
SecretCachingClient: secretCachingClient,
Tracker: tracker,
WatchFilterValue: watchFilterValue,
TokenTTL: tokenTTL,
Client: mgr.GetClient(),
SecretCachingClient: secretCachingClient,
Tracker: tracker,
WatchFilterPredicate: labelSelector,
TokenTTL: tokenTTL,
}).SetupWithManager(ctx, mgr, concurrency(kubeadmConfigConcurrency)); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "KubeadmConfig")
os.Exit(1)
Expand Down
34 changes: 21 additions & 13 deletions controlplane/kubeadm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func InitFlags(fs *pflag.FlagSet) {
fmt.Sprintf("Label value that the controller watches to reconcile cluster-api objects. Label key is always %s. If unspecified, the controller watches for all cluster-api objects.", clusterv1.WatchLabel))

fs.StringVar(&watchExpressionValue, "watch-expression", "",
"More generic version of watch-filter which allows to pass a CEL expression evaluating to boolean result to filter reconcled objects. If unspecified, then the behavior defaults to watch-filter argument")
"More generic version of watch-filter which allows to pass a label selector to filter reconcled objects. If unspecified, then the behavior defaults to the watch-filter argument")

fs.IntVar(&webhookPort, "webhook-port", 9443,
"Webhook Server port")
Expand Down Expand Up @@ -207,6 +207,12 @@ func main() {
req, _ := labels.NewRequirement(clusterv1.ClusterNameLabel, selection.Exists, nil)
clusterSecretCacheSelector := labels.NewSelector().Add(*req)

labelSelector, err := predicates.InitLabelMatcher(setupLog, predicates.ComposeFilterExpression(watchExpressionValue, watchFilterValue))
if err != nil {
setupLog.Error(err, "unable to create expression matcher from provided expression %s", watchExpressionValue)
os.Exit(1)
}

ctrlOptions := ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsBindAddr,
Expand All @@ -219,8 +225,9 @@ func main() {
HealthProbeBindAddress: healthAddr,
PprofBindAddress: profilerAddress,
Cache: cache.Options{
Namespaces: watchNamespaces,
SyncPeriod: &syncPeriod,
Namespaces: watchNamespaces,
SyncPeriod: &syncPeriod,
DefaultLabelSelector: labelSelector.Selector(),
ByObject: map[client.Object]cache.ByObject{
// Note: Only Secrets with the cluster name label are cached.
// The default client of the manager won't use the cache for secrets at all (see Client.Cache.DisableFor).
Expand Down Expand Up @@ -317,27 +324,28 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
os.Exit(1)
}

if err := predicates.InitExpressionMatcher(setupLog, watchExpressionValue); err != nil {
labelSelector, err := predicates.InitLabelMatcher(setupLog, predicates.ComposeFilterExpression(watchExpressionValue, watchFilterValue))
if err != nil {
setupLog.Error(err, "unable to create expression matcher from provided expression %s", watchExpressionValue)
os.Exit(1)
}

if err := (&remote.ClusterCacheReconciler{
Client: mgr.GetClient(),
Tracker: tracker,
WatchFilterValue: watchFilterValue,
Client: mgr.GetClient(),
Tracker: tracker,
WatchFilterPredicate: labelSelector,
}).SetupWithManager(ctx, mgr, concurrency(kubeadmControlPlaneConcurrency)); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ClusterCacheReconciler")
os.Exit(1)
}

if err := (&kubeadmcontrolplanecontrollers.KubeadmControlPlaneReconciler{
Client: mgr.GetClient(),
SecretCachingClient: secretCachingClient,
Tracker: tracker,
WatchFilterValue: watchFilterValue,
EtcdDialTimeout: etcdDialTimeout,
EtcdCallTimeout: etcdCallTimeout,
Client: mgr.GetClient(),
SecretCachingClient: secretCachingClient,
Tracker: tracker,
WatchFilterPredicate: labelSelector,
EtcdDialTimeout: etcdDialTimeout,
EtcdCallTimeout: etcdCallTimeout,
}).SetupWithManager(ctx, mgr, concurrency(kubeadmControlPlaneConcurrency)); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "KubeadmControlPlane")
os.Exit(1)
Expand Down
78 changes: 43 additions & 35 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func InitFlags(fs *pflag.FlagSet) {
fmt.Sprintf("Label value that the controller watches to reconcile cluster-api objects. Label key is always %s. If unspecified, the controller watches for all cluster-api objects.", clusterv1.WatchLabel))

fs.StringVar(&watchExpressionValue, "watch-expression", "",
"More generic version of watch-filter which allows to pass a CEL expression evaluating to boolean result to filter reconcled objects. If unspecified, then the behavior defaults to watch-filter argument")
"More generic version of watch-filter which allows to pass a label selector to filter reconcled objects. If unspecified, then the behavior defaults to the watch-filter argument")

fs.StringVar(&profilerAddress, "profiler-address", "",
"Bind address to expose the pprof profiler (e.g. localhost:6060)")
Expand Down Expand Up @@ -281,6 +281,12 @@ func main() {
req, _ := labels.NewRequirement(clusterv1.ClusterNameLabel, selection.Exists, nil)
clusterSecretCacheSelector := labels.NewSelector().Add(*req)

labelSelector, err := predicates.InitLabelMatcher(setupLog, predicates.ComposeFilterExpression(watchExpressionValue, watchFilterValue))
if err != nil {
setupLog.Error(err, "unable to create expression matcher from provided expression %s", watchExpressionValue)
os.Exit(1)
}

ctrlOptions := ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsBindAddr,
Expand All @@ -293,8 +299,9 @@ func main() {
HealthProbeBindAddress: healthAddr,
PprofBindAddress: profilerAddress,
Cache: cache.Options{
Namespaces: watchNamespaces,
SyncPeriod: &syncPeriod,
Namespaces: watchNamespaces,
SyncPeriod: &syncPeriod,
DefaultLabelSelector: labelSelector.Selector(),
ByObject: map[client.Object]cache.ByObject{
// Note: Only Secrets with the cluster name label are cached.
// The default client of the manager won't use the cache for secrets at all (see Client.Cache.DisableFor).
Expand Down Expand Up @@ -391,15 +398,16 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
os.Exit(1)
}

if err := predicates.InitExpressionMatcher(setupLog, watchExpressionValue); err != nil {
labelSelector, err := predicates.InitLabelMatcher(setupLog, predicates.ComposeFilterExpression(watchExpressionValue, watchFilterValue))
if err != nil {
setupLog.Error(err, "unable to create expression matcher from provided expression %s", watchExpressionValue)
os.Exit(1)
}

if err := (&remote.ClusterCacheReconciler{
Client: mgr.GetClient(),
Tracker: tracker,
WatchFilterValue: watchFilterValue,
Client: mgr.GetClient(),
Tracker: tracker,
WatchFilterPredicate: labelSelector,
}).SetupWithManager(ctx, mgr, concurrency(clusterConcurrency)); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ClusterCacheReconciler")
os.Exit(1)
Expand Down Expand Up @@ -433,7 +441,7 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
APIReader: mgr.GetAPIReader(),
RuntimeClient: runtimeClient,
UnstructuredCachingClient: unstructuredCachingClient,
WatchFilterValue: watchFilterValue,
WatchFilterPredicate: labelSelector,
}).SetupWithManager(ctx, mgr, concurrency(clusterClassConcurrency)); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ClusterClass")
os.Exit(1)
Expand All @@ -444,25 +452,25 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
APIReader: mgr.GetAPIReader(),
RuntimeClient: runtimeClient,
UnstructuredCachingClient: unstructuredCachingClient,
WatchFilterValue: watchFilterValue,
WatchFilterPredicate: labelSelector,
}).SetupWithManager(ctx, mgr, concurrency(clusterTopologyConcurrency)); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ClusterTopology")
os.Exit(1)
}

if err := (&controllers.MachineDeploymentTopologyReconciler{
Client: mgr.GetClient(),
APIReader: mgr.GetAPIReader(),
WatchFilterValue: watchFilterValue,
Client: mgr.GetClient(),
APIReader: mgr.GetAPIReader(),
WatchFilterPredicate: labelSelector,
}).SetupWithManager(ctx, mgr, controller.Options{}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "MachineDeploymentTopology")
os.Exit(1)
}

if err := (&controllers.MachineSetTopologyReconciler{
Client: mgr.GetClient(),
APIReader: mgr.GetAPIReader(),
WatchFilterValue: watchFilterValue,
Client: mgr.GetClient(),
APIReader: mgr.GetAPIReader(),
WatchFilterPredicate: labelSelector,
}).SetupWithManager(ctx, mgr, controller.Options{}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "MachineSetTopology")
os.Exit(1)
Expand All @@ -471,10 +479,10 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {

if feature.Gates.Enabled(feature.RuntimeSDK) {
if err = (&runtimecontrollers.ExtensionConfigReconciler{
Client: mgr.GetClient(),
APIReader: mgr.GetAPIReader(),
RuntimeClient: runtimeClient,
WatchFilterValue: watchFilterValue,
Client: mgr.GetClient(),
APIReader: mgr.GetAPIReader(),
RuntimeClient: runtimeClient,
WatchFilterPredicate: labelSelector,
}).SetupWithManager(ctx, mgr, concurrency(extensionConfigConcurrency)); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ExtensionConfig")
os.Exit(1)
Expand All @@ -485,7 +493,7 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
Client: mgr.GetClient(),
UnstructuredCachingClient: unstructuredCachingClient,
APIReader: mgr.GetAPIReader(),
WatchFilterValue: watchFilterValue,
WatchFilterPredicate: labelSelector,
}).SetupWithManager(ctx, mgr, concurrency(clusterConcurrency)); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Cluster")
os.Exit(1)
Expand All @@ -495,7 +503,7 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
UnstructuredCachingClient: unstructuredCachingClient,
APIReader: mgr.GetAPIReader(),
Tracker: tracker,
WatchFilterValue: watchFilterValue,
WatchFilterPredicate: labelSelector,
NodeDrainClientTimeout: nodeDrainClientTimeout,
}).SetupWithManager(ctx, mgr, concurrency(machineConcurrency)); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Machine")
Expand All @@ -506,7 +514,7 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
UnstructuredCachingClient: unstructuredCachingClient,
APIReader: mgr.GetAPIReader(),
Tracker: tracker,
WatchFilterValue: watchFilterValue,
WatchFilterPredicate: labelSelector,
}).SetupWithManager(ctx, mgr, concurrency(machineSetConcurrency)); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "MachineSet")
os.Exit(1)
Expand All @@ -515,18 +523,18 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {
Client: mgr.GetClient(),
UnstructuredCachingClient: unstructuredCachingClient,
APIReader: mgr.GetAPIReader(),
WatchFilterValue: watchFilterValue,
WatchFilterPredicate: labelSelector,
}).SetupWithManager(ctx, mgr, concurrency(machineDeploymentConcurrency)); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "MachineDeployment")
os.Exit(1)
}

if feature.Gates.Enabled(feature.MachinePool) {
if err := (&expcontrollers.MachinePoolReconciler{
Client: mgr.GetClient(),
APIReader: mgr.GetAPIReader(),
Tracker: tracker,
WatchFilterValue: watchFilterValue,
Client: mgr.GetClient(),
APIReader: mgr.GetAPIReader(),
Tracker: tracker,
WatchFilterPredicate: labelSelector,
}).SetupWithManager(ctx, mgr, concurrency(machinePoolConcurrency)); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "MachinePool")
os.Exit(1)
Expand All @@ -535,26 +543,26 @@ func setupReconcilers(ctx context.Context, mgr ctrl.Manager) {

if feature.Gates.Enabled(feature.ClusterResourceSet) {
if err := (&addonscontrollers.ClusterResourceSetReconciler{
Client: mgr.GetClient(),
Tracker: tracker,
WatchFilterValue: watchFilterValue,
Client: mgr.GetClient(),
Tracker: tracker,
WatchFilterPredicate: labelSelector,
}).SetupWithManager(ctx, mgr, concurrency(clusterResourceSetConcurrency)); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ClusterResourceSet")
os.Exit(1)
}
if err := (&addonscontrollers.ClusterResourceSetBindingReconciler{
Client: mgr.GetClient(),
WatchFilterValue: watchFilterValue,
Client: mgr.GetClient(),
WatchFilterPredicate: labelSelector,
}).SetupWithManager(ctx, mgr, concurrency(clusterResourceSetConcurrency)); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ClusterResourceSetBinding")
os.Exit(1)
}
}

if err := (&controllers.MachineHealthCheckReconciler{
Client: mgr.GetClient(),
Tracker: tracker,
WatchFilterValue: watchFilterValue,
Client: mgr.GetClient(),
Tracker: tracker,
WatchFilterPredicate: labelSelector,
}).SetupWithManager(ctx, mgr, concurrency(machineHealthCheckConcurrency)); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "MachineHealthCheck")
os.Exit(1)
Expand Down
11 changes: 9 additions & 2 deletions test/infrastructure/docker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,12 @@ func main() {
req, _ := labels.NewRequirement(clusterv1.ClusterNameLabel, selection.Exists, nil)
clusterSecretCacheSelector := labels.NewSelector().Add(*req)

labelSelector, err := predicates.InitLabelMatcher(setupLog, predicates.ComposeFilterExpression(watchExpressionValue, watchFilterValue))
if err != nil {
setupLog.Error(err, "unable to create expression matcher from provided expression %s", watchExpressionValue)
os.Exit(1)
}

ctrlOptions := ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsBindAddr,
Expand All @@ -217,8 +223,9 @@ func main() {
HealthProbeBindAddress: healthAddr,
PprofBindAddress: profilerAddress,
Cache: cache.Options{
Namespaces: watchNamespaces,
SyncPeriod: &syncPeriod,
Namespaces: watchNamespaces,
SyncPeriod: &syncPeriod,
DefaultLabelSelector: labelSelector.Selector(),
ByObject: map[client.Object]cache.ByObject{
// Note: Only Secrets with the cluster name label are cached.
// The default client of the manager won't use the cache for secrets at all (see Client.Cache.DisableFor).
Expand Down
11 changes: 9 additions & 2 deletions test/infrastructure/inmemory/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,12 @@ func main() {
goruntime.SetBlockProfileRate(1)
}

labelSelector, err := predicates.InitLabelMatcher(setupLog, predicates.ComposeFilterExpression(watchExpressionValue, watchFilterValue))
if err != nil {
setupLog.Error(err, "unable to create expression matcher from provided expression %s", watchExpressionValue)
os.Exit(1)
}

ctrlOptions := ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsBindAddr,
Expand All @@ -206,8 +212,9 @@ func main() {
HealthProbeBindAddress: healthAddr,
PprofBindAddress: profilerAddress,
Cache: cache.Options{
Namespaces: watchNamespaces,
SyncPeriod: &syncPeriod,
DefaultLabelSelector: labelSelector.Selector(),
Namespaces: watchNamespaces,
SyncPeriod: &syncPeriod,
},
Client: client.Options{
Cache: &client.CacheOptions{
Expand Down

0 comments on commit a65bfa2

Please sign in to comment.