Skip to content

Commit

Permalink
feat: add controller concurrency
Browse files Browse the repository at this point in the history
The controller is now working with 4 concurrent workers by default.
This value is configurable through the `--concurrent` flag.

Signed-off-by: Max Jonas Werner <[email protected]>
  • Loading branch information
Max Jonas Werner committed Jun 24, 2021
1 parent aabdcc9 commit 283a3d9
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 4 deletions.
12 changes: 10 additions & 2 deletions controllers/imageupdateautomation_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -99,6 +100,10 @@ type ImageUpdateAutomationReconciler struct {
MetricsRecorder *metrics.Recorder
}

type ImageUpdateAutomationReconcilerOptions struct {
MaxConcurrentReconciles int
}

// +kubebuilder:rbac:groups=image.toolkit.fluxcd.io,resources=imageupdateautomations,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=image.toolkit.fluxcd.io,resources=imageupdateautomations/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories,verbs=get;list;watch
Expand Down Expand Up @@ -356,7 +361,7 @@ func (r *ImageUpdateAutomationReconciler) Reconcile(ctx context.Context, req ctr
return ctrl.Result{RequeueAfter: interval}, nil
}

func (r *ImageUpdateAutomationReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *ImageUpdateAutomationReconciler) SetupWithManager(mgr ctrl.Manager, opts ImageUpdateAutomationReconcilerOptions) error {
ctx := context.Background()
// Index the git repository object that each I-U-A refers to
if err := mgr.GetFieldIndexer().IndexField(ctx, &imagev1.ImageUpdateAutomation{}, repoRefKey, func(obj client.Object) []string {
Expand All @@ -372,6 +377,9 @@ func (r *ImageUpdateAutomationReconciler) SetupWithManager(mgr ctrl.Manager) err
predicate.Or(predicate.GenerationChangedPredicate{}, predicates.ReconcileRequestedPredicate{}))).
Watches(&source.Kind{Type: &sourcev1.GitRepository{}}, handler.EnqueueRequestsFromMapFunc(r.automationsForGitRepo)).
Watches(&source.Kind{Type: &imagev1_reflect.ImagePolicy{}}, handler.EnqueueRequestsFromMapFunc(r.automationsForImagePolicy)).
WithOptions(controller.Options{
MaxConcurrentReconciles: opts.MaxConcurrentReconciles,
}).
Complete(r)
}

Expand Down Expand Up @@ -558,7 +566,7 @@ func commitChangedManifests(tracelog logr.Logger, repo *gogit.Repository, absRep
// modified. There's no circumstance in which we want to commit a
// change to a broken symlink: so, detect and skip those.
var changed bool
for file, _ := range status {
for file := range status {
abspath := filepath.Join(absRepoPath, file)
info, err := os.Lstat(abspath)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ var _ = BeforeSuite(func(done Done) {
Client: k8sManager.GetClient(),
Scheme: scheme.Scheme,
}
Expect(imageAutoReconciler.SetupWithManager(k8sManager)).To(Succeed())
Expect(imageAutoReconciler.SetupWithManager(k8sManager, ImageUpdateAutomationReconcilerOptions{})).To(Succeed())

go func() {
defer GinkgoRecover()
Expand Down
6 changes: 5 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,15 @@ func main() {
logOptions logger.Options
leaderElectionOptions leaderelection.Options
watchAllNamespaces bool
concurrent int
)

flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&eventsAddr, "events-addr", "", "The address of the events receiver.")
flag.StringVar(&healthAddr, "health-addr", ":9440", "The address the health endpoint binds to.")
flag.BoolVar(&watchAllNamespaces, "watch-all-namespaces", true,
"Watch for custom resources in all namespaces, if set to false it will only watch the runtime namespace.")
flag.IntVar(&concurrent, "concurrent", 4, "The number of concurrent resource reconciles.")
clientOptions.BindFlags(flag.CommandLine)
logOptions.BindFlags(flag.CommandLine)
leaderElectionOptions.BindFlags(flag.CommandLine)
Expand Down Expand Up @@ -128,7 +130,9 @@ func main() {
EventRecorder: mgr.GetEventRecorderFor(controllerName),
ExternalEventRecorder: eventRecorder,
MetricsRecorder: metricsRecorder,
}).SetupWithManager(mgr); err != nil {
}).SetupWithManager(mgr, controllers.ImageUpdateAutomationReconcilerOptions{
MaxConcurrentReconciles: concurrent,
}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ImageUpdateAutomation")
os.Exit(1)
}
Expand Down

0 comments on commit 283a3d9

Please sign in to comment.