Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for race condition: Allocation of Deleting GameServers Possible #367

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"path/filepath"
"reflect"
"strings"
"sync"
"time"

"agones.dev/agones/pkg"
Expand Down Expand Up @@ -90,14 +91,16 @@ func main() {
agonesInformerFactory := externalversions.NewSharedInformerFactory(agonesClient, defaultResync)
kubeInformationFactory := informers.NewSharedInformerFactory(kubeClient, defaultResync)

gsController := gameservers.NewController(wh, health,
allocationMutex := &sync.Mutex{}

gsController := gameservers.NewController(wh, health, allocationMutex,
ctlConf.minPort, ctlConf.maxPort, ctlConf.sidecarImage, ctlConf.alwaysPullSidecar,
kubeClient, kubeInformationFactory, extClient, agonesClient, agonesInformerFactory)
gsSetController := gameserversets.NewController(wh, health,
gsSetController := gameserversets.NewController(wh, health, allocationMutex,
kubeClient, extClient, agonesClient, agonesInformerFactory)
fleetController := fleets.NewController(wh, health,
fleetController := fleets.NewController(wh, health, kubeClient, extClient, agonesClient, agonesInformerFactory)
faController := fleetallocation.NewController(wh, allocationMutex,
kubeClient, extClient, agonesClient, agonesInformerFactory)
faController := fleetallocation.NewController(wh, kubeClient, extClient, agonesClient, agonesInformerFactory)

stop := signals.NewStopChannel()

Expand Down
5 changes: 3 additions & 2 deletions pkg/fleetallocation/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,14 @@ type Controller struct {
fleetAllocationGetter getterv1alpha1.FleetAllocationsGetter
fleetAllocationLister listerv1alpha1.FleetAllocationLister
stop <-chan struct{}
allocationMutex sync.Mutex
allocationMutex *sync.Mutex
recorder record.EventRecorder
}

// NewController returns a controller for a FleetAllocation
func NewController(
wh *webhooks.WebHook,
allocationMutex *sync.Mutex,
kubeClient kubernetes.Interface,
extClient extclientset.Interface,
agonesClient versioned.Interface,
Expand All @@ -85,7 +86,7 @@ func NewController(
fleetLister: agonesInformer.Fleets().Lister(),
fleetAllocationGetter: agonesClient.StableV1alpha1(),
fleetAllocationLister: agonesInformer.FleetAllocations().Lister(),
allocationMutex: sync.Mutex{},
allocationMutex: allocationMutex,
}
c.logger = runtime.NewLoggerWithType(c)

Expand Down
2 changes: 1 addition & 1 deletion pkg/fleetallocation/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func defaultFixtures(gsLen int) (*v1alpha1.Fleet, *v1alpha1.GameServerSet, []v1a
func newFakeController() (*Controller, agtesting.Mocks) {
m := agtesting.NewMocks()
wh := webhooks.NewWebHook("", "")
c := NewController(wh, m.KubeClient, m.ExtClient, m.AgonesClient, m.AgonesInformerFactory)
c := NewController(wh, &sync.Mutex{}, m.KubeClient, m.ExtClient, m.AgonesClient, m.AgonesInformerFactory)
c.recorder = m.FakeRecorder
return c, m
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/gameservers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package gameservers
import (
"encoding/json"
"fmt"
"sync"

"agones.dev/agones/pkg/apis/stable"
"agones.dev/agones/pkg/apis/stable/v1alpha1"
Expand Down Expand Up @@ -69,6 +70,7 @@ type Controller struct {
portAllocator *PortAllocator
healthController *HealthController
workerqueue *workerqueue.WorkerQueue
allocationMutex *sync.Mutex
stop <-chan struct{}
recorder record.EventRecorder
}
Expand All @@ -77,6 +79,7 @@ type Controller struct {
func NewController(
wh *webhooks.WebHook,
health healthcheck.Handler,
allocationMutex *sync.Mutex,
minPort, maxPort int32,
sidecarImage string,
alwaysPullSidecarImage bool,
Expand All @@ -93,6 +96,7 @@ func NewController(
c := &Controller{
sidecarImage: sidecarImage,
alwaysPullSidecarImage: alwaysPullSidecarImage,
allocationMutex: allocationMutex,
crdGetter: extClient.ApiextensionsV1beta1().CustomResourceDefinitions(),
podGetter: kubeClient.CoreV1(),
podLister: pods.Lister(),
Expand Down Expand Up @@ -608,7 +612,9 @@ func (c *Controller) syncGameServerShutdownState(gs *v1alpha1.GameServer) error
// be explicit about where to delete. We only need to wait for the Pod to be removed, which we handle with our
// own finalizer.
p := metav1.DeletePropagationBackground
c.allocationMutex.Lock()
err := c.gameServerGetter.GameServers(gs.ObjectMeta.Namespace).Delete(gs.ObjectMeta.Name, &metav1.DeleteOptions{PropagationPolicy: &p})
c.allocationMutex.Unlock()
if err != nil {
return errors.Wrapf(err, "error deleting Game Server %s", gs.ObjectMeta.Name)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/gameservers/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"fmt"
"sync"
"testing"

"agones.dev/agones/pkg/apis/stable"
Expand Down Expand Up @@ -1093,7 +1094,8 @@ func testWithNonZeroDeletionTimestamp(t *testing.T, f func(*Controller, *v1alpha
func newFakeController() (*Controller, agtesting.Mocks) {
m := agtesting.NewMocks()
wh := webhooks.NewWebHook("", "")
c := NewController(wh, healthcheck.NewHandler(), 10, 20, "sidecar:dev", false,
c := NewController(wh, healthcheck.NewHandler(), &sync.Mutex{},
10, 20, "sidecar:dev", false,
m.KubeClient, m.KubeInformationFactory, m.ExtClient, m.AgonesClient, m.AgonesInformerFactory)
c.recorder = m.FakeRecorder
return c, m
Expand Down
29 changes: 27 additions & 2 deletions pkg/gameserversets/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package gameserversets

import (
"encoding/json"
"sync"

"agones.dev/agones/pkg/apis/stable"
stablev1alpha1 "agones.dev/agones/pkg/apis/stable/v1alpha1"
Expand Down Expand Up @@ -60,13 +61,16 @@ type Controller struct {
gameServerSetLister listerv1alpha1.GameServerSetLister
gameServerSetSynced cache.InformerSynced
workerqueue *workerqueue.WorkerQueue
allocationMutex *sync.Mutex
stop <-chan struct{}
recorder record.EventRecorder
}

// NewController returns a new gameserverset crd controller
func NewController(
wh *webhooks.WebHook,
health healthcheck.Handler,
allocationMutex *sync.Mutex,
kubeClient kubernetes.Interface,
extClient extclientset.Interface,
agonesClient versioned.Interface,
Expand All @@ -85,6 +89,7 @@ func NewController(
gameServerSetGetter: agonesClient.StableV1alpha1(),
gameServerSetLister: gameServerSets.Lister(),
gameServerSetSynced: gsSetInformer.HasSynced,
allocationMutex: allocationMutex,
}

c.logger = runtime.NewLoggerWithType(c)
Expand Down Expand Up @@ -127,6 +132,8 @@ func NewController(
// Run the GameServerSet controller. Will block until stop is closed.
// Runs threadiness number workers to process the rate limited queue
func (c *Controller) Run(workers int, stop <-chan struct{}) error {
c.stop = stop

err := crd.WaitForEstablishedCRD(c.crdGetter, "gameserversets."+stable.GroupName, c.logger)
if err != nil {
return err
Expand Down Expand Up @@ -236,7 +243,7 @@ func (c *Controller) syncGameServerSet(key string) error {
if err := c.syncMoreGameServers(gsSet, diff); err != nil {
return err
}
if err := c.syncLessGameSevers(gsSet, list, diff); err != nil {
if err := c.syncLessGameSevers(gsSet, diff); err != nil {
return err
}
if err := c.syncGameServerSetState(gsSet, list); err != nil {
Expand All @@ -250,7 +257,9 @@ func (c *Controller) syncGameServerSet(key string) error {
func (c *Controller) syncUnhealthyGameServers(gsSet *stablev1alpha1.GameServerSet, list []*stablev1alpha1.GameServer) error {
for _, gs := range list {
if gs.Status.State == stablev1alpha1.Unhealthy && gs.ObjectMeta.DeletionTimestamp.IsZero() {
c.allocationMutex.Lock()
err := c.gameServerGetter.GameServers(gs.ObjectMeta.Namespace).Delete(gs.ObjectMeta.Name, nil)
c.allocationMutex.Unlock()
if err != nil {
return errors.Wrapf(err, "error deleting gameserver %s", gs.ObjectMeta.Name)
}
Expand Down Expand Up @@ -280,7 +289,7 @@ func (c *Controller) syncMoreGameServers(gsSet *stablev1alpha1.GameServerSet, di
}

// syncLessGameSevers removes Ready GameServers from the set of GameServers
func (c *Controller) syncLessGameSevers(gsSet *stablev1alpha1.GameServerSet, list []*stablev1alpha1.GameServer, diff int32) error {
func (c *Controller) syncLessGameSevers(gsSet *stablev1alpha1.GameServerSet, diff int32) error {
if diff >= 0 {
return nil
}
Expand All @@ -289,6 +298,22 @@ func (c *Controller) syncLessGameSevers(gsSet *stablev1alpha1.GameServerSet, lis
c.logger.WithField("diff", diff).WithField("gameserverset", gsSet.ObjectMeta.Name).Info("Deleting gameservers")
count := int32(0)

// don't allow allocation state for GameServers to change
c.allocationMutex.Lock()
defer c.allocationMutex.Unlock()

// make sure we are up to date with GameServer state
if !cache.WaitForCacheSync(c.stop, c.gameServerSynced) {
// if we can't sync the cache, then exit, and try and scale down
// again, and then we aren't blocking allocation at this time.
return errors.New("could not sync gameservers cache")
}

list, err := ListGameServersByGameServerSetOwner(c.gameServerLister, gsSet)
if err != nil {
return err
}

// count anything that is already being deleted
for _, gs := range list {
if !gs.ObjectMeta.DeletionTimestamp.IsZero() {
Expand Down
5 changes: 3 additions & 2 deletions pkg/gameserversets/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package gameserversets
import (
"encoding/json"
"strconv"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -300,7 +301,7 @@ func TestSyncLessGameServers(t *testing.T) {
assert.Nil(t, err)
assert.Len(t, list2, 11)

err = c.syncLessGameSevers(gsSet, list2, int32(-expected))
err = c.syncLessGameSevers(gsSet, int32(-expected))
assert.Nil(t, err)

// subtract one, because one is already deleted
Expand Down Expand Up @@ -481,7 +482,7 @@ func createGameServers(gsSet *v1alpha1.GameServerSet, size int) []v1alpha1.GameS
func newFakeController() (*Controller, agtesting.Mocks) {
m := agtesting.NewMocks()
wh := webhooks.NewWebHook("", "")
c := NewController(wh, healthcheck.NewHandler(), m.KubeClient, m.ExtClient, m.AgonesClient, m.AgonesInformerFactory)
c := NewController(wh, healthcheck.NewHandler(), &sync.Mutex{}, m.KubeClient, m.ExtClient, m.AgonesClient, m.AgonesInformerFactory)
c.recorder = m.FakeRecorder
return c, m
}
Loading