Skip to content

Commit

Permalink
Merge pull request #62 from nitrictech/fix/exit-on-error
Browse files Browse the repository at this point in the history
Exit on error
  • Loading branch information
jyecusch authored Jun 25, 2021
2 parents a7c7447 + 699905c commit 86769cf
Show file tree
Hide file tree
Showing 17 changed files with 188 additions and 70 deletions.
10 changes: 9 additions & 1 deletion adapters/grpc/faas_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package grpc

import (
"fmt"

pb "github.com/nitric-dev/membrane/interfaces/nitric/v1"
"github.com/nitric-dev/membrane/worker"
)
Expand Down Expand Up @@ -47,7 +49,13 @@ func (s *FaasServer) TriggerStream(stream pb.Faas_TriggerStreamServer) error {
go worker.Listen(errchan)

// block here on error returned from the worker
return <-errchan
err := <-errchan
fmt.Println("FaaS stream closed, removing worker")

// Worker is done so we can remove it from the pool
s.pool.RemoveWorker(worker)

return err
}

func NewFaasServer(workerPool worker.WorkerPool) *FaasServer {
Expand Down
39 changes: 34 additions & 5 deletions membrane/membrane.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,13 +242,42 @@ func (s *Membrane) Start() error {
return err
}

s.log("Starting Gateway")
return s.gatewayPlugin.Start(s.pool)
gatewayErrchan := make(chan error)
poolErrchan := make(chan error)

// Start the gateway
go func(errch chan error) {
s.log("Starting Gateway")
errch <- s.gatewayPlugin.Start(s.pool)
}(gatewayErrchan)

// Start the worker pool monitor
go func(errch chan error) {
s.log("Starting Worker Supervisor")
errch <- s.pool.Monitor()
}(poolErrchan)

var exitErr error

// Wait and fail on either
select {
case gatewayErr := <-gatewayErrchan:
if err == nil {
// Normal Gateway shutdown
// Allowing the membrane to exit
return nil
}
exitErr = fmt.Errorf(fmt.Sprintf("Gateway Error: %v, exiting", gatewayErr))
case poolErr := <-poolErrchan:
exitErr = fmt.Errorf(fmt.Sprintf("Supervisor error: %v, exiting", poolErr))
}

return exitErr
}

func (s *Membrane) Stop() {
_ = s.gatewayPlugin.Stop()
s.grpcServer.GracefulStop()
s.gatewayPlugin.Stop()
s.grpcServer.Stop()
}

// Create a new Membrane server
Expand Down Expand Up @@ -293,7 +322,7 @@ func New(options *MembraneOptions) (*Membrane, error) {
}

if options.ChildTimeoutSeconds < 1 {
options.ChildTimeoutSeconds = 5
options.ChildTimeoutSeconds = 10
}

