From 43e198f6945c5e868ade341309f2c5ca39ac563e Mon Sep 17 00:00:00 2001 From: "bdodla@expedia.com" <13788369+EXPEbdodla@users.noreply.github.com> Date: Fri, 28 Jun 2024 17:58:35 -0700 Subject: [PATCH] fix: CGO Memory leak issue in GO Feature server (#4291) --- go/internal/feast/server/http_server.go | 24 +++++++++++++++---- .../feast/transformation/transformation.go | 12 ++++++++++ 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/go/internal/feast/server/http_server.go b/go/internal/feast/server/http_server.go index 75cdbe9929..7ebab429e7 100644 --- a/go/internal/feast/server/http_server.go +++ b/go/internal/feast/server/http_server.go @@ -4,14 +4,16 @@ import ( "context" "encoding/json" "fmt" + "net/http" + "time" + "github.com/feast-dev/feast/go/internal/feast" "github.com/feast-dev/feast/go/internal/feast/model" + "github.com/feast-dev/feast/go/internal/feast/onlineserving" "github.com/feast-dev/feast/go/internal/feast/server/logging" "github.com/feast-dev/feast/go/protos/feast/serving" prototypes "github.com/feast-dev/feast/go/protos/feast/types" "github.com/feast-dev/feast/go/types" - "net/http" - "time" ) type httpServer struct { @@ -210,6 +212,8 @@ func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) { "results": results, } + w.Header().Set("Content-Type", "application/json") + err = json.NewEncoder(w).Encode(response) if err != nil { @@ -217,8 +221,6 @@ func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) { return } - w.Header().Set("Content-Type", "application/json") - if featureService != nil && featureService.LoggingConfig != nil && s.loggingService != nil { logger, err := s.loggingService.GetOrCreateLogger(featureService) if err != nil { @@ -250,11 +252,19 @@ func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) { return } } + go releaseCGOMemory(featureVectors) +} + +func releaseCGOMemory(featureVectors []*onlineserving.FeatureVector) { + for _, vector := range featureVectors { + vector.Values.Release() + } } func (s *httpServer) Serve(host string, port int) error { s.server = &http.Server{Addr: fmt.Sprintf("%s:%d", host, port), Handler: nil} http.HandleFunc("/get-online-features", s.getOnlineFeatures) + http.HandleFunc("/health", healthCheckHandler) err := s.server.ListenAndServe() // Don't return the error if it's caused by graceful shutdown using Stop() if err == http.ErrServerClosed { @@ -262,6 +272,12 @@ func (s *httpServer) Serve(host string, port int) error { } return err } + +func healthCheckHandler(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + fmt.Fprintf(w, "Healthy") +} + func (s *httpServer) Stop() error { if s.server != nil { return s.server.Shutdown(context.Background()) diff --git a/go/internal/feast/transformation/transformation.go b/go/internal/feast/transformation/transformation.go index 1cf1dd3311..7e63aec224 100644 --- a/go/internal/feast/transformation/transformation.go +++ b/go/internal/feast/transformation/transformation.go @@ -46,6 +46,7 @@ func AugmentResponseWithOnDemandTransforms( for name, values := range requestData { requestContextArrow[name], err = types.ProtoValuesToArrowArray(values.Val, arrowMemory, numRows) if err != nil { + ReleaseArrowContext(requestContextArrow) return nil, err } } @@ -53,6 +54,7 @@ func AugmentResponseWithOnDemandTransforms( for name, values := range entityRows { requestContextArrow[name], err = types.ProtoValuesToArrowArray(values.Val, arrowMemory, numRows) if err != nil { + ReleaseArrowContext(requestContextArrow) return nil, err } } @@ -71,14 +73,24 @@ func AugmentResponseWithOnDemandTransforms( fullFeatureNames, ) if err != nil { + ReleaseArrowContext(requestContextArrow) return nil, err } result = append(result, onDemandFeatures...) + + ReleaseArrowContext(requestContextArrow) } return result, nil } +func ReleaseArrowContext(requestContextArrow map[string]arrow.Array) { + // Release memory used by requestContextArrow + for _, arrowArray := range requestContextArrow { + arrowArray.Release() + } +} + func CallTransformations( featureView *model.OnDemandFeatureView, retrievedFeatures map[string]arrow.Array,