Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: CLI command 'feast serve' should start go-based server if flag is enabled #2617

Merged
merged 3 commits into from
Apr 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ compile-protos-go: install-go-proto-dependencies install-protoc-dependencies
cd sdk/python && python setup.py build_go_protos

compile-go-lib: install-go-proto-dependencies install-go-ci-dependencies
cd sdk/python && python setup.py build_go_lib
cd sdk/python && COMPILE_GO=True python setup.py build_ext --inplace

# Needs feast package to setup the feature store
test-go: compile-protos-go
Expand All @@ -178,7 +178,7 @@ format-go:
gofmt -s -w go/

lint-go: compile-protos-go
go vet ./go/internal/feast ./go/cmd/server
go vet ./go/internal/feast ./go/embedded

# Docker

Expand Down
File renamed without changes.
Binary file not shown.
74 changes: 0 additions & 74 deletions go/cmd/server/main.go

This file was deleted.

38 changes: 38 additions & 0 deletions go/embedded/online_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,15 @@ package embedded
import (
"context"
"fmt"
"github.com/feast-dev/feast/go/internal/feast/server"
"github.com/feast-dev/feast/go/internal/feast/server/logging"
"github.com/feast-dev/feast/go/protos/feast/serving"
"google.golang.org/grpc"
"log"
"net"
"os"
"os/signal"
"syscall"

"github.com/apache/arrow/go/v8/arrow"
"github.com/apache/arrow/go/v8/arrow/array"
Expand Down Expand Up @@ -198,6 +206,36 @@ func (s *OnlineFeatureService) GetOnlineFeatures(
return nil
}

func (s *OnlineFeatureService) StartGprcServer(host string, port int) error {
// TODO(oleksii): enable logging
// Disable logging for now
var loggingService *logging.LoggingService = nil
ser := server.NewGrpcServingServiceServer(s.fs, loggingService)
log.Printf("Starting a gRPC server on host %s port %d\n", host, port)
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port))
if err != nil {
return err
}
grpcServer := grpc.NewServer()
serving.RegisterServingServiceServer(grpcServer, ser)

// Notify this channel when receiving interrupt or termination signals from OS
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
go func() {
// As soon as these signals are received from OS, try to gracefully stop the gRPC server
<-c
fmt.Println("Stopping the gRPC server...")
grpcServer.GracefulStop()
}()

err = grpcServer.Serve(lis)
if err != nil {
return err
}
return nil
}

