From a0bdc1eef1f63451ef546eda008a29a900c138d0 Mon Sep 17 00:00:00 2001 From: Tim Holm Date: Wed, 9 Jun 2021 22:10:37 +1000 Subject: [PATCH] fix: Move worker.Listen to better location. --- adapters/grpc/faas_grpc.go | 10 +++++++--- worker/faas_pool.go | 7 +++---- worker/faas_worker.go | 8 ++++---- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/adapters/grpc/faas_grpc.go b/adapters/grpc/faas_grpc.go index 0882da0dd..116d41564 100644 --- a/adapters/grpc/faas_grpc.go +++ b/adapters/grpc/faas_grpc.go @@ -15,13 +15,17 @@ type FaasServer struct { // A reference to this stream will be passed on to a new worker instance // This represents a new server that is ready to begin processing func (s *FaasServer) TriggerStream(srv pb.Faas_TriggerStreamServer) error { - if err := s.pool.AddWorker(srv); err != nil { + if worker, err := s.pool.AddWorker(srv); err != nil { // return an error here... // TODO: Return proper grpc error with status here... return err - } + } else { + errchan := make(chan error) + go worker.Listen(errchan) - return nil + // block here instead + return <-errchan + } } func NewFaasServer(workerPool *worker.FaasWorkerPool) *FaasServer { diff --git a/worker/faas_pool.go b/worker/faas_pool.go index 9aba2716a..f20d1cb05 100644 --- a/worker/faas_pool.go +++ b/worker/faas_pool.go @@ -63,23 +63,22 @@ func (s *FaasWorkerPool) getWorkerCount() int { } // Add a New FaaS worker to this pool -func (s *FaasWorkerPool) AddWorker(stream pb.Faas_TriggerStreamServer) error { +func (s *FaasWorkerPool) AddWorker(stream pb.Faas_TriggerStreamServer) (*FaasWorker, error) { s.workerLock.Lock() workerCount := len(s.workers) // Ensure we haven't reached the maximum number of workers if workerCount > s.maxWorkers { - return fmt.Errorf("Max worker capacity reached! Cannot add more workers!") + return nil, fmt.Errorf("Max worker capacity reached! Cannot add more workers!") } // Add a new worker to this pool worker := newFaasWorker(stream) s.workers = append(s.workers, worker) s.workerLock.Unlock() - worker.listen() - return nil + return worker, nil } func NewFaasWorkerPool() *FaasWorkerPool { diff --git a/worker/faas_worker.go b/worker/faas_worker.go index c135fedf9..43ce0adfa 100644 --- a/worker/faas_worker.go +++ b/worker/faas_worker.go @@ -143,7 +143,7 @@ func (s *FaasWorker) HandleEvent(trigger *triggers.Event) error { } // listen -func (s *FaasWorker) listen() { +func (s *FaasWorker) Listen(errchan chan error) { // Listen for responses for { // var msg *pb.Message = &pb.Message{} @@ -157,13 +157,13 @@ func (s *FaasWorker) listen() { if err == io.EOF { // return will close stream from server side log.Println("exit") - break } if err != nil { log.Printf("received error %v", err) - break } + errchan <- err + break } if msg.GetInitRequest() != nil { @@ -184,7 +184,7 @@ func (s *FaasWorker) listen() { // Write the response the the waiting recipient rChan <- response } else { - log.Fatal(fmt.Errorf("Fatal: FaaS Worker in bad state exiting!!! %v", val)) + errchan <- fmt.Errorf("Fatal: FaaS Worker in bad state exiting!!! %v", val) break } }