Skip to content

Commit

Permalink
Fix for race condition: Allocation of Deleting GameServers Possible
Browse files Browse the repository at this point in the history
If an allocation occurred during a Fleet scale down, or during a update of a
Fleet, it was entirely possible for those parallel delete operations to
be applied to a GameServer that was being allocated at the same time.

This is mainly because the client-go informer cache is lazily consistent, but
also because there was nothing preventing a `Delete()` of a `GameServer` from
happening while allocating a specific `GameServer`.

To solve this, there are two strategies implemented here:

1. Share the `allocationLock` `sync.Mutex` between controllers such that
allocations cannot occur while `GameServer` Deletes for Fleet resizing/update
are happening and vice versa.
2. use `cache.WaitForCacheSync` to bring the cluster informer up to date,
to remove the chance for non-updated information about `GameServers` to be
acted upon.

The shared lock is a quite broad approach - down the line, we could refine this
to being per `Fleet`, or per `GameServerSet`, if we find this to be a
bottleneck, but the priority here was to get something working that resolves
the issue, and we can optimise as needed from here.

There are also e2e tests specifically designed for catching these race
conditions as well.
  • Loading branch information
markmandel authored and cyriltovena committed Sep 28, 2018
1 parent c1c5f41 commit e6818e3
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 18 deletions.
11 changes: 7 additions & 4 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"path/filepath"
"reflect"
"strings"
"sync"
"time"

