From 52d2e809568eb29fd9912875c3520be3a5b42016 Mon Sep 17 00:00:00 2001 From: Tim Holm Date: Wed, 9 Jun 2021 20:11:27 +1000 Subject: [PATCH] feat: Update membrane to use new worker pools. --- adapters/grpc/faas_grpc.go | 90 +++++++------------------------------- membrane/membrane.go | 61 ++++++++++++-------------- worker/faas_pool.go | 4 +- worker/http_pool.go | 8 ++-- worker/pool.go | 9 ++++ 5 files changed, 58 insertions(+), 114 deletions(-) create mode 100644 worker/pool.go diff --git a/adapters/grpc/faas_grpc.go b/adapters/grpc/faas_grpc.go index b4c84bea7..0882da0dd 100644 --- a/adapters/grpc/faas_grpc.go +++ b/adapters/grpc/faas_grpc.go @@ -1,91 +1,31 @@ package grpc import ( - "fmt" - "sync" - - "github.com/google/uuid" pb "github.com/nitric-dev/membrane/interfaces/nitric/v1" - "github.com/nitric-dev/membrane/sdk" + "github.com/nitric-dev/membrane/worker" ) type FaasServer struct { pb.UnimplementedFaasServer - eventPlugin sdk.EventService - - srv pb.Faas_TriggerStreamServer - - // The function is ready to go - FunctionReady chan bool - - // Each trigger will get a channel back to wait for its response from the function - // triggerQueue map[string]*pb.TriggerRequest - // Add a write lock for triggers - responseQueue sync.Map - // Add a read lock for responses + // srv pb.Faas_TriggerStreamServer + pool *worker.FaasWorkerPool } -// Push a trigger onto the queue -func (s *FaasServer) PushTrigger(request *pb.TriggerRequest) (chan *pb.TriggerResponse, error) { - - // Make a channel for this trigger and push it onto the response queue under its id - // Generate an ID for this request response pair - ID := uuid.New().String() - message := &pb.Message{ - Id: ID, - Content: &pb.Message_TriggerRequest{ - TriggerRequest: request, - }, - } - - err := s.srv.Send(message) - - if err != nil { - // There was an error enqueuing the message - return nil, err +// Starts a new stream +// A reference to this stream will be passed on to a new worker instance +// This represents a new server that is ready to begin processing +func (s *FaasServer) TriggerStream(srv pb.Faas_TriggerStreamServer) error { + if err := s.pool.AddWorker(srv); err != nil { + // return an error here... + // TODO: Return proper grpc error with status here... + return err } - // Get a lock on the response queue - returnChan := make(chan *pb.TriggerResponse) - s.responseQueue.Store(ID, returnChan) - - return returnChan, nil + return nil } -// Recieve messages from the function -func (s *FaasServer) recieveMessages(errch chan error) { - for { - var msg *pb.Message - - err := s.srv.RecvMsg(msg) - - if err != nil { - // exit - errch <- err - break - } else { - - } - - // Load the the response channel and delete its map key reference - if val, ok := s.responseQueue.LoadAndDelete(msg.GetId()); ok { - // For now assume this is a trigger response... - response := msg.GetTriggerResponse() - rChan := val.(chan *pb.TriggerResponse) - // Write the response the the waiting recipient - rChan <- response - } else { - errch <- fmt.Errorf("Fatal: FaaS server in base state exiting!!!") - } +func NewFaasServer(workerPool *worker.FaasWorkerPool) *FaasServer { + return &FaasServer{ + pool: workerPool, } } - -// Start the stream -func (s *FaasServer) TriggerStream(srv pb.Faas_TriggerStreamServer) error { - s.srv = srv - - errch := make(chan error) - go s.recieveMessages(errch) - - return <-errch -} diff --git a/membrane/membrane.go b/membrane/membrane.go index fb0eee84c..04b24f7dd 100644 --- a/membrane/membrane.go +++ b/membrane/membrane.go @@ -21,12 +21,11 @@ import ( "os/exec" "strconv" "strings" - "time" "github.com/nitric-dev/membrane/utils" + "github.com/nitric-dev/membrane/worker" grpc2 "github.com/nitric-dev/membrane/adapters/grpc" - "github.com/nitric-dev/membrane/handler" v1 "github.com/nitric-dev/membrane/interfaces/nitric/v1" "github.com/nitric-dev/membrane/sdk" @@ -141,27 +140,6 @@ func (s *Membrane) startChildProcess() error { return fmt.Errorf("There was an error starting the child process: %v", applicationError) } - // Dial the child port to see if it's open and ready... - maxWaitTime := time.Duration(s.childTimeoutSeconds) * time.Second - // Longer poll times, e.g. 200 milliseconds results in slow lambda cold starts (15s+) - pollInterval := time.Duration(15) * time.Millisecond - - var waitedTime = time.Duration(0) - for { - conn, _ := net.Dial("tcp", s.childAddress) - if conn != nil { - conn.Close() - break - } else { - if waitedTime < maxWaitTime { - time.Sleep(pollInterval) - waitedTime += pollInterval - } else { - return fmt.Errorf("Unable to dial child server, does it expose a http server at: %s?", s.childAddress) - } - } - } - return nil } @@ -199,6 +177,16 @@ func (s *Membrane) Start() error { authServer := s.createUserServer() v1.RegisterUserServer(s.grpcServer, authServer) + var pool worker.WorkerPool + // FaaS server MUST start before the child process + if s.mode == Mode_Faas { + faasPool := worker.NewFaasWorkerPool() + // Register the faas server + faasServer := grpc2.NewFaasServer(faasPool) + v1.RegisterFaasServer(s.grpcServer, faasServer) + pool = faasPool + } + lis, err := net.Listen("tcp", s.serviceAddress) if err != nil { return fmt.Errorf("Could not listen on configured service address: %v", err) @@ -223,6 +211,14 @@ func (s *Membrane) Start() error { s.log("No Child Command Specified, Skipping...") } + if s.mode == Mode_HttpProxy { + httpPool := worker.NewHttpWorkerPool() + if err := httpPool.AddWorker(s.childAddress); err != nil { + return err + } + pool = httpPool + } + // FIXME: Only do this in Gateway mode... // Otherwise always pass through to the provided child address // Start the Gateway Server @@ -231,19 +227,16 @@ func (s *Membrane) Start() error { // data ingress/egress to our userland code // The gateway should block the main thread but will // use this callback as a control mechanism - s.log("Starting Gateway") + s.log("Waiting for active workers") + pool.WaitForActiveWorkers(5) - var hndlr handler.TriggerHandler - switch s.mode { - case Mode_Faas: - hndlr = handler.NewFaasHandler(s.childAddress) - break - case Mode_HttpProxy: - hndlr = handler.NewHttpHandler(s.childAddress) - break - } + hndler, err := pool.GetTriggerHandler() - return s.gatewayPlugin.Start(hndlr) + if err != nil { + return err + } + s.log("Starting Gateway") + return s.gatewayPlugin.Start(hndler) } func (s *Membrane) Stop() { diff --git a/worker/faas_pool.go b/worker/faas_pool.go index b8cbd3031..646437023 100644 --- a/worker/faas_pool.go +++ b/worker/faas_pool.go @@ -30,7 +30,7 @@ func (s *FaasWorkerPool) GetTriggerHandler() (handler.TriggerHandler, error) { } // Synchronously wait for at least one active worker -func (s *FaasWorkerPool) WaitForActiveWorker(timeout int) error { +func (s *FaasWorkerPool) WaitForActiveWorkers(timeout int) error { // Dial the child port to see if it's open and ready... maxWaitTime := time.Duration(timeout) * time.Second // Longer poll times, e.g. 200 milliseconds results in slow lambda cold starts (15s+) @@ -38,7 +38,7 @@ func (s *FaasWorkerPool) WaitForActiveWorker(timeout int) error { var waitedTime = time.Duration(0) for { - if s.GetWorkerCount() >= 1 { + if s.getWorkerCount() >= 1 { break } else { if waitedTime < maxWaitTime { diff --git a/worker/http_pool.go b/worker/http_pool.go index 26e3e3db4..c8e5051f9 100644 --- a/worker/http_pool.go +++ b/worker/http_pool.go @@ -3,6 +3,8 @@ package worker import ( "fmt" "sync" + + "github.com/nitric-dev/membrane/handler" ) type HttpWorkerPool struct { @@ -12,7 +14,7 @@ type HttpWorkerPool struct { } // Ensure workers implement the trigger handler interface -func (s *FaasWorkerPool) GetTriggerHandler() (handler.TriggerHandler, error) { +func (s *HttpWorkerPool) GetTriggerHandler() (handler.TriggerHandler, error) { s.workerLock.Lock() defer s.workerLock.Unlock() @@ -25,7 +27,7 @@ func (s *FaasWorkerPool) GetTriggerHandler() (handler.TriggerHandler, error) { return s } -func WaitForActiveWorker(timeout int) { +func (s *HttpWorkerPool) WaitForActiveWorkers(timeout int) error { // Dial the child port to see if it's open and ready... maxWaitTime := time.Duration(timeout) * time.Second // Longer poll times, e.g. 200 milliseconds results in slow lambda cold starts (15s+) @@ -48,7 +50,7 @@ func WaitForActiveWorker(timeout int) { return nil } -func getActiveWorkers() int { +func (s *HttpWorkerPool) getActiveWorkers() int { s.workerLock.Lock() defer s.workerLock.Unlock() diff --git a/worker/pool.go b/worker/pool.go new file mode 100644 index 000000000..b7f23ddcf --- /dev/null +++ b/worker/pool.go @@ -0,0 +1,9 @@ +package worker + +import "github.com/nitric-dev/membrane/handler" + +type WorkerPool interface { + // WaitForActiveWorkers - A blocking method + WaitForActiveWorkers(timeout int) error + GetTriggerHandler() (handler.TriggerHandler, error) +}