Skip to content

Commit

Permalink
Centralise Controller Queue and Worker processing
Browse files Browse the repository at this point in the history
There were multiple places where queue and worker
goroutines pattern for controllers (and related) were
repeated in almost exactly the same ways.

Since there are more controllers coming, it made sense
to consolidate this logic into a centralised place that
could be reused to make authoring them easier.

This may also allow us to centralise some health checking in #98
  • Loading branch information
markmandel committed Feb 25, 2018
1 parent 13839b2 commit 5c8fbd5
Show file tree
Hide file tree
Showing 6 changed files with 206 additions and 182 deletions.
77 changes: 8 additions & 69 deletions pkg/gameservers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
listerv1alpha1 "agones.dev/agones/pkg/client/listers/stable/v1alpha1"
"agones.dev/agones/pkg/util/runtime"
"agones.dev/agones/pkg/util/webhooks"
"agones.dev/agones/pkg/util/workerqueue"
"github.com/mattbaird/jsonpatch"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
Expand All @@ -48,7 +49,6 @@ import (
corelisterv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
)

var (
Expand All @@ -67,13 +67,11 @@ type Controller struct {
gameServerLister listerv1alpha1.GameServerLister
gameServerSynced cache.InformerSynced
nodeLister corelisterv1.NodeLister
queue workqueue.RateLimitingInterface
portAllocator *PortAllocator
healthController *HealthController
workerqueue *workerqueue.WorkerQueue
server *http.Server
recorder record.EventRecorder
// this allows for overwriting for testing purposes
syncHandler func(string) error
}

// NewController returns a new gameserver crd controller
Expand Down Expand Up @@ -101,7 +99,6 @@ func NewController(
gameServerLister: gameServers.Lister(),
gameServerSynced: gsInformer.HasSynced,
nodeLister: kubeInformerFactory.Core().V1().Nodes().Lister(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), stable.GroupName+".GameServerController"),
portAllocator: NewPortAllocator(minPort, maxPort, kubeInformerFactory, agonesInformerFactory),
healthController: NewHealthController(kubeClient, agonesClient, kubeInformerFactory, agonesInformerFactory),
}
Expand All @@ -113,16 +110,18 @@ func NewController(
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
c.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "gameserver-controller"})

c.workerqueue = workerqueue.NewWorkerQueue(c.syncGameServer, c.logger, stable.GroupName+".GameServerController")

wh.AddHandler("/mutate", stablev1alpha1.Kind("GameServer"), admv1beta1.Create, c.creationHandler)

gsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.enqueueGameServer,
AddFunc: c.workerqueue.Enqueue,
UpdateFunc: func(oldObj, newObj interface{}) {
// no point in processing unless there is a State change
oldGs := oldObj.(*stablev1alpha1.GameServer)
newGs := newObj.(*stablev1alpha1.GameServer)
if oldGs.Status.State != newGs.Status.State || oldGs.ObjectMeta.DeletionTimestamp != newGs.ObjectMeta.DeletionTimestamp {
c.enqueueGameServer(newGs)
c.workerqueue.Enqueue(newGs)
}
},
})
Expand All @@ -133,14 +132,12 @@ func NewController(
pod := obj.(*corev1.Pod)
if stablev1alpha1.GameServerRolePodSelector.Matches(labels.Set(pod.ObjectMeta.Labels)) {
if owner := metav1.GetControllerOf(pod); owner != nil {
c.enqueueGameServer(cache.ExplicitKey(pod.ObjectMeta.Namespace + "/" + owner.Name))
c.workerqueue.Enqueue(cache.ExplicitKey(pod.ObjectMeta.Namespace + "/" + owner.Name))
}
}
},
})

c.syncHandler = c.syncGameServer

mux := http.NewServeMux()
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
_, err := w.Write([]byte("ok"))
Expand Down Expand Up @@ -221,8 +218,6 @@ func (c *Controller) creationHandler(review admv1beta1.AdmissionReview) (admv1be
// Run the GameServer controller. Will block until stop is closed.
// Runs threadiness number workers to process the rate limited queue
func (c Controller) Run(threadiness int, stop <-chan struct{}) error {
defer c.queue.ShutDown()

c.logger.Info("Starting health check...")
go func() {
if err := c.server.ListenAndServe(); err != nil {
Expand Down Expand Up @@ -254,66 +249,10 @@ func (c Controller) Run(threadiness int, stop <-chan struct{}) error {
// Run the Health Controller
go c.healthController.Run(stop)

c.logger.Info("Starting workers...")
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stop)
}

<-stop
c.workerqueue.Run(threadiness, stop)
return nil
}

// enqueueGameServer puts the name of the GameServer in the
// queue to be processed. This should not be passed any object
// other than a GameServer.
func (c Controller) enqueueGameServer(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
err := errors.Wrap(err, "Error creating key for object")
runtime.HandleError(c.logger.WithField("obj", obj), err)
return
}
c.queue.AddRateLimited(key)
}

// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (c *Controller) runWorker() {
for c.processNextWorkItem() {
}
}

func (c *Controller) processNextWorkItem() bool {
obj, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(obj)

c.logger.WithField("obj", obj).Info("Processing obj")

var key string
var ok bool
if key, ok = obj.(string); !ok {
runtime.HandleError(c.logger.WithField("obj", obj), errors.Errorf("expected string in queue, but got %T", obj))
// this is a bad entry, we don't want to reprocess
c.queue.Forget(obj)
return true
}

if err := c.syncHandler(key); err != nil {
// we don't forget here, because we want this to be retried via the queue
runtime.HandleError(c.logger.WithField("obj", obj), err)
c.queue.AddRateLimited(obj)
return true
}

c.queue.Forget(obj)
return true
}

