From 367b3832e4eaa8aa2f1fe4f5dfca5b0bca63896b Mon Sep 17 00:00:00 2001 From: Kenny Leung Date: Mon, 11 Jun 2018 09:56:34 -0700 Subject: [PATCH] Fix the race condition by confirming creation/deletion of machine objects (#316) By waiting for the machine objects to be either created or deleted before leaving Reconcile loop will prevent the race condition of stale cache not reporting correctly the change to machine objects. --- pkg/controller/machineset/controller.go | 78 ++++++++---- pkg/controller/machineset/reconcile_test.go | 124 +++++++++++++++++++- 2 files changed, 172 insertions(+), 30 deletions(-) diff --git a/pkg/controller/machineset/controller.go b/pkg/controller/machineset/controller.go index 9841a64460cb..03910d5c907e 100644 --- a/pkg/controller/machineset/controller.go +++ b/pkg/controller/machineset/controller.go @@ -25,6 +25,7 @@ import ( "github.com/golang/glog" "github.com/kubernetes-incubator/apiserver-builder/pkg/builders" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes" @@ -34,14 +35,18 @@ import ( clusterapiclientset "sigs.k8s.io/cluster-api/pkg/client/clientset_generated/clientset" listers "sigs.k8s.io/cluster-api/pkg/client/listers_generated/cluster/v1alpha1" "sigs.k8s.io/cluster-api/pkg/controller/sharedinformers" + "sigs.k8s.io/cluster-api/util" ) // controllerKind contains the schema.GroupVersionKind for this controller type. var controllerKind = v1alpha1.SchemeGroupVersion.WithKind("MachineSet") -// reconcileMutexSleepSec is the duration to sleep before releasing the mutex lock that is held for reconcilation. -// See https://github.com/kubernetes-sigs/cluster-api/issues/245 -var reconcileMutexSleepSec = time.Second +// stateConfirmationTimeout is the amount of time allowed to wait for desired state. +var stateConfirmationTimeout = 10 * time.Second + +// stateConfirmationInterval is the amount of time between polling for the desired state. +// The polling is against a local memory cache. +var stateConfirmationInterval = 100 * time.Millisecond // +controller:group=cluster,version=v1alpha1,kind=MachineSet,resource=machinesets type MachineSetControllerImpl struct { @@ -59,9 +64,6 @@ type MachineSetControllerImpl struct { machineLister listers.MachineLister informers *sharedinformers.SharedInformers - - // msKeyMuxMap holds a mutex lock for reconcilation keyed on the machineset key - msKeyMuxMap map[string]sync.Mutex } // Init initializes the controller and is called by the generated code @@ -85,8 +87,6 @@ func (c *MachineSetControllerImpl) Init(arguments sharedinformers.ControllerInit c.informers = arguments.GetSharedInformers() - c.msKeyMuxMap = make(map[string]sync.Mutex) - c.waitForCacheSync() } @@ -110,20 +110,6 @@ func (c *MachineSetControllerImpl) waitForCacheSync() { // note that the current state of the cluster is calculated based on the number of machines // that are owned by the given machineSet (key). func (c *MachineSetControllerImpl) Reconcile(machineSet *v1alpha1.MachineSet) error { - key, err := cache.MetaNamespaceKeyFunc(machineSet) - if err != nil { - glog.Errorf("Couldn't get key for object %+v.", machineSet) - return err - } - - // Lock on Reconcile, this is to avoid the change of a machine object to cause the same machineset to Reconcile - // during the creation/deletion of machines, causing the incorrect number of machines to created/deleted - // TODO: Find a less heavy handed approach to avoid concurrent machineset reconcilation. - mux := c.msKeyMuxMap[key] - mux.Lock() - defer mux.Unlock() - defer time.Sleep(reconcileMutexSleepSec) - glog.V(4).Infof("Reconcile machineset %v", machineSet.Name) allMachines, err := c.machineLister.Machines(machineSet.Namespace).List(labels.Everything()) if err != nil { @@ -191,26 +177,29 @@ func (c *MachineSetControllerImpl) syncReplicas(ms *v1alpha1.MachineSet, machine return fmt.Errorf("the Replicas field in Spec for machineset %v is nil, this should not be allowed.", ms.Name) } diff := len(machines) - int(*(ms.Spec.Replicas)) + if diff < 0 { diff *= -1 glog.Infof("Too few replicas for %v %s/%s, need %d, creating %d", controllerKind, ms.Namespace, ms.Name, *(ms.Spec.Replicas), diff) + var machineList []*v1alpha1.Machine var errstrings []string for i := 0; i < diff; i++ { glog.Infof("creating machine %d of %d, ( spec.replicas(%d) > currentMachineCount(%d) )", i+1, diff, *(ms.Spec.Replicas), len(machines)) machine := c.createMachine(ms) - _, err := c.clusterAPIClient.ClusterV1alpha1().Machines(ms.Namespace).Create(machine) + newMachine, err := c.clusterAPIClient.ClusterV1alpha1().Machines(ms.Namespace).Create(machine) if err != nil { glog.Errorf("unable to create a machine = %s, due to %v", machine.Name, err) errstrings = append(errstrings, err.Error()) + continue } + machineList = append(machineList, newMachine) } if len(errstrings) > 0 { return fmt.Errorf(strings.Join(errstrings, "; ")) } - - return nil + return c.waitForMachineCreation(machineList) } else if diff > 0 { glog.Infof("Too many replicas for %v %s/%s, need %d, deleting %d", controllerKind, ms.Namespace, ms.Name, *(ms.Spec.Replicas), diff) @@ -241,6 +230,7 @@ func (c *MachineSetControllerImpl) syncReplicas(ms *v1alpha1.MachineSet, machine } default: } + return c.waitForMachineDeletion(machinesToDelete) } return nil @@ -342,3 +332,41 @@ func getMachinesToDelete(filteredMachines []*v1alpha1.Machine, diff int) []*v1al // see: https://github.com/kubernetes/kube-deploy/issues/625 return filteredMachines[:diff] } + +func (c *MachineSetControllerImpl) waitForMachineCreation(machineList []*v1alpha1.Machine) error { + for _, machine := range machineList { + pollErr := util.Poll(stateConfirmationInterval, stateConfirmationTimeout, func() (bool, error) { + _, err := c.machineLister.Machines(machine.Namespace).Get(machine.Name) + glog.Error(err) + if err == nil { + return true, nil + } + if errors.IsNotFound(err) { + return false, nil + } + return false, err + }) + if pollErr != nil { + glog.Error(pollErr) + return fmt.Errorf("failed waiting for machine object to be created. %v", pollErr) + } + } + return nil +} + +func (c *MachineSetControllerImpl) waitForMachineDeletion(machineList []*v1alpha1.Machine) error { + for _, machine := range machineList { + pollErr := util.Poll(stateConfirmationInterval, stateConfirmationTimeout, func() (bool, error) { + m, err := c.machineLister.Machines(machine.Namespace).Get(machine.Name) + if errors.IsNotFound(err) || !m.DeletionTimestamp.IsZero() { + return true, nil + } + return false, err + }) + if pollErr != nil { + glog.Error(pollErr) + return fmt.Errorf("failed waiting for machine object to be deleted. %v", pollErr) + } + } + return nil +} diff --git a/pkg/controller/machineset/reconcile_test.go b/pkg/controller/machineset/reconcile_test.go index af8ad76326b5..fb5e46fbf5bb 100644 --- a/pkg/controller/machineset/reconcile_test.go +++ b/pkg/controller/machineset/reconcile_test.go @@ -22,11 +22,15 @@ import ( "time" "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/diff" clienttesting "k8s.io/client-go/testing" + core "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" + "sigs.k8s.io/cluster-api/pkg/apis/cluster/v1alpha1" "sigs.k8s.io/cluster-api/pkg/client/clientset_generated/clientset/fake" v1alpha1listers "sigs.k8s.io/cluster-api/pkg/client/listers_generated/cluster/v1alpha1" @@ -36,15 +40,60 @@ const ( labelKey = "type" ) +type fakeMachineLister struct { + indexer cache.Indexer +} + +// List lists all Machines in the indexer. +func (s *fakeMachineLister) List(selector labels.Selector) (ret []*v1alpha1.Machine, err error) { + err = cache.ListAll(s.indexer, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.Machine)) + }) + return ret, err +} + +// Machines returns an object that can list and get Machines. +func (s *fakeMachineLister) Machines(namespace string) v1alpha1listers.MachineNamespaceLister { + return fakeMachineNamespaceLister{indexer: s.indexer, namespace: namespace} +} + +type fakeMachineNamespaceLister struct { + indexer cache.Indexer + namespace string +} + +func (s fakeMachineNamespaceLister) List(selector labels.Selector) (ret []*v1alpha1.Machine, err error) { + err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.Machine)) + }) + return ret, err +} + +func (s fakeMachineNamespaceLister) Get(name string) (*v1alpha1.Machine, error) { + obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(v1alpha1.Resource("machine"), name) + } + return obj.(*v1alpha1.Machine), nil +} + func TestMachineSetControllerReconcileHandler(t *testing.T) { + now := time.Now() + tests := []struct { name string startingMachineSets []*v1alpha1.MachineSet startingMachines []*v1alpha1.Machine machineSetToSync string namespaceToSync string + confirmationTimeout *time.Duration + deletionTimestamp *time.Time expectedMachine *v1alpha1.Machine expectedActions []string + expectedError bool }{ { name: "scenario 1: the current state of the cluster is empty, thus a machine is created.", @@ -95,6 +144,7 @@ func TestMachineSetControllerReconcileHandler(t *testing.T) { machineSetToSync: "foo", namespaceToSync: "acme", expectedActions: []string{"create"}, + expectedMachine: machineFromMachineSet(createMachineSet(1, "foo", "bar2", "acme"), "bar2"), }, { name: "scenario 7: the current machine is missing owner refs, machine should be adopted.", @@ -112,6 +162,7 @@ func TestMachineSetControllerReconcileHandler(t *testing.T) { machineSetToSync: "foo", namespaceToSync: "acme", expectedActions: []string{"create"}, + expectedMachine: machineFromMachineSet(createMachineSet(1, "foo", "bar2", "acme"), "bar2"), }, { name: "scenario 9: the current machine is being deleted, thus a machine is created.", @@ -120,6 +171,7 @@ func TestMachineSetControllerReconcileHandler(t *testing.T) { machineSetToSync: "foo", namespaceToSync: "acme", expectedActions: []string{"create"}, + expectedMachine: machineFromMachineSet(createMachineSet(1, "foo", "bar2", "acme"), "bar2"), }, { name: "scenario 10: the current machine has no controller refs, owner refs preserved, machine should be adopted.", @@ -130,11 +182,37 @@ func TestMachineSetControllerReconcileHandler(t *testing.T) { expectedActions: []string{"update"}, expectedMachine: machineWithMultipleOwnerRefs(createMachineSet(1, "foo", "bar2", "acme"), "bar1"), }, + { + name: "scenario 11: create confirmation timed out, err.", + startingMachineSets: []*v1alpha1.MachineSet{createMachineSet(1, "foo", "bar1", "acme")}, + startingMachines: nil, + machineSetToSync: "foo", + namespaceToSync: "acme", + expectedError: true, + }, + { + name: "scenario 12: delete confirmation timed out, err.", + startingMachineSets: []*v1alpha1.MachineSet{createMachineSet(0, "foo", "bar2", "acme")}, + startingMachines: []*v1alpha1.Machine{machineFromMachineSet(createMachineSet(1, "foo", "bar1", "acme"), "bar1")}, + machineSetToSync: "foo", + namespaceToSync: "acme", + expectedError: true, + }, + { + name: "scenario 13: delete confirmation accepts delete non-zero timestamp.", + startingMachineSets: []*v1alpha1.MachineSet{createMachineSet(0, "foo", "bar2", "acme")}, + startingMachines: []*v1alpha1.Machine{machineFromMachineSet(createMachineSet(1, "foo", "bar1", "acme"), "bar1")}, + machineSetToSync: "foo", + namespaceToSync: "acme", + deletionTimestamp: &now, + expectedActions: []string{"delete"}, + }, } - reconcileMutexSleepSec = 0 for _, test := range tests { t.Run(test.name, func(t *testing.T) { + stateConfirmationTimeout = 1 * time.Millisecond + // setup the test scenario rObjects := []runtime.Object{} machinesIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) @@ -154,12 +232,43 @@ func TestMachineSetControllerReconcileHandler(t *testing.T) { rObjects = append(rObjects, amachineset) } fakeClient := fake.NewSimpleClientset(rObjects...) - machineLister := v1alpha1listers.NewMachineLister(machinesIndexer) + + machineLister := fakeMachineLister{indexer: machinesIndexer} machineSetLister := v1alpha1listers.NewMachineSetLister(machineSetIndexer) target := &MachineSetControllerImpl{} target.clusterAPIClient = fakeClient target.machineSetLister = machineSetLister - target.machineLister = machineLister + target.machineLister = &machineLister + + fakeClient.PrependReactor("create", "machines", func(action core.Action) (bool, runtime.Object, error) { + if test.expectedError { + return true, nil, errors.NewNotFound(v1alpha1.Resource("machine"), "somemachine") + } + if test.expectedMachine != nil { + machineLister.indexer.Add(test.expectedMachine) + } + return true, test.expectedMachine, nil + }) + + fakeClient.PrependReactor("delete", "machines", func(action core.Action) (bool, runtime.Object, error) { + if test.deletionTimestamp != nil { + machineLister.indexer.Delete(test.startingMachines[0]) + m := test.startingMachines[0].DeepCopy() + timestamp := metav1.NewTime(*test.deletionTimestamp) + m.ObjectMeta.DeletionTimestamp = ×tamp + machineLister.indexer.Add(m) + return true, nil, nil + } + if test.expectedError { + return false, nil, nil + } + for i, action := range test.expectedActions { + if action == "delete" { + machineLister.indexer.Delete(test.startingMachines[i]) + } + } + return true, nil, nil + }) // act machineSetToTest, err := target.Get(test.namespaceToSync, test.machineSetToSync) @@ -167,8 +276,13 @@ func TestMachineSetControllerReconcileHandler(t *testing.T) { t.Fatal(err) } err = target.Reconcile(machineSetToTest) - if err != nil { - t.Fatal(err) + + if test.expectedError != (err != nil) { + t.Fatalf("Unexpected reconcile err: got %v, expected %v. %v", (err != nil), test.expectedError, err) + return + } + if test.expectedError { + return } // validate