From f47cc64a9b641a7a2a3d6dfc92218189d6e0f68a Mon Sep 17 00:00:00 2001 From: Mark Mandel Date: Sun, 17 Dec 2017 16:20:25 -0800 Subject: [PATCH] Dynamic Port Allocation for GameServers Implementation of a PortAllocator that ties into the GameServer controller that will allocate a port on GameServer creations (if "dynamic" portPolicy is set) and then also release the port when a GameServer has been deleted. This also includes experimental support for node adding, removal and unscheduling within the cluster. Closes #14 --- build/build-image/Dockerfile | 2 +- build/build-image/gen-crd-client.sh | 3 +- build/install.yaml | 4 + examples/cpp-simple/gameserver.yaml | 6 +- examples/gameserver.yaml | 16 +- gameservers/controller/controller.go | 42 ++- gameservers/controller/controller_test.go | 165 +++++---- gameservers/controller/helper_test.go | 68 ++++ gameservers/controller/main.go | 18 +- gameservers/controller/portallocator.go | 255 +++++++++++++ gameservers/controller/portallocator_test.go | 358 +++++++++++++++++++ install.yaml | 6 + pkg/apis/stable/v1alpha1/types.go | 14 +- pkg/apis/stable/v1alpha1/types_test.go | 4 + 14 files changed, 864 insertions(+), 97 deletions(-) create mode 100644 gameservers/controller/helper_test.go create mode 100644 gameservers/controller/portallocator.go create mode 100644 gameservers/controller/portallocator_test.go diff --git a/build/build-image/Dockerfile b/build/build-image/Dockerfile index 867582d671..7ea79c9c1b 100644 --- a/build/build-image/Dockerfile +++ b/build/build-image/Dockerfile @@ -34,7 +34,7 @@ RUN wget -q https://dl.google.com/dl/cloudsdk/release/google-cloud-sdk.zip && un /opt/google-cloud-sdk/install.sh --usage-reporting=true --path-update=true --bash-completion=true --rc-path=/root/.bashrc # update the path for both go and gcloud -ENV PATH /usr/local/go/bin:/opt/google-cloud-sdk/bin:$PATH +ENV PATH /usr/local/go/bin:/go/bin:/opt/google-cloud-sdk/bin:$PATH # RUN gcloud components update RUN gcloud components update && gcloud components install kubectl diff --git a/build/build-image/gen-crd-client.sh b/build/build-image/gen-crd-client.sh index eb9b333ce5..eec74f2726 100644 --- a/build/build-image/gen-crd-client.sh +++ b/build/build-image/gen-crd-client.sh @@ -15,7 +15,8 @@ # limitations under the License. rsync -r /go/src/github.com/agonio/agon/vendor/k8s.io/ /go/src/k8s.io/ -/go/src/k8s.io/code-generator/generate-groups.sh "all" \ +cd /go/src/k8s.io/code-generator +./generate-groups.sh "all" \ github.com/agonio/agon/pkg/client \ github.com/agonio/agon/pkg/apis stable:v1alpha1 \ --go-header-file=/go/src/github.com/agonio/agon/build/boilerplate.go.txt diff --git a/build/install.yaml b/build/install.yaml index 14a3e935db..885a6113c5 100644 --- a/build/install.yaml +++ b/build/install.yaml @@ -51,6 +51,10 @@ spec: value: "true" - name: SIDECAR # overwrite the GameServer sidecar image that is used value: ${REGISTRY}/gameservers-sidecar:${VERSION} + - name: MIN_PORT + value: "7000" + - name: MAX_PORT + value: "8000" livenessProbe: httpGet: path: /healthz diff --git a/examples/cpp-simple/gameserver.yaml b/examples/cpp-simple/gameserver.yaml index ab3de3ff75..a75d0dc974 100644 --- a/examples/cpp-simple/gameserver.yaml +++ b/examples/cpp-simple/gameserver.yaml @@ -15,11 +15,11 @@ apiVersion: "stable.agon.io/v1alpha1" kind: GameServer metadata: - name: cpp-simple + # generate a unique name + # will need to be created with `kubectl create` + generateName: cpp-simple- spec: - portPolicy: "static" containerPort: 7654 - hostPort: 7778 template: spec: containers: diff --git a/examples/gameserver.yaml b/examples/gameserver.yaml index 7158b24e87..83816181f0 100644 --- a/examples/gameserver.yaml +++ b/examples/gameserver.yaml @@ -25,21 +25,27 @@ apiVersion: "stable.agon.io/v1alpha1" kind: GameServer +# GameServer Metadata +# https://v1-8.docs.kubernetes.io/docs/api-reference/v1.8/#objectmeta-v1-meta metadata: - name: "gds-example" + # generateName: "gds-example" # generate a unique name, with the given prefix + name: "gds-example" # set a fixed name spec: # if there is more than one container, specify which one is the game server container: example-server - # `static` is the only current option. Dynamic port allocated will come in future releases. - # When `static` is the policy specified, `hostPort` is required, to specify the port that game clients will connect to + # portPolicy has two options: + # - "dynamic" (default) the system allocates a free hostPort for the gameserver, for game clients to connect to + # - "static", user defines the hostPort that the game client will connect to. Then onus is on the user to ensure that the + # port is available. When static is the policy specified, `hostPort` is required to be populated portPolicy: "static" # the port that is being opened on the game server process containerPort: 7654 - # the port exposed on the host + # the port exposed on the host, only required when `portPolicy` is "static". Overwritten when portPolicy is "dynamic". hostPort: 7777 # protocol being used. Defaults to UDP. TCP is the only other option protocol: UDP - # Pod configuration + # Pod template configuration + # https://v1-8.docs.kubernetes.io/docs/api-reference/v1.8/#podtemplate-v1-core template: # pod metadata. Name & Namespace is overwritten metadata: diff --git a/gameservers/controller/controller.go b/gameservers/controller/controller.go index bbaa929559..6692100628 100644 --- a/gameservers/controller/controller.go +++ b/gameservers/controller/controller.go @@ -15,9 +15,8 @@ package main import ( - "time" - "net/http" + "time" "github.com/agonio/agon/pkg/apis/stable" stablev1alpha1 "github.com/agonio/agon/pkg/apis/stable/v1alpha1" @@ -60,6 +59,7 @@ type Controller struct { gameServerSynced cache.InformerSynced nodeLister corelisterv1.NodeLister queue workqueue.RateLimitingInterface + portAllocator *PortAllocator server *http.Server // this allows for overwriting for testing purposes @@ -67,7 +67,8 @@ type Controller struct { } // NewController returns a new gameserver crd controller -func NewController(sidecarImage string, +func NewController(minPort, maxPort int32, + sidecarImage string, alwaysPullSidecarImage bool, kubeClient kubernetes.Interface, kubeInformerFactory informers.SharedInformerFactory, @@ -89,6 +90,7 @@ func NewController(sidecarImage string, gameServerSynced: gsInformer.HasSynced, nodeLister: kubeInformerFactory.Core().V1().Nodes().Lister(), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), stable.GroupName), + portAllocator: NewPortAllocator(minPort, maxPort, kubeInformerFactory, agonInformerFactory), } gsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -118,7 +120,11 @@ func NewController(sidecarImage string, mux := http.NewServeMux() mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { - w.Write([]byte("ok")) + _, err := w.Write([]byte("ok")) + if err != nil { + logrus.WithError(err).Error("could not send ok response on healthz") + w.WriteHeader(http.StatusInternalServerError) + } }) c.server = &http.Server{ @@ -137,10 +143,15 @@ func (c Controller) Run(threadiness int, stop <-chan struct{}) error { logrus.Info("Starting health check...") go func() { if err := c.server.ListenAndServe(); err != nil { - logrus.WithError(err).Error("Could not listen on :8080") + if err == http.ErrServerClosed { + logrus.WithError(err).Info("health check: http server closed") + } else { + err := errors.Wrap(err, "Could not listen on :8080") + runtime.HandleError(logrus.WithError(err), err) + } } }() - defer c.server.Close() + defer c.server.Close() // nolint: errcheck err := c.waitForEstablishedCRD() if err != nil { @@ -152,6 +163,10 @@ func (c Controller) Run(threadiness int, stop <-chan struct{}) error { return errors.New("failed to wait for caches to sync") } + if err := c.portAllocator.Run(stop); err != nil { + return err + } + logrus.Info("Starting workers...") for i := 0; i < threadiness; i++ { go wait.Until(c.runWorker, time.Second, stop) @@ -298,6 +313,16 @@ func (c *Controller) syncGameServerBlankState(gs *stablev1alpha1.GameServer) (*s if gs.Status.State == "" && gs.ObjectMeta.DeletionTimestamp.IsZero() { gsCopy := gs.DeepCopy() gsCopy.ApplyDefaults() + + // manage dynamic ports + if gsCopy.Spec.PortPolicy == stablev1alpha1.Dynamic { + port, err := c.portAllocator.Allocate() + if err != nil { + return gsCopy, errors.Wrapf(err, "error allocating port for GameServer %s", gsCopy.Name) + } + gsCopy.Spec.HostPort = port + } + logrus.WithField("gs", gsCopy).Info("Syncing Blank State") gs, err := c.gameServerGetter.GameServers(gs.ObjectMeta.Namespace).Update(gsCopy) return gs, errors.Wrapf(err, "error updating GameServer %s to default values", gs.Name) @@ -384,6 +409,7 @@ func (c *Controller) syncGameServerRequestReadyState(gs *stablev1alpha1.GameServ gsCopy := gs.DeepCopy() gsCopy.Status.State = stablev1alpha1.Ready gsCopy.Status.Address = addr + gsCopy.Status.NodeName = pod.Spec.NodeName // HostPort is always going to be populated, even when dynamic // This will be a double up of information, but it will be easier to read gsCopy.Status.Port = gs.Spec.HostPort @@ -398,8 +424,8 @@ func (c *Controller) syncGameServerRequestReadyState(gs *stablev1alpha1.GameServ func (c *Controller) syncGameServerShutdownState(gs *stablev1alpha1.GameServer) (*stablev1alpha1.GameServer, error) { if gs.Status.State == stablev1alpha1.Shutdown && gs.ObjectMeta.DeletionTimestamp.IsZero() { logrus.WithField("gs", gs).Info("Syncing Shutdown State") - // let's be explicit about how we want to shut things down - p := metav1.DeletePropagationBackground + // Do it in the foreground, so the gameserver gets killed last + p := metav1.DeletePropagationForeground err := c.gameServerGetter.GameServers(gs.ObjectMeta.Namespace).Delete(gs.ObjectMeta.Name, &metav1.DeleteOptions{PropagationPolicy: &p}) return nil, errors.Wrapf(err, "error deleting Game Server %s", gs.ObjectMeta.Name) } diff --git a/gameservers/controller/controller_test.go b/gameservers/controller/controller_test.go index 335e5b4ce2..31d9f5c6a2 100644 --- a/gameservers/controller/controller_test.go +++ b/gameservers/controller/controller_test.go @@ -15,32 +15,25 @@ package main import ( - "sync" - "testing" - "time" - "fmt" - "io/ioutil" "net/http" + "sync" + "testing" + "time" "github.com/agonio/agon/pkg/apis/stable" "github.com/agonio/agon/pkg/apis/stable/v1alpha1" - agonfake "github.com/agonio/agon/pkg/client/clientset/versioned/fake" - "github.com/agonio/agon/pkg/client/informers/externalversions" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" - extfake "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/informers" - kubefake "k8s.io/client-go/kubernetes/fake" k8stesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" ) @@ -124,8 +117,8 @@ func TestSyncGameServer(t *testing.T) { return true, gs, nil }) - stop := startInformers(c, mocks) - defer close(stop) + _, cancel := startInformers(mocks, c.gameServerSynced) + defer cancel() err := c.syncHandler("default/test") assert.Nil(t, err) @@ -149,8 +142,8 @@ func TestSyncGameServer(t *testing.T) { return false, nil, nil }) - stop := startInformers(c, mocks) - defer close(stop) + _, cancel := startInformers(mocks, c.gameServerSynced) + defer cancel() agonWatch.Delete(fixture) @@ -185,8 +178,8 @@ func TestWatchGameServers(t *testing.T) { return nil } - stop := startInformers(c, mocks) - defer close(stop) + stop, cancel := startInformers(mocks, c.gameServerSynced) + defer cancel() go func() { err := c.Run(1, stop) @@ -225,8 +218,8 @@ func TestHealthCheck(t *testing.T) { return nil } - stop := startInformers(c, mocks) - defer close(stop) + stop, cancel := startInformers(mocks, c.gameServerSynced) + defer cancel() go func() { err := c.Run(1, stop) @@ -234,11 +227,14 @@ func TestHealthCheck(t *testing.T) { }() resp, err := http.Get("http://localhost:8080/healthz") - assert.Nil(t, err, "health check error should be nil") - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) - assert.Nil(t, err, "read response error should be nil") - assert.Equal(t, []byte("ok"), body, "response body should be 'ok'") + assert.Nil(t, err, "health check error should be nil: %s", err) + assert.NotNil(t, resp) + if resp != nil { + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + assert.Nil(t, err, "read response error should be nil") + assert.Equal(t, []byte("ok"), body, "response body should be 'ok'") + } } func TestSyncGameServerDeletionTimestamp(t *testing.T) { @@ -265,8 +261,8 @@ func TestSyncGameServerDeletionTimestamp(t *testing.T) { return true, nil, nil }) - stop := startInformers(c, mocks) - defer close(stop) + _, cancel := startInformers(mocks, c.gameServerSynced) + defer cancel() result, err := c.syncGameServerDeletionTimestamp(fixture) assert.Nil(t, err) @@ -292,8 +288,8 @@ func TestSyncGameServerDeletionTimestamp(t *testing.T) { return true, gs, nil }) - stop := startInformers(c, mocks) - defer close(stop) + _, cancel := startInformers(mocks, c.gameServerSynced) + defer cancel() result, err := c.syncGameServerDeletionTimestamp(fixture) assert.Nil(t, err) @@ -304,6 +300,7 @@ func TestSyncGameServerDeletionTimestamp(t *testing.T) { } func TestSyncGameServerBlankState(t *testing.T) { + t.Parallel() t.Run("GameServer with a blank initial state", func(t *testing.T) { c, mocks := newFakeController() @@ -327,6 +324,50 @@ func TestSyncGameServerBlankState(t *testing.T) { assert.Equal(t, v1alpha1.Creating, result.Status.State) }) + t.Run("Gameserver with dynamic port state", func(t *testing.T) { + t.Parallel() + c, mocks := newFakeController() + fixture := &v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"}, + Spec: v1alpha1.GameServerSpec{ + ContainerPort: 7777, + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "container", Image: "container/image"}}, + }, + }, + }, + } + mocks.kubeClient.AddReactor("list", "nodes", func(action k8stesting.Action) (bool, runtime.Object, error) { + return true, &corev1.NodeList{Items: []corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node1"}}}}, nil + }) + + updated := false + + mocks.agonClient.AddReactor("update", "gameservers", func(action k8stesting.Action) (bool, runtime.Object, error) { + updated = true + ua := action.(k8stesting.UpdateAction) + gs := ua.GetObject().(*v1alpha1.GameServer) + assert.Equal(t, fixture.ObjectMeta.Name, gs.ObjectMeta.Name) + assert.Equal(t, v1alpha1.Dynamic, gs.Spec.PortPolicy) + assert.NotEqual(t, fixture.Spec.HostPort, gs.Spec.HostPort) + assert.True(t, 10 <= gs.Spec.HostPort && gs.Spec.HostPort <= 20, "%s not in range", gs.Spec.HostPort) + + return true, gs, nil + }) + + stop, cancel := startInformers(mocks, c.gameServerSynced) + defer cancel() + err := c.portAllocator.Run(stop) + assert.Nil(t, err) + + result, err := c.syncGameServerBlankState(fixture) + assert.Nil(t, err, "sync should not error") + assert.True(t, updated, "update should occur") + assert.Equal(t, v1alpha1.Dynamic, result.Spec.PortPolicy) + assert.NotEqual(t, fixture.Spec.HostPort, result.Spec.HostPort) + assert.True(t, 10 <= result.Spec.HostPort && result.Spec.HostPort <= 20, "%s not in range", result.Spec.HostPort) + }) + t.Run("Gameserver with unknown state", func(t *testing.T) { testWithUnknownState(t, func(c *Controller, fixture *v1alpha1.GameServer) (*v1alpha1.GameServer, error) { return c.syncGameServerBlankState(fixture) @@ -416,8 +457,8 @@ func TestSyncGameServerCreatingState(t *testing.T) { return true, gs, nil }) - stop := startInformers(c, mocks) - defer close(stop) + _, cancel := startInformers(mocks, c.gameServerSynced) + defer cancel() gs, err := c.syncGameServerCreatingState(fixture) assert.Equal(t, v1alpha1.Starting, gs.Status.State) @@ -444,8 +485,8 @@ func TestSyncGameServerCreatingState(t *testing.T) { return true, gs, nil }) - stop := startInformers(c, mocks) - defer close(stop) + _, cancel := startInformers(mocks, c.gameServerSynced) + defer cancel() gs, err := c.syncGameServerCreatingState(fixture) assert.Nil(t, err) @@ -497,11 +538,12 @@ func TestSyncGameServerRequestReadyState(t *testing.T) { assert.Equal(t, v1alpha1.Ready, gs.Status.State) assert.Equal(t, gs.Spec.HostPort, gs.Status.Port) assert.Equal(t, ipFixture, gs.Status.Address) + assert.Equal(t, node.ObjectMeta.Name, gs.Status.NodeName) return true, gs, nil }) - stop := startInformers(c, mocks) - defer close(stop) + _, cancel := startInformers(mocks, c.gameServerSynced) + defer cancel() gs, err := c.syncGameServerRequestReadyState(gsFixture) assert.Nil(t, err, "should not error") @@ -509,7 +551,7 @@ func TestSyncGameServerRequestReadyState(t *testing.T) { assert.Equal(t, v1alpha1.Ready, gs.Status.State) assert.Equal(t, gs.Spec.HostPort, gs.Status.Port) assert.Equal(t, ipFixture, gs.Status.Address) - + assert.Equal(t, node.ObjectMeta.Name, gs.Status.NodeName) }) t.Run("GameServer with unknown state", func(t *testing.T) { @@ -526,6 +568,8 @@ func TestSyncGameServerRequestReadyState(t *testing.T) { } func TestSyncGameServerShutdownState(t *testing.T) { + t.Parallel() + t.Run("GameServer with a Shutdown state", func(t *testing.T) { c, mocks := newFakeController() gsFixture := &v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"}, @@ -542,8 +586,8 @@ func TestSyncGameServerShutdownState(t *testing.T) { return true, nil, nil }) - stop := startInformers(c, mocks) - defer close(stop) + _, cancel := startInformers(mocks, c.gameServerSynced) + defer cancel() gs, err := c.syncGameServerShutdownState(gsFixture) assert.Nil(t, gs) @@ -566,6 +610,7 @@ func TestSyncGameServerShutdownState(t *testing.T) { func TestControllerExternalIP(t *testing.T) { t.Parallel() + c, mocks := newFakeController() ipfixture := "12.12.12.12" node := corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1"}, Status: corev1.NodeStatus{Addresses: []corev1.NodeAddress{{Address: ipfixture, Type: corev1.NodeExternalIP}}}} @@ -579,8 +624,8 @@ func TestControllerExternalIP(t *testing.T) { return true, &corev1.NodeList{Items: []corev1.Node{node}}, nil }) - stop := startInformers(c, mocks) - defer close(stop) + _, cancel := startInformers(mocks, c.gameServerSynced) + defer cancel() addr, err := c.externalIP(&pod) assert.Nil(t, err) @@ -589,6 +634,7 @@ func TestControllerExternalIP(t *testing.T) { func TestControllerGameServerPod(t *testing.T) { t.Parallel() + c, mocks := newFakeController() fakeWatch := watch.NewFake() mocks.kubeClient.AddWatchReactor("pods", k8stesting.DefaultWatchReactor(fakeWatch, nil)) @@ -596,8 +642,8 @@ func TestControllerGameServerPod(t *testing.T) { gs.ApplyDefaults() pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Labels: map[string]string{v1alpha1.GameServerPodLabel: gs.ObjectMeta.Name}}} - stop := startInformers(c, mocks) - defer close(stop) + stop, cancel := startInformers(mocks, c.gameServerSynced) + defer cancel() _, err := c.gameServerPod(gs) assert.Equal(t, errPodNotFound, err) @@ -678,30 +724,12 @@ func testWithNonZeroDeletionTimestamp(t *testing.T, state v1alpha1.State, f func assert.Equal(t, fixture, result) } -// holder for all my fakes and mocks -type mocks struct { - kubeClient *kubefake.Clientset - kubeInformationFactory informers.SharedInformerFactory - extClient *extfake.Clientset - agonClient *agonfake.Clientset - agonInformerFactory externalversions.SharedInformerFactory -} - // newFakeController returns a controller, backed by the fake Clientset func newFakeController() (*Controller, mocks) { - kubeClient := &kubefake.Clientset{} - kubeInformationFactory := informers.NewSharedInformerFactory(kubeClient, 30*time.Second) - extClient := &extfake.Clientset{} - agonClient := &agonfake.Clientset{} - agonInformerFactory := externalversions.NewSharedInformerFactory(agonClient, 30*time.Second) - - return NewController("sidecar:dev", false, kubeClient, kubeInformationFactory, extClient, agonClient, agonInformerFactory), - mocks{ - kubeClient: kubeClient, - kubeInformationFactory: kubeInformationFactory, - extClient: extClient, - agonClient: agonClient, - agonInformerFactory: agonInformerFactory} + m := newMocks() + c := NewController(10, 20, "sidecar:dev", false, + m.kubeClient, m.kubeInformationFactory, m.extClient, m.agonClient, m.agonInformerFactory) + return c, m } func newSingeContainerSpec() v1alpha1.GameServerSpec { @@ -727,16 +755,3 @@ func newEstablishedCRD() *v1beta1.CustomResourceDefinition { }, } } - -func startInformers(c *Controller, mocks mocks) chan struct{} { - stop := make(chan struct{}) - mocks.kubeInformationFactory.Start(stop) - mocks.agonInformerFactory.Start(stop) - - logrus.Info("Wait for cache sync") - if !cache.WaitForCacheSync(stop, c.gameServerSynced) { - panic("Cache never synced") - } - - return stop -} diff --git a/gameservers/controller/helper_test.go b/gameservers/controller/helper_test.go new file mode 100644 index 0000000000..bf9ecc4f80 --- /dev/null +++ b/gameservers/controller/helper_test.go @@ -0,0 +1,68 @@ +// Copyright 2017 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "time" + + "context" + + agonfake "github.com/agonio/agon/pkg/client/clientset/versioned/fake" + "github.com/agonio/agon/pkg/client/informers/externalversions" + "github.com/sirupsen/logrus" + extfake "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake" + "k8s.io/client-go/informers" + kubefake "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" +) + +// holder for all my fakes and mocks +type mocks struct { + kubeClient *kubefake.Clientset + kubeInformationFactory informers.SharedInformerFactory + extClient *extfake.Clientset + agonClient *agonfake.Clientset + agonInformerFactory externalversions.SharedInformerFactory +} + +func newMocks() mocks { + kubeClient := &kubefake.Clientset{} + kubeInformationFactory := informers.NewSharedInformerFactory(kubeClient, 30*time.Second) + extClient := &extfake.Clientset{} + agonClient := &agonfake.Clientset{} + agonInformerFactory := externalversions.NewSharedInformerFactory(agonClient, 30*time.Second) + m := mocks{ + kubeClient: kubeClient, + kubeInformationFactory: kubeInformationFactory, + extClient: extClient, + agonClient: agonClient, + agonInformerFactory: agonInformerFactory} + return m +} + +func startInformers(mocks mocks, sync ...cache.InformerSynced) (<-chan struct{}, context.CancelFunc) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + stop := ctx.Done() + + mocks.kubeInformationFactory.Start(stop) + mocks.agonInformerFactory.Start(stop) + + logrus.Info("Wait for cache sync") + if !cache.WaitForCacheSync(stop, sync...) { + panic("Cache never synced") + } + + return stop, cancel +} diff --git a/gameservers/controller/main.go b/gameservers/controller/main.go index 451a88f607..c3ccfcaf3a 100644 --- a/gameservers/controller/main.go +++ b/gameservers/controller/main.go @@ -36,6 +36,8 @@ import ( const ( sidecarFlag = "sidecar" pullSidecarFlag = "always-pull-sidecar" + minPortFlag = "min-port" + maxPortFlag = "max-port" ) func init() { @@ -49,20 +51,34 @@ func main() { pflag.String(sidecarFlag, viper.GetString(sidecarFlag), "Flag to overwrite the GameServer sidecar image that is used. Can also use SIDECAR env variable") pflag.Bool(pullSidecarFlag, viper.GetBool(pullSidecarFlag), "For development purposes, set the sidecar image to have a ImagePullPolicy of Always. Can also use ALWAYS_PULL_SIDECAR env variable") + pflag.Int32(minPortFlag, 0, "Required. The minimum port that that a GameServer can be allocated to. Can also use MIN_PORT env variable.") + pflag.Int32(maxPortFlag, 0, "Required. The minimum port that that a GameServer can be allocated to. Can also use MAX_PORT env variable") pflag.Parse() viper.SetEnvKeyReplacer(strings.NewReplacer("-", "_")) runtime.Must(viper.BindEnv(sidecarFlag)) runtime.Must(viper.BindEnv(pullSidecarFlag)) + runtime.Must(viper.BindEnv(minPortFlag)) + runtime.Must(viper.BindEnv(maxPortFlag)) runtime.Must(viper.BindPFlags(pflag.CommandLine)) + minPort := int32(viper.GetInt64(minPortFlag)) + maxPort := int32(viper.GetInt64(maxPortFlag)) sidecarImage := viper.GetString(sidecarFlag) alwaysPullSidecar := viper.GetBool(pullSidecarFlag) logrus.WithField(sidecarFlag, sidecarImage). + WithField("minPort", minPort). + WithField("maxPort", maxPort). WithField("alwaysPullSidecarImage", alwaysPullSidecar). WithField("Version", pkg.Version).Info("starting gameServer operator...") + if minPort <= 0 || maxPort <= 0 { + logrus.Fatal("Min Port and Max Port values are required.") + } else if maxPort < minPort { + logrus.Fatal("Max Port cannot be set less that the Min Port") + } + config, err := rest.InClusterConfig() if err != nil { logrus.WithError(err).Fatal("Could not create in cluster config") @@ -85,7 +101,7 @@ func main() { agonInformerFactory := externalversions.NewSharedInformerFactory(agonClient, 30*time.Second) kubeInformationFactory := informers.NewSharedInformerFactory(kubeClient, 30*time.Second) - c := NewController(sidecarImage, alwaysPullSidecar, kubeClient, kubeInformationFactory, extClient, agonClient, agonInformerFactory) + c := NewController(minPort, maxPort, sidecarImage, alwaysPullSidecar, kubeClient, kubeInformationFactory, extClient, agonClient, agonInformerFactory) stop := signals.NewStopChannel() diff --git a/gameservers/controller/portallocator.go b/gameservers/controller/portallocator.go new file mode 100644 index 0000000000..2fdb73891e --- /dev/null +++ b/gameservers/controller/portallocator.go @@ -0,0 +1,255 @@ +// Copyright 2017 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "sync" + + "github.com/agonio/agon/pkg/apis/stable/v1alpha1" + "github.com/agonio/agon/pkg/client/informers/externalversions" + listerv1alpha1 "github.com/agonio/agon/pkg/client/listers/stable/v1alpha1" + "github.com/agonio/agon/pkg/util/runtime" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/informers" + corelisterv1 "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" +) + +// ErrPortNotFound is returns when a port is unable to be allocated +var ErrPortNotFound = errors.New("Unable to allocate a port") + +// A set of port allocations for a node +type portAllocation map[int32]bool + +// PortAllocator manages the dynamic port +// allocation strategy. Only use exposed methods to ensure +// appropriate locking is taken. +// The PortAllocator does not currently support mixing static portAllocations (or any pods with defined HostPort) +// within the dynamic port range other than the ones it coordinates. +type PortAllocator struct { + mutex sync.RWMutex + portAllocations []portAllocation + minPort int32 + maxPort int32 + gameServerSynced cache.InformerSynced + gameServerLister listerv1alpha1.GameServerLister + gameServerInformer cache.SharedIndexInformer + nodeSynced cache.InformerSynced + nodeLister corelisterv1.NodeLister + nodeInformer cache.SharedIndexInformer +} + +// NewPortAllocator returns a new dynamic port +// allocator. minPort and maxPort are the top and bottom portAllocations that can be allocated in the range for +// the game servers +func NewPortAllocator(minPort, maxPort int32, + kubeInformerFactory informers.SharedInformerFactory, + agonInformerFactory externalversions.SharedInformerFactory) *PortAllocator { + logrus.WithField("minPort", minPort).WithField("maxPort", maxPort).Info("Starting port allocator") + + v1 := kubeInformerFactory.Core().V1() + nodes := v1.Nodes() + gameServers := agonInformerFactory.Stable().V1alpha1().GameServers() + + pa := &PortAllocator{ + mutex: sync.RWMutex{}, + minPort: minPort, + maxPort: maxPort, + gameServerSynced: gameServers.Informer().HasSynced, + gameServerLister: gameServers.Lister(), + gameServerInformer: gameServers.Informer(), + nodeLister: nodes.Lister(), + nodeInformer: nodes.Informer(), + nodeSynced: nodes.Informer().HasSynced, + } + + return pa +} + +// Run sets up the current state of port allocations and +// starts tracking Pod and Node changes +func (pa *PortAllocator) Run(stop <-chan struct{}) error { + logrus.Info("Running Pod Allocator") + pa.gameServerInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + DeleteFunc: pa.syncDeleteGameServer, + }) + + // Experimental support for node adding/removal + pa.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: pa.syncAddNode, + UpdateFunc: func(oldObj, newObj interface{}) { + oldNode := oldObj.(*corev1.Node) + newNode := newObj.(*corev1.Node) + if oldNode.Spec.Unschedulable != newNode.Spec.Unschedulable { + err := pa.syncPortAllocations(stop) + if err != nil { + err := errors.Wrap(err, "error resetting ports on node update") + runtime.HandleError(logrus.WithField("node", newNode), err) + } + } + }, + DeleteFunc: func(obj interface{}) { + err := pa.syncPortAllocations(stop) + if err != nil { + err := errors.Wrap(err, "error on node deletion") + runtime.HandleError(logrus.WithField("node", obj), err) + } + }, + }) + + logrus.Info("Flush cache sync, before syncing gameserver and node state") + if !cache.WaitForCacheSync(stop, pa.gameServerSynced, pa.nodeSynced) { + return nil + } + + return pa.syncPortAllocations(stop) +} + +// Allocate allocates a port. Return ErrPortNotFound if no port is +// allocatable +func (pa *PortAllocator) Allocate() (int32, error) { + pa.mutex.Lock() + defer pa.mutex.Unlock() + for _, n := range pa.portAllocations { + for p, taken := range n { + if !taken { + n[p] = true + return p, nil + } + } + } + return -1, ErrPortNotFound +} + +// syncAddNode adds another node port section +// to the available ports +func (pa *PortAllocator) syncAddNode(obj interface{}) { + pa.mutex.Lock() + defer pa.mutex.Unlock() + + node := obj.(*corev1.Node) + logrus.WithField("node", node.ObjectMeta.Name).Info("Adding Node to port allocations") + + ports := portAllocation{} + for i := pa.minPort; i <= pa.maxPort; i++ { + ports[i] = false + } + + pa.portAllocations = append(pa.portAllocations, ports) +} + +// syncDeleteGameServer when a GameServer Pod is deleted +// make the HostPort available +func (pa *PortAllocator) syncDeleteGameServer(object interface{}) { + pa.mutex.Lock() + defer pa.mutex.Unlock() + + gs := object.(*v1alpha1.GameServer) + logrus.WithField("gs", gs).Info("syncing deleted GameServer") + pa.portAllocations = setPortAllocation(gs.Spec.HostPort, pa.portAllocations, false) +} + +// syncPortAllocations syncs the pod, node and gameserver caches then +// traverses all Nodes in the cluster and all looks at GameServers +// and Terminating Pods values make sure those +// portAllocations are marked as taken. +// Locks the mutex while doing this. +// This is basically a stop the world Garbage Collection on port allocations. +func (pa *PortAllocator) syncPortAllocations(stop <-chan struct{}) error { + pa.mutex.Lock() + defer pa.mutex.Unlock() + + logrus.Info("Resetting Port Allocation") + + if !cache.WaitForCacheSync(stop, pa.gameServerSynced, pa.nodeSynced) { + return nil + } + + nodes, err := pa.nodeLister.List(labels.Everything()) + if err != nil { + return errors.Wrap(err, "error listing all nodes") + } + + // setup blank port values + nodePorts := pa.nodePortAllocation(nodes) + + gameservers, err := pa.gameServerLister.List(labels.Everything()) + if err != nil { + return errors.Wrapf(err, "error listing all GameServers") + } + + // place to put GameServer port allocations that are not ready yet/after the ready state + var nonReadyNodesPorts []int32 + // Check GameServers as well, as some + for _, gs := range gameservers { + // if the node doesn't exist, it's likely unscheduled + _, ok := nodePorts[gs.Status.NodeName] + if gs.Status.NodeName != "" && ok { + nodePorts[gs.Status.NodeName][gs.Status.Port] = true + } else if gs.Spec.HostPort != 0 { + nonReadyNodesPorts = append(nonReadyNodesPorts, gs.Spec.HostPort) + } + } + + // this gives us back an ordered node list. + allocations := make([]portAllocation, len(nodePorts)) + i := 0 + for _, np := range nodePorts { + allocations[i] = np + i++ + } + + // close off the port on the first node you find + // we actually don't mind what node it is, since we only care + // that there is a port open *somewhere* as the default scheduler + // will re-route for us based on HostPort allocation + for _, p := range nonReadyNodesPorts { + allocations = setPortAllocation(p, allocations, true) + } + + pa.portAllocations = allocations + + return nil +} + +// nodePortAllocation returns a map of port allocations all set to being available +// with a map key for each node +func (pa *PortAllocator) nodePortAllocation(nodes []*corev1.Node) map[string]portAllocation { + nodePorts := map[string]portAllocation{} + for _, n := range nodes { + // ignore unschedulable nodes + if !n.Spec.Unschedulable { + nodePorts[n.Name] = portAllocation{} + for i := pa.minPort; i <= pa.maxPort; i++ { + nodePorts[n.Name][i] = false + } + } + } + return nodePorts +} + +// setPortAllocation takes a port from an all +func setPortAllocation(port int32, allocations []portAllocation, taken bool) []portAllocation { + for _, np := range allocations { + if np[port] != taken { + np[port] = taken + break + } + } + return allocations +} diff --git a/gameservers/controller/portallocator_test.go b/gameservers/controller/portallocator_test.go new file mode 100644 index 0000000000..bff83e2a73 --- /dev/null +++ b/gameservers/controller/portallocator_test.go @@ -0,0 +1,358 @@ +// Copyright 2017 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "fmt" + "testing" + + "sync" + + "github.com/agonio/agon/pkg/apis/stable/v1alpha1" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + k8stesting "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" +) + +var ( + n1 = corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1"}} + n2 = corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node2"}} + n3 = corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node3"}} +) + +func TestPortAllocatorAllocate(t *testing.T) { + t.Parallel() + + t.Run("ports are all allocated", func(t *testing.T) { + m := newMocks() + pa := NewPortAllocator(10, 20, m.kubeInformationFactory, m.agonInformerFactory) + nodeWatch := watch.NewFake() + m.kubeClient.AddWatchReactor("nodes", k8stesting.DefaultWatchReactor(nodeWatch, nil)) + + stop, cancel := startInformers(m) + defer cancel() + + // Make sure the add's don't corrupt the sync + nodeWatch.Add(&n1) + nodeWatch.Add(&n2) + + err := pa.Run(stop) + assert.Nil(t, err) + + // two nodes + for x := 0; x < 2; x++ { + // ports between 10 and 20 + for i := 10; i <= 20; i++ { + var p int32 + p, err = pa.Allocate() + assert.True(t, 10 <= p && p <= 20, "%v is not between 10 and 20", p) + assert.Nil(t, err) + } + } + + // now we should have none left + _, err = pa.Allocate() + assert.Equal(t, ErrPortNotFound, err) + }) + + t.Run("ports are unique in a node", func(t *testing.T) { + m := newMocks() + pa := NewPortAllocator(10, 20, m.kubeInformationFactory, m.agonInformerFactory) + + m.kubeClient.AddReactor("list", "nodes", func(action k8stesting.Action) (bool, runtime.Object, error) { + nl := &corev1.NodeList{Items: []corev1.Node{n1}} + return true, nl, nil + }) + stop, cancel := startInformers(m) + defer cancel() + err := pa.Run(stop) + assert.Nil(t, err) + var ports []int32 + for i := 10; i <= 20; i++ { + p, err := pa.Allocate() + assert.Nil(t, err) + assert.NotContains(t, ports, p) + ports = append(ports, p) + } + }) +} + +func TestPortAllocatorMultithreadAllocate(t *testing.T) { + m := newMocks() + pa := NewPortAllocator(10, 110, m.kubeInformationFactory, m.agonInformerFactory) + + m.kubeClient.AddReactor("list", "nodes", func(action k8stesting.Action) (bool, runtime.Object, error) { + nl := &corev1.NodeList{Items: []corev1.Node{n1, n2}} + return true, nl, nil + }) + stop, cancel := startInformers(m) + defer cancel() + err := pa.Run(stop) + assert.Nil(t, err) + wg := sync.WaitGroup{} + + for i := 0; i < 3; i++ { + wg.Add(1) + go func(i int) { + for x := 0; x < 10; x++ { + logrus.WithField("x", x).WithField("i", i).Info("allocating!") + _, err := pa.Allocate() + assert.Nil(t, err) + } + wg.Done() + }(i) + } + + wg.Wait() +} + +func TestPortAllocatorSyncPortAllocations(t *testing.T) { + t.Parallel() + + m := newMocks() + pa := NewPortAllocator(10, 20, m.kubeInformationFactory, m.agonInformerFactory) + + m.kubeClient.AddReactor("list", "nodes", func(action k8stesting.Action) (bool, runtime.Object, error) { + nl := &corev1.NodeList{Items: []corev1.Node{n1, n2, n3}} + return true, nl, nil + }) + + m.agonClient.AddReactor("list", "gameservers", func(action k8stesting.Action) (bool, runtime.Object, error) { + gs1 := v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs1"}, Spec: v1alpha1.GameServerSpec{HostPort: 10}, + Status: v1alpha1.GameServerStatus{State: v1alpha1.Ready, Port: 10, NodeName: n1.ObjectMeta.Name}} + gs2 := v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs2"}, Spec: v1alpha1.GameServerSpec{HostPort: 10}, + Status: v1alpha1.GameServerStatus{State: v1alpha1.Ready, Port: 10, NodeName: n2.ObjectMeta.Name}} + gs3 := v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs3"}, Spec: v1alpha1.GameServerSpec{HostPort: 11}, + Status: v1alpha1.GameServerStatus{State: v1alpha1.Ready, Port: 11, NodeName: n3.ObjectMeta.Name}} + gs4 := v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs4"}, Spec: v1alpha1.GameServerSpec{HostPort: 12}, + Status: v1alpha1.GameServerStatus{State: v1alpha1.Creating}} + gs5 := v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs5"}, Spec: v1alpha1.GameServerSpec{HostPort: 12}, + Status: v1alpha1.GameServerStatus{State: v1alpha1.Creating}} + gsl := &v1alpha1.GameServerList{Items: []v1alpha1.GameServer{gs1, gs2, gs3, gs4, gs5}} + return true, gsl, nil + }) + + stop, cancel := startInformers(m) + defer cancel() + err := pa.syncPortAllocations(stop) + assert.Nil(t, err) + assert.Len(t, pa.portAllocations, 3) + + // count the number of allocated ports, + assert.Equal(t, 2, countAllocatedPorts(pa, 10)) + assert.Equal(t, 1, countAllocatedPorts(pa, 11)) + assert.Equal(t, 2, countAllocatedPorts(pa, 12)) + + count := 0 + for i := int32(10); i <= 20; i++ { + count += countAllocatedPorts(pa, i) + } + assert.Equal(t, 5, count) +} + +func TestPortAllocatorSyncDeleteGameServer(t *testing.T) { + t.Parallel() + + m := newMocks() + fakeWatch := watch.NewFake() + m.agonClient.AddWatchReactor("gameservers", k8stesting.DefaultWatchReactor(fakeWatch, nil)) + + gs1Fixture := &v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs4"}, Spec: v1alpha1.GameServerSpec{HostPort: 10}} + gs2Fixture := gs1Fixture.DeepCopy() + gs2Fixture.ObjectMeta.Name = "gs5" + + pa := NewPortAllocator(10, 20, m.kubeInformationFactory, m.agonInformerFactory) + + m.kubeClient.AddReactor("list", "nodes", func(action k8stesting.Action) (bool, runtime.Object, error) { + nl := &corev1.NodeList{Items: []corev1.Node{n1, n2, n3}} + return true, nl, nil + }) + + m.agonClient.AddReactor("list", "gameservers", func(action k8stesting.Action) (bool, runtime.Object, error) { + gs1 := v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs1"}, Spec: v1alpha1.GameServerSpec{HostPort: 10}, + Status: v1alpha1.GameServerStatus{State: v1alpha1.Ready, Port: 10, NodeName: n1.ObjectMeta.Name}} + gs2 := v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs2"}, Spec: v1alpha1.GameServerSpec{HostPort: 11}, + Status: v1alpha1.GameServerStatus{State: v1alpha1.Ready, Port: 11, NodeName: n1.ObjectMeta.Name}} + gs3 := v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs3"}, Spec: v1alpha1.GameServerSpec{HostPort: 10}, + Status: v1alpha1.GameServerStatus{State: v1alpha1.Ready, Port: 10, NodeName: n2.ObjectMeta.Name}} + + gsl := &v1alpha1.GameServerList{Items: []v1alpha1.GameServer{gs1, gs2, gs3}} + return true, gsl, nil + }) + + stop, cancel := startInformers(m) + defer cancel() + + // this should do nothing, as it's before pa.Created is called + fakeWatch.Add(gs2Fixture.DeepCopy()) + fakeWatch.Delete(gs2Fixture.DeepCopy()) + + err := pa.Run(stop) + assert.Nil(t, err) + + nonGSPod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "notagameserver"}} + fakeWatch.Add(gs1Fixture.DeepCopy()) + fakeWatch.Add(nonGSPod.DeepCopy()) + assert.True(t, cache.WaitForCacheSync(stop, pa.gameServerSynced)) + + // gate + pa.mutex.RLock() // reading mutable state, so read lock + assert.Equal(t, 2, countAllocatedPorts(pa, 10)) + assert.Equal(t, 1, countAllocatedPorts(pa, 11)) + pa.mutex.RUnlock() + + fakeWatch.Delete(gs1Fixture.DeepCopy()) + assert.True(t, cache.WaitForCacheSync(stop, pa.gameServerSynced)) + + pa.mutex.RLock() // reading mutable state, so read lock + assert.Equal(t, 1, countAllocatedPorts(pa, 10)) + assert.Equal(t, 1, countAllocatedPorts(pa, 11)) + pa.mutex.RUnlock() + + // delete the non gameserver pod, all should be the same + fakeWatch.Delete(nonGSPod.DeepCopy()) + assert.True(t, cache.WaitForCacheSync(stop, pa.gameServerSynced)) + pa.mutex.RLock() // reading mutable state, so read lock + assert.Equal(t, 1, countAllocatedPorts(pa, 10)) + assert.Equal(t, 1, countAllocatedPorts(pa, 11)) + pa.mutex.RUnlock() +} + +func TestPortAllocatorNodeEvents(t *testing.T) { + m := newMocks() + pa := NewPortAllocator(10, 20, m.kubeInformationFactory, m.agonInformerFactory) + nodeWatch := watch.NewFake() + gsWatch := watch.NewFake() + m.kubeClient.AddWatchReactor("nodes", k8stesting.DefaultWatchReactor(nodeWatch, nil)) + m.agonClient.AddWatchReactor("gameservers", k8stesting.DefaultWatchReactor(gsWatch, nil)) + + stop, cancel := startInformers(m) + defer cancel() + + // Make sure the add's don't corrupt the sync + nodeWatch.Add(&n1) + nodeWatch.Add(&n2) + + err := pa.Run(stop) + assert.Nil(t, err) + + // add a game server + port, err := pa.Allocate() + assert.Nil(t, err) + gs := v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs1"}, Spec: v1alpha1.GameServerSpec{HostPort: port}} + gsWatch.Add(&gs) + + pa.mutex.RLock() + assert.Len(t, pa.portAllocations, 2) + assert.Equal(t, 1, countAllocatedPorts(pa, port)) + pa.mutex.RUnlock() + + // add the n3 node + logrus.Info("adding n3") + nodeWatch.Add(&n3) + assert.True(t, cache.WaitForCacheSync(stop, pa.nodeSynced)) + + pa.mutex.RLock() + assert.Len(t, pa.portAllocations, 3) + assert.Equal(t, 1, countAllocatedPorts(pa, port)) + pa.mutex.RUnlock() + + // mark the node as unscheduled + logrus.Info("unscheduling n3") + copy := n3.DeepCopy() + copy.Spec.Unschedulable = true + assert.True(t, copy.Spec.Unschedulable) + nodeWatch.Modify(copy) + assert.True(t, cache.WaitForCacheSync(stop, pa.nodeSynced)) + pa.mutex.RLock() + assert.Len(t, pa.portAllocations, 2) + assert.Equal(t, 1, countAllocatedPorts(pa, port)) + pa.mutex.RUnlock() + + // schedule the n3 node again + logrus.Info("scheduling n3") + copy = n3.DeepCopy() + copy.Spec.Unschedulable = false + nodeWatch.Modify(copy) + assert.True(t, cache.WaitForCacheSync(stop, pa.nodeSynced)) + pa.mutex.RLock() + assert.Len(t, pa.portAllocations, 3) + assert.Equal(t, 1, countAllocatedPorts(pa, port)) + pa.mutex.RUnlock() + + // delete the n3 node + logrus.Info("deleting n3") + nodeWatch.Delete(n3.DeepCopy()) + assert.True(t, cache.WaitForCacheSync(stop, pa.nodeSynced)) + pa.mutex.RLock() + assert.Len(t, pa.portAllocations, 2) + assert.Equal(t, 1, countAllocatedPorts(pa, port)) + pa.mutex.RUnlock() +} + +func TestNodePortAllocation(t *testing.T) { + t.Parallel() + + m := newMocks() + pa := NewPortAllocator(10, 20, m.kubeInformationFactory, m.agonInformerFactory) + nodes := []corev1.Node{n1, n2, n3} + m.kubeClient.AddReactor("list", "nodes", func(action k8stesting.Action) (bool, runtime.Object, error) { + nl := &corev1.NodeList{Items: nodes} + return true, nl, nil + }) + result := pa.nodePortAllocation([]*corev1.Node{&n1, &n2, &n3}) + assert.Len(t, result, 3) + for _, n := range nodes { + ports, ok := result[n.ObjectMeta.Name] + assert.True(t, ok, "Should have a port allocation for %s", n.ObjectMeta.Name) + assert.Len(t, ports, 11) + for _, v := range ports { + assert.False(t, v) + } + } +} + +func TestTakePortAllocation(t *testing.T) { + t.Parallel() + + fixture := []portAllocation{{1: false, 2: false}, {1: false, 2: false}, {1: false, 3: false}} + result := setPortAllocation(2, fixture, true) + assert.True(t, result[0][2]) + + for i, row := range fixture { + for p, taken := range row { + if i != 0 && p != 2 { + assert.False(t, taken, fmt.Sprintf("row %d and port %d should be false", i, p)) + } + } + } +} + +// countAllocatedPorts counts how many of a given port have been +// allocated across nodes +func countAllocatedPorts(pa *PortAllocator, p int32) int { + count := 0 + for _, node := range pa.portAllocations { + if node[p] { + count++ + } + } + return count +} diff --git a/install.yaml b/install.yaml index c2bca80f6e..6b7efaddf6 100644 --- a/install.yaml +++ b/install.yaml @@ -44,6 +44,12 @@ spec: - name: gameservers-controller image: gcr.io/agon-images/gameservers-controller:0.1 env: + # minimum port that can be exposed to GameServer traffic + - name: MIN_PORT + value: "7000" + # maximum port that can be exposed to GameServer traffic + - name: MAX_PORT + value: "8000" # - name: SIDECAR # overwrite the GameServer sidecar image that is used # value: gcr.io/agon-images/gameservers-sidecar:0.1 livenessProbe: diff --git a/pkg/apis/stable/v1alpha1/types.go b/pkg/apis/stable/v1alpha1/types.go index 63a4f9837b..40c322cf16 100644 --- a/pkg/apis/stable/v1alpha1/types.go +++ b/pkg/apis/stable/v1alpha1/types.go @@ -44,6 +44,9 @@ const ( // Static PortPolicy means that the user defines the hostPort to be used // in the configuration. Static PortPolicy = "static" + // Dynamic PortPolicy means that the system will choose an open + // port for the GameServer in question + Dynamic PortPolicy = "dynamic" // RoleLabel is the label in which the Agon role is specified. // Pods from a GameServer will have the value "gameserver" @@ -102,9 +105,10 @@ type PortPolicy string // GameServerStatus is the status for a GameServer resource type GameServerStatus struct { // The current state of a GameServer, e.g. Creating, Starting, Ready, etc - State State `json:"state"` - Port int32 `json:"port"` - Address string `json:"address"` + State State `json:"state"` + Port int32 `json:"port"` + Address string `json:"address"` + NodeName string `json:"nodeName"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -125,6 +129,10 @@ func (gs *GameServer) ApplyDefaults() { gs.Spec.Container = gs.Spec.Template.Spec.Containers[0].Name } + if gs.Spec.PortPolicy == "" { + gs.Spec.PortPolicy = Dynamic + } + if gs.Spec.Protocol == "" { gs.Spec.Protocol = "UDP" } diff --git a/pkg/apis/stable/v1alpha1/types_test.go b/pkg/apis/stable/v1alpha1/types_test.go index 2e7eadfaba..84466f3811 100644 --- a/pkg/apis/stable/v1alpha1/types_test.go +++ b/pkg/apis/stable/v1alpha1/types_test.go @@ -58,6 +58,7 @@ func TestGameServerApplyDefaults(t *testing.T) { expectedContainer string expectedProtocol corev1.Protocol expectedState State + expectedPolicy PortPolicy }{ "set basic defaults on a very simple gameserver": { gameServer: GameServer{ @@ -68,11 +69,13 @@ func TestGameServerApplyDefaults(t *testing.T) { expectedContainer: "testing", expectedProtocol: "UDP", expectedState: Creating, + expectedPolicy: Dynamic, }, "defaults are already set": { gameServer: GameServer{ Spec: GameServerSpec{ Container: "testing2", Protocol: "TCP", + PortPolicy: Static, Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{Containers: []corev1.Container{ {Name: "testing", Image: "testing/image"}, @@ -83,6 +86,7 @@ func TestGameServerApplyDefaults(t *testing.T) { expectedContainer: "testing2", expectedProtocol: "TCP", expectedState: "TestState", + expectedPolicy: Static, }, }