diff --git a/plugins/gateway/app_platform/http.go b/plugins/gateway/app_platform/http.go index 4d7a21710..d0e93aaff 100644 --- a/plugins/gateway/app_platform/http.go +++ b/plugins/gateway/app_platform/http.go @@ -31,8 +31,15 @@ type HttpGateway struct { sdk.UnimplementedGatewayPlugin } -func httpHandler(wrkr worker.Worker) func(ctx *fasthttp.RequestCtx) { +func httpHandler(pool worker.WorkerPool) func(ctx *fasthttp.RequestCtx) { return func(ctx *fasthttp.RequestCtx) { + wrkr, err := pool.GetWorker() + + if err != nil { + ctx.Error("Unable to get worker to handle request", 500) + return + } + httpTrigger := triggers.FromHttpRequest(ctx) response, err := wrkr.HandleHttpRequest(httpTrigger) @@ -52,9 +59,9 @@ func httpHandler(wrkr worker.Worker) func(ctx *fasthttp.RequestCtx) { } } -func (s *HttpGateway) Start(wrkr Worker) error { +func (s *HttpGateway) Start(pool worker.WorkerPool) error { s.server = &fasthttp.Server{ - Handler: httpHandler(wrkr), + Handler: httpHandler(pool), } return s.server.ListenAndServe(s.address) diff --git a/plugins/gateway/appservice/http.go b/plugins/gateway/appservice/http.go index 45d81ad62..d50f7d4d2 100644 --- a/plugins/gateway/appservice/http.go +++ b/plugins/gateway/appservice/http.go @@ -100,8 +100,14 @@ func handleRequest(ctx *fasthttp.RequestCtx, wrkr worker.Worker) { ctx.Response.SetBody(response.Body) } -func httpHandler(wrkr worker.Worker) func(ctx *fasthttp.RequestCtx) { +func httpHandler(pool worker.WorkerPool) func(ctx *fasthttp.RequestCtx) { return func(ctx *fasthttp.RequestCtx) { + wrkr, err := pool.GetWorker() + + if err != nil { + ctx.Error("Unable to get worker to handle request", 500) + return + } // Handle Event/Subscription Request Types eventType := string(ctx.Request.Header.Peek("aeg-event-type")) @@ -132,10 +138,10 @@ func httpHandler(wrkr worker.Worker) func(ctx *fasthttp.RequestCtx) { } } -func (s *HttpService) Start(wrkr worker.Worker) error { +func (s *HttpService) Start(pool worker.WorkerPool) error { // Start the fasthttp server s.server = &fasthttp.Server{ - Handler: httpHandler(wrkr), + Handler: httpHandler(pool), } return s.server.ListenAndServe(s.address) diff --git a/plugins/gateway/cloudrun/http.go b/plugins/gateway/cloudrun/http.go index 02f37b82d..a2fd50985 100644 --- a/plugins/gateway/cloudrun/http.go +++ b/plugins/gateway/cloudrun/http.go @@ -40,8 +40,14 @@ type PubSubMessage struct { Subscription string `json:"subscription"` } -func httpHandler(wrkr worker.Worker) func(ctx *fasthttp.RequestCtx) { +func httpHandler(pool worker.WorkerPool) func(ctx *fasthttp.RequestCtx) { return func(ctx *fasthttp.RequestCtx) { + wrkr, err := pool.GetWorker() + if err != nil { + ctx.Error("Unable to get worker to handle request", 500) + return + } + bodyBytes := ctx.Request.Body() // Check if the payload contains a pubsub event @@ -90,10 +96,10 @@ func httpHandler(wrkr worker.Worker) func(ctx *fasthttp.RequestCtx) { } } -func (s *HttpProxyGateway) Start(wrkr worker.Worker) error { +func (s *HttpProxyGateway) Start(pool worker.WorkerPool) error { // Start the fasthttp server s.server = &fasthttp.Server{ - Handler: httpHandler(wrkr), + Handler: httpHandler(pool), } return s.server.ListenAndServe(s.address) diff --git a/plugins/gateway/dev/gateway.go b/plugins/gateway/dev/gateway.go index e7f57630d..faabeee4e 100644 --- a/plugins/gateway/dev/gateway.go +++ b/plugins/gateway/dev/gateway.go @@ -38,7 +38,7 @@ func httpHandler(pool worker.WorkerPool) func(ctx *fasthttp.RequestCtx) { wrkr, err := pool.GetWorker() if err != nil { - ctx.Error("Unable to retrieve worker for this event", 500) + ctx.Error("Unable to get worker for this event", 500) return } diff --git a/plugins/gateway/ecs/ecs.go b/plugins/gateway/ecs/ecs.go index e89ad4b1d..17560b798 100644 --- a/plugins/gateway/ecs/ecs.go +++ b/plugins/gateway/ecs/ecs.go @@ -42,8 +42,13 @@ type HttpProxyGateway struct { server *fasthttp.Server } -func (s *HttpProxyGateway) httpHandler(wrkr worker.Worker) func(*fasthttp.RequestCtx) { +func (s *HttpProxyGateway) httpHandler(pool worker.WorkerPool) func(*fasthttp.RequestCtx) { return func(ctx *fasthttp.RequestCtx) { + wrkr, err := pool.GetWorker() + if err != nil { + ctx.Error("Unable to get work to handle this event", 500) + } + var trigger = ctx.UserAgent() if string(trigger) == "Amazon Simple Notification Service Agent" { @@ -106,10 +111,10 @@ func (s *HttpProxyGateway) httpHandler(wrkr worker.Worker) func(*fasthttp.Reques } } -func (s *HttpProxyGateway) Start(wrkr worker.Worker) error { +func (s *HttpProxyGateway) Start(pool worker.WorkerPool) error { // Start the fasthttp server s.server = &fasthttp.Server{ - Handler: s.httpHandler(wrkr), + Handler: s.httpHandler(pool), } go (func() { diff --git a/plugins/gateway/lambda/lambda.go b/plugins/gateway/lambda/lambda.go index f26f3dfa1..c2d5cb1be 100644 --- a/plugins/gateway/lambda/lambda.go +++ b/plugins/gateway/lambda/lambda.go @@ -19,12 +19,13 @@ import ( "encoding/base64" "encoding/json" "fmt" + "strings" + events "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" - "github.com/nitric-dev/membrane/handler" "github.com/nitric-dev/membrane/sdk" "github.com/nitric-dev/membrane/triggers" - "strings" + "github.com/nitric-dev/membrane/worker" ) type eventType int @@ -158,22 +159,24 @@ func (event *Event) UnmarshalJSON(data []byte) error { } type LambdaGateway struct { - handler handler.TriggerHandler + pool worker.WorkerPool runtime LambdaRuntimeHandler sdk.UnimplementedGatewayPlugin finished chan int } func (s *LambdaGateway) handle(ctx context.Context, event Event) (interface{}, error) { - for _, request := range event.Requests { - // TODO: Build up an array of responses? - //in some cases we won't need to send a response as well... - // resp := s.handler(&request) + wrkr, err := pool.GetWorker() + if err != nil { + return fmt.Errorf("Unable to get worker to handle events") + } + + for _, request := range event.Requests { switch request.GetTriggerType() { case triggers.TriggerType_Request: if httpEvent, ok := request.(*triggers.HttpRequest); ok { - response, err := s.handler.HandleHttpRequest(httpEvent) + response, err := wrkr.HandleHttpRequest(httpEvent) if err != nil { return events.APIGatewayProxyResponse{ @@ -208,7 +211,7 @@ func (s *LambdaGateway) handle(ctx context.Context, event Event) (interface{}, e break case triggers.TriggerType_Subscription: if event, ok := request.(*triggers.Event); ok { - if err := s.handler.HandleEvent(event); err != nil { + if err := wrkr.HandleEvent(event); err != nil { return nil, err } } else { @@ -221,13 +224,13 @@ func (s *LambdaGateway) handle(ctx context.Context, event Event) (interface{}, e } // Start the lambda gateway handler -func (s *LambdaGateway) Start(handler handler.TriggerHandler) error { +func (s *LambdaGateway) Start(pool worker.WorkerPool) error { //s.finished = make(chan int) - s.handler = handler + s.pool = pool // Here we want to begin polling lambda for incoming requests... s.runtime(s.handle) // Unblock the 'Stop' function if it's waiting. - go func(){s.finished <- 1}() + go func() { s.finished <- 1 }() return nil } @@ -236,20 +239,20 @@ func (s *LambdaGateway) Stop() error { // We don't need to stop listening to anything fmt.Println("gateway 'Stop' called, waiting for lambda runtime to finish") // Lambda can't be stopped, need to wait for it to finish - <- s.finished + <-s.finished return nil } func New() (sdk.GatewayService, error) { return &LambdaGateway{ - runtime: lambda.Start, + runtime: lambda.Start, finished: make(chan int), }, nil } func NewWithRuntime(runtime LambdaRuntimeHandler) (sdk.GatewayService, error) { return &LambdaGateway{ - runtime: runtime, + runtime: runtime, finished: make(chan int), }, nil }