forked from googleforgames/agones
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Allocated GameServers updated on Fleet update
Functional implementation and testing of applying labels and/or annotations to any `Allocated` `GameServers` that are overflowed past the configured replica count. Next ➡️ write some guides to close out the ticket. Work on googleforgames#2682
- Loading branch information
1 parent
2343471
commit f429ea5
Showing
12 changed files
with
745 additions
and
69 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,162 @@ | ||
// Copyright 2023 Google LLC All Rights Reserved. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package gameserversets | ||
|
||
import ( | ||
"context" | ||
"time" | ||
|
||
"agones.dev/agones/pkg/apis/agones" | ||
agonesv1 "agones.dev/agones/pkg/apis/agones/v1" | ||
"agones.dev/agones/pkg/client/clientset/versioned" | ||
getterv1 "agones.dev/agones/pkg/client/clientset/versioned/typed/agones/v1" | ||
"agones.dev/agones/pkg/client/informers/externalversions" | ||
listerv1 "agones.dev/agones/pkg/client/listers/agones/v1" | ||
"agones.dev/agones/pkg/gameservers" | ||
"agones.dev/agones/pkg/util/logfields" | ||
"agones.dev/agones/pkg/util/runtime" | ||
"agones.dev/agones/pkg/util/workerqueue" | ||
"github.com/heptiolabs/healthcheck" | ||
"github.com/pkg/errors" | ||
"github.com/sirupsen/logrus" | ||
k8serrors "k8s.io/apimachinery/pkg/api/errors" | ||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/client-go/tools/cache" | ||
) | ||
|
||
// AllocationOverflowController watches `GameServerSets`, and those with configured | ||
// AllocationOverflow settings, will the relevant labels and annotations to `GameServers` attached to the given | ||
// `GameServerSet` | ||
type AllocationOverflowController struct { | ||
baseLogger *logrus.Entry | ||
counter *gameservers.PerNodeCounter | ||
gameServerSynced cache.InformerSynced | ||
gameServerGetter getterv1.GameServersGetter | ||
gameServerLister listerv1.GameServerLister | ||
gameServerSetSynced cache.InformerSynced | ||
gameServerSetLister listerv1.GameServerSetLister | ||
workerqueue *workerqueue.WorkerQueue | ||
} | ||
|
||
// NewAllocatorOverflowController returns a new AllocationOverflowController | ||
func NewAllocatorOverflowController( | ||
health healthcheck.Handler, | ||
counter *gameservers.PerNodeCounter, | ||
agonesClient versioned.Interface, | ||
agonesInformerFactory externalversions.SharedInformerFactory) *AllocationOverflowController { | ||
gameServers := agonesInformerFactory.Agones().V1().GameServers() | ||
gameServerSet := agonesInformerFactory.Agones().V1().GameServerSets() | ||
gsSetInformer := gameServerSet.Informer() | ||
|
||
c := &AllocationOverflowController{ | ||
counter: counter, | ||
gameServerSynced: gameServers.Informer().HasSynced, | ||
gameServerGetter: agonesClient.AgonesV1(), | ||
gameServerLister: gameServers.Lister(), | ||
gameServerSetSynced: gsSetInformer.HasSynced, | ||
gameServerSetLister: gameServerSet.Lister(), | ||
} | ||
|
||
c.baseLogger = runtime.NewLoggerWithType(c) | ||
c.baseLogger.Debug("Created!") | ||
c.workerqueue = workerqueue.NewWorkerQueueWithRateLimiter(c.syncGameServerSet, c.baseLogger, logfields.GameServerSetKey, agones.GroupName+".GameServerSetController", workerqueue.FastRateLimiter(3*time.Second)) | ||
health.AddLivenessCheck("gameserverset-allocationoverflow-workerqueue", c.workerqueue.Healthy) | ||
|
||
gsSetInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ | ||
UpdateFunc: func(oldObj, newObj interface{}) { | ||
newGss := newObj.(*agonesv1.GameServerSet) | ||
|
||
// Only process if there is an AllocationOverflow, and it has labels or annotations. | ||
if newGss.Spec.AllocationOverflow == nil { | ||
return | ||
} else if len(newGss.Spec.AllocationOverflow.Labels) == 0 && len(newGss.Spec.AllocationOverflow.Annotations) == 0 { | ||
return | ||
} | ||
if newGss.Status.AllocatedReplicas > newGss.Spec.Replicas { | ||
c.workerqueue.Enqueue(newGss) | ||
} | ||
}, | ||
}) | ||
|
||
return c | ||
} | ||
|
||
// Run this controller. Will block until stop is closed. | ||
func (c *AllocationOverflowController) Run(ctx context.Context) error { | ||
c.baseLogger.Debug("Wait for cache sync") | ||
if !cache.WaitForCacheSync(ctx.Done(), c.gameServerSynced, c.gameServerSetSynced) { | ||
return errors.New("failed to wait for caches to sync") | ||
} | ||
|
||
c.workerqueue.Run(ctx, 1) | ||
return nil | ||
} | ||
|
||
// syncGameServerSet checks to see if there are overflow Allocated GameServers and applied the labels and/or | ||
// annotations to the requisite number of GameServers needed to alert the underlying system. | ||
func (c *AllocationOverflowController) syncGameServerSet(ctx context.Context, key string) error { | ||
// Convert the namespace/name string into a distinct namespace and name | ||
namespace, name, err := cache.SplitMetaNamespaceKey(key) | ||
if err != nil { | ||
// don't return an error, as we don't want this retried | ||
runtime.HandleError(loggerForGameServerSetKey(c.baseLogger, key), errors.Wrapf(err, "invalid resource key")) | ||
return nil | ||
} | ||
|
||
gsSet, err := c.gameServerSetLister.GameServerSets(namespace).Get(name) | ||
if err != nil { | ||
if k8serrors.IsNotFound(err) { | ||
loggerForGameServerSetKey(c.baseLogger, key).Debug("GameServerSet is no longer available for syncing") | ||
return nil | ||
} | ||
return errors.Wrapf(err, "error retrieving GameServerSet %s from namespace %s", name, namespace) | ||
} | ||
|
||
// just in case something changed, double check to avoid panics and/or sending work to the K8s API that we don't | ||
// need to | ||
if gsSet.Spec.AllocationOverflow == nil { | ||
return nil | ||
} | ||
if gsSet.Status.AllocatedReplicas <= gsSet.Spec.Replicas { | ||
return nil | ||
} | ||
|
||
overflow := gsSet.Status.AllocatedReplicas - gsSet.Spec.Replicas | ||
|
||
list, err := ListGameServersByGameServerSetOwner(c.gameServerLister, gsSet) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
matches, rest := gsSet.Spec.AllocationOverflow.CountMatches(list) | ||
if matches >= overflow { | ||
return nil | ||
} | ||
|
||
rest = sortGameServersByStrategy(gsSet.Spec.Scheduling, rest, c.counter.Counts()) | ||
rest = rest[:(overflow - matches)] | ||
|
||
opts := v1.UpdateOptions{} | ||
for _, gs := range rest { | ||
gsCopy := gs.DeepCopy() | ||
gsSet.Spec.AllocationOverflow.Apply(gsCopy) | ||
|
||
if _, err := c.gameServerGetter.GameServers(gs.ObjectMeta.Namespace).Update(ctx, gsCopy, opts); err != nil { | ||
return errors.Wrapf(err, "error updating GameServer %s with overflow labels and/or annotations", gs.ObjectMeta.Name) | ||
} | ||
} | ||
|
||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,203 @@ | ||
// Copyright 2023 Google LLC All Rights Reserved. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package gameserversets | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"testing" | ||
"time" | ||
|
||
agonesv1 "agones.dev/agones/pkg/apis/agones/v1" | ||
"agones.dev/agones/pkg/gameservers" | ||
agtesting "agones.dev/agones/pkg/testing" | ||
"github.com/heptiolabs/healthcheck" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
"k8s.io/apimachinery/pkg/runtime" | ||
"k8s.io/apimachinery/pkg/watch" | ||
k8stesting "k8s.io/client-go/testing" | ||
) | ||
|
||
func TestAllocationOverflowControllerWatchGameServers(t *testing.T) { | ||
t.Parallel() | ||
|
||
gsSet := defaultFixture() | ||
gsSet.Status.Replicas = gsSet.Spec.Replicas | ||
gsSet.Status.ReadyReplicas = gsSet.Spec.Replicas | ||
c, m := newFakeAllocationOverflowController() | ||
|
||
received := make(chan string, 10) | ||
defer close(received) | ||
|
||
gsSetWatch := watch.NewFake() | ||
m.AgonesClient.AddWatchReactor("gameserversets", k8stesting.DefaultWatchReactor(gsSetWatch, nil)) | ||
|
||
c.workerqueue.SyncHandler = func(_ context.Context, name string) error { | ||
received <- name | ||
return nil | ||
} | ||
|
||
ctx, cancel := agtesting.StartInformers(m, c.gameServerSetSynced) | ||
defer cancel() | ||
|
||
go func() { | ||
err := c.Run(ctx) | ||
require.NoError(t, err) | ||
}() | ||
|
||
change := func() string { | ||
select { | ||
case result := <-received: | ||
return result | ||
case <-time.After(3 * time.Second): | ||
require.FailNow(t, "timeout occurred") | ||
} | ||
return "" | ||
} | ||
|
||
nochange := func() { | ||
select { | ||
case <-received: | ||
assert.Fail(t, "Should be no value") | ||
case <-time.After(time.Second): | ||
} | ||
} | ||
|
||
gsSetWatch.Add(gsSet.DeepCopy()) | ||
nochange() | ||
|
||
// update with no allocation overflow | ||
require.Nil(t, gsSet.Spec.AllocationOverflow) | ||
gsSet.Spec.Replicas++ | ||
gsSetWatch.Modify(gsSet.DeepCopy()) | ||
nochange() | ||
|
||
// update with no labels or annotations | ||
gsSet.Spec.AllocationOverflow = &agonesv1.AllocationOverflow{} | ||
gsSet.Spec.Replicas++ | ||
gsSetWatch.Modify(gsSet.DeepCopy()) | ||
nochange() | ||
|
||
// update with allocation <= replicas (and a label) | ||
gsSet.Spec.AllocationOverflow.Labels = map[string]string{"colour": "green"} | ||
gsSet.Status.AllocatedReplicas = 2 | ||
gsSetWatch.Modify(gsSet.DeepCopy()) | ||
nochange() | ||
|
||
// update with allocation > replicas | ||
gsSet.Status.AllocatedReplicas = 20 | ||
gsSetWatch.Modify(gsSet.DeepCopy()) | ||
require.Equal(t, fmt.Sprintf("%s/%s", gsSet.ObjectMeta.Namespace, gsSet.ObjectMeta.Name), change()) | ||
|
||
// delete | ||
gsSetWatch.Delete(gsSet.DeepCopy()) | ||
nochange() | ||
} | ||
|
||
func TestAllocationOverflowSyncGameServerSet(t *testing.T) { | ||
t.Parallel() | ||
|
||
// setup fictures. | ||
setup := func(gs func(server *agonesv1.GameServer)) (*agonesv1.GameServerSet, *AllocationOverflowController, agtesting.Mocks) { | ||
gsSet := defaultFixture() | ||
gsSet.Status.AllocatedReplicas = 5 | ||
gsSet.Status.Replicas = 3 | ||
gsSet.Spec.Replicas = 3 | ||
gsSet.Spec.AllocationOverflow = &agonesv1.AllocationOverflow{Labels: map[string]string{"colour": "green"}} | ||
list := createGameServers(gsSet, 5) | ||
for i := range list { | ||
list[i].Status.State = agonesv1.GameServerStateAllocated | ||
gs(&list[i]) | ||
} | ||
|
||
c, m := newFakeAllocationOverflowController() | ||
m.AgonesClient.AddReactor("list", "gameserversets", func(action k8stesting.Action) (bool, runtime.Object, error) { | ||
return true, &agonesv1.GameServerSetList{Items: []agonesv1.GameServerSet{*gsSet}}, nil | ||
}) | ||
m.AgonesClient.AddReactor("list", "gameservers", func(action k8stesting.Action) (bool, runtime.Object, error) { | ||
return true, &agonesv1.GameServerList{Items: list}, nil | ||
}) | ||
return gsSet, c, m | ||
} | ||
|
||
// run the sync process | ||
run := func(c *AllocationOverflowController, m agtesting.Mocks, gsSet *agonesv1.GameServerSet, update func(action k8stesting.Action) (bool, runtime.Object, error)) func() { | ||
m.AgonesClient.AddReactor("update", "gameservers", update) | ||
ctx, cancel := agtesting.StartInformers(m, c.gameServerSetSynced, c.gameServerSynced) | ||
err := c.syncGameServerSet(ctx, gsSet.ObjectMeta.Namespace+"/"+gsSet.ObjectMeta.Name) | ||
require.NoError(t, err) | ||
return cancel | ||
} | ||
|
||
t.Run("labels are applied", func(t *testing.T) { | ||
gsSet, c, m := setup(func(_ *agonesv1.GameServer) {}) | ||
count := 0 | ||
cancel := run(c, m, gsSet, func(action k8stesting.Action) (bool, runtime.Object, error) { | ||
ua := action.(k8stesting.UpdateAction) | ||
gs := ua.GetObject().(*agonesv1.GameServer) | ||
require.Equal(t, gs.Status.State, agonesv1.GameServerStateAllocated) | ||
require.Equal(t, "green", gs.ObjectMeta.Labels["colour"]) | ||
|
||
count++ | ||
return true, nil, nil | ||
}) | ||
defer cancel() | ||
require.Equal(t, 2, count) | ||
}) | ||
|
||
t.Run("Labels are already set", func(t *testing.T) { | ||
gsSet, c, m := setup(func(gs *agonesv1.GameServer) { | ||
gs.ObjectMeta.Labels["colour"] = "green" | ||
}) | ||
cancel := run(c, m, gsSet, func(action k8stesting.Action) (bool, runtime.Object, error) { | ||
require.Fail(t, "should not update") | ||
return true, nil, nil | ||
}) | ||
defer cancel() | ||
}) | ||
|
||
t.Run("one label is set", func(t *testing.T) { | ||
set := false | ||
gsSet, c, m := setup(func(gs *agonesv1.GameServer) { | ||
// just make one as already set | ||
if !set { | ||
gs.ObjectMeta.Labels["colour"] = "green" | ||
set = true | ||
} | ||
}) | ||
|
||
count := 0 | ||
cancel := run(c, m, gsSet, func(action k8stesting.Action) (bool, runtime.Object, error) { | ||
ua := action.(k8stesting.UpdateAction) | ||
gs := ua.GetObject().(*agonesv1.GameServer) | ||
require.Equal(t, gs.Status.State, agonesv1.GameServerStateAllocated) | ||
require.Equal(t, "green", gs.ObjectMeta.Labels["colour"]) | ||
|
||
count++ | ||
return true, nil, nil | ||
}) | ||
defer cancel() | ||
require.Equal(t, 1, count) | ||
}) | ||
} | ||
|
||
// newFakeAllocationOverflowController returns a controller, backed by the fake Clientset | ||
func newFakeAllocationOverflowController() (*AllocationOverflowController, agtesting.Mocks) { | ||
m := agtesting.NewMocks() | ||
counter := gameservers.NewPerNodeCounter(m.KubeInformerFactory, m.AgonesInformerFactory) | ||
c := NewAllocatorOverflowController(healthcheck.NewHandler(), counter, m.AgonesClient, m.AgonesInformerFactory) | ||
return c, m | ||
} |
Oops, something went wrong.