Skip to content

Commit

Permalink
Fix the race condition by confirming creation/deletion of machine obj…
Browse files Browse the repository at this point in the history
…ects (#316)

By waiting for the machine objects to be either created or deleted
before leaving Reconcile loop will prevent the race condition of stale
cache not reporting correctly the change to machine objects.
  • Loading branch information
k4leung4 authored and k8s-ci-robot committed Jun 11, 2018
1 parent 7fdecc5 commit 367b383
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 30 deletions.
78 changes: 53 additions & 25 deletions pkg/controller/machineset/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/golang/glog"
"github.com/kubernetes-incubator/apiserver-builder/pkg/builders"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
Expand All @@ -34,14 +35,18 @@ import (
clusterapiclientset "sigs.k8s.io/cluster-api/pkg/client/clientset_generated/clientset"
listers "sigs.k8s.io/cluster-api/pkg/client/listers_generated/cluster/v1alpha1"
"sigs.k8s.io/cluster-api/pkg/controller/sharedinformers"
"sigs.k8s.io/cluster-api/util"
)

// controllerKind contains the schema.GroupVersionKind for this controller type.
var controllerKind = v1alpha1.SchemeGroupVersion.WithKind("MachineSet")

// reconcileMutexSleepSec is the duration to sleep before releasing the mutex lock that is held for reconcilation.
// See https://github.com/kubernetes-sigs/cluster-api/issues/245
var reconcileMutexSleepSec = time.Second
// stateConfirmationTimeout is the amount of time allowed to wait for desired state.
var stateConfirmationTimeout = 10 * time.Second

// stateConfirmationInterval is the amount of time between polling for the desired state.
// The polling is against a local memory cache.
var stateConfirmationInterval = 100 * time.Millisecond

// +controller:group=cluster,version=v1alpha1,kind=MachineSet,resource=machinesets
type MachineSetControllerImpl struct {
Expand All @@ -59,9 +64,6 @@ type MachineSetControllerImpl struct {
machineLister listers.MachineLister

informers *sharedinformers.SharedInformers

// msKeyMuxMap holds a mutex lock for reconcilation keyed on the machineset key
msKeyMuxMap map[string]sync.Mutex
}

// Init initializes the controller and is called by the generated code
Expand All @@ -85,8 +87,6 @@ func (c *MachineSetControllerImpl) Init(arguments sharedinformers.ControllerInit

c.informers = arguments.GetSharedInformers()

c.msKeyMuxMap = make(map[string]sync.Mutex)

c.waitForCacheSync()
}

Expand All @@ -110,20 +110,6 @@ func (c *MachineSetControllerImpl) waitForCacheSync() {
// note that the current state of the cluster is calculated based on the number of machines
// that are owned by the given machineSet (key).
func (c *MachineSetControllerImpl) Reconcile(machineSet *v1alpha1.MachineSet) error {
key, err := cache.MetaNamespaceKeyFunc(machineSet)
if err != nil {
glog.Errorf("Couldn't get key for object %+v.", machineSet)
return err
}

// Lock on Reconcile, this is to avoid the change of a machine object to cause the same machineset to Reconcile
// during the creation/deletion of machines, causing the incorrect number of machines to created/deleted
// TODO: Find a less heavy handed approach to avoid concurrent machineset reconcilation.
mux := c.msKeyMuxMap[key]
mux.Lock()
defer mux.Unlock()
defer time.Sleep(reconcileMutexSleepSec)

glog.V(4).Infof("Reconcile machineset %v", machineSet.Name)
allMachines, err := c.machineLister.Machines(machineSet.Namespace).List(labels.Everything())
if err != nil {
Expand Down Expand Up @@ -191,26 +177,29 @@ func (c *MachineSetControllerImpl) syncReplicas(ms *v1alpha1.MachineSet, machine
return fmt.Errorf("the Replicas field in Spec for machineset %v is nil, this should not be allowed.", ms.Name)
}
diff := len(machines) - int(*(ms.Spec.Replicas))

if diff < 0 {
diff *= -1
glog.Infof("Too few replicas for %v %s/%s, need %d, creating %d", controllerKind, ms.Namespace, ms.Name, *(ms.Spec.Replicas), diff)

var machineList []*v1alpha1.Machine
var errstrings []string
for i := 0; i < diff; i++ {
glog.Infof("creating machine %d of %d, ( spec.replicas(%d) > currentMachineCount(%d) )", i+1, diff, *(ms.Spec.Replicas), len(machines))
machine := c.createMachine(ms)
_, err := c.clusterAPIClient.ClusterV1alpha1().Machines(ms.Namespace).Create(machine)
newMachine, err := c.clusterAPIClient.ClusterV1alpha1().Machines(ms.Namespace).Create(machine)
if err != nil {
glog.Errorf("unable to create a machine = %s, due to %v", machine.Name, err)
errstrings = append(errstrings, err.Error())
continue
}
machineList = append(machineList, newMachine)
}

if len(errstrings) > 0 {
return fmt.Errorf(strings.Join(errstrings, "; "))
}

return nil
return c.waitForMachineCreation(machineList)
} else if diff > 0 {
glog.Infof("Too many replicas for %v %s/%s, need %d, deleting %d", controllerKind, ms.Namespace, ms.Name, *(ms.Spec.Replicas), diff)

Expand Down Expand Up @@ -241,6 +230,7 @@ func (c *MachineSetControllerImpl) syncReplicas(ms *v1alpha1.MachineSet, machine
}
default:
}
return c.waitForMachineDeletion(machinesToDelete)
}

return nil
Expand Down Expand Up @@ -342,3 +332,41 @@ func getMachinesToDelete(filteredMachines []*v1alpha1.Machine, diff int) []*v1al
// see: https://github.com/kubernetes/kube-deploy/issues/625
return filteredMachines[:diff]
}

func (c *MachineSetControllerImpl) waitForMachineCreation(machineList []*v1alpha1.Machine) error {
for _, machine := range machineList {
pollErr := util.Poll(stateConfirmationInterval, stateConfirmationTimeout, func() (bool, error) {
_, err := c.machineLister.Machines(machine.Namespace).Get(machine.Name)
glog.Error(err)
if err == nil {
return true, nil
}
if errors.IsNotFound(err) {
return false, nil
}
return false, err
})
if pollErr != nil {
glog.Error(pollErr)
return fmt.Errorf("failed waiting for machine object to be created. %v", pollErr)
}
}
return nil
}

func (c *MachineSetControllerImpl) waitForMachineDeletion(machineList []*v1alpha1.Machine) error {
for _, machine := range machineList {
pollErr := util.Poll(stateConfirmationInterval, stateConfirmationTimeout, func() (bool, error) {
m, err := c.machineLister.Machines(machine.Namespace).Get(machine.Name)
if errors.IsNotFound(err) || !m.DeletionTimestamp.IsZero() {
return true, nil
}
return false, err
})
if pollErr != nil {
glog.Error(pollErr)
return fmt.Errorf("failed waiting for machine object to be deleted. %v", pollErr)
}
}
return nil
}
124 changes: 119 additions & 5 deletions pkg/controller/machineset/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@ import (
"time"

"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/diff"
clienttesting "k8s.io/client-go/testing"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"

"sigs.k8s.io/cluster-api/pkg/apis/cluster/v1alpha1"
"sigs.k8s.io/cluster-api/pkg/client/clientset_generated/clientset/fake"
v1alpha1listers "sigs.k8s.io/cluster-api/pkg/client/listers_generated/cluster/v1alpha1"
Expand All @@ -36,15 +40,60 @@ const (
labelKey = "type"
)

type fakeMachineLister struct {
indexer cache.Indexer
}

// List lists all Machines in the indexer.
func (s *fakeMachineLister) List(selector labels.Selector) (ret []*v1alpha1.Machine, err error) {
err = cache.ListAll(s.indexer, selector, func(m interface{}) {
ret = append(ret, m.(*v1alpha1.Machine))
})
return ret, err
}

// Machines returns an object that can list and get Machines.
func (s *fakeMachineLister) Machines(namespace string) v1alpha1listers.MachineNamespaceLister {
return fakeMachineNamespaceLister{indexer: s.indexer, namespace: namespace}
}

type fakeMachineNamespaceLister struct {
indexer cache.Indexer
namespace string
}

func (s fakeMachineNamespaceLister) List(selector labels.Selector) (ret []*v1alpha1.Machine, err error) {
err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) {
ret = append(ret, m.(*v1alpha1.Machine))
})
return ret, err
}

func (s fakeMachineNamespaceLister) Get(name string) (*v1alpha1.Machine, error) {
obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name)
if err != nil {
return nil, err
}
if !exists {
return nil, errors.NewNotFound(v1alpha1.Resource("machine"), name)
}
return obj.(*v1alpha1.Machine), nil
}

func TestMachineSetControllerReconcileHandler(t *testing.T) {
now := time.Now()

tests := []struct {
name string
startingMachineSets []*v1alpha1.MachineSet
startingMachines []*v1alpha1.Machine
machineSetToSync string
namespaceToSync string
confirmationTimeout *time.Duration
deletionTimestamp *time.Time
expectedMachine *v1alpha1.Machine
expectedActions []string
expectedError bool
}{
{
name: "scenario 1: the current state of the cluster is empty, thus a machine is created.",
Expand Down Expand Up @@ -95,6 +144,7 @@ func TestMachineSetControllerReconcileHandler(t *testing.T) {
machineSetToSync: "foo",
namespaceToSync: "acme",
expectedActions: []string{"create"},
expectedMachine: machineFromMachineSet(createMachineSet(1, "foo", "bar2", "acme"), "bar2"),
},
{
name: "scenario 7: the current machine is missing owner refs, machine should be adopted.",
Expand All @@ -112,6 +162,7 @@ func TestMachineSetControllerReconcileHandler(t *testing.T) {
machineSetToSync: "foo",
namespaceToSync: "acme",
expectedActions: []string{"create"},
expectedMachine: machineFromMachineSet(createMachineSet(1, "foo", "bar2", "acme"), "bar2"),
},
{
name: "scenario 9: the current machine is being deleted, thus a machine is created.",
Expand All @@ -120,6 +171,7 @@ func TestMachineSetControllerReconcileHandler(t *testing.T) {
machineSetToSync: "foo",
namespaceToSync: "acme",
expectedActions: []string{"create"},
expectedMachine: machineFromMachineSet(createMachineSet(1, "foo", "bar2", "acme"), "bar2"),
},
{
name: "scenario 10: the current machine has no controller refs, owner refs preserved, machine should be adopted.",
Expand All @@ -130,11 +182,37 @@ func TestMachineSetControllerReconcileHandler(t *testing.T) {
expectedActions: []string{"update"},
expectedMachine: machineWithMultipleOwnerRefs(createMachineSet(1, "foo", "bar2", "acme"), "bar1"),
},
{
name: "scenario 11: create confirmation timed out, err.",
startingMachineSets: []*v1alpha1.MachineSet{createMachineSet(1, "foo", "bar1", "acme")},
startingMachines: nil,
machineSetToSync: "foo",
namespaceToSync: "acme",
expectedError: true,
},
{
name: "scenario 12: delete confirmation timed out, err.",
startingMachineSets: []*v1alpha1.MachineSet{createMachineSet(0, "foo", "bar2", "acme")},
startingMachines: []*v1alpha1.Machine{machineFromMachineSet(createMachineSet(1, "foo", "bar1", "acme"), "bar1")},
machineSetToSync: "foo",
namespaceToSync: "acme",
expectedError: true,
},
{
name: "scenario 13: delete confirmation accepts delete non-zero timestamp.",
startingMachineSets: []*v1alpha1.MachineSet{createMachineSet(0, "foo", "bar2", "acme")},
startingMachines: []*v1alpha1.Machine{machineFromMachineSet(createMachineSet(1, "foo", "bar1", "acme"), "bar1")},
machineSetToSync: "foo",
namespaceToSync: "acme",
deletionTimestamp: &now,
expectedActions: []string{"delete"},
},
}

reconcileMutexSleepSec = 0
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
stateConfirmationTimeout = 1 * time.Millisecond

// setup the test scenario
rObjects := []runtime.Object{}
machinesIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
Expand All @@ -154,21 +232,57 @@ func TestMachineSetControllerReconcileHandler(t *testing.T) {
rObjects = append(rObjects, amachineset)
}
fakeClient := fake.NewSimpleClientset(rObjects...)
machineLister := v1alpha1listers.NewMachineLister(machinesIndexer)

machineLister := fakeMachineLister{indexer: machinesIndexer}
machineSetLister := v1alpha1listers.NewMachineSetLister(machineSetIndexer)
target := &MachineSetControllerImpl{}
target.clusterAPIClient = fakeClient
target.machineSetLister = machineSetLister
target.machineLister = machineLister
target.machineLister = &machineLister

fakeClient.PrependReactor("create", "machines", func(action core.Action) (bool, runtime.Object, error) {
if test.expectedError {
return true, nil, errors.NewNotFound(v1alpha1.Resource("machine"), "somemachine")
}
if test.expectedMachine != nil {
machineLister.indexer.Add(test.expectedMachine)
}
return true, test.expectedMachine, nil
})

fakeClient.PrependReactor("delete", "machines", func(action core.Action) (bool, runtime.Object, error) {
if test.deletionTimestamp != nil {
machineLister.indexer.Delete(test.startingMachines[0])
m := test.startingMachines[0].DeepCopy()
timestamp := metav1.NewTime(*test.deletionTimestamp)
m.ObjectMeta.DeletionTimestamp = &timestamp
machineLister.indexer.Add(m)
return true, nil, nil
}
if test.expectedError {
return false, nil, nil
}
for i, action := range test.expectedActions {
if action == "delete" {
machineLister.indexer.Delete(test.startingMachines[i])
}
}
return true, nil, nil
})

// act
machineSetToTest, err := target.Get(test.namespaceToSync, test.machineSetToSync)
if err != nil {
t.Fatal(err)
}
err = target.Reconcile(machineSetToTest)
if err != nil {
t.Fatal(err)

if test.expectedError != (err != nil) {
t.Fatalf("Unexpected reconcile err: got %v, expected %v. %v", (err != nil), test.expectedError, err)
return
}
if test.expectedError {
return
}

// validate
Expand Down

0 comments on commit 367b383

Please sign in to comment.