Skip to content

Commit

Permalink
fix: Move worker.Listen to better location.
Browse files Browse the repository at this point in the history
  • Loading branch information
tjholm committed Jun 9, 2021
1 parent 3e71f11 commit a0bdc1e
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 11 deletions.
10 changes: 7 additions & 3 deletions adapters/grpc/faas_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@ type FaasServer struct {
// A reference to this stream will be passed on to a new worker instance
// This represents a new server that is ready to begin processing
func (s *FaasServer) TriggerStream(srv pb.Faas_TriggerStreamServer) error {
if err := s.pool.AddWorker(srv); err != nil {
if worker, err := s.pool.AddWorker(srv); err != nil {
// return an error here...
// TODO: Return proper grpc error with status here...
return err
}
} else {
errchan := make(chan error)
go worker.Listen(errchan)

return nil
// block here instead
return <-errchan
}
}

func NewFaasServer(workerPool *worker.FaasWorkerPool) *FaasServer {
Expand Down
7 changes: 3 additions & 4 deletions worker/faas_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,23 +63,22 @@ func (s *FaasWorkerPool) getWorkerCount() int {
}

// Add a New FaaS worker to this pool
func (s *FaasWorkerPool) AddWorker(stream pb.Faas_TriggerStreamServer) error {
func (s *FaasWorkerPool) AddWorker(stream pb.Faas_TriggerStreamServer) (*FaasWorker, error) {
s.workerLock.Lock()

workerCount := len(s.workers)

// Ensure we haven't reached the maximum number of workers
if workerCount > s.maxWorkers {
return fmt.Errorf("Max worker capacity reached! Cannot add more workers!")
return nil, fmt.Errorf("Max worker capacity reached! Cannot add more workers!")
}

// Add a new worker to this pool
worker := newFaasWorker(stream)
s.workers = append(s.workers, worker)
s.workerLock.Unlock()
worker.listen()

return nil
return worker, nil
}

func NewFaasWorkerPool() *FaasWorkerPool {
Expand Down
8 changes: 4 additions & 4 deletions worker/faas_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (s *FaasWorker) HandleEvent(trigger *triggers.Event) error {
}

// listen
func (s *FaasWorker) listen() {
func (s *FaasWorker) Listen(errchan chan error) {
// Listen for responses
for {
// var msg *pb.Message = &pb.Message{}
Expand All @@ -157,13 +157,13 @@ func (s *FaasWorker) listen() {
if err == io.EOF {
// return will close stream from server side
log.Println("exit")
break
}
if err != nil {
log.Printf("received error %v", err)
break
}

errchan <- err
break
}

if msg.GetInitRequest() != nil {
Expand All @@ -184,7 +184,7 @@ func (s *FaasWorker) listen() {
// Write the response the the waiting recipient
rChan <- response
} else {
log.Fatal(fmt.Errorf("Fatal: FaaS Worker in bad state exiting!!! %v", val))
errchan <- fmt.Errorf("Fatal: FaaS Worker in bad state exiting!!! %v", val)
break
}
}
Expand Down

0 comments on commit a0bdc1e

Please sign in to comment.