Skip to content

Commit

Permalink
feat: Update membrane to use new worker pools.
Browse files Browse the repository at this point in the history
  • Loading branch information
tjholm committed Jun 9, 2021
1 parent 3ef726b commit 52d2e80
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 114 deletions.
90 changes: 15 additions & 75 deletions adapters/grpc/faas_grpc.go
Original file line number Diff line number Diff line change
@@ -1,91 +1,31 @@
package grpc

import (
"fmt"
"sync"

"github.com/google/uuid"
pb "github.com/nitric-dev/membrane/interfaces/nitric/v1"
"github.com/nitric-dev/membrane/sdk"
"github.com/nitric-dev/membrane/worker"
)

type FaasServer struct {
pb.UnimplementedFaasServer
eventPlugin sdk.EventService

srv pb.Faas_TriggerStreamServer

// The function is ready to go
FunctionReady chan bool

// Each trigger will get a channel back to wait for its response from the function
// triggerQueue map[string]*pb.TriggerRequest
// Add a write lock for triggers
responseQueue sync.Map
// Add a read lock for responses
// srv pb.Faas_TriggerStreamServer
pool *worker.FaasWorkerPool
}

// Push a trigger onto the queue
func (s *FaasServer) PushTrigger(request *pb.TriggerRequest) (chan *pb.TriggerResponse, error) {

// Make a channel for this trigger and push it onto the response queue under its id
// Generate an ID for this request response pair
ID := uuid.New().String()
message := &pb.Message{
Id: ID,
Content: &pb.Message_TriggerRequest{
TriggerRequest: request,
},
}

err := s.srv.Send(message)

if err != nil {
// There was an error enqueuing the message
return nil, err
// Starts a new stream
// A reference to this stream will be passed on to a new worker instance
// This represents a new server that is ready to begin processing
func (s *FaasServer) TriggerStream(srv pb.Faas_TriggerStreamServer) error {
if err := s.pool.AddWorker(srv); err != nil {
// return an error here...
// TODO: Return proper grpc error with status here...
return err
}

// Get a lock on the response queue
returnChan := make(chan *pb.TriggerResponse)
s.responseQueue.Store(ID, returnChan)

return returnChan, nil
return nil
}

// Recieve messages from the function
func (s *FaasServer) recieveMessages(errch chan error) {
for {
var msg *pb.Message

err := s.srv.RecvMsg(msg)

if err != nil {
// exit
errch <- err
break
} else {

}

// Load the the response channel and delete its map key reference
if val, ok := s.responseQueue.LoadAndDelete(msg.GetId()); ok {
// For now assume this is a trigger response...
response := msg.GetTriggerResponse()
rChan := val.(chan *pb.TriggerResponse)
// Write the response the the waiting recipient
rChan <- response
} else {
errch <- fmt.Errorf("Fatal: FaaS server in base state exiting!!!")
}
func NewFaasServer(workerPool *worker.FaasWorkerPool) *FaasServer {
return &FaasServer{
pool: workerPool,
}
}

// Start the stream
func (s *FaasServer) TriggerStream(srv pb.Faas_TriggerStreamServer) error {
s.srv = srv

errch := make(chan error)
go s.recieveMessages(errch)

return <-errch
}
61 changes: 27 additions & 34 deletions membrane/membrane.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ import (
"os/exec"
"strconv"
"strings"
"time"

"github.com/nitric-dev/membrane/utils"
"github.com/nitric-dev/membrane/worker"

grpc2 "github.com/nitric-dev/membrane/adapters/grpc"
"github.com/nitric-dev/membrane/handler"

v1 "github.com/nitric-dev/membrane/interfaces/nitric/v1"
"github.com/nitric-dev/membrane/sdk"
Expand Down Expand Up @@ -141,27 +140,6 @@ func (s *Membrane) startChildProcess() error {
return fmt.Errorf("There was an error starting the child process: %v", applicationError)
}

// Dial the child port to see if it's open and ready...
maxWaitTime := time.Duration(s.childTimeoutSeconds) * time.Second
// Longer poll times, e.g. 200 milliseconds results in slow lambda cold starts (15s+)
pollInterval := time.Duration(15) * time.Millisecond

var waitedTime = time.Duration(0)
for {
conn, _ := net.Dial("tcp", s.childAddress)
if conn != nil {
conn.Close()
break
} else {
if waitedTime < maxWaitTime {
time.Sleep(pollInterval)
waitedTime += pollInterval
} else {
return fmt.Errorf("Unable to dial child server, does it expose a http server at: %s?", s.childAddress)
}
}
}

return nil
}

Expand Down Expand Up @@ -199,6 +177,16 @@ func (s *Membrane) Start() error {
authServer := s.createUserServer()
v1.RegisterUserServer(s.grpcServer, authServer)

var pool worker.WorkerPool
// FaaS server MUST start before the child process
if s.mode == Mode_Faas {
faasPool := worker.NewFaasWorkerPool()
// Register the faas server
faasServer := grpc2.NewFaasServer(faasPool)
v1.RegisterFaasServer(s.grpcServer, faasServer)
pool = faasPool
}

lis, err := net.Listen("tcp", s.serviceAddress)
if err != nil {
return fmt.Errorf("Could not listen on configured service address: %v", err)
Expand All @@ -223,6 +211,14 @@ func (s *Membrane) Start() error {
s.log("No Child Command Specified, Skipping...")
}

if s.mode == Mode_HttpProxy {
httpPool := worker.NewHttpWorkerPool()
if err := httpPool.AddWorker(s.childAddress); err != nil {
return err
}
pool = httpPool
}

// FIXME: Only do this in Gateway mode...
// Otherwise always pass through to the provided child address
// Start the Gateway Server
Expand All @@ -231,19 +227,16 @@ func (s *Membrane) Start() error {
// data ingress/egress to our userland code
// The gateway should block the main thread but will
// use this callback as a control mechanism
s.log("Starting Gateway")
s.log("Waiting for active workers")
pool.WaitForActiveWorkers(5)

var hndlr handler.TriggerHandler
switch s.mode {
case Mode_Faas:
hndlr = handler.NewFaasHandler(s.childAddress)
break
case Mode_HttpProxy:
hndlr = handler.NewHttpHandler(s.childAddress)
break
}
hndler, err := pool.GetTriggerHandler()

return s.gatewayPlugin.Start(hndlr)
if err != nil {
return err
}
s.log("Starting Gateway")
return s.gatewayPlugin.Start(hndler)
}

func (s *Membrane) Stop() {
Expand Down
4 changes: 2 additions & 2 deletions worker/faas_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ func (s *FaasWorkerPool) GetTriggerHandler() (handler.TriggerHandler, error) {
}

// Synchronously wait for at least one active worker
func (s *FaasWorkerPool) WaitForActiveWorker(timeout int) error {
func (s *FaasWorkerPool) WaitForActiveWorkers(timeout int) error {
// Dial the child port to see if it's open and ready...
maxWaitTime := time.Duration(timeout) * time.Second
// Longer poll times, e.g. 200 milliseconds results in slow lambda cold starts (15s+)
pollInterval := time.Duration(15) * time.Millisecond

var waitedTime = time.Duration(0)
for {
if s.GetWorkerCount() >= 1 {
if s.getWorkerCount() >= 1 {
break
} else {
if waitedTime < maxWaitTime {
Expand Down
8 changes: 5 additions & 3 deletions worker/http_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package worker
import (
"fmt"
"sync"

"github.com/nitric-dev/membrane/handler"
)

type HttpWorkerPool struct {
Expand All @@ -12,7 +14,7 @@ type HttpWorkerPool struct {
}

// Ensure workers implement the trigger handler interface
func (s *FaasWorkerPool) GetTriggerHandler() (handler.TriggerHandler, error) {
func (s *HttpWorkerPool) GetTriggerHandler() (handler.TriggerHandler, error) {
s.workerLock.Lock()
defer s.workerLock.Unlock()

Expand All @@ -25,7 +27,7 @@ func (s *FaasWorkerPool) GetTriggerHandler() (handler.TriggerHandler, error) {
return s
}

func WaitForActiveWorker(timeout int) {
func (s *HttpWorkerPool) WaitForActiveWorkers(timeout int) error {
// Dial the child port to see if it's open and ready...
maxWaitTime := time.Duration(timeout) * time.Second
// Longer poll times, e.g. 200 milliseconds results in slow lambda cold starts (15s+)
Expand All @@ -48,7 +50,7 @@ func WaitForActiveWorker(timeout int) {
return nil
}

func getActiveWorkers() int {
func (s *HttpWorkerPool) getActiveWorkers() int {
s.workerLock.Lock()
defer s.workerLock.Unlock()

Expand Down
9 changes: 9 additions & 0 deletions worker/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package worker

import "github.com/nitric-dev/membrane/handler"

type WorkerPool interface {
// WaitForActiveWorkers - A blocking method
WaitForActiveWorkers(timeout int) error
GetTriggerHandler() (handler.TriggerHandler, error)
}

0 comments on commit 52d2e80

Please sign in to comment.