"agones.dev/agones/pkg"
Expand Down Expand Up @@ -90,14 +91,16 @@ func main() {
agonesInformerFactory := externalversions.NewSharedInformerFactory(agonesClient, defaultResync)
kubeInformationFactory := informers.NewSharedInformerFactory(kubeClient, defaultResync)

gsController := gameservers.NewController(wh, health,
allocationMutex := &sync.Mutex{}

gsController := gameservers.NewController(wh, health, allocationMutex,
ctlConf.minPort, ctlConf.maxPort, ctlConf.sidecarImage, ctlConf.alwaysPullSidecar,
kubeClient, kubeInformationFactory, extClient, agonesClient, agonesInformerFactory)
gsSetController := gameserversets.NewController(wh, health,
gsSetController := gameserversets.NewController(wh, health, allocationMutex,
kubeClient, extClient, agonesClient, agonesInformerFactory)
fleetController := fleets.NewController(wh, health,
fleetController := fleets.NewController(wh, health, kubeClient, extClient, agonesClient, agonesInformerFactory)
faController := fleetallocation.NewController(wh, allocationMutex,
kubeClient, extClient, agonesClient, agonesInformerFactory)
faController := fleetallocation.NewController(wh, kubeClient, extClient, agonesClient, agonesInformerFactory)

stop := signals.NewStopChannel()

Expand Down
5 changes: 3 additions & 2 deletions pkg/fleetallocation/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,14 @@ type Controller struct {
fleetAllocationGetter getterv1alpha1.FleetAllocationsGetter
fleetAllocationLister listerv1alpha1.FleetAllocationLister
stop <-chan struct{}
allocationMutex sync.Mutex
allocationMutex *sync.Mutex
recorder record.EventRecorder
}

// NewController returns a controller for a FleetAllocation
func NewController(
wh *webhooks.WebHook,
allocationMutex *sync.Mutex,
kubeClient kubernetes.Interface,
extClient extclientset.Interface,
agonesClient versioned.Interface,
Expand All @@ -85,7 +86,7 @@ func NewController(
fleetLister: agonesInformer.Fleets().Lister(),
fleetAllocationGetter: agonesClient.StableV1alpha1(),
fleetAllocationLister: agonesInformer.FleetAllocations().Lister(),
allocationMutex: sync.Mutex{},
allocationMutex: allocationMutex,
}
c.logger = runtime.NewLoggerWithType(c)

Expand Down
2 changes: 1 addition & 1 deletion pkg/fleetallocation/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func defaultFixtures(gsLen int) (*v1alpha1.Fleet, *v1alpha1.GameServerSet, []v1a
func newFakeController() (*Controller, agtesting.Mocks) {
m := agtesting.NewMocks()
wh := webhooks.NewWebHook("", "")
c := NewController(wh, m.KubeClient, m.ExtClient, m.AgonesClient, m.AgonesInformerFactory)
c := NewController(wh, &sync.Mutex{}, m.KubeClient, m.ExtClient, m.AgonesClient, m.AgonesInformerFactory)
c.recorder = m.FakeRecorder
return c, m
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/gameservers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package gameservers
import (
"encoding/json"
"fmt"
"sync"

"agones.dev/agones/pkg/apis/stable"
"agones.dev/agones/pkg/apis/stable/v1alpha1"
Expand Down Expand Up @@ -69,6 +70,7 @@ type Controller struct {
portAllocator *PortAllocator
healthController *HealthController
workerqueue *workerqueue.WorkerQueue
allocationMutex *sync.Mutex
stop <-chan struct{}
recorder record.EventRecorder
}
Expand All @@ -77,6 +79,7 @@ type Controller struct {
func NewController(
wh *webhooks.WebHook,
health healthcheck.Handler,
allocationMutex *sync.Mutex,
minPort, maxPort int32,
sidecarImage string,
alwaysPullSidecarImage bool,
Expand All @@ -93,6 +96,7 @@ func NewController(
c := &Controller{
sidecarImage: sidecarImage,
alwaysPullSidecarImage: alwaysPullSidecarImage,
allocationMutex: allocationMutex,
crdGetter: extClient.ApiextensionsV1beta1().CustomResourceDefinitions(),
podGetter: kubeClient.CoreV1(),
podLister: pods.Lister(),
Expand Down Expand Up @@ -608,7 +612,9 @@ func (c *Controller) syncGameServerShutdownState(gs *v1alpha1.GameServer) error
// be explicit about where to delete. We only need to wait for the Pod to be removed, which we handle with our
// own finalizer.
p := metav1.DeletePropagationBackground
c.allocationMutex.Lock()
err := c.gameServerGetter.GameServers(gs.ObjectMeta.Namespace).Delete(gs.ObjectMeta.Name, &metav1.DeleteOptions{PropagationPolicy: &p})
c.allocationMutex.Unlock()
if err != nil {
return errors.Wrapf(err, "error deleting Game Server %s", gs.ObjectMeta.Name)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/gameservers/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"fmt"
"sync"
"testing"

"agones.dev/agones/pkg/apis/stable"
Expand Down Expand Up @@ -1093,7 +1094,8 @@ func testWithNonZeroDeletionTimestamp(t *testing.T, f func(*Controller, *v1alpha
func newFakeController() (*Controller, agtesting.Mocks) {
m := agtesting.NewMocks()
wh := webhooks.NewWebHook("", "")
c := NewController(wh, healthcheck.NewHandler(), 10, 20, "sidecar:dev", false,
c := NewController(wh, healthcheck.NewHandler(), &sync.Mutex{},
10, 20, "sidecar:dev", false,
m.KubeClient, m.KubeInformationFactory, m.ExtClient, m.AgonesClient, m.AgonesInformerFactory)
c.recorder = m.FakeRecorder
return c, m
Expand Down
29 changes: 27 additions & 2 deletions pkg/gameserversets/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package gameserversets

import (
"encoding/json"
"sync"

"agones.dev/agones/pkg/apis/stable"
stablev1alpha1 "agones.dev/agones/pkg/apis/stable/v1alpha1"
Expand Down Expand Up @@ -60,13 +61,16 @@ type Controller struct {
gameServerSetLister listerv1alpha1.GameServerSetLister
gameServerSetSynced cache.InformerSynced
workerqueue *workerqueue.WorkerQueue
allocationMutex *sync.Mutex
stop <-chan struct{}
recorder record.EventRecorder
}

// NewController returns a new gameserverset crd controller
func NewController(
wh *webhooks.WebHook,
health healthcheck.Handler,
allocationMutex *sync.Mutex,
kubeClient kubernetes.Interface,
extClient extclientset.Interface,
agonesClient versioned.Interface,
Expand All @@ -85,6 +89,7 @@ func NewController(
gameServerSetGetter: agonesClient.StableV1alpha1(),
gameServerSetLister: gameServerSets.Lister(),
gameServerSetSynced: gsSetInformer.HasSynced,
allocationMutex: allocationMutex,
}

c.logger = runtime.NewLoggerWithType(c)
Expand Down Expand Up @@ -127,6 +132,8 @@ func NewController(
// Run the GameServerSet controller. Will block until stop is closed.
// Runs threadiness number workers to process the rate limited queue
func (c *Controller) Run(workers int, stop <-chan struct{}) error {
c.stop = stop

err := crd.WaitForEstablishedCRD(c.crdGetter, "gameserversets."+stable.GroupName, c.logger)
if err != nil {
return err
Expand Down Expand Up @@ -236,7 +243,7 @@ func (c *Controller) syncGameServerSet(key string) error {
if err := c.syncMoreGameServers(gsSet, diff); err != nil {
return err
}
if err := c.syncLessGameSevers(gsSet, list, diff); err != nil {
if err := c.syncLessGameSevers(gsSet, diff); err != nil {
return err
}
if err := c.syncGameServerSetState(gsSet, list); err != nil {
Expand All @@ -250,7 +257,9 @@ func (c *Controller) syncGameServerSet(key string) error {
func (c *Controller) syncUnhealthyGameServers(gsSet *stablev1alpha1.GameServerSet, list []*stablev1alpha1.GameServer) error {
for _, gs := range list {
if gs.Status.State == stablev1alpha1.Unhealthy && gs.ObjectMeta.DeletionTimestamp.IsZero() {
c.allocationMutex.Lock()
err := c.gameServerGetter.GameServers(gs.ObjectMeta.Namespace).Delete(gs.ObjectMeta.Name, nil)
c.allocationMutex.Unlock()
if err != nil {
return errors.Wrapf(err, "error deleting gameserver %s", gs.ObjectMeta.Name)
}
Expand Down Expand Up @@ -280,7 +289,7 @@ func (c *Controller) syncMoreGameServers(gsSet *stablev1alpha1.GameServerSet, di
}

// syncLessGameSevers removes Ready GameServers from the set of GameServers
func (c *Controller) syncLessGameSevers(gsSet *stablev1alpha1.GameServerSet, list []*stablev1alpha1.GameServer, diff int32) error {
func (c *Controller) syncLessGameSevers(gsSet *stablev1alpha1.GameServerSet, diff int32) error {
if diff >= 0 {
return nil
}
Expand All @@ -289,6 +298,22 @@ func (c *Controller) syncLessGameSevers(gsSet *stablev1alpha1.GameServerSet, lis
c.logger.WithField("diff", diff).WithField("gameserverset", gsSet.ObjectMeta.Name).Info("Deleting gameservers")
count := int32(0)

// don't allow allocation state for GameServers to change
c.allocationMutex.Lock()
defer c.allocationMutex.Unlock()

// make sure we are up to date with GameServer state
if !cache.WaitForCacheSync(c.stop, c.gameServerSynced) {
// if we can't sync the cache, then exit, and try and scale down
// again, and then we aren't blocking allocation at this time.
return errors.New("could not sync gameservers cache")
}

list, err := ListGameServersByGameServerSetOwner(c.gameServerLister, gsSet)
if err != nil {
return err
}

// count anything that is already being deleted
for _, gs := range list {
if !gs.ObjectMeta.DeletionTimestamp.IsZero() {
Expand Down
5 changes: 3 additions & 2 deletions pkg/gameserversets/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package gameserversets
import (
"encoding/json"
"strconv"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -300,7 +301,7 @@ func TestSyncLessGameServers(t *testing.T) {
assert.Nil(t, err)
assert.Len(t, list2, 11)

err = c.syncLessGameSevers(gsSet, list2, int32(-expected))
err = c.syncLessGameSevers(gsSet, int32(-expected))
assert.Nil(t, err)

// subtract one, because one is already deleted
Expand Down Expand Up @@ -481,7 +482,7 @@ func createGameServers(gsSet *v1alpha1.GameServerSet, size int) []v1alpha1.GameS
func newFakeController() (*Controller, agtesting.Mocks) {
m := agtesting.NewMocks()
wh := webhooks.NewWebHook("", "")
c := NewController(wh, healthcheck.NewHandler(), m.KubeClient, m.ExtClient, m.AgonesClient, m.AgonesInformerFactory)
c := NewController(wh, healthcheck.NewHandler(), &sync.Mutex{}, m.KubeClient, m.ExtClient, m.AgonesClient, m.AgonesInformerFactory)
c.recorder = m.FakeRecorder
return c, m
}
Loading

0 comments on commit e6818e3

Please sign in to comment.