// syncGameServer synchronises the Pods for the GameServers.
// and reacts to status changes that can occur through the client SDK
func (c *Controller) syncGameServer(key string) error {
Expand Down
6 changes: 3 additions & 3 deletions pkg/gameservers/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func TestControllerSyncGameServer(t *testing.T) {
err := c.portAllocator.Run(stop)
assert.Nil(t, err)

err = c.syncHandler("default/test")
err = c.syncGameServer("default/test")
assert.Nil(t, err)
assert.Equal(t, 2, updateCount, "update reactor should twice")
assert.True(t, podCreated, "pod should be created")
Expand Down Expand Up @@ -190,7 +190,7 @@ func TestControllerWatchGameServers(t *testing.T) {
received := make(chan string)
defer close(received)

c.syncHandler = func(name string) error {
c.workerqueue.SyncHandler = func(name string) error {
assert.Equal(t, "default/test", name)
received <- name
return nil
Expand Down Expand Up @@ -232,7 +232,7 @@ func TestControllerHealthCheck(t *testing.T) {
return true, newEstablishedCRD(), nil
})

c.syncHandler = func(name string) error {
c.workerqueue.SyncHandler = func(name string) error {
return nil
}

Expand Down
60 changes: 5 additions & 55 deletions pkg/gameservers/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,27 @@
package gameservers

import (
"time"

"agones.dev/agones/pkg/apis/stable"
"agones.dev/agones/pkg/apis/stable/v1alpha1"
"agones.dev/agones/pkg/client/clientset/versioned"
getterv1alpha1 "agones.dev/agones/pkg/client/clientset/versioned/typed/stable/v1alpha1"
"agones.dev/agones/pkg/client/informers/externalversions"
listerv1alpha1 "agones.dev/agones/pkg/client/listers/stable/v1alpha1"
"agones.dev/agones/pkg/util/runtime"
"agones.dev/agones/pkg/util/workerqueue"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
corelisterv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
)

// HealthController watches Pods, and applies
Expand All @@ -50,7 +47,7 @@ type HealthController struct {
podLister corelisterv1.PodLister
gameServerGetter getterv1alpha1.GameServersGetter
gameServerLister listerv1alpha1.GameServerLister
queue workqueue.RateLimitingInterface
workerqueue *workerqueue.WorkerQueue
recorder record.EventRecorder
}

Expand All @@ -64,10 +61,10 @@ func NewHealthController(kubeClient kubernetes.Interface, agonesClient versioned
podLister: kubeInformerFactory.Core().V1().Pods().Lister(),
gameServerGetter: agonesClient.StableV1alpha1(),
gameServerLister: agonesInformerFactory.Stable().V1alpha1().GameServers().Lister(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), stable.GroupName+".HealthController"),
}

hc.logger = runtime.NewLoggerWithType(hc)
hc.workerqueue = workerqueue.NewWorkerQueue(hc.syncGameServer, hc.logger, stable.GroupName+".HealthController")

eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(hc.logger.Infof)
Expand All @@ -81,7 +78,7 @@ func NewHealthController(kubeClient kubernetes.Interface, agonesClient versioned
if v1alpha1.GameServerRolePodSelector.Matches(labels.Set(pod.Labels)) && hc.failedContainer(pod) {
key := pod.ObjectMeta.Namespace + "/" + owner.Name
hc.logger.WithField("key", key).Info("GameServer container has terminated")
hc.enqueueGameServer(key)
hc.workerqueue.Enqueue(cache.ExplicitKey(key))
}
}
},
Expand All @@ -102,57 +99,10 @@ func (hc *HealthController) failedContainer(pod *corev1.Pod) bool {

}

// enqueue puts the name of the GameServer into the queue
func (hc *HealthController) enqueueGameServer(key string) {
hc.queue.AddRateLimited(key)
}

// Run processes the rate limited queue.
// Will block until stop is closed
func (hc *HealthController) Run(stop <-chan struct{}) {
defer hc.queue.ShutDown()

hc.logger.Info("Starting worker")
go wait.Until(hc.runWorker, time.Second, stop)
<-stop
hc.logger.Info("Shut down worker")
}

// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (hc *HealthController) runWorker() {
for hc.processNextWorkItem() {
}
}

func (hc *HealthController) processNextWorkItem() bool {
obj, quit := hc.queue.Get()
if quit {
return false
}
defer hc.queue.Done(obj)

hc.logger.WithField("obj", obj).Info("Processing obj")

var key string
var ok bool
if key, ok = obj.(string); !ok {
runtime.HandleError(hc.logger.WithField("obj", obj), errors.Errorf("expected string in queue, but got %T", obj))
// this is a bad entry, we don't want to reprocess
hc.queue.Forget(obj)
return true
}

if err := hc.syncGameServer(key); err != nil {
// we don't forget here, because we want this to be retried via the queue
runtime.HandleError(hc.logger.WithField("obj", obj), err)
hc.queue.AddRateLimited(obj)
return true
}

hc.queue.Forget(obj)
return true
hc.workerqueue.Run(1, stop)
}

// syncGameServer sets the GameSerer to Unhealthy, if its state is Ready
Expand Down
Loading

0 comments on commit 5c8fbd5

Please sign in to comment.