From 5af785a05289c0568545be4530c3b264cb39f8e0 Mon Sep 17 00:00:00 2001 From: Jye Cusch Date: Tue, 25 May 2021 11:22:29 +1000 Subject: [PATCH] fix: issue with calling SIGTERM from lambda gateway Made 'start()' blocking again and made the 'stop()' execute in a go routine, which will unblock start when the servers shutdown. --- membrane/membrane.go | 5 ++--- plugins/gateway/app_platform/http.go | 14 +++++--------- plugins/gateway/appservice/http.go | 13 +++++-------- plugins/gateway/cloudrun/http.go | 13 +++++-------- plugins/gateway/dev/gateway.go | 13 +++++-------- plugins/gateway/lambda/lambda.go | 16 ++++++++++------ providers/aws/membrane.go | 10 ++++++---- providers/azure/membrane.go | 10 ++++++---- 8 files changed, 44 insertions(+), 50 deletions(-) diff --git a/membrane/membrane.go b/membrane/membrane.go index a0654046a..fb0eee84c 100644 --- a/membrane/membrane.go +++ b/membrane/membrane.go @@ -242,9 +242,8 @@ func (s *Membrane) Start() error { hndlr = handler.NewHttpHandler(s.childAddress) break } - - err = s.gatewayPlugin.Start(hndlr) - return err + + return s.gatewayPlugin.Start(hndlr) } func (s *Membrane) Stop() { diff --git a/plugins/gateway/app_platform/http.go b/plugins/gateway/app_platform/http.go index b79370fa5..6e13952a6 100644 --- a/plugins/gateway/app_platform/http.go +++ b/plugins/gateway/app_platform/http.go @@ -60,18 +60,14 @@ func (s *HttpGateway) Start(handler handler.TriggerHandler) error { Handler: httpHandler(handler), } - go (func() { - err := s.server.ListenAndServe(s.address) - if err != nil { - panic(err) - } - })() - - return nil + return s.server.ListenAndServe(s.address) } func (s *HttpGateway) Stop() error { - return s.server.Shutdown() + if s.server != nil { + return s.server.Shutdown() + } + return nil } // Create new HTTP gateway diff --git a/plugins/gateway/appservice/http.go b/plugins/gateway/appservice/http.go index 4c16432e4..57d8ca10f 100644 --- a/plugins/gateway/appservice/http.go +++ b/plugins/gateway/appservice/http.go @@ -138,17 +138,14 @@ func (s *HttpService) Start(handler handler.TriggerHandler) error { Handler: httpHandler(handler), } - go (func() { - err := s.server.ListenAndServe(s.address) - if err != nil { - panic(err) - } - })() - return nil + return s.server.ListenAndServe(s.address) } func (s *HttpService) Stop() error { - return s.server.Shutdown() + if s.server != nil { + return s.server.Shutdown() + } + return nil } // Create a new HTTP Gateway plugin diff --git a/plugins/gateway/cloudrun/http.go b/plugins/gateway/cloudrun/http.go index 910378e48..92b881be8 100644 --- a/plugins/gateway/cloudrun/http.go +++ b/plugins/gateway/cloudrun/http.go @@ -95,17 +95,14 @@ func (s *HttpProxyGateway) Start(handler handler.TriggerHandler) error { Handler: httpHandler(handler), } - go (func() { - err := s.server.ListenAndServe(s.address) - if err != nil { - panic(err) - } - })() - return nil + return s.server.ListenAndServe(s.address) } func (s *HttpProxyGateway) Stop() error { - return s.server.Shutdown() + if s.server != nil { + return s.server.Shutdown() + } + return nil } // Create new DynamoDB documents server diff --git a/plugins/gateway/dev/gateway.go b/plugins/gateway/dev/gateway.go index c8817dee2..f42776d59 100644 --- a/plugins/gateway/dev/gateway.go +++ b/plugins/gateway/dev/gateway.go @@ -86,17 +86,14 @@ func (s *HttpGateway) Start(handler handler.TriggerHandler) error { Handler: httpHandler(handler), } - go (func() { - err := s.server.ListenAndServe(s.address) - if err != nil { - panic(err) - } - })() - return nil + return s.server.ListenAndServe(s.address) } func (s *HttpGateway) Stop() error { - return s.server.Shutdown() + if s.server != nil { + return s.server.Shutdown() + } + return nil } // Create new HTTP gateway diff --git a/plugins/gateway/lambda/lambda.go b/plugins/gateway/lambda/lambda.go index 2638729e6..f26f3dfa1 100644 --- a/plugins/gateway/lambda/lambda.go +++ b/plugins/gateway/lambda/lambda.go @@ -25,7 +25,6 @@ import ( "github.com/nitric-dev/membrane/sdk" "github.com/nitric-dev/membrane/triggers" "strings" - "syscall" ) type eventType int @@ -162,6 +161,7 @@ type LambdaGateway struct { handler handler.TriggerHandler runtime LambdaRuntimeHandler sdk.UnimplementedGatewayPlugin + finished chan int } func (s *LambdaGateway) handle(ctx context.Context, event Event) (interface{}, error) { @@ -222,30 +222,34 @@ func (s *LambdaGateway) handle(ctx context.Context, event Event) (interface{}, e // Start the lambda gateway handler func (s *LambdaGateway) Start(handler handler.TriggerHandler) error { + //s.finished = make(chan int) s.handler = handler // Here we want to begin polling lambda for incoming requests... - // Assuming that this is blocking s.runtime(s.handle) - // Signal process to terminate if no more lambda requests to handle - return syscall.Kill(syscall.Getpid(), syscall.SIGTERM) + // Unblock the 'Stop' function if it's waiting. + go func(){s.finished <- 1}() + return nil } func (s *LambdaGateway) Stop() error { // XXX: This is a NO_OP Process, as this is a pull based system // We don't need to stop listening to anything - fmt.Println("Shutting down lambda gateway") - + fmt.Println("gateway 'Stop' called, waiting for lambda runtime to finish") + // Lambda can't be stopped, need to wait for it to finish + <- s.finished return nil } func New() (sdk.GatewayService, error) { return &LambdaGateway{ runtime: lambda.Start, + finished: make(chan int), }, nil } func NewWithRuntime(runtime LambdaRuntimeHandler) (sdk.GatewayService, error) { return &LambdaGateway{ runtime: runtime, + finished: make(chan int), }, nil } diff --git a/providers/aws/membrane.go b/providers/aws/membrane.go index ba355337c..72c4be67a 100644 --- a/providers/aws/membrane.go +++ b/providers/aws/membrane.go @@ -68,8 +68,10 @@ func main() { log.Fatalf("There was an error initialising the membrane server: %v", err) } - go (m.Start)() - // Wait for a terminate interrupt - <-term - m.Stop() + go(func(){ + <-term + m.Stop() + })() + + m.Start() } diff --git a/providers/azure/membrane.go b/providers/azure/membrane.go index 9a8d425f1..cd634c790 100644 --- a/providers/azure/membrane.go +++ b/providers/azure/membrane.go @@ -51,8 +51,10 @@ func main() { log.Fatalf("There was an error initialising the membrane server: %v", err) } - go (m.Start)() - // Wait for a terminate interrupt - <-term - m.Stop() + go(func(){ + <-term + m.Stop() + })() + + m.Start() }