Skip to content

Commit

Permalink
fix: Explicitly translate errors when instantiating the go fs (#2842)
Browse files Browse the repository at this point in the history
* fix: Explicitly translate errors when instantiating the go fs and convert bool params

Signed-off-by: Achal Shah <[email protected]>

* uncomment more

Signed-off-by: Achal Shah <[email protected]>
  • Loading branch information
achals authored Jun 23, 2022
1 parent 34c997d commit 7a2c4cd
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 13 deletions.
14 changes: 12 additions & 2 deletions go/embedded/online_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type OnlineFeatureService struct {
fs *feast.FeatureStore
grpcStopCh chan os.Signal
httpStopCh chan os.Signal

err error
}

type OnlineFeatureServiceConfig struct {
Expand All @@ -56,12 +58,16 @@ type LoggingOptions struct {
func NewOnlineFeatureService(conf *OnlineFeatureServiceConfig, transformationCallback transformation.TransformationCallback) *OnlineFeatureService {
repoConfig, err := registry.NewRepoConfigFromJSON(conf.RepoPath, conf.RepoConfig)
if err != nil {
log.Fatalln(err)
return &OnlineFeatureService{
err: err,
}
}

fs, err := feast.NewFeatureStore(repoConfig, transformationCallback)
if err != nil {
log.Fatalln(err)
return &OnlineFeatureService{
err: err,
}
}

// Notify these channels when receiving interrupt or termination signals from OS
Expand Down Expand Up @@ -121,6 +127,10 @@ func (s *OnlineFeatureService) GetEntityTypesMapByFeatureService(featureServiceN
return joinKeyTypes, nil
}

func (s *OnlineFeatureService) CheckForInstantiationError() error {
return s.err
}

func (s *OnlineFeatureService) GetOnlineFeatures(
featureRefs []string,
featureServiceName string,
Expand Down
18 changes: 12 additions & 6 deletions sdk/python/feast/embedded_go/online_features_service.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import faulthandler
from functools import partial
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
Expand Down Expand Up @@ -46,13 +45,16 @@ def __init__(
self._transformation_callback = partial(transformation_callback, feature_store)
self._logging_callback = partial(logging_callback, feature_store)

self._config = OnlineFeatureServiceConfig(
RepoPath=repo_path, RepoConfig=repo_config.json()
)

self._service = NewOnlineFeatureService(
OnlineFeatureServiceConfig(
RepoPath=repo_path, RepoConfig=repo_config.json()
),
self._transformation_callback,
self._config, self._transformation_callback,
)
faulthandler.enable()

# This should raise an exception if there were any errors in NewOnlineFeatureService.
self._service.CheckForInstantiationError()

def get_online_features(
self,
Expand Down Expand Up @@ -244,6 +246,10 @@ def transformation_callback(

input_record = pa.RecordBatch._import_from_c(input_arr_ptr, input_schema_ptr)

# For some reason, the callback is called with `full_feature_names` as a 1 if True or 0 if false. This handles
# the typeguard requirement.
full_feature_names = bool(full_feature_names)

output = odfv.get_transformed_features_df(
input_record.to_pandas(), full_feature_names=full_feature_names
)
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/tests/integration/e2e/test_go_feature_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def grpc_client(grpc_server_port):


@pytest.mark.integration
# @pytest.mark.goserver Disabling because the go fs tests are flaking in CI. TODO(achals): uncomment after fixed.
@pytest.mark.goserver
def test_go_grpc_server(grpc_client):
resp: GetOnlineFeaturesResponse = grpc_client.GetOnlineFeatures(
GetOnlineFeaturesRequest(
Expand All @@ -148,7 +148,7 @@ def test_go_grpc_server(grpc_client):


@pytest.mark.integration
# @pytest.mark.goserver Disabling because the go fs tests are flaking in CI. TODO(achals): uncomment after fixed.
@pytest.mark.goserver
def test_go_http_server(http_server_port):
response = requests.post(
f"http://localhost:{http_server_port}/get-online-features",
Expand Down Expand Up @@ -186,7 +186,7 @@ def test_go_http_server(http_server_port):


@pytest.mark.integration
# @pytest.mark.goserver Disabling because the go fs tests are flaking in CI. TODO(achals): uncomment after fixed.
@pytest.mark.goserver
@pytest.mark.universal_offline_stores
@pytest.mark.parametrize("full_feature_names", [True, False], ids=lambda v: str(v))
def test_feature_logging(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ def test_online_retrieval_with_event_timestamps(

@pytest.mark.integration
@pytest.mark.universal_online_stores
# @pytest.mark.goserver Disabling because the go fs tests are flaking in CI. TODO(achals): uncomment after fixed.
@pytest.mark.goserver
@pytest.mark.parametrize("full_feature_names", [True, False], ids=lambda v: str(v))
def test_stream_feature_view_online_retrieval(
environment, universal_data_sources, feature_server_endpoint, full_feature_names
Expand Down Expand Up @@ -519,7 +519,7 @@ def test_stream_feature_view_online_retrieval(

@pytest.mark.integration
@pytest.mark.universal_online_stores
# @pytest.mark.goserver Disabling because the go fs tests are flaking in CI. TODO(achals): uncomment after fixed.
@pytest.mark.goserver
@pytest.mark.parametrize("full_feature_names", [True, False], ids=lambda v: str(v))
def test_online_retrieval(
environment, universal_data_sources, feature_server_endpoint, full_feature_names
Expand Down

0 comments on commit 7a2c4cd

Please sign in to comment.