diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 7a5a8299eb..29db73b199 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1370,6 +1370,28 @@ def write_to_online_store( provider = self._get_provider() provider.ingest_df(feature_view, entities, df) + @log_exceptions_and_usage + def write_to_offline_store( + self, + feature_view_name: str, + df: pd.DataFrame, + allow_registry_cache: bool = True, + ): + """ + ingests data directly into the Online store + """ + # TODO: restrict this to work with online StreamFeatureViews and validate the FeatureView type + try: + feature_view = self.get_stream_feature_view( + feature_view_name, allow_registry_cache=allow_registry_cache + ) + except FeatureViewNotFoundException: + feature_view = self.get_feature_view( + feature_view_name, allow_registry_cache=allow_registry_cache + ) + provider = self._get_provider() + provider.ingest_df_to_offline_store(feature_view, df) + @log_exceptions_and_usage def get_online_features( self, diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index cc06ad54c1..6c95283358 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -15,7 +15,7 @@ from abc import ABC, abstractmethod from datetime import datetime from pathlib import Path -from typing import TYPE_CHECKING, List, Optional, Union +from typing import TYPE_CHECKING, Any, Callable, List, Optional, Union import pandas as pd import pyarrow @@ -270,3 +270,25 @@ def write_logged_features( This is an optional method that could be supported only be some stores. """ raise NotImplementedError() + + @staticmethod + def offline_write_batch( + config: RepoConfig, + table: FeatureView, + data: pd.DataFrame, + progress: Optional[Callable[[int], Any]], + ): + """ + Write features to a specified destination in the offline store. + Data can be appended to an existing table (destination) or a new one will be created automatically + (if it doesn't exist). + Hence, this function can be called repeatedly with the same destination config to write features. + + Args: + config: Repo configuration object + table: FeatureView to write the data to. + data: dataframe containing feature data and timestamp column for historical feature retrieval + progress: Optional function to be called once every mini-batch of rows is written to + the online store. Can be used to display progress. + """ + raise NotImplementedError() diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index f04d03eb99..ef72541147 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -1,7 +1,7 @@ from datetime import datetime, timedelta from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union -import pandas +import pandas as pd import pyarrow import pyarrow as pa from tqdm import tqdm @@ -100,6 +100,17 @@ def online_write_batch( if self.online_store: self.online_store.online_write_batch(config, table, data, progress) + def offline_write_batch( + self, + config: RepoConfig, + table: FeatureView, + data: pd.DataFrame, + progress: Optional[Callable[[int], Any]], + ) -> None: + set_usage_attribute("provider", self.__class__.__name__) + if self.offline_store: + self.offline_store.offline_write_batch(config, table, data, progress) + @log_exceptions_and_usage(sampler=RatioSampler(ratio=0.001)) def online_read( self, @@ -117,7 +128,7 @@ def online_read( return result def ingest_df( - self, feature_view: FeatureView, entities: List[Entity], df: pandas.DataFrame, + self, feature_view: FeatureView, entities: List[Entity], df: pd.DataFrame, ): set_usage_attribute("provider", self.__class__.__name__) table = pa.Table.from_pandas(df) @@ -193,7 +204,7 @@ def get_historical_features( config: RepoConfig, feature_views: List[FeatureView], feature_refs: List[str], - entity_df: Union[pandas.DataFrame, str], + entity_df: Union[pd.DataFrame, str], registry: BaseRegistry, project: str, full_feature_names: bool, diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index e6c3da86a5..c6c9b75787 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -5,7 +5,7 @@ from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union import dask.dataframe as dd -import pandas +import pandas as pd import pyarrow from tqdm import tqdm @@ -119,13 +119,21 @@ def online_write_batch( ... def ingest_df( - self, feature_view: FeatureView, entities: List[Entity], df: pandas.DataFrame, + self, feature_view: FeatureView, entities: List[Entity], df: pd.DataFrame, ): """ Ingests a DataFrame directly into the online store """ pass + def ingest_df_to_offline_store( + self, feature_view: FeatureView, df: pd.DataFrame, + ): + """ + Ingests a DataFrame directly into the offline store + """ + pass + @abc.abstractmethod def materialize_single_feature_view( self, @@ -145,7 +153,7 @@ def get_historical_features( config: RepoConfig, feature_views: List[FeatureView], feature_refs: List[str], - entity_df: Union[pandas.DataFrame, str], + entity_df: Union[pd.DataFrame, str], registry: BaseRegistry, project: str, full_feature_names: bool, @@ -367,14 +375,14 @@ def _run_dask_field_mapping( def _coerce_datetime(ts): """ - Depending on underlying time resolution, arrow to_pydict() sometimes returns pandas + Depending on underlying time resolution, arrow to_pydict() sometimes returns pd timestamp type (for nanosecond resolution), and sometimes you get standard python datetime (for microsecond resolution). - While pandas timestamp class is a subclass of python datetime, it doesn't always behave the + While pd timestamp class is a subclass of python datetime, it doesn't always behave the same way. We convert it to normal datetime so that consumers downstream don't have to deal with these quirks. """ - if isinstance(ts, pandas.Timestamp): + if isinstance(ts, pd.Timestamp): return ts.to_pydatetime() else: return ts @@ -418,7 +426,7 @@ def _convert_arrow_to_proto( # Convert event_timestamps event_timestamps = [ _coerce_datetime(val) - for val in pandas.to_datetime( + for val in pd.to_datetime( table.column(feature_view.batch_source.timestamp_field).to_numpy( zero_copy_only=False ) @@ -429,7 +437,7 @@ def _convert_arrow_to_proto( if feature_view.batch_source.created_timestamp_column: created_timestamps = [ _coerce_datetime(val) - for val in pandas.to_datetime( + for val in pd.to_datetime( table.column( feature_view.batch_source.created_timestamp_column ).to_numpy(zero_copy_only=False)