Skip to content

Commit

Permalink
refactor: Rework gateway plugins to use WorkerPools.
Browse files Browse the repository at this point in the history
  • Loading branch information
tjholm committed Jun 10, 2021
1 parent 8ec0692 commit 1df0893
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 28 deletions.
13 changes: 10 additions & 3 deletions plugins/gateway/app_platform/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,15 @@ type HttpGateway struct {
sdk.UnimplementedGatewayPlugin
}

func httpHandler(wrkr worker.Worker) func(ctx *fasthttp.RequestCtx) {
func httpHandler(pool worker.WorkerPool) func(ctx *fasthttp.RequestCtx) {
return func(ctx *fasthttp.RequestCtx) {
wrkr, err := pool.GetWorker()

if err != nil {
ctx.Error("Unable to get worker to handle request", 500)
return
}

httpTrigger := triggers.FromHttpRequest(ctx)
response, err := wrkr.HandleHttpRequest(httpTrigger)

Expand All @@ -52,9 +59,9 @@ func httpHandler(wrkr worker.Worker) func(ctx *fasthttp.RequestCtx) {
}
}

func (s *HttpGateway) Start(wrkr Worker) error {
func (s *HttpGateway) Start(pool worker.WorkerPool) error {
s.server = &fasthttp.Server{
Handler: httpHandler(wrkr),
Handler: httpHandler(pool),
}

return s.server.ListenAndServe(s.address)
Expand Down
12 changes: 9 additions & 3 deletions plugins/gateway/appservice/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,14 @@ func handleRequest(ctx *fasthttp.RequestCtx, wrkr worker.Worker) {
ctx.Response.SetBody(response.Body)
}

func httpHandler(wrkr worker.Worker) func(ctx *fasthttp.RequestCtx) {
func httpHandler(pool worker.WorkerPool) func(ctx *fasthttp.RequestCtx) {
return func(ctx *fasthttp.RequestCtx) {
wrkr, err := pool.GetWorker()

if err != nil {
ctx.Error("Unable to get worker to handle request", 500)
return
}
// Handle Event/Subscription Request Types
eventType := string(ctx.Request.Header.Peek("aeg-event-type"))

Expand Down Expand Up @@ -132,10 +138,10 @@ func httpHandler(wrkr worker.Worker) func(ctx *fasthttp.RequestCtx) {
}
}

func (s *HttpService) Start(wrkr worker.Worker) error {
func (s *HttpService) Start(pool worker.WorkerPool) error {
// Start the fasthttp server
s.server = &fasthttp.Server{
Handler: httpHandler(wrkr),
Handler: httpHandler(pool),
}

return s.server.ListenAndServe(s.address)
Expand Down
12 changes: 9 additions & 3 deletions plugins/gateway/cloudrun/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,14 @@ type PubSubMessage struct {
Subscription string `json:"subscription"`
}

func httpHandler(wrkr worker.Worker) func(ctx *fasthttp.RequestCtx) {
func httpHandler(pool worker.WorkerPool) func(ctx *fasthttp.RequestCtx) {
return func(ctx *fasthttp.RequestCtx) {
wrkr, err := pool.GetWorker()
if err != nil {
ctx.Error("Unable to get worker to handle request", 500)
return
}

bodyBytes := ctx.Request.Body()

// Check if the payload contains a pubsub event
Expand Down Expand Up @@ -90,10 +96,10 @@ func httpHandler(wrkr worker.Worker) func(ctx *fasthttp.RequestCtx) {
}
}

func (s *HttpProxyGateway) Start(wrkr worker.Worker) error {
func (s *HttpProxyGateway) Start(pool worker.WorkerPool) error {
// Start the fasthttp server
s.server = &fasthttp.Server{
Handler: httpHandler(wrkr),
Handler: httpHandler(pool),
}

return s.server.ListenAndServe(s.address)
Expand Down
2 changes: 1 addition & 1 deletion plugins/gateway/dev/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func httpHandler(pool worker.WorkerPool) func(ctx *fasthttp.RequestCtx) {
wrkr, err := pool.GetWorker()

if err != nil {
ctx.Error("Unable to retrieve worker for this event", 500)
ctx.Error("Unable to get worker for this event", 500)
return
}

Expand Down
11 changes: 8 additions & 3 deletions plugins/gateway/ecs/ecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,13 @@ type HttpProxyGateway struct {
server *fasthttp.Server
}

func (s *HttpProxyGateway) httpHandler(wrkr worker.Worker) func(*fasthttp.RequestCtx) {
func (s *HttpProxyGateway) httpHandler(pool worker.WorkerPool) func(*fasthttp.RequestCtx) {
return func(ctx *fasthttp.RequestCtx) {
wrkr, err := pool.GetWorker()
if err != nil {
ctx.Error("Unable to get work to handle this event", 500)
}

var trigger = ctx.UserAgent()

if string(trigger) == "Amazon Simple Notification Service Agent" {
Expand Down Expand Up @@ -106,10 +111,10 @@ func (s *HttpProxyGateway) httpHandler(wrkr worker.Worker) func(*fasthttp.Reques
}
}

func (s *HttpProxyGateway) Start(wrkr worker.Worker) error {
func (s *HttpProxyGateway) Start(pool worker.WorkerPool) error {
// Start the fasthttp server
s.server = &fasthttp.Server{
Handler: s.httpHandler(wrkr),
Handler: s.httpHandler(pool),
}

go (func() {
Expand Down
33 changes: 18 additions & 15 deletions plugins/gateway/lambda/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"strings"

events "github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/nitric-dev/membrane/handler"
"github.com/nitric-dev/membrane/sdk"
"github.com/nitric-dev/membrane/triggers"
"strings"
"github.com/nitric-dev/membrane/worker"
)

type eventType int
Expand Down Expand Up @@ -158,22 +159,24 @@ func (event *Event) UnmarshalJSON(data []byte) error {
}

type LambdaGateway struct {
handler handler.TriggerHandler
pool worker.WorkerPool
runtime LambdaRuntimeHandler
sdk.UnimplementedGatewayPlugin
finished chan int
}

func (s *LambdaGateway) handle(ctx context.Context, event Event) (interface{}, error) {
for _, request := range event.Requests {
// TODO: Build up an array of responses?
//in some cases we won't need to send a response as well...
// resp := s.handler(&request)
wrkr, err := pool.GetWorker()

if err != nil {
return fmt.Errorf("Unable to get worker to handle events")
}

for _, request := range event.Requests {
switch request.GetTriggerType() {
case triggers.TriggerType_Request:
if httpEvent, ok := request.(*triggers.HttpRequest); ok {
response, err := s.handler.HandleHttpRequest(httpEvent)
response, err := wrkr.HandleHttpRequest(httpEvent)

if err != nil {
return events.APIGatewayProxyResponse{
Expand Down Expand Up @@ -208,7 +211,7 @@ func (s *LambdaGateway) handle(ctx context.Context, event Event) (interface{}, e
break
case triggers.TriggerType_Subscription:
if event, ok := request.(*triggers.Event); ok {
if err := s.handler.HandleEvent(event); err != nil {
if err := wrkr.HandleEvent(event); err != nil {
return nil, err
}
} else {
Expand All @@ -221,13 +224,13 @@ func (s *LambdaGateway) handle(ctx context.Context, event Event) (interface{}, e
}

// Start the lambda gateway handler
func (s *LambdaGateway) Start(handler handler.TriggerHandler) error {
func (s *LambdaGateway) Start(pool worker.WorkerPool) error {
//s.finished = make(chan int)
s.handler = handler
s.pool = pool
// Here we want to begin polling lambda for incoming requests...
s.runtime(s.handle)
// Unblock the 'Stop' function if it's waiting.
go func(){s.finished <- 1}()
go func() { s.finished <- 1 }()
return nil
}

Expand All @@ -236,20 +239,20 @@ func (s *LambdaGateway) Stop() error {
// We don't need to stop listening to anything
fmt.Println("gateway 'Stop' called, waiting for lambda runtime to finish")
// Lambda can't be stopped, need to wait for it to finish
<- s.finished
<-s.finished
return nil
}

func New() (sdk.GatewayService, error) {
return &LambdaGateway{
runtime: lambda.Start,
runtime: lambda.Start,
finished: make(chan int),
}, nil
}

func NewWithRuntime(runtime LambdaRuntimeHandler) (sdk.GatewayService, error) {
return &LambdaGateway{
runtime: runtime,
runtime: runtime,
finished: make(chan int),
}, nil
}

0 comments on commit 1df0893

Please sign in to comment.