From 7415426e43247ea1ec83c3d78a35b8640170bd1c Mon Sep 17 00:00:00 2001 From: Tim Holm Date: Sat, 12 Jun 2021 10:45:44 +1000 Subject: [PATCH] feat: Add FaaS Http worker. --- triggers/http_response.go | 26 ++++++ worker/faas_http_worker.go | 162 +++++++++++++++++++++++++++++++++++++ worker/faas_worker.go | 12 +-- 3 files changed, 194 insertions(+), 6 deletions(-) create mode 100644 worker/faas_http_worker.go diff --git a/triggers/http_response.go b/triggers/http_response.go index 4f31cf31d..bf0c3747f 100644 --- a/triggers/http_response.go +++ b/triggers/http_response.go @@ -15,6 +15,9 @@ package triggers import ( + "fmt" + + pb "github.com/nitric-dev/membrane/interfaces/nitric/v1" "github.com/valyala/fasthttp" ) @@ -36,3 +39,26 @@ func FromHttpResponse(resp *fasthttp.Response) *HttpResponse { StatusCode: resp.StatusCode(), } } + +// FromTriggerResponse (csontructs a HttpResponse from a FaaS TriggerResponse) +func FromTriggerResponse(triggerResponse *pb.TriggerResponse) (*HttpResponse, error) { + // FIXME: This will panic if the incorrect response type is provided + httpContext := triggerResponse.GetHttp() + if httpContext != nil { + fasthttpHeader := &fasthttp.ResponseHeader{} + + if httpContext.GetHeaders() != nil { + for key, val := range httpContext.GetHeaders() { + fasthttpHeader.Add(key, val) + } + } + + return &HttpResponse{ + Header: fasthttpHeader, + StatusCode: int(httpContext.Status), + Body: triggerResponse.GetData(), + }, nil + } + + return nil, fmt.Errorf("TriggerResponse does not container HTTP Context") +} diff --git a/worker/faas_http_worker.go b/worker/faas_http_worker.go new file mode 100644 index 000000000..66bbb4e33 --- /dev/null +++ b/worker/faas_http_worker.go @@ -0,0 +1,162 @@ +package worker + +import ( + "encoding/json" + "fmt" + "net" + "time" + + pb "github.com/nitric-dev/membrane/interfaces/nitric/v1" + "github.com/nitric-dev/membrane/triggers" + "github.com/valyala/fasthttp" +) + +// A Nitric HTTP worker +type FaasHttpWorker struct { + address string +} + +var METHOD_TYPE = []byte("POST") + +// HandleEvent - Handles an event from a subscription by converting it to an HTTP request. +func (h *FaasHttpWorker) HandleEvent(trigger *triggers.Event) error { + address := fmt.Sprintf("http://%s", h.address) + request := fasthttp.AcquireRequest() + response := fasthttp.AcquireResponse() + + // Release resources after finishing + defer func() { + request.Reset() + response.Reset() + fasthttp.ReleaseRequest(request) + fasthttp.ReleaseResponse(response) + }() + + triggerRequest := &pb.TriggerRequest{ + Data: trigger.Payload, + Context: &pb.TriggerRequest_Topic{ + Topic: &pb.TopicTriggerContext{ + Topic: trigger.Topic, + }, + }, + } + + if jsonData, err := json.Marshal(triggerRequest); err != nil { + request := fasthttp.AcquireRequest() + + request.Header.SetContentType("application/json") + request.SetBody(jsonData) + request.SetRequestURI(address) + + err := fasthttp.Do(request, response) + + if err != nil { + return fmt.Errorf("Function request failed") + } + + // Response body should contain an instance of triggerResponse + var triggerResponse pb.TriggerResponse + err = json.Unmarshal(response.Body(), &triggerResponse) + + if err != nil { + return err + } + + topic := triggerResponse.GetTopic() + + if topic != nil { + if topic.Success { + return nil + } + + return fmt.Errorf("Topic context indicated processing was unsuccesful") + } + + return fmt.Errorf("Response from function did not contain topic context") + } else { + return fmt.Errorf("Error marshalling request") + } +} + +// HandleHttpRequest - Handles an HTTP request by forwarding it as an HTTP request. +func (h *FaasHttpWorker) HandleHttpRequest(trigger *triggers.HttpRequest) (*triggers.HttpResponse, error) { + address := fmt.Sprintf("http://%s", h.address) + request := fasthttp.AcquireRequest() + response := fasthttp.AcquireResponse() + + // Release resources after finishing + defer func() { + request.Reset() + response.Reset() + fasthttp.ReleaseRequest(request) + fasthttp.ReleaseResponse(response) + }() + + triggerRequest := &pb.TriggerRequest{ + Data: trigger.Body, + Context: &pb.TriggerRequest_Http{ + Http: &pb.HttpTriggerContext{ + Method: trigger.Method, + QueryParams: trigger.Query, + PathParams: make(map[string]string), + }, + }, + } + + if jsonData, err := json.Marshal(triggerRequest); err != nil { + request := fasthttp.AcquireRequest() + + request.Header.SetContentType("application/json") + request.SetBody(jsonData) + request.SetRequestURI(address) + + err := fasthttp.Do(request, response) + + if err != nil { + return nil, err + } + + // Response body should contain an instance of triggerResponse + var triggerResponse pb.TriggerResponse + err = json.Unmarshal(response.Body(), &triggerResponse) + + if err != nil { + return nil, err + } + + return triggers.FromTriggerResponse(&triggerResponse) + } else { + return nil, err + } +} + +// Creates a new FaasHttpWorker +// Will wait to ensure that the provided address is dialable +// before proceeding +func NewFaasHttpWorker(address string) (*FaasHttpWorker, error) { + // Dial the child port to see if it's open and ready... + maxWaitTime := time.Duration(5) * time.Second + // Longer poll times, e.g. 200 milliseconds results in slow lambda cold starts (15s+) + pollInterval := time.Duration(15) * time.Millisecond + + var waitedTime = time.Duration(0) + for { + conn, _ := net.Dial("tcp", address) + if conn != nil { + conn.Close() + break + } else { + if waitedTime < maxWaitTime { + time.Sleep(pollInterval) + waitedTime += pollInterval + } else { + return nil, fmt.Errorf("Unable to dial http worker, does it expose a http server at: %s?", address) + } + } + } + + // Dial the provided address to ensure its availability + return &FaasHttpWorker{ + address: address, + }, nil +} diff --git a/worker/faas_worker.go b/worker/faas_worker.go index 0205d1831..1f94ff3a4 100644 --- a/worker/faas_worker.go +++ b/worker/faas_worker.go @@ -13,7 +13,7 @@ import ( ) // FaasWorker -// Worker representation for a Nitric FaaS functon +// Worker representation for a Nitric FaaS function using gRPC type FaasWorker struct { // gRPC Stream for this worker stream pb.Faas_TriggerStreamServer @@ -168,11 +168,11 @@ func (s *FaasWorker) Listen(errchan chan error) { if msg.GetInitRequest() != nil { fmt.Println("Recieved init request from worker") - s.stream.Send(&pb.ServerMessage{ - Content: &pb.ServerMessage_InitResponse{ - InitResponse: &pb.InitResponse{}, - }, - }) + //s.stream.Send(&pb.ServerMessage{ + // Content: &pb.ServerMessage_InitResponse{ + // InitResponse: &pb.InitResponse{}, + // }, + //}) continue }