/*
Read Record Batch from memory managed by Python caller.
Python part uses C ABI interface to export this record into C Data Interface,
Expand Down
Empty file added go/internal/__init__.py
Empty file.
Empty file added go/internal/feast/__init__.py
Empty file.
8 changes: 5 additions & 3 deletions go/internal/feast/onlinestore/onlinestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ type OnlineStore interface {

func getOnlineStoreType(onlineStoreConfig map[string]interface{}) (string, bool) {
if onlineStoreType, ok := onlineStoreConfig["type"]; !ok {
return "", false
// If online store type isn't specified, default to sqlite
return "sqlite", true
} else {
result, ok := onlineStoreType.(string)
return result, ok
Expand All @@ -53,10 +54,11 @@ func getOnlineStoreType(onlineStoreConfig map[string]interface{}) (string, bool)
func NewOnlineStore(config *registry.RepoConfig) (OnlineStore, error) {
onlineStoreType, ok := getOnlineStoreType(config.OnlineStore)
if !ok {
return nil, fmt.Errorf("could not get online store type from online store config: %+v", config.OnlineStore)
} else if onlineStoreType == "sqlite" {
onlineStore, err := NewSqliteOnlineStore(config.Project, config, config.OnlineStore)
return onlineStore, err
}
if onlineStoreType == "redis" {
} else if onlineStoreType == "redis" {
onlineStore, err := NewRedisOnlineStore(config.Project, config.OnlineStore)
return onlineStore, err
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,29 @@
package main
package server

import (
"context"

"github.com/feast-dev/feast/go/cmd/server/logging"
"github.com/feast-dev/feast/go/internal/feast"
"github.com/feast-dev/feast/go/internal/feast/server/logging"
"github.com/feast-dev/feast/go/protos/feast/serving"
prototypes "github.com/feast-dev/feast/go/protos/feast/types"
"github.com/feast-dev/feast/go/types"
"github.com/google/uuid"
)

type servingServiceServer struct {
const feastServerVersion = "0.0.1"

type grpcServingServiceServer struct {
fs *feast.FeatureStore
loggingService *logging.LoggingService
serving.UnimplementedServingServiceServer
}

func newServingServiceServer(fs *feast.FeatureStore, loggingService *logging.LoggingService) *servingServiceServer {
return &servingServiceServer{fs: fs, loggingService: loggingService}
func NewGrpcServingServiceServer(fs *feast.FeatureStore, loggingService *logging.LoggingService) *grpcServingServiceServer {
return &grpcServingServiceServer{fs: fs, loggingService: loggingService}
}

func (s *servingServiceServer) GetFeastServingInfo(ctx context.Context, request *serving.GetFeastServingInfoRequest) (*serving.GetFeastServingInfoResponse, error) {
func (s *grpcServingServiceServer) GetFeastServingInfo(ctx context.Context, request *serving.GetFeastServingInfoRequest) (*serving.GetFeastServingInfoResponse, error) {
return &serving.GetFeastServingInfoResponse{
Version: feastServerVersion,
}, nil
Expand All @@ -30,7 +32,7 @@ func (s *servingServiceServer) GetFeastServingInfo(ctx context.Context, request
// Returns an object containing the response to GetOnlineFeatures.
// Metadata contains featurenames that corresponds to the number of rows in response.Results.
// Results contains values including the value of the feature, the event timestamp, and feature status in a columnar format.
func (s *servingServiceServer) GetOnlineFeatures(ctx context.Context, request *serving.GetOnlineFeaturesRequest) (*serving.GetOnlineFeaturesResponse, error) {
func (s *grpcServingServiceServer) GetOnlineFeatures(ctx context.Context, request *serving.GetOnlineFeaturesRequest) (*serving.GetOnlineFeaturesResponse, error) {
requestId := GenerateRequestId()
featuresOrService, err := s.fs.ParseFeatures(request.GetKind())
if err != nil {
Expand Down Expand Up @@ -74,7 +76,7 @@ func (s *servingServiceServer) GetOnlineFeatures(ctx context.Context, request *s
EventTimestamps: vector.Timestamps,
})
}
if featuresOrService.FeatureService != nil {
if featuresOrService.FeatureService != nil && s.loggingService != nil {
go s.loggingService.GenerateLogs(featuresOrService.FeatureService, entityValuesMap, resp.Results[len(request.Entities):], request.RequestContext, requestId)
}
return resp, nil
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package server

import (
"context"
Expand All @@ -16,8 +16,8 @@ import (
"github.com/apache/arrow/go/v8/arrow/memory"
"github.com/apache/arrow/go/v8/parquet/file"
"github.com/apache/arrow/go/v8/parquet/pqarrow"
"github.com/feast-dev/feast/go/cmd/server/logging"
"github.com/feast-dev/feast/go/internal/feast"
"github.com/feast-dev/feast/go/internal/feast/server/logging"
"github.com/feast-dev/feast/go/internal/test"
"github.com/feast-dev/feast/go/protos/feast/serving"
"github.com/feast-dev/feast/go/protos/feast/types"
Expand Down Expand Up @@ -73,7 +73,7 @@ func getClient(ctx context.Context, offlineStoreType string, basePath string, en
if err != nil {
panic(err)
}
servingServiceServer := newServingServiceServer(fs, loggingService)
servingServiceServer := NewGrpcServingServiceServer(fs, loggingService)

serving.RegisterServingServiceServer(server, servingServiceServer)
go func() {
Expand Down
Empty file.
3 changes: 3 additions & 0 deletions sdk/python/feast/embedded_go/online_features_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ def get_online_features(
resp = record_batch_to_online_response(record_batch)
return OnlineResponse(resp)

def start_grpc_server(self, host: str, port: int):
self._service.StartGprcServer(host, port)


def _to_arrow(value, type_hint: Optional[ValueType]) -> pa.Array:
if isinstance(value, Value_pb2.RepeatedValue):
Expand Down
33 changes: 22 additions & 11 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class FeatureStore:
repo_path: Path
_registry: Registry
_provider: Provider
_go_server: Optional["EmbeddedOnlineFeatureServer"]
_go_server: "EmbeddedOnlineFeatureServer"

@log_exceptions
def __init__(
Expand Down Expand Up @@ -1306,6 +1306,18 @@ def get_online_features(
native_entity_values=True,
)

def _lazy_init_go_server(self):
"""Lazily initialize self._go_server if it hasn't been initialized before."""
from feast.embedded_go.online_features_service import (
EmbeddedOnlineFeatureServer,
)

# Lazily start the go server on the first request
if self._go_server is None:
self._go_server = EmbeddedOnlineFeatureServer(
str(self.repo_path.absolute()), self.config, self
)

def _get_online_features(
self,
features: Union[List[str], FeatureService],
Expand All @@ -1323,15 +1335,7 @@ def _get_online_features(

# If Go feature server is enabled, send request to it instead of going through regular Python logic
if self.config.go_feature_retrieval:
from feast.embedded_go.online_features_service import (
EmbeddedOnlineFeatureServer,
)

# Lazily start the go server on the first request
if self._go_server is None:
self._go_server = EmbeddedOnlineFeatureServer(
str(self.repo_path.absolute()), self.config, self
)
self._lazy_init_go_server()

entity_native_values: Dict[str, List[Any]]
if not native_entity_values:
Expand Down Expand Up @@ -1957,7 +1961,14 @@ def _get_feature_views_to_use(
@log_exceptions_and_usage
def serve(self, host: str, port: int, no_access_log: bool) -> None:
"""Start the feature consumption server locally on a given port."""
feature_server.start_server(self, host, port, no_access_log)
if self.config.go_feature_retrieval:
# Start go server instead of python if the flag is enabled
self._lazy_init_go_server()
# TODO(tsotne) add http/grpc flag in CLI and call appropriate method here depending on that
self._go_server.start_grpc_server(host, port)
else:
# Start the python server if go server isn't enabled
feature_server.start_server(self, host, port, no_access_log)

@log_exceptions_and_usage
def get_feature_server_endpoint(self) -> Optional[str]:
Expand Down