Skip to content

Commit

Permalink
Fix the race condition by confirming creation/deletion of machine obj…
Browse files Browse the repository at this point in the history
…ects

By waiting for the machine objects to be either created or deleted
before leaving Reconcile loop will prevent the race condition of stale
cache not reporting correctly the change to machine objects.
  • Loading branch information
k4leung4 committed Jun 8, 2018
1 parent 51b218c commit 23f8fbe
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 26 deletions.
74 changes: 49 additions & 25 deletions pkg/controller/machineset/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/golang/glog"
"github.com/kubernetes-incubator/apiserver-builder/pkg/builders"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
Expand All @@ -34,14 +35,14 @@ import (
clusterapiclientset "sigs.k8s.io/cluster-api/pkg/client/clientset_generated/clientset"
listers "sigs.k8s.io/cluster-api/pkg/client/listers_generated/cluster/v1alpha1"
"sigs.k8s.io/cluster-api/pkg/controller/sharedinformers"
"sigs.k8s.io/cluster-api/util"
)

// controllerKind contains the schema.GroupVersionKind for this controller type.
var controllerKind = v1alpha1.SchemeGroupVersion.WithKind("MachineSet")

// reconcileMutexSleepSec is the duration to sleep before releasing the mutex lock that is held for reconcilation.
// See https://github.com/kubernetes-sigs/cluster-api/issues/245
var reconcileMutexSleepSec = time.Second
// stateConfirmationTimeout is the amount of time allowed to wait for desired state.
var stateConfirmationTimeout = 30 * time.Second

// +controller:group=cluster,version=v1alpha1,kind=MachineSet,resource=machinesets
type MachineSetControllerImpl struct {
Expand All @@ -59,9 +60,6 @@ type MachineSetControllerImpl struct {
machineLister listers.MachineLister

informers *sharedinformers.SharedInformers

// msKeyMuxMap holds a mutex lock for reconcilation keyed on the machineset key
msKeyMuxMap map[string]sync.Mutex
}

// Init initializes the controller and is called by the generated code
Expand All @@ -85,8 +83,6 @@ func (c *MachineSetControllerImpl) Init(arguments sharedinformers.ControllerInit

c.informers = arguments.GetSharedInformers()

c.msKeyMuxMap = make(map[string]sync.Mutex)

c.waitForCacheSync()
}

Expand All @@ -110,20 +106,6 @@ func (c *MachineSetControllerImpl) waitForCacheSync() {
// note that the current state of the cluster is calculated based on the number of machines
// that are owned by the given machineSet (key).
func (c *MachineSetControllerImpl) Reconcile(machineSet *v1alpha1.MachineSet) error {
key, err := cache.MetaNamespaceKeyFunc(machineSet)
if err != nil {
glog.Errorf("Couldn't get key for object %+v.", machineSet)
return err
}

// Lock on Reconcile, this is to avoid the change of a machine object to cause the same machineset to Reconcile
// during the creation/deletion of machines, causing the incorrect number of machines to created/deleted
// TODO: Find a less heavy handed approach to avoid concurrent machineset reconcilation.
mux := c.msKeyMuxMap[key]
mux.Lock()
defer mux.Unlock()
defer time.Sleep(reconcileMutexSleepSec)

glog.V(4).Infof("Reconcile machineset %v", machineSet.Name)
allMachines, err := c.machineLister.Machines(machineSet.Namespace).List(labels.Everything())
if err != nil {
Expand Down Expand Up @@ -191,26 +173,29 @@ func (c *MachineSetControllerImpl) syncReplicas(ms *v1alpha1.MachineSet, machine
return fmt.Errorf("the Replicas field in Spec for machineset %v is nil, this should not be allowed.", ms.Name)
}
diff := len(machines) - int(*(ms.Spec.Replicas))

if diff < 0 {
diff *= -1
glog.Infof("Too few replicas for %v %s/%s, need %d, creating %d", controllerKind, ms.Namespace, ms.Name, *(ms.Spec.Replicas), diff)

var machineList []*v1alpha1.Machine
var errstrings []string
for i := 0; i < diff; i++ {
glog.Infof("creating machine %d of %d, ( spec.replicas(%d) > currentMachineCount(%d) )", i+1, diff, *(ms.Spec.Replicas), len(machines))
machine := c.createMachine(ms)
_, err := c.clusterAPIClient.ClusterV1alpha1().Machines(ms.Namespace).Create(machine)
newMachine, err := c.clusterAPIClient.ClusterV1alpha1().Machines(ms.Namespace).Create(machine)
if err != nil {
glog.Errorf("unable to create a machine = %s, due to %v", machine.Name, err)
errstrings = append(errstrings, err.Error())
continue
}
machineList = append(machineList, newMachine)
}

if len(errstrings) > 0 {
return fmt.Errorf(strings.Join(errstrings, "; "))
}

return nil
return c.waitForMachineCreation(machineList)
} else if diff > 0 {
glog.Infof("Too many replicas for %v %s/%s, need %d, deleting %d", controllerKind, ms.Namespace, ms.Name, *(ms.Spec.Replicas), diff)

Expand Down Expand Up @@ -241,6 +226,7 @@ func (c *MachineSetControllerImpl) syncReplicas(ms *v1alpha1.MachineSet, machine
}
default:
}
return c.waitForMachineDeletion(machinesToDelete)
}

return nil
Expand Down Expand Up @@ -342,3 +328,41 @@ func getMachinesToDelete(filteredMachines []*v1alpha1.Machine, diff int) []*v1al
// see: https://github.com/kubernetes/kube-deploy/issues/625
return filteredMachines[:diff]
}

func (c *MachineSetControllerImpl) waitForMachineCreation(machineList []*v1alpha1.Machine) error {
for _, machine := range machineList {
pollErr := util.Poll(100 * time.Millisecond, stateConfirmationTimeout, func() (bool, error) {
glog.Infof("CALLED")
_, err := c.Get(machine.Namespace, machine.Name)
if err == nil {
return true, nil
}
if errors.IsNotFound(err) {
return false, nil
}
return false, err
})
if pollErr != nil {
glog.Error(pollErr)
return fmt.Errorf("failed waiting for machine object to be created. %v", pollErr)
}
}
return nil
}

func (c *MachineSetControllerImpl) waitForMachineDeletion(machineList []*v1alpha1.Machine) error {
for _, machine := range machineList {
pollErr := util.Poll(100 * time.Millisecond, stateConfirmationTimeout, func() (bool, error) {
_, err := c.Get(machine.Namespace, machine.Name)
if errors.IsNotFound(err) {
return true, nil
}
return false, err
})
if pollErr != nil {
glog.Error(pollErr)
return fmt.Errorf("failed waiting for machine object to be deleted. %v", pollErr)
}
}
return nil
}
1 change: 0 additions & 1 deletion pkg/controller/machineset/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ func TestMachineSetControllerReconcileHandler(t *testing.T) {
},
}

reconcileMutexSleepSec = 0
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
// setup the test scenario
Expand Down

0 comments on commit 23f8fbe

Please sign in to comment.