if options.GatewayPlugin == nil {
Expand Down
2 changes: 1 addition & 1 deletion membrane/membrane_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type MockAuthServer struct {
}

type MockFunction struct {
// Records the requests that its recieved for later inspection
// Records the requests that its received for later inspection
requests []*http.Request
// Returns a fixed HTTP response
response *http.Response
Expand Down
16 changes: 8 additions & 8 deletions mocks/worker/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,33 +29,33 @@ type MockWorker struct {
returnHttp *triggers.HttpResponse
httpError error
eventError error
RecievedEvents []*triggers.Event
RecievedRequests []*triggers.HttpRequest
ReceivedEvents []*triggers.Event
ReceivedRequests []*triggers.HttpRequest
}

func (m *MockWorker) HandleEvent(trigger *triggers.Event) error {
m.RecievedEvents = append(m.RecievedEvents, trigger)
m.ReceivedEvents = append(m.ReceivedEvents, trigger)

return m.eventError
}

func (m *MockWorker) HandleHttpRequest(trigger *triggers.HttpRequest) (*triggers.HttpResponse, error) {
m.RecievedRequests = append(m.RecievedRequests, trigger)
m.ReceivedRequests = append(m.ReceivedRequests, trigger)

return m.returnHttp, m.httpError
}

func (m *MockWorker) Reset() {
m.RecievedEvents = make([]*triggers.Event, 0)
m.RecievedRequests = make([]*triggers.HttpRequest, 0)
m.ReceivedEvents = make([]*triggers.Event, 0)
m.ReceivedRequests = make([]*triggers.HttpRequest, 0)
}

func NewMockWorker(opts *MockWorkerOptions) *MockWorker {
return &MockWorker{
httpError: opts.HttpError,
returnHttp: opts.ReturnHttp,
eventError: opts.eventError,
RecievedEvents: make([]*triggers.Event, 0),
RecievedRequests: make([]*triggers.HttpRequest, 0),
ReceivedEvents: make([]*triggers.Event, 0),
ReceivedRequests: make([]*triggers.HttpRequest, 0),
}
}
10 changes: 5 additions & 5 deletions plugins/gateway/appservice/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ var _ = Describe("Http", func() {
Expect(err).To(BeNil())

By("Handling exactly 1 request")
Expect(mockHandler.RecievedRequests).To(HaveLen(1))
Expect(mockHandler.ReceivedRequests).To(HaveLen(1))

handledRequest := mockHandler.RecievedRequests[0]
handledRequest := mockHandler.ReceivedRequests[0]

By("Having the provided path")
Expect(handledRequest.Path).To((Equal("/test/")))
Expand All @@ -102,7 +102,7 @@ var _ = Describe("Http", func() {
resp, _ := http.DefaultClient.Do(request)

By("Not invoking the nitric application")
Expect(mockHandler.RecievedRequests).To(BeEmpty())
Expect(mockHandler.ReceivedRequests).To(BeEmpty())

By("Returning a 200 response")
Expect(resp.StatusCode).To(Equal(200))
Expand Down Expand Up @@ -137,9 +137,9 @@ var _ = Describe("Http", func() {
_, _ = http.DefaultClient.Do(request)

By("Passing the event to the Nitric Application")
Expect(mockHandler.RecievedEvents).To(HaveLen(1))
Expect(mockHandler.ReceivedEvents).To(HaveLen(1))

event := mockHandler.RecievedEvents[0]
event := mockHandler.ReceivedEvents[0]
By("Having the provided requestId")
Expect(event.ID).To(Equal("1234"))

Expand Down
8 changes: 4 additions & 4 deletions plugins/gateway/cloudrun/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ var _ = Describe("Http", func() {
Expect(err).To(BeNil())

By("Handling exactly 1 request")
Expect(mockHandler.RecievedRequests).To(HaveLen(1))
Expect(mockHandler.ReceivedRequests).To(HaveLen(1))

handledRequest := mockHandler.RecievedRequests[0]
handledRequest := mockHandler.ReceivedRequests[0]
By("Preserving the original requests method")
Expect(handledRequest.Method).To(Equal("POST"))

Expand Down Expand Up @@ -143,9 +143,9 @@ var _ = Describe("Http", func() {
Expect(err).To(BeNil())

By("Handling exactly 1 request")
Expect(mockHandler.RecievedEvents).To(HaveLen(1))
Expect(mockHandler.ReceivedEvents).To(HaveLen(1))

handledEvent := mockHandler.RecievedEvents[0]
handledEvent := mockHandler.ReceivedEvents[0]

By("Passing through the pubsub message ID")
Expect(handledEvent.ID).To(Equal("test"))
Expand Down
8 changes: 4 additions & 4 deletions plugins/gateway/dev/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ var _ = Describe("Gateway", func() {
Expect(err).To(BeNil())

By("Passing through exactly 1 request")
Expect(mockHandler.RecievedRequests).To(HaveLen(1))
Expect(mockHandler.ReceivedRequests).To(HaveLen(1))

handledRequest := mockHandler.RecievedRequests[0]
handledRequest := mockHandler.ReceivedRequests[0]

By("Preserving the original request method")
Expect(handledRequest.Method).To(Equal("POST"))
Expand Down Expand Up @@ -113,9 +113,9 @@ var _ = Describe("Gateway", func() {
Expect(err).To(BeNil())

By("Passing through exactly 1 event")
Expect(mockHandler.RecievedEvents).To(HaveLen(1))
Expect(mockHandler.ReceivedEvents).To(HaveLen(1))

evt := mockHandler.RecievedEvents[0]
evt := mockHandler.ReceivedEvents[0]

By("Preserving the provided payload")
Expect(evt.Payload).To(BeEquivalentTo(payload))
Expand Down
10 changes: 5 additions & 5 deletions plugins/gateway/lambda/lambda_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ var _ = Describe("Lambda", func() {
client.Start(pool)

By("Handling a single HTTP request")
Expect(len(mockHandler.RecievedRequests)).To(Equal(1))
Expect(len(mockHandler.ReceivedRequests)).To(Equal(1))

request := mockHandler.RecievedRequests[0]
request := mockHandler.ReceivedRequests[0]

By("Retaining the body")
Expect(string(request.Body)).To(BeEquivalentTo("Test Payload"))
Expand All @@ -120,7 +120,7 @@ var _ = Describe("Lambda", func() {
})

Context("SNS Events", func() {
When("The Lambda Gateway recieves SNS events", func() {
When("The Lambda Gateway receives SNS events", func() {
topicName := "MyTopic"
eventPayload := map[string]interface{}{
"test": "test",
Expand Down Expand Up @@ -161,9 +161,9 @@ var _ = Describe("Lambda", func() {
client.Start(pool)

By("Handling a single event")
Expect(len(mockHandler.RecievedEvents)).To(Equal(1))
Expect(len(mockHandler.ReceivedEvents)).To(Equal(1))

request := mockHandler.RecievedEvents[0]
request := mockHandler.ReceivedEvents[0]

By("Containing the Source Topic")
Expect(request.Topic).To(Equal("MyTopic"))
Expand Down
2 changes: 1 addition & 1 deletion plugins/queue/dev/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ var _ = Describe("Queue", func() {
})
})

Context("Recieve", func() {
Context("Receive", func() {
When("The queue is empty", func() {
tasksBytes, _ := json.Marshal([]sdk.NitricTask{})
mockStorageDriver := mocks.NewMockStorageDriver(&mocks.MockStorageDriverOptions{
Expand Down
4 changes: 2 additions & 2 deletions plugins/queue/pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ var _ = Describe("Pubsub", func() {
})
})

Context("Recieve", func() {
Context("Receive", func() {

When("Popping from a queue that exists", func() {
When("There is a message on the queue", func() {
Expand Down Expand Up @@ -163,7 +163,7 @@ var _ = Describe("Pubsub", func() {
}, nil
})

It("Should recieve the message", func() {
It("Should receive the message", func() {
items, err := queuePlugin.Receive(sdk.ReceiveOptions{
QueueName: "mock-queue",
Depth: nil,
Expand Down
19 changes: 14 additions & 5 deletions providers/aws/membrane.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package main

import (
"fmt"
"log"
"os"
"os/signal"
Expand Down Expand Up @@ -68,10 +69,18 @@ func main() {
log.Fatalf("There was an error initialising the membrane server: %v", err)
}

go(func(){
<-term
m.Stop()
})()
errChan := make(chan error)
// Start the Membrane server
go func(chan error) {
errChan <- m.Start()
}(errChan)

m.Start()
select {
case membraneError := <-errChan:
fmt.Println(fmt.Sprintf("Membrane Error: %v, exiting", membraneError))
case sigTerm := <-term:
fmt.Println(fmt.Sprintf("Received %v, exiting", sigTerm))
}

m.Stop()
}
19 changes: 14 additions & 5 deletions providers/azure/membrane.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package main

import (
"fmt"
"log"
"os"
"os/signal"
Expand Down Expand Up @@ -51,10 +52,18 @@ func main() {
log.Fatalf("There was an error initialising the membrane server: %v", err)
}

go(func(){
<-term
m.Stop()
})()
errChan := make(chan error)
// Start the Membrane server
go func(chan error) {
errChan <- m.Start()
}(errChan)

m.Start()
select {
case membraneError := <-errChan:
fmt.Println(fmt.Sprintf("Membrane Error: %v, exiting", membraneError))
case sigTerm := <-term:
fmt.Println(fmt.Sprintf("Received %v, exiting", sigTerm))
}

m.Stop()
}
Loading

0 comments on commit 86769cf

Please sign in to comment.