diff --git a/controllers/async_reconciler.go b/controllers/async_reconciler.go index 85238ba958a..808e9531fc5 100644 --- a/controllers/async_reconciler.go +++ b/controllers/async_reconciler.go @@ -1,31 +1,72 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package controllers import ( "context" + "reflect" + "sync" + + "github.com/pkg/errors" + kerrors "k8s.io/apimachinery/pkg/util/errors" "sigs.k8s.io/cluster-api-provider-azure/azure" ) +// asyncReconciler executes multiple reconcilers simultaneously and stores the output in results. type asyncReconciler struct { - results chan error + wg sync.WaitGroup + results []result } -func NewAsyncReconciler() asyncReconciler { +// result represents the outcome of a reconilitation operation along with some metadata. +type result struct { + err error + reconciler azure.Reconciler +} + +// newAsyncReconciler returns a new instance of asyncReconciler. +func newAsyncReconciler() asyncReconciler { return asyncReconciler{} } -func (ar *asyncReconciler) submit(ctx context.Context, task azure.Reconciler) { +// submit initiates a go routine for the reconciler and appends the result to results. +func (ar *asyncReconciler) submit(ctx context.Context, reconciler azure.Reconciler) { + ar.wg.Add(1) go func() { - ar.results <- task.Reconcile(ctx) - + err := reconciler.Reconcile(ctx) + ar.results = append(ar.results, result{err, reconciler}) + ar.wg.Done() }() } +// wait waits for all pending reconcilers to complete. func (ar *asyncReconciler) wait() error { + ar.wg.Wait() + + defer func() { + ar.results = []result{} + }() + var errs []error - for r := range ar.results { - if r != nil { - errs = append(errs, r) + for _, r := range ar.results { + if r.err != nil { + reconcilerImpl := reflect.TypeOf(r.reconciler) + errs = append(errs, errors.Wrapf(r.err, "failed to reconcile %s", reconcilerImpl.Name())) } } diff --git a/controllers/azuremachine_reconciler.go b/controllers/azuremachine_reconciler.go index a1cbf649562..43b35024a63 100644 --- a/controllers/azuremachine_reconciler.go +++ b/controllers/azuremachine_reconciler.go @@ -78,36 +78,26 @@ func (s *azureMachineService) Reconcile(ctx context.Context) error { ctx, span := tele.Tracer().Start(ctx, "controllers.azureMachineService.Reconcile") defer span.End() - if err := s.publicIPsSvc.Reconcile(ctx); err != nil { - return errors.Wrap(err, "failed to create public IP") + ar := newAsyncReconciler() + + ar.submit(ctx, s.publicIPsSvc) + ar.submit(ctx, s.inboundNatRulesSvc) + ar.submit(ctx, s.networkInterfacesSvc) + ar.submit(ctx, s.availabilitySetsSvc) + if err := ar.wait(); err != nil { + return err } - if err := s.inboundNatRulesSvc.Reconcile(ctx); err != nil { - return errors.Wrap(err, "failed to create inbound NAT rule") + ar.submit(ctx, s.virtualMachinesSvc) + if err := ar.wait(); err != nil { + return err } - if err := s.networkInterfacesSvc.Reconcile(ctx); err != nil { - return errors.Wrap(err, "failed to create network interface") - } - - if err := s.availabilitySetsSvc.Reconcile(ctx); err != nil { - return errors.Wrap(err, "failed to create availability set") - } - - if err := s.virtualMachinesSvc.Reconcile(ctx); err != nil { - return errors.Wrap(err, "failed to create virtual machine") - } - - if err := s.roleAssignmentsSvc.Reconcile(ctx); err != nil { - return errors.Wrap(err, "unable to create role assignment") - } - - if err := s.vmExtensionsSvc.Reconcile(ctx); err != nil { - return errors.Wrap(err, "unable to create vm extension") - } - - if err := s.tagsSvc.Reconcile(ctx); err != nil { - return errors.Wrap(err, "unable to update tags") + ar.submit(ctx, s.roleAssignmentsSvc) + ar.submit(ctx, s.vmExtensionsSvc) + ar.submit(ctx, s.tagsSvc) + if err := ar.wait(); err != nil { + return err } return nil