diff --git a/pkg/controller/machineset/controller.go b/pkg/controller/machineset/controller.go index 9841a64460cb..415a2fda36b0 100644 --- a/pkg/controller/machineset/controller.go +++ b/pkg/controller/machineset/controller.go @@ -25,6 +25,7 @@ import ( "github.com/golang/glog" "github.com/kubernetes-incubator/apiserver-builder/pkg/builders" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes" @@ -34,14 +35,14 @@ import ( clusterapiclientset "sigs.k8s.io/cluster-api/pkg/client/clientset_generated/clientset" listers "sigs.k8s.io/cluster-api/pkg/client/listers_generated/cluster/v1alpha1" "sigs.k8s.io/cluster-api/pkg/controller/sharedinformers" + "sigs.k8s.io/cluster-api/util" ) // controllerKind contains the schema.GroupVersionKind for this controller type. var controllerKind = v1alpha1.SchemeGroupVersion.WithKind("MachineSet") -// reconcileMutexSleepSec is the duration to sleep before releasing the mutex lock that is held for reconcilation. -// See https://github.com/kubernetes-sigs/cluster-api/issues/245 -var reconcileMutexSleepSec = time.Second +// stateConfirmationTimeout is the amount of time allowed to wait for desired state. +var stateConfirmationTimeout = 30 * time.Second // +controller:group=cluster,version=v1alpha1,kind=MachineSet,resource=machinesets type MachineSetControllerImpl struct { @@ -59,9 +60,6 @@ type MachineSetControllerImpl struct { machineLister listers.MachineLister informers *sharedinformers.SharedInformers - - // msKeyMuxMap holds a mutex lock for reconcilation keyed on the machineset key - msKeyMuxMap map[string]sync.Mutex } // Init initializes the controller and is called by the generated code @@ -85,8 +83,6 @@ func (c *MachineSetControllerImpl) Init(arguments sharedinformers.ControllerInit c.informers = arguments.GetSharedInformers() - c.msKeyMuxMap = make(map[string]sync.Mutex) - c.waitForCacheSync() } @@ -110,20 +106,6 @@ func (c *MachineSetControllerImpl) waitForCacheSync() { // note that the current state of the cluster is calculated based on the number of machines // that are owned by the given machineSet (key). func (c *MachineSetControllerImpl) Reconcile(machineSet *v1alpha1.MachineSet) error { - key, err := cache.MetaNamespaceKeyFunc(machineSet) - if err != nil { - glog.Errorf("Couldn't get key for object %+v.", machineSet) - return err - } - - // Lock on Reconcile, this is to avoid the change of a machine object to cause the same machineset to Reconcile - // during the creation/deletion of machines, causing the incorrect number of machines to created/deleted - // TODO: Find a less heavy handed approach to avoid concurrent machineset reconcilation. - mux := c.msKeyMuxMap[key] - mux.Lock() - defer mux.Unlock() - defer time.Sleep(reconcileMutexSleepSec) - glog.V(4).Infof("Reconcile machineset %v", machineSet.Name) allMachines, err := c.machineLister.Machines(machineSet.Namespace).List(labels.Everything()) if err != nil { @@ -191,26 +173,29 @@ func (c *MachineSetControllerImpl) syncReplicas(ms *v1alpha1.MachineSet, machine return fmt.Errorf("the Replicas field in Spec for machineset %v is nil, this should not be allowed.", ms.Name) } diff := len(machines) - int(*(ms.Spec.Replicas)) + if diff < 0 { diff *= -1 glog.Infof("Too few replicas for %v %s/%s, need %d, creating %d", controllerKind, ms.Namespace, ms.Name, *(ms.Spec.Replicas), diff) + var machineList []*v1alpha1.Machine var errstrings []string for i := 0; i < diff; i++ { glog.Infof("creating machine %d of %d, ( spec.replicas(%d) > currentMachineCount(%d) )", i+1, diff, *(ms.Spec.Replicas), len(machines)) machine := c.createMachine(ms) - _, err := c.clusterAPIClient.ClusterV1alpha1().Machines(ms.Namespace).Create(machine) + newMachine, err := c.clusterAPIClient.ClusterV1alpha1().Machines(ms.Namespace).Create(machine) if err != nil { glog.Errorf("unable to create a machine = %s, due to %v", machine.Name, err) errstrings = append(errstrings, err.Error()) + continue } + machineList = append(machineList, newMachine) } if len(errstrings) > 0 { return fmt.Errorf(strings.Join(errstrings, "; ")) } - - return nil + return c.waitForMachineCreation(machineList) } else if diff > 0 { glog.Infof("Too many replicas for %v %s/%s, need %d, deleting %d", controllerKind, ms.Namespace, ms.Name, *(ms.Spec.Replicas), diff) @@ -241,6 +226,7 @@ func (c *MachineSetControllerImpl) syncReplicas(ms *v1alpha1.MachineSet, machine } default: } + return c.waitForMachineDeletion(machinesToDelete) } return nil @@ -342,3 +328,41 @@ func getMachinesToDelete(filteredMachines []*v1alpha1.Machine, diff int) []*v1al // see: https://github.com/kubernetes/kube-deploy/issues/625 return filteredMachines[:diff] } + +func (c *MachineSetControllerImpl) waitForMachineCreation(machineList []*v1alpha1.Machine) error { + for _, machine := range machineList { + pollErr := util.Poll(100 * time.Millisecond, stateConfirmationTimeout, func() (bool, error) { + glog.Infof("CALLED") + _, err := c.Get(machine.Namespace, machine.Name) + if err == nil { + return true, nil + } + if errors.IsNotFound(err) { + return false, nil + } + return false, err + }) + if pollErr != nil { + glog.Error(pollErr) + return fmt.Errorf("failed waiting for machine object to be created. %v", pollErr) + } + } + return nil +} + +func (c *MachineSetControllerImpl) waitForMachineDeletion(machineList []*v1alpha1.Machine) error { + for _, machine := range machineList { + pollErr := util.Poll(100 * time.Millisecond, stateConfirmationTimeout, func() (bool, error) { + _, err := c.Get(machine.Namespace, machine.Name) + if errors.IsNotFound(err) { + return true, nil + } + return false, err + }) + if pollErr != nil { + glog.Error(pollErr) + return fmt.Errorf("failed waiting for machine object to be deleted. %v", pollErr) + } + } + return nil +} diff --git a/pkg/controller/machineset/reconcile_test.go b/pkg/controller/machineset/reconcile_test.go index af8ad76326b5..46a5f60b07e7 100644 --- a/pkg/controller/machineset/reconcile_test.go +++ b/pkg/controller/machineset/reconcile_test.go @@ -132,7 +132,6 @@ func TestMachineSetControllerReconcileHandler(t *testing.T) { }, } - reconcileMutexSleepSec = 0 for _, test := range tests { t.Run(test.name, func(t *testing.T) { // setup the test scenario