diff --git a/docs/reference/feature-servers/python-feature-server.md b/docs/reference/feature-servers/python-feature-server.md index 352f0edc16..0b357565ee 100644 --- a/docs/reference/feature-servers/python-feature-server.md +++ b/docs/reference/feature-servers/python-feature-server.md @@ -2,7 +2,7 @@ ## Overview -The feature server is an HTTP endpoint that serves features with JSON I/O. This enables users to write + read features from Feast online stores using any programming language that can make HTTP requests. +The feature server is an HTTP endpoint that serves features with JSON I/O. This enables users to write + read features from Feast online stores using any programming language that can make HTTP requests. ## CLI @@ -155,6 +155,10 @@ curl -X POST \ ### Pushing features to the online store You can push data corresponding to a push source to the online store (note that timestamps need to be strings): +You can also define a pushmode to push offline data, either to the online store, offline store, or both. The feature server will throw an error if the online/offline +store doesn't support the push api functionality. + +The request definition for pushmode is a string parameter `to` where the options are: ["online", "offline", "both"]. ```text curl -X POST "http://localhost:6566/push" -d '{ "push_source_name": "driver_hourly_stats_push_source", @@ -187,9 +191,10 @@ event_dict = { } push_data = { "push_source_name":"driver_stats_push_source", - "df":event_dict + "df":event_dict, + "to":"online", } requests.post( - "http://localhost:6566/push", + "http://localhost:6566/push", data=json.dumps(push_data)) ``` diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 3682d84e57..f5c40d2421 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -914,6 +914,12 @@ def to_proto(self) -> DataSourceProto: return data_source_proto +class PushMode(enum.Enum): + ONLINE = 1 + OFFLINE = 2 + ONLINE_AND_OFFLINE = 3 + + @typechecked class PushSource(DataSource): """ diff --git a/sdk/python/feast/feature_server.py b/sdk/python/feast/feature_server.py index 8347bed6da..7bc634f7f5 100644 --- a/sdk/python/feast/feature_server.py +++ b/sdk/python/feast/feature_server.py @@ -12,6 +12,7 @@ import feast from feast import proto_json +from feast.data_source import PushMode from feast.protos.feast.serving.ServingService_pb2 import GetOnlineFeaturesRequest @@ -26,6 +27,7 @@ class PushFeaturesRequest(BaseModel): push_source_name: str df: dict allow_registry_cache: bool = True + to: str = "online" def get_app(store: "feast.FeatureStore"): @@ -80,10 +82,17 @@ def push(body=Depends(get_body)): try: request = PushFeaturesRequest(**json.loads(body)) df = pd.DataFrame(request.df) + if request.to == "offline": + to = PushMode.OFFLINE + elif request.to == "online": + to = PushMode.ONLINE + else: + to = PushMode.ONLINE_AND_OFFLINE store.push( push_source_name=request.push_source_name, df=df, allow_registry_cache=request.allow_registry_cache, + to=to, ) except Exception as e: # Print the original exception on the server side diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 9c2ea8a276..de52b9e3f3 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -43,7 +43,7 @@ from feast import feature_server, flags, flags_helper, ui_server, utils from feast.base_feature_view import BaseFeatureView from feast.batch_feature_view import BatchFeatureView -from feast.data_source import DataSource +from feast.data_source import DataSource, PushMode from feast.diff.infra_diff import InfraDiff, diff_infra_protos from feast.diff.registry_diff import RegistryDiff, apply_diff_to_registry, diff_between from feast.dqm.errors import ValidationFailed @@ -1341,15 +1341,20 @@ def tqdm_builder(length): @log_exceptions_and_usage def push( - self, push_source_name: str, df: pd.DataFrame, allow_registry_cache: bool = True + self, + push_source_name: str, + df: pd.DataFrame, + allow_registry_cache: bool = True, + to: PushMode = PushMode.ONLINE, ): """ Push features to a push source. This updates all the feature views that have the push source as stream source. Args: push_source_name: The name of the push source we want to push data to. - df: the data being pushed. - allow_registry_cache: whether to allow cached versions of the registry. + df: The data being pushed. + allow_registry_cache: Whether to allow cached versions of the registry. + to: Whether to push to online or offline store. Defaults to online store only. """ warnings.warn( "Push source is an experimental feature. " @@ -1373,9 +1378,14 @@ def push( } for fv in fvs_with_push_sources: - self.write_to_online_store( - fv.name, df, allow_registry_cache=allow_registry_cache - ) + if to == PushMode.ONLINE or to == PushMode.ONLINE_AND_OFFLINE: + self.write_to_online_store( + fv.name, df, allow_registry_cache=allow_registry_cache + ) + if to == PushMode.OFFLINE or to == PushMode.ONLINE_AND_OFFLINE: + self._write_to_offline_store( + fv.name, df, allow_registry_cache=allow_registry_cache + ) @log_exceptions_and_usage def write_to_online_store( diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index 9d18e6b249..8c6dd831dd 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -110,7 +110,9 @@ def offline_write_batch( set_usage_attribute("provider", self.__class__.__name__) if self.offline_store: - self.offline_store.offline_write_batch(config, feature_view, data, progress) + self.offline_store.__class__.offline_write_batch( + config, feature_view, data, progress + ) @log_exceptions_and_usage(sampler=RatioSampler(ratio=0.001)) def online_read( diff --git a/sdk/python/tests/integration/offline_store/test_offline_write.py b/sdk/python/tests/integration/offline_store/test_offline_write.py index 5e7a242513..997299c11b 100644 --- a/sdk/python/tests/integration/offline_store/test_offline_write.py +++ b/sdk/python/tests/integration/offline_store/test_offline_write.py @@ -123,7 +123,9 @@ def test_writing_consecutively_to_offline_store(environment, universal_data_sour Field(name="acc_rate", dtype=Float32), ], source=data_sources.driver, - ttl=timedelta(minutes=10), + ttl=timedelta( + minutes=10 + ), # This is to make sure all offline store data is out of date since get_historical_features() only searches backwards for a ttl window. ) now = datetime.utcnow() diff --git a/sdk/python/tests/integration/offline_store/test_push_offline_retrieval.py b/sdk/python/tests/integration/offline_store/test_push_offline_retrieval.py new file mode 100644 index 0000000000..b2f91f442e --- /dev/null +++ b/sdk/python/tests/integration/offline_store/test_push_offline_retrieval.py @@ -0,0 +1,67 @@ +import datetime + +import numpy as np +import pandas as pd +import pytest + +from feast.data_source import PushMode +from tests.integration.feature_repos.repo_configuration import ( + construct_universal_feature_views, +) +from tests.integration.feature_repos.universal.entities import ( + customer, + driver, + location, +) + + +@pytest.mark.integration +@pytest.mark.universal_offline_stores(only=["file", "redshift"]) +@pytest.mark.universal_online_stores(only=["sqlite"]) +def test_push_features_and_read_from_offline_store(environment, universal_data_sources): + store = environment.feature_store + + (_, _, data_sources) = universal_data_sources + feature_views = construct_universal_feature_views(data_sources) + now = pd.Timestamp(datetime.datetime.utcnow()).round("ms") + + store.apply([driver(), customer(), location(), *feature_views.values()]) + entity_df = pd.DataFrame.from_dict({"location_id": [1], "event_timestamp": [now]}) + + before_df = store.get_historical_features( + entity_df=entity_df, + features=["pushable_location_stats:temperature"], + full_feature_names=False, + ).to_df() + + data = { + "event_timestamp": [now], + "location_id": [1], + "temperature": [4], + "created": [now], + } + df_ingest = pd.DataFrame(data) + assert np.where( + before_df["location_id"].reset_index(drop=True) + == df_ingest["location_id"].reset_index(drop=True) + ) + assert np.where( + before_df["temperature"].reset_index(drop=True) + != df_ingest["temperature"].reset_index(drop=True) + ) + + store.push("location_stats_push_source", df_ingest, to=PushMode.OFFLINE) + + df = store.get_historical_features( + entity_df=entity_df, + features=["pushable_location_stats:temperature"], + full_feature_names=False, + ).to_df() + assert np.where( + df["location_id"].reset_index(drop=True) + == df_ingest["location_id"].reset_index(drop=True) + ) + assert np.where( + df["temperature"].reset_index(drop=True) + == df_ingest["temperature"].reset_index(drop=True) + )