diff --git a/pkg/apis/agones/v1/gameserver.go b/pkg/apis/agones/v1/gameserver.go index bdc38d9a0a..865d2c2ce9 100644 --- a/pkg/apis/agones/v1/gameserver.go +++ b/pkg/apis/agones/v1/gameserver.go @@ -644,3 +644,9 @@ func (gs *GameServer) Patch(delta *GameServer) ([]byte, error) { result, err = json.Marshal(patch) return result, errors.Wrapf(err, "error creating json for patch for GameServer %s", gs.ObjectMeta.Name) } + +// IsUnhealthy returns true if the GameServer is Unhealthy or in Error state +func (gs *GameServer) IsUnhealthy() bool { + state := gs.Status.State + return state == GameServerStateUnhealthy || state == GameServerStateError +} diff --git a/pkg/apis/agones/v1/gameserver_test.go b/pkg/apis/agones/v1/gameserver_test.go index ffe6bc13ee..bcb2cd7b96 100644 --- a/pkg/apis/agones/v1/gameserver_test.go +++ b/pkg/apis/agones/v1/gameserver_test.go @@ -688,7 +688,34 @@ func TestGameServerIsBeforeReady(t *testing.T) { for _, test := range fixtures { t.Run(string(test.state), func(t *testing.T) { gs := &GameServer{Status: GameServerStatus{State: test.state}} - assert.Equal(t, test.expected, gs.IsBeforeReady()) + assert.Equal(t, test.expected, gs.IsBeforeReady(), test.state) + }) + } + +} + +func TestGameServerIsUnhealthy(t *testing.T) { + fixtures := []struct { + state GameServerState + expected bool + }{ + {GameServerStatePortAllocation, false}, + {GameServerStateCreating, false}, + {GameServerStateStarting, false}, + {GameServerStateScheduled, false}, + {GameServerStateRequestReady, false}, + {GameServerStateReady, false}, + {GameServerStateShutdown, false}, + {GameServerStateError, true}, + {GameServerStateUnhealthy, true}, + {GameServerStateReserved, false}, + {GameServerStateAllocated, false}, + } + + for _, test := range fixtures { + t.Run(string(test.state), func(t *testing.T) { + gs := &GameServer{Status: GameServerStatus{State: test.state}} + assert.Equal(t, test.expected, gs.IsUnhealthy(), test.state) }) } diff --git a/pkg/gameservers/controller.go b/pkg/gameservers/controller.go index dc2855991d..e627a3dd41 100644 --- a/pkg/gameservers/controller.go +++ b/pkg/gameservers/controller.go @@ -80,6 +80,7 @@ type Controller struct { portAllocator *PortAllocator healthController *HealthController migrationController *MigrationController + missingPodController *MissingPodController workerqueue *workerqueue.WorkerQueue creationWorkerQueue *workerqueue.WorkerQueue // handles creation only deletionWorkerQueue *workerqueue.WorkerQueue // handles deletion only @@ -125,6 +126,7 @@ func NewController( portAllocator: NewPortAllocator(minPort, maxPort, kubeInformerFactory, agonesInformerFactory), healthController: NewHealthController(health, kubeClient, agonesClient, kubeInformerFactory, agonesInformerFactory), migrationController: NewMigrationController(health, kubeClient, agonesClient, kubeInformerFactory, agonesInformerFactory), + missingPodController: NewMissingPodController(health, kubeClient, agonesClient, kubeInformerFactory, agonesInformerFactory), } c.baseLogger = runtime.NewLoggerWithType(c) @@ -328,6 +330,13 @@ func (c *Controller) Run(workers int, stop <-chan struct{}) error { } }() + // Run the Missing Pod Controller + go func() { + if err := c.missingPodController.Run(stop); err != nil { + c.baseLogger.WithError(err).Error("error running missing pod controller") + } + }() + // start work queues var wg sync.WaitGroup diff --git a/pkg/gameservers/gameservers.go b/pkg/gameservers/gameservers.go index b82faa20b9..a6094805ac 100644 --- a/pkg/gameservers/gameservers.go +++ b/pkg/gameservers/gameservers.go @@ -75,3 +75,11 @@ func applyGameServerAddressAndPort(gs *agonesv1.GameServer, node *corev1.Node, p return gs, nil } + +// isBeforePodCreated checks to see if the GameServer is in a state in which the pod could not have been +// created yet. This includes "Starting" in which a pod MAY exist, but may not yet be available, depending on when the +// informer cache updates +func isBeforePodCreated(gs *agonesv1.GameServer) bool { + state := gs.Status.State + return state == agonesv1.GameServerStatePortAllocation || state == agonesv1.GameServerStateCreating || state == agonesv1.GameServerStateStarting +} diff --git a/pkg/gameservers/missing.go b/pkg/gameservers/missing.go new file mode 100644 index 0000000000..31f00e345e --- /dev/null +++ b/pkg/gameservers/missing.go @@ -0,0 +1,158 @@ +// Copyright 2020 Google LLC All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gameservers + +import ( + "agones.dev/agones/pkg/apis/agones" + agonesv1 "agones.dev/agones/pkg/apis/agones/v1" + "agones.dev/agones/pkg/client/clientset/versioned" + "agones.dev/agones/pkg/client/clientset/versioned/scheme" + getterv1 "agones.dev/agones/pkg/client/clientset/versioned/typed/agones/v1" + "agones.dev/agones/pkg/client/informers/externalversions" + listerv1 "agones.dev/agones/pkg/client/listers/agones/v1" + "agones.dev/agones/pkg/util/logfields" + "agones.dev/agones/pkg/util/runtime" + "agones.dev/agones/pkg/util/workerqueue" + "github.com/heptiolabs/healthcheck" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + corelisterv1 "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" +) + +// MissingPodController watches makes sure that any GameServer +// that isn't in Scheduled, or Unhealthy/Error state, that is missing a Pod +// moved to Unhealthy. This can sometimes happen if the controller has downtime +// for a unexpected reason, or if there is no Delete event for a Pod for any reason. +// Since resync is every 30 seconds, even if there is some time in which a GameServer +// is in a broken state, it will eventually move to Unhealthy, and get replaced (if in a Fleet). +type MissingPodController struct { + baseLogger *logrus.Entry + podSynced cache.InformerSynced + podLister corelisterv1.PodLister + gameServerSynced cache.InformerSynced + gameServerGetter getterv1.GameServersGetter + gameServerLister listerv1.GameServerLister + workerqueue *workerqueue.WorkerQueue + recorder record.EventRecorder +} + +// NewMissingPodController returns a MissingPodController +func NewMissingPodController(health healthcheck.Handler, + kubeClient kubernetes.Interface, + agonesClient versioned.Interface, + kubeInformerFactory informers.SharedInformerFactory, + agonesInformerFactory externalversions.SharedInformerFactory) *MissingPodController { + + podInformer := kubeInformerFactory.Core().V1().Pods().Informer() + gameServers := agonesInformerFactory.Agones().V1().GameServers() + c := &MissingPodController{ + podSynced: podInformer.HasSynced, + podLister: kubeInformerFactory.Core().V1().Pods().Lister(), + gameServerSynced: gameServers.Informer().HasSynced, + gameServerGetter: agonesClient.AgonesV1(), + gameServerLister: gameServers.Lister(), + } + + c.baseLogger = runtime.NewLoggerWithType(c) + c.workerqueue = workerqueue.NewWorkerQueue(c.syncGameServer, c.baseLogger, logfields.GameServerKey, agones.GroupName+".MissingPodController") + health.AddLivenessCheck("gameserver-missing-pod-workerqueue", healthcheck.Check(c.workerqueue.Healthy)) + + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(c.baseLogger.Debugf) + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) + c.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "missing-pod-controller"}) + + gameServers.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(_, newObj interface{}) { + gs := newObj.(*agonesv1.GameServer) + _, isDev := gs.GetDevAddress() + // TOXO: test for dev gameserver, which won't have a pod + if !isDev && !isBeforePodCreated(gs) && !gs.IsBeingDeleted() && !gs.IsUnhealthy() { + c.workerqueue.Enqueue(gs) + } + }, + }) + + return c +} + +// Run processes the rate limited queue. +// Will block until stop is closed +func (c *MissingPodController) Run(stop <-chan struct{}) error { + c.baseLogger.Debug("Wait for cache sync") + if !cache.WaitForCacheSync(stop, c.gameServerSynced, c.podSynced) { + return errors.New("failed to wait for caches to sync") + } + + c.workerqueue.Run(1, stop) + return nil +} + +func (c *MissingPodController) loggerForGameServerKey(key string) *logrus.Entry { + return logfields.AugmentLogEntry(c.baseLogger, logfields.GameServerKey, key) +} + +// syncGameServer checks if a GameServer has a backing Pod, and if not, +// moves it to Unhealthy +func (c *MissingPodController) syncGameServer(key string) error { + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + // don't return an error, as we don't want this retried + runtime.HandleError(c.loggerForGameServerKey(key), errors.Wrapf(err, "invalid resource key")) + return nil + } + + // check if the pod exists + if pod, err := c.podLister.Pods(namespace).Get(name); err != nil { + if !k8serrors.IsNotFound(err) { + return errors.Wrapf(err, "error retrieving Pod %s from namespace %s", name, namespace) + } + c.loggerForGameServerKey(key).Debug("Pod is missing. Moving GameServer to Unhealthy.") + } else if isGameServerPod(pod) { + // if the pod exists, all is well, and we can continue on our merry way. + return nil + } + + gs, err := c.gameServerLister.GameServers(namespace).Get(name) + if err != nil { + if k8serrors.IsNotFound(err) { + c.loggerForGameServerKey(key).Debug("GameServer is no longer available for syncing") + return nil + } + return errors.Wrapf(err, "error retrieving GameServer %s from namespace %s", name, namespace) + } + + // already on the way out, so no need to do anything. + if gs.IsBeingDeleted() || gs.IsUnhealthy() { + return nil + } + + gsCopy := gs.DeepCopy() + gsCopy.Status.State = agonesv1.GameServerStateUnhealthy + gs, err = c.gameServerGetter.GameServers(gsCopy.ObjectMeta.Namespace).Update(gsCopy) + if err != nil { + return errors.Wrap(err, "error updating GameServer to Unhealthy") + } + + c.recorder.Event(gs, corev1.EventTypeWarning, string(gs.Status.State), "Pod is missing") + return nil +} diff --git a/pkg/gameservers/missing_test.go b/pkg/gameservers/missing_test.go new file mode 100644 index 0000000000..22c8e169ea --- /dev/null +++ b/pkg/gameservers/missing_test.go @@ -0,0 +1,263 @@ +// Copyright 2020 Google LLC All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gameservers + +import ( + "testing" + "time" + + agonesv1 "agones.dev/agones/pkg/apis/agones/v1" + agtesting "agones.dev/agones/pkg/testing" + "github.com/heptiolabs/healthcheck" + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + k8stesting "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" +) + +func TestIsBeforePodCreated(t *testing.T) { + fixture := map[string]struct { + state agonesv1.GameServerState + expected bool + }{ + "port": {state: agonesv1.GameServerStatePortAllocation, expected: true}, + "creating": {state: agonesv1.GameServerStateCreating, expected: true}, + "starting": {state: agonesv1.GameServerStateStarting, expected: true}, + "allocated": {state: agonesv1.GameServerStateAllocated, expected: false}, + "ready": {state: agonesv1.GameServerStateReady, expected: false}, + } + + for k, v := range fixture { + t.Run(k, func(t *testing.T) { + gs := &agonesv1.GameServer{Status: agonesv1.GameServerStatus{State: v.state}} + + assert.Equal(t, v.expected, isBeforePodCreated(gs)) + }) + } +} + +func TestMissingPodControllerSyncGameServer(t *testing.T) { + type expected struct { + updated bool + updateTests func(t *testing.T, gs *agonesv1.GameServer) + postTests func(t *testing.T, mocks agtesting.Mocks) + } + fixtures := map[string]struct { + setup func(*agonesv1.GameServer, *corev1.Pod) (*agonesv1.GameServer, *corev1.Pod) + expected expected + }{ + "pod exists": { + setup: func(gs *agonesv1.GameServer, pod *corev1.Pod) (*agonesv1.GameServer, *corev1.Pod) { + return gs, pod + }, + expected: expected{ + updated: false, + updateTests: func(_ *testing.T, _ *agonesv1.GameServer) {}, + postTests: func(_ *testing.T, _ agtesting.Mocks) {}, + }, + }, + "pod doesn't exist: game server is fine": { + setup: func(gs *agonesv1.GameServer, pod *corev1.Pod) (*agonesv1.GameServer, *corev1.Pod) { + return gs, nil + }, + expected: expected{ + updated: true, + updateTests: func(t *testing.T, gs *agonesv1.GameServer) { + assert.Equal(t, agonesv1.GameServerStateUnhealthy, gs.Status.State) + }, + postTests: func(t *testing.T, m agtesting.Mocks) { + agtesting.AssertEventContains(t, m.FakeRecorder.Events, "Warning Unhealthy Pod is missing") + }, + }, + }, + "pod doesn't exist: game server not found": { + setup: func(gs *agonesv1.GameServer, pod *corev1.Pod) (*agonesv1.GameServer, *corev1.Pod) { + return nil, nil + }, + expected: expected{ + updated: false, + updateTests: func(_ *testing.T, _ *agonesv1.GameServer) {}, + postTests: func(_ *testing.T, _ agtesting.Mocks) {}, + }, + }, + "pod doesn't exist: game server is being deleted": { + setup: func(gs *agonesv1.GameServer, pod *corev1.Pod) (*agonesv1.GameServer, *corev1.Pod) { + now := metav1.Now() + gs.ObjectMeta.DeletionTimestamp = &now + return gs, nil + }, + expected: expected{ + updated: false, + updateTests: func(_ *testing.T, _ *agonesv1.GameServer) {}, + postTests: func(_ *testing.T, _ agtesting.Mocks) {}, + }, + }, + "pod doesn't exist: game server is already Unhealthy": { + setup: func(gs *agonesv1.GameServer, pod *corev1.Pod) (*agonesv1.GameServer, *corev1.Pod) { + gs.Status.State = agonesv1.GameServerStateUnhealthy + return gs, nil + }, + expected: expected{ + updated: false, + updateTests: func(_ *testing.T, _ *agonesv1.GameServer) {}, + postTests: func(_ *testing.T, _ agtesting.Mocks) {}, + }, + }, + "pod is not a gameserver pod": { + setup: func(gs *agonesv1.GameServer, pod *corev1.Pod) (*agonesv1.GameServer, *corev1.Pod) { + return gs, &corev1.Pod{ObjectMeta: gs.ObjectMeta} + }, + expected: expected{ + updated: true, + updateTests: func(t *testing.T, gs *agonesv1.GameServer) { + assert.Equal(t, agonesv1.GameServerStateUnhealthy, gs.Status.State) + }, + postTests: func(t *testing.T, m agtesting.Mocks) { + agtesting.AssertEventContains(t, m.FakeRecorder.Events, "Warning Unhealthy Pod is missing") + }, + }, + }, + } + + for k, v := range fixtures { + t.Run(k, func(t *testing.T) { + m := agtesting.NewMocks() + c := NewMissingPodController(healthcheck.NewHandler(), m.KubeClient, m.AgonesClient, m.KubeInformerFactory, m.AgonesInformerFactory) + c.recorder = m.FakeRecorder + + gs := &agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"}, + Spec: newSingleContainerSpec(), Status: agonesv1.GameServerStatus{}} + gs.ApplyDefaults() + + pod, err := gs.Pod() + assert.NoError(t, err) + + gs, pod = v.setup(gs, pod) + m.AgonesClient.AddReactor("list", "gameservers", func(action k8stesting.Action) (bool, runtime.Object, error) { + if gs != nil { + return true, &agonesv1.GameServerList{Items: []agonesv1.GameServer{*gs}}, nil + } + return true, &agonesv1.GameServerList{Items: []agonesv1.GameServer{}}, nil + }) + m.KubeClient.AddReactor("list", "pods", func(action k8stesting.Action) (bool, runtime.Object, error) { + if pod != nil { + return true, &corev1.PodList{Items: []corev1.Pod{*pod}}, nil + } + return true, &corev1.PodList{Items: []corev1.Pod{}}, nil + }) + + updated := false + m.AgonesClient.AddReactor("update", "gameservers", func(action k8stesting.Action) (bool, runtime.Object, error) { + updated = true + ua := action.(k8stesting.UpdateAction) + gs := ua.GetObject().(*agonesv1.GameServer) + v.expected.updateTests(t, gs) + return true, gs, nil + }) + _, cancel := agtesting.StartInformers(m, c.gameServerSynced, c.podSynced) + defer cancel() + + err = c.syncGameServer("default/test") + assert.NoError(t, err) + assert.Equal(t, v.expected.updated, updated, "updated state") + v.expected.postTests(t, m) + }) + } +} + +func TestMissingPodControllerRun(t *testing.T) { + m := agtesting.NewMocks() + c := NewMissingPodController(healthcheck.NewHandler(), m.KubeClient, m.AgonesClient, m.KubeInformerFactory, m.AgonesInformerFactory) + + gs := &agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"}, + Spec: newSingleContainerSpec(), Status: agonesv1.GameServerStatus{}} + gs.ApplyDefaults() + + received := make(chan string) + h := func(name string) error { + assert.Equal(t, "default/test", name) + received <- name + return nil + } + + gsWatch := watch.NewFake() + m.AgonesClient.AddWatchReactor("gameservers", k8stesting.DefaultWatchReactor(gsWatch, nil)) + + c.workerqueue.SyncHandler = h + + stop, cancel := agtesting.StartInformers(m, c.gameServerSynced) + defer cancel() + + go func() { + err := c.Run(stop) + assert.Nil(t, err, "Run should not error") + }() + + noChange := func() { + assert.True(t, cache.WaitForCacheSync(stop, c.gameServerSynced)) + select { + case <-received: + assert.FailNow(t, "should not run sync") + default: + } + } + + result := func() { + select { + case res := <-received: + assert.Equal(t, "default/test", res) + case <-time.After(2 * time.Second): + assert.FailNow(t, "did not run sync") + } + } + + // initial population + gsWatch.Add(gs.DeepCopy()) + noChange() + + // gs before pod + gs.Status.State = agonesv1.GameServerStatePortAllocation + gsWatch.Modify(gs.DeepCopy()) + noChange() + + // ready gs + gs.Status.State = agonesv1.GameServerStateReady + gsWatch.Modify(gs.DeepCopy()) + result() + + // allocated + gs.Status.State = agonesv1.GameServerStateAllocated + gsWatch.Modify(gs.DeepCopy()) + result() + + // unhealthy gs + gs.Status.State = agonesv1.GameServerStateUnhealthy + gsWatch.Modify(gs.DeepCopy()) + noChange() + + // shutdown gs + gs.Status.State = agonesv1.GameServerStateShutdown + gsWatch.Modify(gs.DeepCopy()) + noChange() + + // dev gameservers + gs.Status.State = agonesv1.GameServerStateReady + gs.ObjectMeta.Annotations[agonesv1.DevAddressAnnotation] = ipFixture + gsWatch.Modify(gs.DeepCopy()) + noChange() +}