diff --git a/Makefile b/Makefile index 61549d4d0f..a808e3f4fb 100644 --- a/Makefile +++ b/Makefile @@ -167,7 +167,7 @@ compile-protos-go: install-go-proto-dependencies install-protoc-dependencies cd sdk/python && python setup.py build_go_protos compile-go-lib: install-go-proto-dependencies install-go-ci-dependencies - cd sdk/python && python setup.py build_go_lib + cd sdk/python && COMPILE_GO=True python setup.py build_ext --inplace # Needs feast package to setup the feature store test-go: compile-protos-go @@ -178,7 +178,7 @@ format-go: gofmt -s -w go/ lint-go: compile-protos-go - go vet ./go/internal/feast ./go/cmd/server + go vet ./go/internal/feast ./go/embedded # Docker diff --git a/go/cmd/server/logging/feature_repo/__init__.py b/go/__init__.py similarity index 100% rename from go/cmd/server/logging/feature_repo/__init__.py rename to go/__init__.py diff --git a/go/cmd/server/logging/feature_repo/data/online_store.db b/go/cmd/server/logging/feature_repo/data/online_store.db deleted file mode 100644 index b6ccea139e..0000000000 Binary files a/go/cmd/server/logging/feature_repo/data/online_store.db and /dev/null differ diff --git a/go/cmd/server/main.go b/go/cmd/server/main.go deleted file mode 100644 index 33d56e0a7a..0000000000 --- a/go/cmd/server/main.go +++ /dev/null @@ -1,74 +0,0 @@ -package main - -import ( - "fmt" - "log" - "net" - "os" - - "github.com/feast-dev/feast/go/cmd/server/logging" - "github.com/feast-dev/feast/go/internal/feast" - "github.com/feast-dev/feast/go/internal/feast/registry" - "github.com/feast-dev/feast/go/protos/feast/serving" - "google.golang.org/grpc" -) - -const ( - flagFeastRepoPath = "FEAST_REPO_PATH" - flagFeastRepoConfig = "FEAST_REPO_CONFIG" - flagFeastSockFile = "FEAST_GRPC_SOCK_FILE" - feastServerVersion = "0.18.0" -) - -// TODO: Add a proper logging library such as https://github.com/Sirupsen/logrus -func main() { - repoPath := os.Getenv(flagFeastRepoPath) - repoConfigJSON := os.Getenv(flagFeastRepoConfig) - sockFile := os.Getenv(flagFeastSockFile) - if repoPath == "" && repoConfigJSON == "" { - log.Fatalln(fmt.Sprintf("One of %s of %s environment variables must be set", flagFeastRepoPath, flagFeastRepoConfig)) - } - - var repoConfig *registry.RepoConfig - var err error - if repoConfigJSON != "" { - repoConfig, err = registry.NewRepoConfigFromJSON(repoPath, repoConfigJSON) - if err != nil { - log.Fatalln(err) - } - } else { - repoConfig, err = registry.NewRepoConfigFromFile(repoPath) - if err != nil { - log.Fatalln(err) - } - } - - log.Println("Initializing feature store...") - fs, err := feast.NewFeatureStore(repoConfig, nil) - if err != nil { - log.Fatalln(err) - } - // Disable logging for now - loggingService, err := logging.NewLoggingService(fs, 1000, "", false) - if err != nil { - log.Fatalln(err) - } - defer fs.DestructOnlineStore() - startGrpcServer(fs, loggingService, sockFile) -} - -func startGrpcServer(fs *feast.FeatureStore, loggingService *logging.LoggingService, sockFile string) { - server := newServingServiceServer(fs, loggingService) - log.Printf("Starting a gRPC server listening on %s\n", sockFile) - lis, err := net.Listen("unix", sockFile) - if err != nil { - log.Fatalln(err) - } - grpcServer := grpc.NewServer() - defer grpcServer.Stop() - serving.RegisterServingServiceServer(grpcServer, server) - err = grpcServer.Serve(lis) - if err != nil { - log.Fatalln(err) - } -} diff --git a/go/embedded/online_features.go b/go/embedded/online_features.go index 24a5489430..636ccd403b 100644 --- a/go/embedded/online_features.go +++ b/go/embedded/online_features.go @@ -3,7 +3,15 @@ package embedded import ( "context" "fmt" + "github.com/feast-dev/feast/go/internal/feast/server" + "github.com/feast-dev/feast/go/internal/feast/server/logging" + "github.com/feast-dev/feast/go/protos/feast/serving" + "google.golang.org/grpc" "log" + "net" + "os" + "os/signal" + "syscall" "github.com/apache/arrow/go/v8/arrow" "github.com/apache/arrow/go/v8/arrow/array" @@ -198,6 +206,36 @@ func (s *OnlineFeatureService) GetOnlineFeatures( return nil } +func (s *OnlineFeatureService) StartGprcServer(host string, port int) error { + // TODO(oleksii): enable logging + // Disable logging for now + var loggingService *logging.LoggingService = nil + ser := server.NewGrpcServingServiceServer(s.fs, loggingService) + log.Printf("Starting a gRPC server on host %s port %d\n", host, port) + lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port)) + if err != nil { + return err + } + grpcServer := grpc.NewServer() + serving.RegisterServingServiceServer(grpcServer, ser) + + // Notify this channel when receiving interrupt or termination signals from OS + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) + go func() { + // As soon as these signals are received from OS, try to gracefully stop the gRPC server + <-c + fmt.Println("Stopping the gRPC server...") + grpcServer.GracefulStop() + }() + + err = grpcServer.Serve(lis) + if err != nil { + return err + } + return nil +} + /* Read Record Batch from memory managed by Python caller. Python part uses C ABI interface to export this record into C Data Interface, diff --git a/go/internal/__init__.py b/go/internal/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/go/internal/feast/__init__.py b/go/internal/feast/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/go/internal/feast/onlinestore/onlinestore.go b/go/internal/feast/onlinestore/onlinestore.go index b4a2571480..433b96c08e 100644 --- a/go/internal/feast/onlinestore/onlinestore.go +++ b/go/internal/feast/onlinestore/onlinestore.go @@ -43,7 +43,8 @@ type OnlineStore interface { func getOnlineStoreType(onlineStoreConfig map[string]interface{}) (string, bool) { if onlineStoreType, ok := onlineStoreConfig["type"]; !ok { - return "", false + // If online store type isn't specified, default to sqlite + return "sqlite", true } else { result, ok := onlineStoreType.(string) return result, ok @@ -53,10 +54,11 @@ func getOnlineStoreType(onlineStoreConfig map[string]interface{}) (string, bool) func NewOnlineStore(config *registry.RepoConfig) (OnlineStore, error) { onlineStoreType, ok := getOnlineStoreType(config.OnlineStore) if !ok { + return nil, fmt.Errorf("could not get online store type from online store config: %+v", config.OnlineStore) + } else if onlineStoreType == "sqlite" { onlineStore, err := NewSqliteOnlineStore(config.Project, config, config.OnlineStore) return onlineStore, err - } - if onlineStoreType == "redis" { + } else if onlineStoreType == "redis" { onlineStore, err := NewRedisOnlineStore(config.Project, config.OnlineStore) return onlineStore, err } else { diff --git a/go/cmd/server/server.go b/go/internal/feast/server/grpc_server.go similarity index 76% rename from go/cmd/server/server.go rename to go/internal/feast/server/grpc_server.go index 3708689268..08f624b6bd 100644 --- a/go/cmd/server/server.go +++ b/go/internal/feast/server/grpc_server.go @@ -1,27 +1,29 @@ -package main +package server import ( "context" - "github.com/feast-dev/feast/go/cmd/server/logging" "github.com/feast-dev/feast/go/internal/feast" + "github.com/feast-dev/feast/go/internal/feast/server/logging" "github.com/feast-dev/feast/go/protos/feast/serving" prototypes "github.com/feast-dev/feast/go/protos/feast/types" "github.com/feast-dev/feast/go/types" "github.com/google/uuid" ) -type servingServiceServer struct { +const feastServerVersion = "0.0.1" + +type grpcServingServiceServer struct { fs *feast.FeatureStore loggingService *logging.LoggingService serving.UnimplementedServingServiceServer } -func newServingServiceServer(fs *feast.FeatureStore, loggingService *logging.LoggingService) *servingServiceServer { - return &servingServiceServer{fs: fs, loggingService: loggingService} +func NewGrpcServingServiceServer(fs *feast.FeatureStore, loggingService *logging.LoggingService) *grpcServingServiceServer { + return &grpcServingServiceServer{fs: fs, loggingService: loggingService} } -func (s *servingServiceServer) GetFeastServingInfo(ctx context.Context, request *serving.GetFeastServingInfoRequest) (*serving.GetFeastServingInfoResponse, error) { +func (s *grpcServingServiceServer) GetFeastServingInfo(ctx context.Context, request *serving.GetFeastServingInfoRequest) (*serving.GetFeastServingInfoResponse, error) { return &serving.GetFeastServingInfoResponse{ Version: feastServerVersion, }, nil @@ -30,7 +32,7 @@ func (s *servingServiceServer) GetFeastServingInfo(ctx context.Context, request // Returns an object containing the response to GetOnlineFeatures. // Metadata contains featurenames that corresponds to the number of rows in response.Results. // Results contains values including the value of the feature, the event timestamp, and feature status in a columnar format. -func (s *servingServiceServer) GetOnlineFeatures(ctx context.Context, request *serving.GetOnlineFeaturesRequest) (*serving.GetOnlineFeaturesResponse, error) { +func (s *grpcServingServiceServer) GetOnlineFeatures(ctx context.Context, request *serving.GetOnlineFeaturesRequest) (*serving.GetOnlineFeaturesResponse, error) { requestId := GenerateRequestId() featuresOrService, err := s.fs.ParseFeatures(request.GetKind()) if err != nil { @@ -74,7 +76,7 @@ func (s *servingServiceServer) GetOnlineFeatures(ctx context.Context, request *s EventTimestamps: vector.Timestamps, }) } - if featuresOrService.FeatureService != nil { + if featuresOrService.FeatureService != nil && s.loggingService != nil { go s.loggingService.GenerateLogs(featuresOrService.FeatureService, entityValuesMap, resp.Results[len(request.Entities):], request.RequestContext, requestId) } return resp, nil diff --git a/go/cmd/server/server_test.go b/go/internal/feast/server/grpc_server_test.go similarity index 98% rename from go/cmd/server/server_test.go rename to go/internal/feast/server/grpc_server_test.go index 9d4ffb50bf..090a873811 100644 --- a/go/cmd/server/server_test.go +++ b/go/internal/feast/server/grpc_server_test.go @@ -1,4 +1,4 @@ -package main +package server import ( "context" @@ -16,8 +16,8 @@ import ( "github.com/apache/arrow/go/v8/arrow/memory" "github.com/apache/arrow/go/v8/parquet/file" "github.com/apache/arrow/go/v8/parquet/pqarrow" - "github.com/feast-dev/feast/go/cmd/server/logging" "github.com/feast-dev/feast/go/internal/feast" + "github.com/feast-dev/feast/go/internal/feast/server/logging" "github.com/feast-dev/feast/go/internal/test" "github.com/feast-dev/feast/go/protos/feast/serving" "github.com/feast-dev/feast/go/protos/feast/types" @@ -73,7 +73,7 @@ func getClient(ctx context.Context, offlineStoreType string, basePath string, en if err != nil { panic(err) } - servingServiceServer := newServingServiceServer(fs, loggingService) + servingServiceServer := NewGrpcServingServiceServer(fs, loggingService) serving.RegisterServingServiceServer(server, servingServiceServer) go func() { diff --git a/go/internal/feast/server/logging/feature_repo/__init__.py b/go/internal/feast/server/logging/feature_repo/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/go/cmd/server/logging/feature_repo/driver_stats.parquet b/go/internal/feast/server/logging/feature_repo/driver_stats.parquet similarity index 100% rename from go/cmd/server/logging/feature_repo/driver_stats.parquet rename to go/internal/feast/server/logging/feature_repo/driver_stats.parquet diff --git a/go/cmd/server/logging/feature_repo/example.py b/go/internal/feast/server/logging/feature_repo/example.py similarity index 100% rename from go/cmd/server/logging/feature_repo/example.py rename to go/internal/feast/server/logging/feature_repo/example.py diff --git a/go/cmd/server/logging/feature_repo/feature_store.yaml b/go/internal/feast/server/logging/feature_repo/feature_store.yaml similarity index 100% rename from go/cmd/server/logging/feature_repo/feature_store.yaml rename to go/internal/feast/server/logging/feature_repo/feature_store.yaml diff --git a/go/cmd/server/logging/filelogstorage.go b/go/internal/feast/server/logging/filelogstorage.go similarity index 100% rename from go/cmd/server/logging/filelogstorage.go rename to go/internal/feast/server/logging/filelogstorage.go diff --git a/go/cmd/server/logging/filelogstorage_test.go b/go/internal/feast/server/logging/filelogstorage_test.go similarity index 100% rename from go/cmd/server/logging/filelogstorage_test.go rename to go/internal/feast/server/logging/filelogstorage_test.go diff --git a/go/cmd/server/logging/logging.go b/go/internal/feast/server/logging/logging.go similarity index 100% rename from go/cmd/server/logging/logging.go rename to go/internal/feast/server/logging/logging.go diff --git a/go/cmd/server/logging/logging_test.go b/go/internal/feast/server/logging/logging_test.go similarity index 100% rename from go/cmd/server/logging/logging_test.go rename to go/internal/feast/server/logging/logging_test.go diff --git a/go/cmd/server/logging/offlinelogstorage.go b/go/internal/feast/server/logging/offlinelogstorage.go similarity index 100% rename from go/cmd/server/logging/offlinelogstorage.go rename to go/internal/feast/server/logging/offlinelogstorage.go diff --git a/sdk/python/feast/embedded_go/online_features_service.py b/sdk/python/feast/embedded_go/online_features_service.py index 410af1d8fe..5bb27e2ae0 100644 --- a/sdk/python/feast/embedded_go/online_features_service.py +++ b/sdk/python/feast/embedded_go/online_features_service.py @@ -132,6 +132,9 @@ def get_online_features( resp = record_batch_to_online_response(record_batch) return OnlineResponse(resp) + def start_grpc_server(self, host: str, port: int): + self._service.StartGprcServer(host, port) + def _to_arrow(value, type_hint: Optional[ValueType]) -> pa.Array: if isinstance(value, Value_pb2.RepeatedValue): diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 95b1458849..13c73612f0 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -111,7 +111,7 @@ class FeatureStore: repo_path: Path _registry: Registry _provider: Provider - _go_server: Optional["EmbeddedOnlineFeatureServer"] + _go_server: "EmbeddedOnlineFeatureServer" @log_exceptions def __init__( @@ -1306,6 +1306,18 @@ def get_online_features( native_entity_values=True, ) + def _lazy_init_go_server(self): + """Lazily initialize self._go_server if it hasn't been initialized before.""" + from feast.embedded_go.online_features_service import ( + EmbeddedOnlineFeatureServer, + ) + + # Lazily start the go server on the first request + if self._go_server is None: + self._go_server = EmbeddedOnlineFeatureServer( + str(self.repo_path.absolute()), self.config, self + ) + def _get_online_features( self, features: Union[List[str], FeatureService], @@ -1323,15 +1335,7 @@ def _get_online_features( # If Go feature server is enabled, send request to it instead of going through regular Python logic if self.config.go_feature_retrieval: - from feast.embedded_go.online_features_service import ( - EmbeddedOnlineFeatureServer, - ) - - # Lazily start the go server on the first request - if self._go_server is None: - self._go_server = EmbeddedOnlineFeatureServer( - str(self.repo_path.absolute()), self.config, self - ) + self._lazy_init_go_server() entity_native_values: Dict[str, List[Any]] if not native_entity_values: @@ -1957,7 +1961,14 @@ def _get_feature_views_to_use( @log_exceptions_and_usage def serve(self, host: str, port: int, no_access_log: bool) -> None: """Start the feature consumption server locally on a given port.""" - feature_server.start_server(self, host, port, no_access_log) + if self.config.go_feature_retrieval: + # Start go server instead of python if the flag is enabled + self._lazy_init_go_server() + # TODO(tsotne) add http/grpc flag in CLI and call appropriate method here depending on that + self._go_server.start_grpc_server(host, port) + else: + # Start the python server if go server isn't enabled + feature_server.start_server(self, host, port, no_access_log) @log_exceptions_and_usage def get_feature_server_endpoint(self) -> Optional[str]: