From 5c8fbd57caf339f6281158fd305b01fc5f9907ce Mon Sep 17 00:00:00 2001 From: Mark Mandel Date: Sat, 24 Feb 2018 12:49:53 -0800 Subject: [PATCH] Centralise Controller Queue and Worker processing There were multiple places where queue and worker goroutines pattern for controllers (and related) were repeated in almost exactly the same ways. Since there are more controllers coming, it made sense to consolidate this logic into a centralised place that could be reused to make authoring them easier. This may also allow us to centralise some health checking in #98 --- pkg/gameservers/controller.go | 77 ++------------- pkg/gameservers/controller_test.go | 6 +- pkg/gameservers/health.go | 60 +----------- pkg/gameservers/sdkserver.go | 69 +++---------- pkg/util/workerqueue/workerqueue.go | 120 +++++++++++++++++++++++ pkg/util/workerqueue/workerqueue_test.go | 56 +++++++++++ 6 files changed, 206 insertions(+), 182 deletions(-) create mode 100644 pkg/util/workerqueue/workerqueue.go create mode 100644 pkg/util/workerqueue/workerqueue_test.go diff --git a/pkg/gameservers/controller.go b/pkg/gameservers/controller.go index 8e291e2cd1..033568e6e4 100644 --- a/pkg/gameservers/controller.go +++ b/pkg/gameservers/controller.go @@ -28,6 +28,7 @@ import ( listerv1alpha1 "agones.dev/agones/pkg/client/listers/stable/v1alpha1" "agones.dev/agones/pkg/util/runtime" "agones.dev/agones/pkg/util/webhooks" + "agones.dev/agones/pkg/util/workerqueue" "github.com/mattbaird/jsonpatch" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -48,7 +49,6 @@ import ( corelisterv1 "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/workqueue" ) var ( @@ -67,13 +67,11 @@ type Controller struct { gameServerLister listerv1alpha1.GameServerLister gameServerSynced cache.InformerSynced nodeLister corelisterv1.NodeLister - queue workqueue.RateLimitingInterface portAllocator *PortAllocator healthController *HealthController + workerqueue *workerqueue.WorkerQueue server *http.Server recorder record.EventRecorder - // this allows for overwriting for testing purposes - syncHandler func(string) error } // NewController returns a new gameserver crd controller @@ -101,7 +99,6 @@ func NewController( gameServerLister: gameServers.Lister(), gameServerSynced: gsInformer.HasSynced, nodeLister: kubeInformerFactory.Core().V1().Nodes().Lister(), - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), stable.GroupName+".GameServerController"), portAllocator: NewPortAllocator(minPort, maxPort, kubeInformerFactory, agonesInformerFactory), healthController: NewHealthController(kubeClient, agonesClient, kubeInformerFactory, agonesInformerFactory), } @@ -113,16 +110,18 @@ func NewController( eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) c.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "gameserver-controller"}) + c.workerqueue = workerqueue.NewWorkerQueue(c.syncGameServer, c.logger, stable.GroupName+".GameServerController") + wh.AddHandler("/mutate", stablev1alpha1.Kind("GameServer"), admv1beta1.Create, c.creationHandler) gsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: c.enqueueGameServer, + AddFunc: c.workerqueue.Enqueue, UpdateFunc: func(oldObj, newObj interface{}) { // no point in processing unless there is a State change oldGs := oldObj.(*stablev1alpha1.GameServer) newGs := newObj.(*stablev1alpha1.GameServer) if oldGs.Status.State != newGs.Status.State || oldGs.ObjectMeta.DeletionTimestamp != newGs.ObjectMeta.DeletionTimestamp { - c.enqueueGameServer(newGs) + c.workerqueue.Enqueue(newGs) } }, }) @@ -133,14 +132,12 @@ func NewController( pod := obj.(*corev1.Pod) if stablev1alpha1.GameServerRolePodSelector.Matches(labels.Set(pod.ObjectMeta.Labels)) { if owner := metav1.GetControllerOf(pod); owner != nil { - c.enqueueGameServer(cache.ExplicitKey(pod.ObjectMeta.Namespace + "/" + owner.Name)) + c.workerqueue.Enqueue(cache.ExplicitKey(pod.ObjectMeta.Namespace + "/" + owner.Name)) } } }, }) - c.syncHandler = c.syncGameServer - mux := http.NewServeMux() mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { _, err := w.Write([]byte("ok")) @@ -221,8 +218,6 @@ func (c *Controller) creationHandler(review admv1beta1.AdmissionReview) (admv1be // Run the GameServer controller. Will block until stop is closed. // Runs threadiness number workers to process the rate limited queue func (c Controller) Run(threadiness int, stop <-chan struct{}) error { - defer c.queue.ShutDown() - c.logger.Info("Starting health check...") go func() { if err := c.server.ListenAndServe(); err != nil { @@ -254,66 +249,10 @@ func (c Controller) Run(threadiness int, stop <-chan struct{}) error { // Run the Health Controller go c.healthController.Run(stop) - c.logger.Info("Starting workers...") - for i := 0; i < threadiness; i++ { - go wait.Until(c.runWorker, time.Second, stop) - } - - <-stop + c.workerqueue.Run(threadiness, stop) return nil } -// enqueueGameServer puts the name of the GameServer in the -// queue to be processed. This should not be passed any object -// other than a GameServer. -func (c Controller) enqueueGameServer(obj interface{}) { - var key string - var err error - if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { - err := errors.Wrap(err, "Error creating key for object") - runtime.HandleError(c.logger.WithField("obj", obj), err) - return - } - c.queue.AddRateLimited(key) -} - -// runWorker is a long-running function that will continually call the -// processNextWorkItem function in order to read and process a message on the -// workqueue. -func (c *Controller) runWorker() { - for c.processNextWorkItem() { - } -} - -func (c *Controller) processNextWorkItem() bool { - obj, quit := c.queue.Get() - if quit { - return false - } - defer c.queue.Done(obj) - - c.logger.WithField("obj", obj).Info("Processing obj") - - var key string - var ok bool - if key, ok = obj.(string); !ok { - runtime.HandleError(c.logger.WithField("obj", obj), errors.Errorf("expected string in queue, but got %T", obj)) - // this is a bad entry, we don't want to reprocess - c.queue.Forget(obj) - return true - } - - if err := c.syncHandler(key); err != nil { - // we don't forget here, because we want this to be retried via the queue - runtime.HandleError(c.logger.WithField("obj", obj), err) - c.queue.AddRateLimited(obj) - return true - } - - c.queue.Forget(obj) - return true -} - // syncGameServer synchronises the Pods for the GameServers. // and reacts to status changes that can occur through the client SDK func (c *Controller) syncGameServer(key string) error { diff --git a/pkg/gameservers/controller_test.go b/pkg/gameservers/controller_test.go index 3313f55fc1..8582df4aef 100644 --- a/pkg/gameservers/controller_test.go +++ b/pkg/gameservers/controller_test.go @@ -138,7 +138,7 @@ func TestControllerSyncGameServer(t *testing.T) { err := c.portAllocator.Run(stop) assert.Nil(t, err) - err = c.syncHandler("default/test") + err = c.syncGameServer("default/test") assert.Nil(t, err) assert.Equal(t, 2, updateCount, "update reactor should twice") assert.True(t, podCreated, "pod should be created") @@ -190,7 +190,7 @@ func TestControllerWatchGameServers(t *testing.T) { received := make(chan string) defer close(received) - c.syncHandler = func(name string) error { + c.workerqueue.SyncHandler = func(name string) error { assert.Equal(t, "default/test", name) received <- name return nil @@ -232,7 +232,7 @@ func TestControllerHealthCheck(t *testing.T) { return true, newEstablishedCRD(), nil }) - c.syncHandler = func(name string) error { + c.workerqueue.SyncHandler = func(name string) error { return nil } diff --git a/pkg/gameservers/health.go b/pkg/gameservers/health.go index 14bf46f9d3..451ffc4ad8 100644 --- a/pkg/gameservers/health.go +++ b/pkg/gameservers/health.go @@ -15,8 +15,6 @@ package gameservers import ( - "time" - "agones.dev/agones/pkg/apis/stable" "agones.dev/agones/pkg/apis/stable/v1alpha1" "agones.dev/agones/pkg/client/clientset/versioned" @@ -24,13 +22,13 @@ import ( "agones.dev/agones/pkg/client/informers/externalversions" listerv1alpha1 "agones.dev/agones/pkg/client/listers/stable/v1alpha1" "agones.dev/agones/pkg/util/runtime" + "agones.dev/agones/pkg/util/workerqueue" "github.com/pkg/errors" "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" @@ -38,7 +36,6 @@ import ( corelisterv1 "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/workqueue" ) // HealthController watches Pods, and applies @@ -50,7 +47,7 @@ type HealthController struct { podLister corelisterv1.PodLister gameServerGetter getterv1alpha1.GameServersGetter gameServerLister listerv1alpha1.GameServerLister - queue workqueue.RateLimitingInterface + workerqueue *workerqueue.WorkerQueue recorder record.EventRecorder } @@ -64,10 +61,10 @@ func NewHealthController(kubeClient kubernetes.Interface, agonesClient versioned podLister: kubeInformerFactory.Core().V1().Pods().Lister(), gameServerGetter: agonesClient.StableV1alpha1(), gameServerLister: agonesInformerFactory.Stable().V1alpha1().GameServers().Lister(), - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), stable.GroupName+".HealthController"), } hc.logger = runtime.NewLoggerWithType(hc) + hc.workerqueue = workerqueue.NewWorkerQueue(hc.syncGameServer, hc.logger, stable.GroupName+".HealthController") eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(hc.logger.Infof) @@ -81,7 +78,7 @@ func NewHealthController(kubeClient kubernetes.Interface, agonesClient versioned if v1alpha1.GameServerRolePodSelector.Matches(labels.Set(pod.Labels)) && hc.failedContainer(pod) { key := pod.ObjectMeta.Namespace + "/" + owner.Name hc.logger.WithField("key", key).Info("GameServer container has terminated") - hc.enqueueGameServer(key) + hc.workerqueue.Enqueue(cache.ExplicitKey(key)) } } }, @@ -102,57 +99,10 @@ func (hc *HealthController) failedContainer(pod *corev1.Pod) bool { } -// enqueue puts the name of the GameServer into the queue -func (hc *HealthController) enqueueGameServer(key string) { - hc.queue.AddRateLimited(key) -} - // Run processes the rate limited queue. // Will block until stop is closed func (hc *HealthController) Run(stop <-chan struct{}) { - defer hc.queue.ShutDown() - - hc.logger.Info("Starting worker") - go wait.Until(hc.runWorker, time.Second, stop) - <-stop - hc.logger.Info("Shut down worker") -} - -// runWorker is a long-running function that will continually call the -// processNextWorkItem function in order to read and process a message on the -// workqueue. -func (hc *HealthController) runWorker() { - for hc.processNextWorkItem() { - } -} - -func (hc *HealthController) processNextWorkItem() bool { - obj, quit := hc.queue.Get() - if quit { - return false - } - defer hc.queue.Done(obj) - - hc.logger.WithField("obj", obj).Info("Processing obj") - - var key string - var ok bool - if key, ok = obj.(string); !ok { - runtime.HandleError(hc.logger.WithField("obj", obj), errors.Errorf("expected string in queue, but got %T", obj)) - // this is a bad entry, we don't want to reprocess - hc.queue.Forget(obj) - return true - } - - if err := hc.syncGameServer(key); err != nil { - // we don't forget here, because we want this to be retried via the queue - runtime.HandleError(hc.logger.WithField("obj", obj), err) - hc.queue.AddRateLimited(obj) - return true - } - - hc.queue.Forget(obj) - return true + hc.workerqueue.Run(1, stop) } // syncGameServer sets the GameSerer to Unhealthy, if its state is Ready diff --git a/pkg/gameservers/sdkserver.go b/pkg/gameservers/sdkserver.go index 698066256c..7b24b10cbf 100644 --- a/pkg/gameservers/sdkserver.go +++ b/pkg/gameservers/sdkserver.go @@ -15,11 +15,11 @@ package gameservers import ( - "fmt" "io" "net/http" "sync" "time" + "strings" "agones.dev/agones/pkg/apis/stable" stablev1alpha1 "agones.dev/agones/pkg/apis/stable/v1alpha1" @@ -27,6 +27,7 @@ import ( typedv1alpha1 "agones.dev/agones/pkg/client/clientset/versioned/typed/stable/v1alpha1" "agones.dev/agones/pkg/sdk" "agones.dev/agones/pkg/util/runtime" + "agones.dev/agones/pkg/util/workerqueue" "github.com/pkg/errors" "github.com/sirupsen/logrus" "golang.org/x/net/context" @@ -37,8 +38,8 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/workqueue" ) var _ sdk.SDKServer = &SDKServer{} @@ -50,7 +51,6 @@ type SDKServer struct { gameServerName string namespace string gameServerGetter typedv1alpha1.GameServersGetter - queue workqueue.RateLimitingInterface server *http.Server clock clock.Clock healthDisabled bool @@ -59,6 +59,7 @@ type SDKServer struct { healthMutex sync.RWMutex healthLastUpdated time.Time healthFailureCount int64 + workerqueue *workerqueue.WorkerQueue recorder record.EventRecorder } @@ -113,7 +114,12 @@ func NewSDKServer(gameServerName, namespace string, }) s.initHealthLastUpdated(healthInitialDelay) - s.queue = s.newWorkQueue() + s.workerqueue = workerqueue.NewWorkerQueue( + func(key string) error { + return s.updateState(stablev1alpha1.State(key)) + }, + s.logger, + strings.Join([]string{stable.GroupName, s.namespace, s.gameServerName}, ".")) s.logger.WithField("gameServerName", s.gameServerName).WithField("namespace", s.namespace).Info("created GameServer sidecar") @@ -126,16 +132,9 @@ func (s *SDKServer) initHealthLastUpdated(healthInitialDelay time.Duration) { s.healthLastUpdated = s.clock.Now().UTC().Add(healthInitialDelay) } -func (s *SDKServer) newWorkQueue() workqueue.RateLimitingInterface { - return workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), - fmt.Sprintf("%s/%s/%s", stable.GroupName, s.namespace, s.gameServerName)) -} - // Run processes the rate limited queue. // Will block until stop is closed func (s *SDKServer) Run(stop <-chan struct{}) { - defer s.queue.ShutDown() - s.logger.Info("Starting SDKServer http health check...") go func() { if err := s.server.ListenAndServe(); err != nil { @@ -154,47 +153,7 @@ func (s *SDKServer) Run(stop <-chan struct{}) { go wait.Until(s.runHealth, s.healthTimeout, stop) } - s.logger.Info("Starting worker") - go wait.Until(s.runWorker, time.Second, stop) - <-stop - s.logger.Info("Shut down workers and health checking") -} - -// runWorker is a long-running function that will continually call the -// processNextWorkItem function in order to read and process a message on the -// workqueue. -func (s *SDKServer) runWorker() { - for s.processNextWorkItem() { - } -} - -func (s *SDKServer) processNextWorkItem() bool { - obj, quit := s.queue.Get() - if quit { - return false - } - defer s.queue.Done(obj) - - s.logger.WithField("obj", obj).Info("Processing obj") - - var state stablev1alpha1.State - var ok bool - if state, ok = obj.(stablev1alpha1.State); !ok { - runtime.HandleError(s.logger.WithField("obj", obj), errors.Errorf("expected State in queue, but got %T", obj)) - // this is a bad entry, we don't want to reprocess - s.queue.Forget(obj) - return true - } - - if err := s.updateState(state); err != nil { - // we don't forget here, because we want this to be retried via the queue - runtime.HandleError(s.logger.WithField("obj", obj), err) - s.queue.AddRateLimited(obj) - return true - } - - s.queue.Forget(obj) - return true + s.workerqueue.Run(1, stop) } // updateState sets the GameServer Status's state to the state @@ -228,7 +187,7 @@ func (s *SDKServer) updateState(state stablev1alpha1.State) error { // the workqueue so it can be updated func (s *SDKServer) Ready(ctx context.Context, e *sdk.Empty) (*sdk.Empty, error) { s.logger.Info("Received Ready request, adding to queue") - s.queue.AddRateLimited(stablev1alpha1.RequestReady) + s.workerqueue.Enqueue(cache.ExplicitKey(stablev1alpha1.RequestReady)) return e, nil } @@ -236,7 +195,7 @@ func (s *SDKServer) Ready(ctx context.Context, e *sdk.Empty) (*sdk.Empty, error) // the workqueue so it can be updated func (s *SDKServer) Shutdown(ctx context.Context, e *sdk.Empty) (*sdk.Empty, error) { s.logger.Info("Received Shutdown request, adding to queue") - s.queue.AddRateLimited(stablev1alpha1.Shutdown) + s.workerqueue.Enqueue(cache.ExplicitKey(stablev1alpha1.Shutdown)) return e, nil } @@ -264,7 +223,7 @@ func (s *SDKServer) runHealth() { s.checkHealth() if !s.healthy() { s.logger.WithField("gameServerName", s.gameServerName).Info("being marked as not healthy") - s.queue.AddRateLimited(stablev1alpha1.Unhealthy) + s.workerqueue.Enqueue(cache.ExplicitKey(stablev1alpha1.Unhealthy)) } } diff --git a/pkg/util/workerqueue/workerqueue.go b/pkg/util/workerqueue/workerqueue.go new file mode 100644 index 0000000000..626903d715 --- /dev/null +++ b/pkg/util/workerqueue/workerqueue.go @@ -0,0 +1,120 @@ +// Copyright 2018 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package workerqueue extends client-go's workqueue +// functionality into an opinionated queue + worker model that +// is reusable +package workerqueue + +import ( + "time" + + "agones.dev/agones/pkg/util/runtime" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" +) + +// Handler is the handler for processing the work queue +// This is usually a syncronisation handler for a controller or related +type Handler func(string) error + +// WorkerQueue is an opinionated queue + worker for use +// with controllers and related and processing Kubernetes watched +// events and synchronising resources +type WorkerQueue struct { + logger *logrus.Entry + queue workqueue.RateLimitingInterface + // SyncHandler is exported to make testing easier (hack) + SyncHandler Handler +} + +// NewWorkerQueue returns a new worker queue for a given name +func NewWorkerQueue(handler Handler, logger *logrus.Entry, name string) *WorkerQueue { + return &WorkerQueue{ + logger: logger.WithField("queue", name), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name), + SyncHandler: handler, + } +} + +// Enqueue puts the name of the runtime.Object in the +// queue to be processed. If you need to send through an +// explicit key, use an cache.ExplicitKey +func (wq *WorkerQueue) Enqueue(obj interface{}) { + var key string + var err error + if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + err := errors.Wrap(err, "Error creating key for object") + runtime.HandleError(wq.logger.WithField("obj", obj), err) + return + } + wq.logger.WithField("key", key).Info("Enqueuing key") + wq.queue.AddRateLimited(key) +} + +// runWorker is a long-running function that will continually call the +// processNextWorkItem function in order to read and process a message on the +// workqueue. +func (wq *WorkerQueue) runWorker() { + for wq.processNextWorkItem() { + } +} + +// processNextWorkItem processes the next work item. +// pretty self explanatory :) +func (wq *WorkerQueue) processNextWorkItem() bool { + obj, quit := wq.queue.Get() + if quit { + return false + } + defer wq.queue.Done(obj) + + wq.logger.WithField("obj", obj).Info("Processing obj") + + var key string + var ok bool + if key, ok = obj.(string); !ok { + runtime.HandleError(wq.logger.WithField("obj", obj), errors.Errorf("expected string in queue, but got %T", obj)) + // this is a bad entry, we don't want to reprocess + wq.queue.Forget(obj) + return true + } + + if err := wq.SyncHandler(key); err != nil { + // we don't forget here, because we want this to be retried via the queue + runtime.HandleError(wq.logger.WithField("obj", obj), err) + wq.queue.AddRateLimited(obj) + return true + } + + wq.queue.Forget(obj) + return true +} + +// Run the WorkerQueue processing via the Handler. Will block until stop is closed. +// Runs threadiness number workers to process the rate limited queue +func (wq *WorkerQueue) Run(threadiness int, stop <-chan struct{}) { + defer wq.queue.ShutDown() + + wq.logger.WithField("threadiness", threadiness).Info("Starting workers...") + for i := 0; i < threadiness; i++ { + go wait.Until(wq.runWorker, time.Second, stop) + } + + <-stop + wq.logger.Info("...shutting down workers") +} diff --git a/pkg/util/workerqueue/workerqueue_test.go b/pkg/util/workerqueue/workerqueue_test.go new file mode 100644 index 0000000000..9792b38774 --- /dev/null +++ b/pkg/util/workerqueue/workerqueue_test.go @@ -0,0 +1,56 @@ +// Copyright 2018 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package workerqueue + +import ( + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "k8s.io/client-go/tools/cache" +) + +func TestWorkerQueueRun(t *testing.T) { + received := make(chan string) + defer close(received) + + syncHandler := func(name string) error { + assert.Equal(t, "default/test", name) + received <- name + return nil + } + + wq := NewWorkerQueue(syncHandler, logrus.WithField("source", "test"), "test") + stop := make(chan struct{}) + defer close(stop) + + go wq.Run(1, stop) + + // no change, should be no value + select { + case <-received: + assert.Fail(t, "should not have received value") + case <-time.After(1 * time.Second): + } + + wq.Enqueue(cache.ExplicitKey("default/test")) + + select { + case <-received: + case <-time.After(5 * time.Second): + assert.Fail(t, "should have received value") + } +}