diff --git a/pkg/gameservers/controller.go b/pkg/gameservers/controller.go index 8e291e2cd1..033568e6e4 100644 --- a/pkg/gameservers/controller.go +++ b/pkg/gameservers/controller.go @@ -28,6 +28,7 @@ import ( listerv1alpha1 "agones.dev/agones/pkg/client/listers/stable/v1alpha1" "agones.dev/agones/pkg/util/runtime" "agones.dev/agones/pkg/util/webhooks" + "agones.dev/agones/pkg/util/workerqueue" "github.com/mattbaird/jsonpatch" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -48,7 +49,6 @@ import ( corelisterv1 "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/workqueue" ) var ( @@ -67,13 +67,11 @@ type Controller struct { gameServerLister listerv1alpha1.GameServerLister gameServerSynced cache.InformerSynced nodeLister corelisterv1.NodeLister - queue workqueue.RateLimitingInterface portAllocator *PortAllocator healthController *HealthController + workerqueue *workerqueue.WorkerQueue server *http.Server recorder record.EventRecorder - // this allows for overwriting for testing purposes - syncHandler func(string) error } // NewController returns a new gameserver crd controller @@ -101,7 +99,6 @@ func NewController( gameServerLister: gameServers.Lister(), gameServerSynced: gsInformer.HasSynced, nodeLister: kubeInformerFactory.Core().V1().Nodes().Lister(), - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), stable.GroupName+".GameServerController"), portAllocator: NewPortAllocator(minPort, maxPort, kubeInformerFactory, agonesInformerFactory), healthController: NewHealthController(kubeClient, agonesClient, kubeInformerFactory, agonesInformerFactory), } @@ -113,16 +110,18 @@ func NewController( eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) c.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "gameserver-controller"}) + c.workerqueue = workerqueue.NewWorkerQueue(c.syncGameServer, c.logger, stable.GroupName+".GameServerController") + wh.AddHandler("/mutate", stablev1alpha1.Kind("GameServer"), admv1beta1.Create, c.creationHandler) gsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: c.enqueueGameServer, + AddFunc: c.workerqueue.Enqueue, UpdateFunc: func(oldObj, newObj interface{}) { // no point in processing unless there is a State change oldGs := oldObj.(*stablev1alpha1.GameServer) newGs := newObj.(*stablev1alpha1.GameServer) if oldGs.Status.State != newGs.Status.State || oldGs.ObjectMeta.DeletionTimestamp != newGs.ObjectMeta.DeletionTimestamp { - c.enqueueGameServer(newGs) + c.workerqueue.Enqueue(newGs) } }, }) @@ -133,14 +132,12 @@ func NewController( pod := obj.(*corev1.Pod) if stablev1alpha1.GameServerRolePodSelector.Matches(labels.Set(pod.ObjectMeta.Labels)) { if owner := metav1.GetControllerOf(pod); owner != nil { - c.enqueueGameServer(cache.ExplicitKey(pod.ObjectMeta.Namespace + "/" + owner.Name)) + c.workerqueue.Enqueue(cache.ExplicitKey(pod.ObjectMeta.Namespace + "/" + owner.Name)) } } }, }) - c.syncHandler = c.syncGameServer - mux := http.NewServeMux() mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { _, err := w.Write([]byte("ok")) @@ -221,8 +218,6 @@ func (c *Controller) creationHandler(review admv1beta1.AdmissionReview) (admv1be // Run the GameServer controller. Will block until stop is closed. // Runs threadiness number workers to process the rate limited queue func (c Controller) Run(threadiness int, stop <-chan struct{}) error { - defer c.queue.ShutDown() - c.logger.Info("Starting health check...") go func() { if err := c.server.ListenAndServe(); err != nil { @@ -254,66 +249,10 @@ func (c Controller) Run(threadiness int, stop <-chan struct{}) error { // Run the Health Controller go c.healthController.Run(stop) - c.logger.Info("Starting workers...") - for i := 0; i < threadiness; i++ { - go wait.Until(c.runWorker, time.Second, stop) - } - - <-stop + c.workerqueue.Run(threadiness, stop) return nil } -// enqueueGameServer puts the name of the GameServer in the -// queue to be processed. This should not be passed any object -// other than a GameServer. -func (c Controller) enqueueGameServer(obj interface{}) { - var key string - var err error - if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { - err := errors.Wrap(err, "Error creating key for object") - runtime.HandleError(c.logger.WithField("obj", obj), err) - return - } - c.queue.AddRateLimited(key) -} - -// runWorker is a long-running function that will continually call the -// processNextWorkItem function in order to read and process a message on the -// workqueue. -func (c *Controller) runWorker() { - for c.processNextWorkItem() { - } -} - -func (c *Controller) processNextWorkItem() bool { - obj, quit := c.queue.Get() - if quit { - return false - } - defer c.queue.Done(obj) - - c.logger.WithField("obj", obj).Info("Processing obj") - - var key string - var ok bool - if key, ok = obj.(string); !ok { - runtime.HandleError(c.logger.WithField("obj", obj), errors.Errorf("expected string in queue, but got %T", obj)) - // this is a bad entry, we don't want to reprocess - c.queue.Forget(obj) - return true - } - - if err := c.syncHandler(key); err != nil { - // we don't forget here, because we want this to be retried via the queue - runtime.HandleError(c.logger.WithField("obj", obj), err) - c.queue.AddRateLimited(obj) - return true - } - - c.queue.Forget(obj) - return true -} - // syncGameServer synchronises the Pods for the GameServers. // and reacts to status changes that can occur through the client SDK func (c *Controller) syncGameServer(key string) error { diff --git a/pkg/gameservers/controller_test.go b/pkg/gameservers/controller_test.go index 3313f55fc1..8582df4aef 100644 --- a/pkg/gameservers/controller_test.go +++ b/pkg/gameservers/controller_test.go @@ -138,7 +138,7 @@ func TestControllerSyncGameServer(t *testing.T) { err := c.portAllocator.Run(stop) assert.Nil(t, err) - err = c.syncHandler("default/test") + err = c.syncGameServer("default/test") assert.Nil(t, err) assert.Equal(t, 2, updateCount, "update reactor should twice") assert.True(t, podCreated, "pod should be created") @@ -190,7 +190,7 @@ func TestControllerWatchGameServers(t *testing.T) { received := make(chan string) defer close(received) - c.syncHandler = func(name string) error { + c.workerqueue.SyncHandler = func(name string) error { assert.Equal(t, "default/test", name) received <- name return nil @@ -232,7 +232,7 @@ func TestControllerHealthCheck(t *testing.T) { return true, newEstablishedCRD(), nil }) - c.syncHandler = func(name string) error { + c.workerqueue.SyncHandler = func(name string) error { return nil } diff --git a/pkg/gameservers/health.go b/pkg/gameservers/health.go index 14bf46f9d3..451ffc4ad8 100644 --- a/pkg/gameservers/health.go +++ b/pkg/gameservers/health.go @@ -15,8 +15,6 @@ package gameservers import ( - "time" - "agones.dev/agones/pkg/apis/stable" "agones.dev/agones/pkg/apis/stable/v1alpha1" "agones.dev/agones/pkg/client/clientset/versioned" @@ -24,13 +22,13 @@ import ( "agones.dev/agones/pkg/client/informers/externalversions" listerv1alpha1 "agones.dev/agones/pkg/client/listers/stable/v1alpha1" "agones.dev/agones/pkg/util/runtime" + "agones.dev/agones/pkg/util/workerqueue" "github.com/pkg/errors" "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" @@ -38,7 +36,6 @@ import ( corelisterv1 "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/workqueue" ) // HealthController watches Pods, and applies @@ -50,7 +47,7 @@ type HealthController struct { podLister corelisterv1.PodLister gameServerGetter getterv1alpha1.GameServersGetter gameServerLister listerv1alpha1.GameServerLister - queue workqueue.RateLimitingInterface + workerqueue *workerqueue.WorkerQueue recorder record.EventRecorder } @@ -64,10 +61,10 @@ func NewHealthController(kubeClient kubernetes.Interface, agonesClient versioned podLister: kubeInformerFactory.Core().V1().Pods().Lister(), gameServerGetter: agonesClient.StableV1alpha1(), gameServerLister: agonesInformerFactory.Stable().V1alpha1().GameServers().Lister(), - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), stable.GroupName+".HealthController"), } hc.logger = runtime.NewLoggerWithType(hc) + hc.workerqueue = workerqueue.NewWorkerQueue(hc.syncGameServer, hc.logger, stable.GroupName+".HealthController") eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(hc.logger.Infof) @@ -81,7 +78,7 @@ func NewHealthController(kubeClient kubernetes.Interface, agonesClient versioned if v1alpha1.GameServerRolePodSelector.Matches(labels.Set(pod.Labels)) && hc.failedContainer(pod) { key := pod.ObjectMeta.Namespace + "/" + owner.Name hc.logger.WithField("key", key).Info("GameServer container has terminated") - hc.enqueueGameServer(key) + hc.workerqueue.Enqueue(cache.ExplicitKey(key)) } } }, @@ -102,57 +99,10 @@ func (hc *HealthController) failedContainer(pod *corev1.Pod) bool { } -// enqueue puts the name of the GameServer into the queue -func (hc *HealthController) enqueueGameServer(key string) { - hc.queue.AddRateLimited(key) -} - // Run processes the rate limited queue. // Will block until stop is closed func (hc *HealthController) Run(stop <-chan struct{}) { - defer hc.queue.ShutDown() - - hc.logger.Info("Starting worker") - go wait.Until(hc.runWorker, time.Second, stop) - <-stop - hc.logger.Info("Shut down worker") -} - -// runWorker is a long-running function that will continually call the -// processNextWorkItem function in order to read and process a message on the -// workqueue. -func (hc *HealthController) runWorker() { - for hc.processNextWorkItem() { - } -} - -func (hc *HealthController) processNextWorkItem() bool { - obj, quit := hc.queue.Get() - if quit { - return false - } - defer hc.queue.Done(obj) - - hc.logger.WithField("obj", obj).Info("Processing obj") - - var key string - var ok bool - if key, ok = obj.(string); !ok { - runtime.HandleError(hc.logger.WithField("obj", obj), errors.Errorf("expected string in queue, but got %T", obj)) - // this is a bad entry, we don't want to reprocess - hc.queue.Forget(obj) - return true - } - - if err := hc.syncGameServer(key); err != nil { - // we don't forget here, because we want this to be retried via the queue - runtime.HandleError(hc.logger.WithField("obj", obj), err) - hc.queue.AddRateLimited(obj) - return true - } - - hc.queue.Forget(obj) - return true + hc.workerqueue.Run(1, stop) } // syncGameServer sets the GameSerer to Unhealthy, if its state is Ready diff --git a/pkg/gameservers/sdkserver.go b/pkg/gameservers/sdkserver.go index 698066256c..b8e2614ec8 100644 --- a/pkg/gameservers/sdkserver.go +++ b/pkg/gameservers/sdkserver.go @@ -15,9 +15,9 @@ package gameservers import ( - "fmt" "io" "net/http" + "strings" "sync" "time" @@ -27,6 +27,7 @@ import ( typedv1alpha1 "agones.dev/agones/pkg/client/clientset/versioned/typed/stable/v1alpha1" "agones.dev/agones/pkg/sdk" "agones.dev/agones/pkg/util/runtime" + "agones.dev/agones/pkg/util/workerqueue" "github.com/pkg/errors" "github.com/sirupsen/logrus" "golang.org/x/net/context" @@ -37,8 +38,8 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/workqueue" ) var _ sdk.SDKServer = &SDKServer{} @@ -50,7 +51,6 @@ type SDKServer struct { gameServerName string namespace string gameServerGetter typedv1alpha1.GameServersGetter - queue workqueue.RateLimitingInterface server *http.Server clock clock.Clock healthDisabled bool @@ -59,6 +59,7 @@ type SDKServer struct { healthMutex sync.RWMutex healthLastUpdated time.Time healthFailureCount int64 + workerqueue *workerqueue.WorkerQueue recorder record.EventRecorder } @@ -113,7 +114,12 @@ func NewSDKServer(gameServerName, namespace string, }) s.initHealthLastUpdated(healthInitialDelay) - s.queue = s.newWorkQueue() + s.workerqueue = workerqueue.NewWorkerQueue( + func(key string) error { + return s.updateState(stablev1alpha1.State(key)) + }, + s.logger, + strings.Join([]string{stable.GroupName, s.namespace, s.gameServerName}, ".")) s.logger.WithField("gameServerName", s.gameServerName).WithField("namespace", s.namespace).Info("created GameServer sidecar") @@ -126,16 +132,9 @@ func (s *SDKServer) initHealthLastUpdated(healthInitialDelay time.Duration) { s.healthLastUpdated = s.clock.Now().UTC().Add(healthInitialDelay) } -func (s *SDKServer) newWorkQueue() workqueue.RateLimitingInterface { - return workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), - fmt.Sprintf("%s/%s/%s", stable.GroupName, s.namespace, s.gameServerName)) -} - // Run processes the rate limited queue. // Will block until stop is closed func (s *SDKServer) Run(stop <-chan struct{}) { - defer s.queue.ShutDown() - s.logger.Info("Starting SDKServer http health check...") go func() { if err := s.server.ListenAndServe(); err != nil { @@ -154,47 +153,7 @@ func (s *SDKServer) Run(stop <-chan struct{}) { go wait.Until(s.runHealth, s.healthTimeout, stop) } - s.logger.Info("Starting worker") - go wait.Until(s.runWorker, time.Second, stop) - <-stop - s.logger.Info("Shut down workers and health checking") -} - -// runWorker is a long-running function that will continually call the -// processNextWorkItem function in order to read and process a message on the -// workqueue. -func (s *SDKServer) runWorker() { - for s.processNextWorkItem() { - } -} - -func (s *SDKServer) processNextWorkItem() bool { - obj, quit := s.queue.Get() - if quit { - return false - } - defer s.queue.Done(obj) - - s.logger.WithField("obj", obj).Info("Processing obj") - - var state stablev1alpha1.State - var ok bool - if state, ok = obj.(stablev1alpha1.State); !ok { - runtime.HandleError(s.logger.WithField("obj", obj), errors.Errorf("expected State in queue, but got %T", obj)) - // this is a bad entry, we don't want to reprocess - s.queue.Forget(obj) - return true - } - - if err := s.updateState(state); err != nil { - // we don't forget here, because we want this to be retried via the queue - runtime.HandleError(s.logger.WithField("obj", obj), err) - s.queue.AddRateLimited(obj) - return true - } - - s.queue.Forget(obj) - return true + s.workerqueue.Run(1, stop) } // updateState sets the GameServer Status's state to the state @@ -228,7 +187,7 @@ func (s *SDKServer) updateState(state stablev1alpha1.State) error { // the workqueue so it can be updated func (s *SDKServer) Ready(ctx context.Context, e *sdk.Empty) (*sdk.Empty, error) { s.logger.Info("Received Ready request, adding to queue") - s.queue.AddRateLimited(stablev1alpha1.RequestReady) + s.workerqueue.Enqueue(cache.ExplicitKey(stablev1alpha1.RequestReady)) return e, nil } @@ -236,7 +195,7 @@ func (s *SDKServer) Ready(ctx context.Context, e *sdk.Empty) (*sdk.Empty, error) // the workqueue so it can be updated func (s *SDKServer) Shutdown(ctx context.Context, e *sdk.Empty) (*sdk.Empty, error) { s.logger.Info("Received Shutdown request, adding to queue") - s.queue.AddRateLimited(stablev1alpha1.Shutdown) + s.workerqueue.Enqueue(cache.ExplicitKey(stablev1alpha1.Shutdown)) return e, nil } @@ -264,7 +223,7 @@ func (s *SDKServer) runHealth() { s.checkHealth() if !s.healthy() { s.logger.WithField("gameServerName", s.gameServerName).Info("being marked as not healthy") - s.queue.AddRateLimited(stablev1alpha1.Unhealthy) + s.workerqueue.Enqueue(cache.ExplicitKey(stablev1alpha1.Unhealthy)) } } diff --git a/pkg/util/workerqueue/workerqueue.go b/pkg/util/workerqueue/workerqueue.go new file mode 100644 index 0000000000..626903d715 --- /dev/null +++ b/pkg/util/workerqueue/workerqueue.go @@ -0,0 +1,120 @@ +// Copyright 2018 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package workerqueue extends client-go's workqueue +// functionality into an opinionated queue + worker model that +// is reusable +package workerqueue + +import ( + "time" + + "agones.dev/agones/pkg/util/runtime" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" +) + +// Handler is the handler for processing the work queue +// This is usually a syncronisation handler for a controller or related +type Handler func(string) error + +// WorkerQueue is an opinionated queue + worker for use +// with controllers and related and processing Kubernetes watched +// events and synchronising resources +type WorkerQueue struct { + logger *logrus.Entry + queue workqueue.RateLimitingInterface + // SyncHandler is exported to make testing easier (hack) + SyncHandler Handler +} + +// NewWorkerQueue returns a new worker queue for a given name +func NewWorkerQueue(handler Handler, logger *logrus.Entry, name string) *WorkerQueue { + return &WorkerQueue{ + logger: logger.WithField("queue", name), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name), + SyncHandler: handler, + } +} + +// Enqueue puts the name of the runtime.Object in the +// queue to be processed. If you need to send through an +// explicit key, use an cache.ExplicitKey +func (wq *WorkerQueue) Enqueue(obj interface{}) { + var key string + var err error + if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + err := errors.Wrap(err, "Error creating key for object") + runtime.HandleError(wq.logger.WithField("obj", obj), err) + return + } + wq.logger.WithField("key", key).Info("Enqueuing key") + wq.queue.AddRateLimited(key) +} + +// runWorker is a long-running function that will continually call the +// processNextWorkItem function in order to read and process a message on the +// workqueue. +func (wq *WorkerQueue) runWorker() { + for wq.processNextWorkItem() { + } +} + +// processNextWorkItem processes the next work item. +// pretty self explanatory :) +func (wq *WorkerQueue) processNextWorkItem() bool { + obj, quit := wq.queue.Get() + if quit { + return false + } + defer wq.queue.Done(obj) + + wq.logger.WithField("obj", obj).Info("Processing obj") + + var key string + var ok bool + if key, ok = obj.(string); !ok { + runtime.HandleError(wq.logger.WithField("obj", obj), errors.Errorf("expected string in queue, but got %T", obj)) + // this is a bad entry, we don't want to reprocess + wq.queue.Forget(obj) + return true + } + + if err := wq.SyncHandler(key); err != nil { + // we don't forget here, because we want this to be retried via the queue + runtime.HandleError(wq.logger.WithField("obj", obj), err) + wq.queue.AddRateLimited(obj) + return true + } + + wq.queue.Forget(obj) + return true +} + +// Run the WorkerQueue processing via the Handler. Will block until stop is closed. +// Runs threadiness number workers to process the rate limited queue +func (wq *WorkerQueue) Run(threadiness int, stop <-chan struct{}) { + defer wq.queue.ShutDown() + + wq.logger.WithField("threadiness", threadiness).Info("Starting workers...") + for i := 0; i < threadiness; i++ { + go wait.Until(wq.runWorker, time.Second, stop) + } + + <-stop + wq.logger.Info("...shutting down workers") +} diff --git a/pkg/util/workerqueue/workerqueue_test.go b/pkg/util/workerqueue/workerqueue_test.go new file mode 100644 index 0000000000..9792b38774 --- /dev/null +++ b/pkg/util/workerqueue/workerqueue_test.go @@ -0,0 +1,56 @@ +// Copyright 2018 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package workerqueue + +import ( + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "k8s.io/client-go/tools/cache" +) + +func TestWorkerQueueRun(t *testing.T) { + received := make(chan string) + defer close(received) + + syncHandler := func(name string) error { + assert.Equal(t, "default/test", name) + received <- name + return nil + } + + wq := NewWorkerQueue(syncHandler, logrus.WithField("source", "test"), "test") + stop := make(chan struct{}) + defer close(stop) + + go wq.Run(1, stop) + + // no change, should be no value + select { + case <-received: + assert.Fail(t, "should not have received value") + case <-time.After(1 * time.Second): + } + + wq.Enqueue(cache.ExplicitKey("default/test")) + + select { + case <-received: + case <-time.After(5 * time.Second): + assert.Fail(t, "should have received value") + } +}