From e424295993627f4f2b6123e7a7ccb6ea3b575105 Mon Sep 17 00:00:00 2001 From: Mark Mandel Date: Sun, 23 Sep 2018 21:52:45 -0700 Subject: [PATCH] Fix for race condition: Allocation of Deleting GameServers Possible If an allocation occurred during a Fleet scale down, or during a update of a Fleet, it was entirely possible for those parallel delete operations to be applied to a GameServer that was being allocated at the same time. This is mainly because the client-go informer cache is lazily consistent, but also because there was nothing preventing a `Delete()` of a `GameServer` from happening while allocating a specific `GameServer`. To solve this, there are two strategies implemented here: 1. Share the `allocationLock` `sync.Mutex` between controllers such that allocations cannot occur while `GameServer` Deletes for Fleet resizing/update are happening and vice versa. 2. use `cache.WaitForCacheSync` to bring the cluster informer up to date, to remove the chance for non-updated information about `GameServers` to be acted upon. The shared lock is a quite broad approach - down the line, we could refine this to being per `Fleet`, or per `GameServerSet`, if we find this to be a bottleneck, but the priority here was to get something working that resolves the issue, and we can optimise as needed from here. There are also e2e tests specifically designed for catching these race conditions as well. --- cmd/controller/main.go | 11 +- pkg/fleetallocation/controller.go | 5 +- pkg/fleetallocation/controller_test.go | 2 +- pkg/gameservers/controller.go | 6 ++ pkg/gameservers/controller_test.go | 4 +- pkg/gameserversets/controller.go | 29 +++++- pkg/gameserversets/controller_test.go | 5 +- test/e2e/fleet_test.go | 139 ++++++++++++++++++++++++- test/e2e/gameserver_test.go | 2 +- 9 files changed, 185 insertions(+), 18 deletions(-) diff --git a/cmd/controller/main.go b/cmd/controller/main.go index 11b16c55a0..c0291160ca 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -21,6 +21,7 @@ import ( "path/filepath" "reflect" "strings" + "sync" "time" "agones.dev/agones/pkg" @@ -90,14 +91,16 @@ func main() { agonesInformerFactory := externalversions.NewSharedInformerFactory(agonesClient, defaultResync) kubeInformationFactory := informers.NewSharedInformerFactory(kubeClient, defaultResync) - gsController := gameservers.NewController(wh, health, + allocationMutex := &sync.Mutex{} + + gsController := gameservers.NewController(wh, health, allocationMutex, ctlConf.minPort, ctlConf.maxPort, ctlConf.sidecarImage, ctlConf.alwaysPullSidecar, kubeClient, kubeInformationFactory, extClient, agonesClient, agonesInformerFactory) - gsSetController := gameserversets.NewController(wh, health, + gsSetController := gameserversets.NewController(wh, health, allocationMutex, kubeClient, extClient, agonesClient, agonesInformerFactory) - fleetController := fleets.NewController(wh, health, + fleetController := fleets.NewController(wh, health, kubeClient, extClient, agonesClient, agonesInformerFactory) + faController := fleetallocation.NewController(wh, allocationMutex, kubeClient, extClient, agonesClient, agonesInformerFactory) - faController := fleetallocation.NewController(wh, kubeClient, extClient, agonesClient, agonesInformerFactory) stop := signals.NewStopChannel() diff --git a/pkg/fleetallocation/controller.go b/pkg/fleetallocation/controller.go index 87648edbd4..2666011b4f 100644 --- a/pkg/fleetallocation/controller.go +++ b/pkg/fleetallocation/controller.go @@ -63,13 +63,14 @@ type Controller struct { fleetAllocationGetter getterv1alpha1.FleetAllocationsGetter fleetAllocationLister listerv1alpha1.FleetAllocationLister stop <-chan struct{} - allocationMutex sync.Mutex + allocationMutex *sync.Mutex recorder record.EventRecorder } // NewController returns a controller for a FleetAllocation func NewController( wh *webhooks.WebHook, + allocationMutex *sync.Mutex, kubeClient kubernetes.Interface, extClient extclientset.Interface, agonesClient versioned.Interface, @@ -85,7 +86,7 @@ func NewController( fleetLister: agonesInformer.Fleets().Lister(), fleetAllocationGetter: agonesClient.StableV1alpha1(), fleetAllocationLister: agonesInformer.FleetAllocations().Lister(), - allocationMutex: sync.Mutex{}, + allocationMutex: allocationMutex, } c.logger = runtime.NewLoggerWithType(c) diff --git a/pkg/fleetallocation/controller_test.go b/pkg/fleetallocation/controller_test.go index ebcb1cae13..51e2a37e37 100644 --- a/pkg/fleetallocation/controller_test.go +++ b/pkg/fleetallocation/controller_test.go @@ -286,7 +286,7 @@ func defaultFixtures(gsLen int) (*v1alpha1.Fleet, *v1alpha1.GameServerSet, []v1a func newFakeController() (*Controller, agtesting.Mocks) { m := agtesting.NewMocks() wh := webhooks.NewWebHook("", "") - c := NewController(wh, m.KubeClient, m.ExtClient, m.AgonesClient, m.AgonesInformerFactory) + c := NewController(wh, &sync.Mutex{}, m.KubeClient, m.ExtClient, m.AgonesClient, m.AgonesInformerFactory) c.recorder = m.FakeRecorder return c, m } diff --git a/pkg/gameservers/controller.go b/pkg/gameservers/controller.go index 23812855d4..c634378c13 100644 --- a/pkg/gameservers/controller.go +++ b/pkg/gameservers/controller.go @@ -17,6 +17,7 @@ package gameservers import ( "encoding/json" "fmt" + "sync" "agones.dev/agones/pkg/apis/stable" "agones.dev/agones/pkg/apis/stable/v1alpha1" @@ -69,6 +70,7 @@ type Controller struct { portAllocator *PortAllocator healthController *HealthController workerqueue *workerqueue.WorkerQueue + allocationMutex *sync.Mutex stop <-chan struct{} recorder record.EventRecorder } @@ -77,6 +79,7 @@ type Controller struct { func NewController( wh *webhooks.WebHook, health healthcheck.Handler, + allocationMutex *sync.Mutex, minPort, maxPort int32, sidecarImage string, alwaysPullSidecarImage bool, @@ -93,6 +96,7 @@ func NewController( c := &Controller{ sidecarImage: sidecarImage, alwaysPullSidecarImage: alwaysPullSidecarImage, + allocationMutex: allocationMutex, crdGetter: extClient.ApiextensionsV1beta1().CustomResourceDefinitions(), podGetter: kubeClient.CoreV1(), podLister: pods.Lister(), @@ -608,7 +612,9 @@ func (c *Controller) syncGameServerShutdownState(gs *v1alpha1.GameServer) error // be explicit about where to delete. We only need to wait for the Pod to be removed, which we handle with our // own finalizer. p := metav1.DeletePropagationBackground + c.allocationMutex.Lock() err := c.gameServerGetter.GameServers(gs.ObjectMeta.Namespace).Delete(gs.ObjectMeta.Name, &metav1.DeleteOptions{PropagationPolicy: &p}) + c.allocationMutex.Unlock() if err != nil { return errors.Wrapf(err, "error deleting Game Server %s", gs.ObjectMeta.Name) } diff --git a/pkg/gameservers/controller_test.go b/pkg/gameservers/controller_test.go index bd0d21e182..09cd98a52b 100644 --- a/pkg/gameservers/controller_test.go +++ b/pkg/gameservers/controller_test.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "fmt" + "sync" "testing" "agones.dev/agones/pkg/apis/stable" @@ -1093,7 +1094,8 @@ func testWithNonZeroDeletionTimestamp(t *testing.T, f func(*Controller, *v1alpha func newFakeController() (*Controller, agtesting.Mocks) { m := agtesting.NewMocks() wh := webhooks.NewWebHook("", "") - c := NewController(wh, healthcheck.NewHandler(), 10, 20, "sidecar:dev", false, + c := NewController(wh, healthcheck.NewHandler(), &sync.Mutex{}, + 10, 20, "sidecar:dev", false, m.KubeClient, m.KubeInformationFactory, m.ExtClient, m.AgonesClient, m.AgonesInformerFactory) c.recorder = m.FakeRecorder return c, m diff --git a/pkg/gameserversets/controller.go b/pkg/gameserversets/controller.go index 574d42f463..1c919c98f5 100644 --- a/pkg/gameserversets/controller.go +++ b/pkg/gameserversets/controller.go @@ -16,6 +16,7 @@ package gameserversets import ( "encoding/json" + "sync" "agones.dev/agones/pkg/apis/stable" stablev1alpha1 "agones.dev/agones/pkg/apis/stable/v1alpha1" @@ -60,6 +61,8 @@ type Controller struct { gameServerSetLister listerv1alpha1.GameServerSetLister gameServerSetSynced cache.InformerSynced workerqueue *workerqueue.WorkerQueue + allocationMutex *sync.Mutex + stop <-chan struct{} recorder record.EventRecorder } @@ -67,6 +70,7 @@ type Controller struct { func NewController( wh *webhooks.WebHook, health healthcheck.Handler, + allocationMutex *sync.Mutex, kubeClient kubernetes.Interface, extClient extclientset.Interface, agonesClient versioned.Interface, @@ -85,6 +89,7 @@ func NewController( gameServerSetGetter: agonesClient.StableV1alpha1(), gameServerSetLister: gameServerSets.Lister(), gameServerSetSynced: gsSetInformer.HasSynced, + allocationMutex: allocationMutex, } c.logger = runtime.NewLoggerWithType(c) @@ -127,6 +132,8 @@ func NewController( // Run the GameServerSet controller. Will block until stop is closed. // Runs threadiness number workers to process the rate limited queue func (c *Controller) Run(workers int, stop <-chan struct{}) error { + c.stop = stop + err := crd.WaitForEstablishedCRD(c.crdGetter, "gameserversets."+stable.GroupName, c.logger) if err != nil { return err @@ -236,7 +243,7 @@ func (c *Controller) syncGameServerSet(key string) error { if err := c.syncMoreGameServers(gsSet, diff); err != nil { return err } - if err := c.syncLessGameSevers(gsSet, list, diff); err != nil { + if err := c.syncLessGameSevers(gsSet, diff); err != nil { return err } if err := c.syncGameServerSetState(gsSet, list); err != nil { @@ -250,7 +257,9 @@ func (c *Controller) syncGameServerSet(key string) error { func (c *Controller) syncUnhealthyGameServers(gsSet *stablev1alpha1.GameServerSet, list []*stablev1alpha1.GameServer) error { for _, gs := range list { if gs.Status.State == stablev1alpha1.Unhealthy && gs.ObjectMeta.DeletionTimestamp.IsZero() { + c.allocationMutex.Lock() err := c.gameServerGetter.GameServers(gs.ObjectMeta.Namespace).Delete(gs.ObjectMeta.Name, nil) + c.allocationMutex.Unlock() if err != nil { return errors.Wrapf(err, "error deleting gameserver %s", gs.ObjectMeta.Name) } @@ -280,7 +289,7 @@ func (c *Controller) syncMoreGameServers(gsSet *stablev1alpha1.GameServerSet, di } // syncLessGameSevers removes Ready GameServers from the set of GameServers -func (c *Controller) syncLessGameSevers(gsSet *stablev1alpha1.GameServerSet, list []*stablev1alpha1.GameServer, diff int32) error { +func (c *Controller) syncLessGameSevers(gsSet *stablev1alpha1.GameServerSet, diff int32) error { if diff >= 0 { return nil } @@ -289,6 +298,22 @@ func (c *Controller) syncLessGameSevers(gsSet *stablev1alpha1.GameServerSet, lis c.logger.WithField("diff", diff).WithField("gameserverset", gsSet.ObjectMeta.Name).Info("Deleting gameservers") count := int32(0) + // don't allow allocation state for GameServers to change + c.allocationMutex.Lock() + defer c.allocationMutex.Unlock() + + // make sure we are up to date with GameServer state + if !cache.WaitForCacheSync(c.stop, c.gameServerSynced) { + // if we can't sync the cache, then exit, and try and scale down + // again, and then we aren't blocking allocation at this time. + return errors.New("could not sync gameservers cache") + } + + list, err := ListGameServersByGameServerSetOwner(c.gameServerLister, gsSet) + if err != nil { + return err + } + // count anything that is already being deleted for _, gs := range list { if !gs.ObjectMeta.DeletionTimestamp.IsZero() { diff --git a/pkg/gameserversets/controller_test.go b/pkg/gameserversets/controller_test.go index 82ca461591..d7e8852ecb 100644 --- a/pkg/gameserversets/controller_test.go +++ b/pkg/gameserversets/controller_test.go @@ -17,6 +17,7 @@ package gameserversets import ( "encoding/json" "strconv" + "sync" "testing" "time" @@ -300,7 +301,7 @@ func TestSyncLessGameServers(t *testing.T) { assert.Nil(t, err) assert.Len(t, list2, 11) - err = c.syncLessGameSevers(gsSet, list2, int32(-expected)) + err = c.syncLessGameSevers(gsSet, int32(-expected)) assert.Nil(t, err) // subtract one, because one is already deleted @@ -481,7 +482,7 @@ func createGameServers(gsSet *v1alpha1.GameServerSet, size int) []v1alpha1.GameS func newFakeController() (*Controller, agtesting.Mocks) { m := agtesting.NewMocks() wh := webhooks.NewWebHook("", "") - c := NewController(wh, healthcheck.NewHandler(), m.KubeClient, m.ExtClient, m.AgonesClient, m.AgonesInformerFactory) + c := NewController(wh, healthcheck.NewHandler(), &sync.Mutex{}, m.KubeClient, m.ExtClient, m.AgonesClient, m.AgonesInformerFactory) c.recorder = m.FakeRecorder return c, m } diff --git a/test/e2e/fleet_test.go b/test/e2e/fleet_test.go index 7ca240233a..f655756108 100644 --- a/test/e2e/fleet_test.go +++ b/test/e2e/fleet_test.go @@ -16,6 +16,7 @@ package e2e import ( "fmt" + "sync" "testing" "time" @@ -29,6 +30,12 @@ import ( "k8s.io/apimachinery/pkg/util/wait" ) +const ( + key = "test-state" + red = "red" + green = "green" +) + func TestCreateFleetAndAllocate(t *testing.T) { t.Parallel() @@ -115,7 +122,6 @@ func TestScaleFleetUpAndDownWithAllocation(t *testing.T) { func TestFleetUpdates(t *testing.T) { t.Parallel() - key := "test-state" fixtures := map[string]func() *v1alpha1.Fleet{ "recreate": func() *v1alpha1.Fleet { flt := defaultFleet() @@ -134,14 +140,14 @@ func TestFleetUpdates(t *testing.T) { alpha1 := framework.AgonesClient.StableV1alpha1() flt := v() - flt.Spec.Template.ObjectMeta.Annotations = map[string]string{key: "red"} + flt.Spec.Template.ObjectMeta.Annotations = map[string]string{key: red} flt, err := alpha1.Fleets(defaultNs).Create(flt) if assert.Nil(t, err) { defer alpha1.Fleets(defaultNs).Delete(flt.ObjectMeta.Name, nil) // nolint:errcheck } err = framework.WaitForFleetGameServersCondition(flt, func(gs v1alpha1.GameServer) bool { - return gs.ObjectMeta.Annotations[key] == "red" + return gs.ObjectMeta.Annotations[key] == red }) assert.Nil(t, err) @@ -152,7 +158,7 @@ func TestFleetUpdates(t *testing.T) { return false, err } fltCopy := flt.DeepCopy() - fltCopy.Spec.Template.ObjectMeta.Annotations[key] = "green" + fltCopy.Spec.Template.ObjectMeta.Annotations[key] = green _, err = framework.AgonesClient.StableV1alpha1().Fleets(defaultNs).Update(fltCopy) if err != nil { logrus.WithError(err).Warn("Could not update fleet, trying again") @@ -164,13 +170,136 @@ func TestFleetUpdates(t *testing.T) { assert.Nil(t, err) err = framework.WaitForFleetGameServersCondition(flt, func(gs v1alpha1.GameServer) bool { - return gs.ObjectMeta.Annotations[key] == "green" + return gs.ObjectMeta.Annotations[key] == green }) assert.Nil(t, err) }) } } +// TestFleetAllocationDuringGameServerDeletion is built to specifically +// test for race conditions of allocations when doing scale up/down, +// rolling updates, etc. Failures my not happen ALL the time -- as that is the +// nature of race conditions. +func TestFleetAllocationDuringGameServerDeletion(t *testing.T) { + t.Parallel() + + testAllocationRaceCondition := func(t *testing.T, fleet func() *v1alpha1.Fleet, deltaSleep time.Duration, delta func(t *testing.T, flt *v1alpha1.Fleet)) { + alpha1 := framework.AgonesClient.StableV1alpha1() + + flt := fleet() + flt.ApplyDefaults() + size := int32(10) + flt.Spec.Replicas = size + flt, err := alpha1.Fleets(defaultNs).Create(flt) + if assert.Nil(t, err) { + defer alpha1.Fleets(defaultNs).Delete(flt.ObjectMeta.Name, nil) // nolint:errcheck + } + + assert.Equal(t, size, flt.Spec.Replicas) + + err = framework.WaitForFleetCondition(flt, e2e.FleetReadyCount(flt.Spec.Replicas)) + assert.Nil(t, err, "fleet not ready") + + var allocs []*v1alpha1.GameServer + + wg := sync.WaitGroup{} + wg.Add(2) + go func() { + for { + // this gives room for fleet scaling to go down - makes it more likely for the race condition to fire + time.Sleep(100 * time.Millisecond) + fa := &v1alpha1.FleetAllocation{ + ObjectMeta: metav1.ObjectMeta{GenerateName: "allocation-", Namespace: defaultNs}, + Spec: v1alpha1.FleetAllocationSpec{ + FleetName: flt.ObjectMeta.Name, + }, + } + fa, err = framework.AgonesClient.StableV1alpha1().FleetAllocations(defaultNs).Create(fa) + if err != nil { + logrus.WithError(err).Info("Allocation ended") + break + } + logrus.WithField("gs", fa.Status.GameServer.ObjectMeta.Name).Info("Allocated") + allocs = append(allocs, fa.Status.GameServer) + } + wg.Done() + }() + go func() { + // this tends to force the scaling to happen as we are fleet allocating + time.Sleep(deltaSleep) + // call the function that makes the change to the fleet + logrus.Info("Applying delta function") + delta(t, flt) + wg.Done() + }() + + wg.Wait() + assert.NotEmpty(t, allocs) + + for _, gs := range allocs { + gsCheck, err := alpha1.GameServers(defaultNs).Get(gs.ObjectMeta.Name, metav1.GetOptions{}) + assert.Nil(t, err) + assert.True(t, gsCheck.ObjectMeta.DeletionTimestamp.IsZero()) + } + } + + t.Run("scale down", func(t *testing.T) { + t.Parallel() + + testAllocationRaceCondition(t, defaultFleet, time.Second, + func(t *testing.T, flt *v1alpha1.Fleet) { + fltResult, err := scaleFleet(flt, 0) + assert.Nil(t, err) + assert.Equal(t, int32(0), fltResult.Spec.Replicas) + }) + }) + + t.Run("recreate update", func(t *testing.T) { + t.Parallel() + + fleet := func() *v1alpha1.Fleet { + flt := defaultFleet() + flt.Spec.Strategy.Type = v1.RecreateDeploymentStrategyType + flt.Spec.Template.ObjectMeta.Annotations = map[string]string{key: red} + + return flt + } + + testAllocationRaceCondition(t, fleet, time.Second, + func(t *testing.T, flt *v1alpha1.Fleet) { + flt, err := framework.AgonesClient.StableV1alpha1().Fleets(defaultNs).Get(flt.ObjectMeta.Name, metav1.GetOptions{}) + assert.Nil(t, err) + fltCopy := flt.DeepCopy() + fltCopy.Spec.Template.ObjectMeta.Annotations[key] = green + _, err = framework.AgonesClient.StableV1alpha1().Fleets(defaultNs).Update(fltCopy) + assert.Nil(t, err) + }) + }) + + t.Run("rolling update", func(t *testing.T) { + t.Parallel() + + fleet := func() *v1alpha1.Fleet { + flt := defaultFleet() + flt.Spec.Strategy.Type = v1.RollingUpdateDeploymentStrategyType + flt.Spec.Template.ObjectMeta.Annotations = map[string]string{key: red} + + return flt + } + + testAllocationRaceCondition(t, fleet, time.Duration(0), + func(t *testing.T, flt *v1alpha1.Fleet) { + flt, err := framework.AgonesClient.StableV1alpha1().Fleets(defaultNs).Get(flt.ObjectMeta.Name, metav1.GetOptions{}) + assert.Nil(t, err) + fltCopy := flt.DeepCopy() + fltCopy.Spec.Template.ObjectMeta.Annotations[key] = green + _, err = framework.AgonesClient.StableV1alpha1().Fleets(defaultNs).Update(fltCopy) + assert.Nil(t, err) + }) + }) +} + // scaleFleet creates a patch to apply to a Fleet. // easier for testing, as it removes object generational issues. func scaleFleet(f *v1alpha1.Fleet, scale int32) (*v1alpha1.Fleet, error) { diff --git a/test/e2e/gameserver_test.go b/test/e2e/gameserver_test.go index 84a3b10efe..38dcd1ea5a 100644 --- a/test/e2e/gameserver_test.go +++ b/test/e2e/gameserver_test.go @@ -130,7 +130,7 @@ func defaultGameServer() *v1alpha1.GameServer { Containers: []corev1.Container{{ Name: "udp-server", Image: framework.GameServerImage, - ImagePullPolicy: corev1.PullAlways}}, + ImagePullPolicy: corev1.PullIfNotPresent}}, }, }, },