From 58ce39d93fa43532203666623a83b40ac3ccde3e Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Tue, 14 Jun 2022 16:31:45 -0700 Subject: [PATCH 01/11] Skaffolding for offline store push Signed-off-by: Kevin Zhang --- sdk/python/feast/infra/offline_stores/offline_store.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index 6c95283358..3d1e941435 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -28,6 +28,8 @@ from feast.registry import BaseRegistry from feast.repo_config import RepoConfig from feast.saved_dataset import SavedDatasetStorage +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto if TYPE_CHECKING: from feast.saved_dataset import ValidationReference From 656fb9f218218d779c350c2219cb7b470f02d9f5 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Tue, 14 Jun 2022 16:33:14 -0700 Subject: [PATCH 02/11] LInt Signed-off-by: Kevin Zhang --- sdk/python/feast/infra/offline_stores/offline_store.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index 3d1e941435..10769b61f6 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -25,11 +25,11 @@ from feast.feature_logging import LoggingConfig, LoggingSource from feast.feature_view import FeatureView from feast.on_demand_feature_view import OnDemandFeatureView +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.registry import BaseRegistry from feast.repo_config import RepoConfig from feast.saved_dataset import SavedDatasetStorage -from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto -from feast.protos.feast.types.Value_pb2 import Value as ValueProto if TYPE_CHECKING: from feast.saved_dataset import ValidationReference From 83f0d2f229a4ad58991dd87ae5d01012136b1277 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Tue, 14 Jun 2022 16:55:26 -0700 Subject: [PATCH 03/11] Fix Signed-off-by: Kevin Zhang --- sdk/python/feast/infra/offline_stores/offline_store.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index 10769b61f6..6c95283358 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -25,8 +25,6 @@ from feast.feature_logging import LoggingConfig, LoggingSource from feast.feature_view import FeatureView from feast.on_demand_feature_view import OnDemandFeatureView -from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto -from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.registry import BaseRegistry from feast.repo_config import RepoConfig from feast.saved_dataset import SavedDatasetStorage From c21f6c2f3b765d4576785bfe3595107469b74340 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Thu, 16 Jun 2022 14:07:54 -0700 Subject: [PATCH 04/11] File source offline push Signed-off-by: Kevin Zhang --- sdk/python/feast/feature_store.py | 3 +- sdk/python/feast/infra/offline_stores/file.py | 28 ++- .../infra/offline_stores/offline_store.py | 4 +- .../feast/infra/passthrough_provider.py | 14 +- sdk/python/feast/infra/provider.py | 2 +- .../offline_store/test_offline_push.py | 196 ++++++++++++++++++ 6 files changed, 241 insertions(+), 6 deletions(-) create mode 100644 sdk/python/tests/integration/offline_store/test_offline_push.py diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 29db73b199..63aa2a2552 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1389,8 +1389,9 @@ def write_to_offline_store( feature_view = self.get_feature_view( feature_view_name, allow_registry_cache=allow_registry_cache ) + table = pa.Table.from_pandas(df) provider = self._get_provider() - provider.ingest_df_to_offline_store(feature_view, df) + provider.ingest_df_to_offline_store(feature_view, table) @log_exceptions_and_usage def get_online_features( diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index 7288223883..f587bc99a7 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -1,12 +1,13 @@ import uuid from datetime import datetime from pathlib import Path -from typing import Callable, List, Optional, Tuple, Union +from typing import Callable, List, Optional, Tuple, Union, Any import dask.dataframe as dd import pandas as pd import pyarrow import pyarrow.dataset +from pyarrow import csv import pyarrow.parquet import pytz from pydantic.typing import Literal @@ -404,6 +405,31 @@ def write_logged_features( existing_data_behavior="overwrite_or_ignore", ) + @staticmethod + def offline_write_batch(config: RepoConfig, table: FeatureView, data: pyarrow.Table, progress: Optional[Callable[[int], Any]]): + if not table.batch_source: + raise ValueError("feature view does not have a batch source to persist offline data") + if not isinstance(config.offline_store, FileOfflineStoreConfig): + raise ValueError(f"offline store config is of type {type(config.offline_store)} when file type required") + if not isinstance(table.batch_source, FileSource): + raise ValueError(f"feature view batch source is {type(table.batch_source)} not file source") + file_options = table.batch_source.file_options + filesystem, path = FileSource.create_filesystem_and_path( + file_options.uri, file_options.s3_endpoint_override + ) + + prev_table = pyarrow.parquet.read_table(path, memory_map=True) + if(prev_table.column_names != data.column_names): + raise ValueError(f"Input dataframe have columns in wrong order, columns should be in the order: {prev_table.column_names}") + if(data.schema != prev_table.schema): + data = data.cast(prev_table.schema) + new_table = pyarrow.concat_tables([data, prev_table]) + writer = pyarrow.parquet.ParquetWriter( + path, + data.schema, + filesystem=filesystem) + writer.write_table(new_table) + writer.close() def _get_entity_df_event_timestamp_range( entity_df: Union[pd.DataFrame, str], entity_df_event_timestamp_col: str, diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index 6c95283358..5dbb43fec9 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -275,7 +275,7 @@ def write_logged_features( def offline_write_batch( config: RepoConfig, table: FeatureView, - data: pd.DataFrame, + data: pyarrow.Table, progress: Optional[Callable[[int], Any]], ): """ @@ -287,7 +287,7 @@ def offline_write_batch( Args: config: Repo configuration object table: FeatureView to write the data to. - data: dataframe containing feature data and timestamp column for historical feature retrieval + data: pyarrow table containing feature data and timestamp column for historical feature retrieval progress: Optional function to be called once every mini-batch of rows is written to the online store. Can be used to display progress. """ diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index ef72541147..7b19be988e 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -104,10 +104,14 @@ def offline_write_batch( self, config: RepoConfig, table: FeatureView, - data: pd.DataFrame, + data: pa.Table, progress: Optional[Callable[[int], Any]], ) -> None: set_usage_attribute("provider", self.__class__.__name__) + + if "created" not in data.column_names: + raise ValueError("input dataframe must have a created timestamp column") + if self.offline_store: self.offline_store.offline_write_batch(config, table, data, progress) @@ -143,6 +147,14 @@ def ingest_df( self.repo_config, feature_view, rows_to_write, progress=None ) + def ingest_df_to_offline_store(self, feature_view: FeatureView, table: pa.Table): + set_usage_attribute("provider", self.__class__.__name__) + + if feature_view.batch_source.field_mapping is not None: + table = _run_field_mapping(table, feature_view.batch_source.field_mapping) + + self.offline_write_batch(self.repo_config, feature_view, table, None) + def materialize_single_feature_view( self, config: RepoConfig, diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index c6c9b75787..d2e37e69db 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -127,7 +127,7 @@ def ingest_df( pass def ingest_df_to_offline_store( - self, feature_view: FeatureView, df: pd.DataFrame, + self, feature_view: FeatureView, df: pyarrow.Table, ): """ Ingests a DataFrame directly into the offline store diff --git a/sdk/python/tests/integration/offline_store/test_offline_push.py b/sdk/python/tests/integration/offline_store/test_offline_push.py new file mode 100644 index 0000000000..d31a6ebf77 --- /dev/null +++ b/sdk/python/tests/integration/offline_store/test_offline_push.py @@ -0,0 +1,196 @@ + +import datetime +from datetime import datetime, timedelta + +import numpy as np +import pandas as pd +import pytest +import tempfile +import uuid + +from feast.data_format import ParquetFormat + +from feast import FeatureView, Field, FileSource +from feast.types import Int32, Float32 +from feast.wait import wait_retry_backoff +from tests.integration.feature_repos.repo_configuration import ( + construct_universal_feature_views, +) +from tests.integration.feature_repos.universal.data_sources.file import FileDataSourceCreator +from tests.integration.feature_repos.universal.entities import ( + customer, + driver, + location, +) +from tests.integration.feature_repos.universal.feature_views import conv_rate_plus_100 +from tests.utils.logged_features import prepare_logs, to_logs_dataset + +@pytest.mark.integration +@pytest.mark.universal_online_stores(only=["sqlite"]) +def test_writing_incorrect_order_fails(environment, universal_data_sources): + # TODO(kevjumba) handle incorrect order later, for now schema must be in the order that the filesource is in + store = environment.feature_store + _, _, data_sources = universal_data_sources + driver_stats = FeatureView( + name="driver_stats", + entities=["driver"], + schema=[ + Field(name="avg_daily_trips", dtype=Int32), + Field(name="conv_rate", dtype=Float32), + ], + source=data_sources.driver, + ) + + now = datetime.utcnow() + ts = pd.Timestamp(now).round("ms") + + entity_df = pd.DataFrame.from_dict( + { + "driver_id": [1001, 1002], + "event_timestamp": [ + ts-timedelta(hours=3), + ts, + ], + } + ) + + store.apply([driver(), driver_stats]) + df = store.get_historical_features( + entity_df=entity_df, + features=[ + "driver_stats:conv_rate", + "driver_stats:avg_daily_trips" + ], + full_feature_names=False, + ).to_df() + + assert df["conv_rate"].isnull().all() + assert df["avg_daily_trips"].isnull().all() + + expected_df = pd.DataFrame.from_dict( + { + "driver_id": [1001, 1002], + "event_timestamp": [ + ts-timedelta(hours=3), + ts, + ], + "conv_rate": [0.1, 0.2], + "avg_daily_trips": [1, 2], + "created": [ts, ts] + }, + ) + with pytest.raises(ValueError): + store.write_to_offline_store(driver_stats.name, expected_df, allow_registry_cache=False) + +@pytest.mark.integration +@pytest.mark.universal_online_stores(only=["sqlite"]) +def test_writing_consecutively_to_offline_store(environment, universal_data_sources): + store = environment.feature_store + _, _, data_sources = universal_data_sources + driver_stats = FeatureView( + name="driver_stats", + entities=["driver"], + schema=[ + Field(name="avg_daily_trips", dtype=Int32), + Field(name="conv_rate", dtype=Float32), + ], + source=data_sources.driver, + ttl=timedelta(minutes=10), + ) + + now = datetime.utcnow() + ts = pd.Timestamp(now, unit='ns') + + entity_df = pd.DataFrame.from_dict( + { + "driver_id": [1001, 1002], + "event_timestamp": [ + ts-timedelta(hours=4), + ts-timedelta(hours=3), + ], + } + ) + + store.apply([driver(), driver_stats]) + df = store.get_historical_features( + entity_df=entity_df, + features=[ + "driver_stats:conv_rate", + "driver_stats:avg_daily_trips" + ], + full_feature_names=False, + ).to_df() + + assert df["conv_rate"].isnull().all() + assert df["avg_daily_trips"].isnull().all() + + first_df = pd.DataFrame.from_dict( + { + "event_timestamp": [ + ts-timedelta(hours=4), + ts-timedelta(hours=3), + ], + "driver_id": [1001, 1001], + "conv_rate": [0.1, 0.2], + "acc_rate": [0.5, 0.6], + "avg_daily_trips": [1, 2], + "created": [ts, ts] + }, + ) + store.write_to_offline_store(driver_stats.name, first_df, allow_registry_cache=False) + + after_write_df = store.get_historical_features( + entity_df=entity_df, + features=[ + "driver_stats:conv_rate", + "driver_stats:avg_daily_trips" + ], + full_feature_names=False, + ).to_df() + + assert len(after_write_df) == len(first_df) + assert np.where(after_write_df["conv_rate"].reset_index(drop=True) == first_df["conv_rate"].reset_index(drop=True)) + assert np.where(after_write_df["avg_daily_trips"].reset_index(drop=True) == first_df["avg_daily_trips"].reset_index(drop=True)) + + second_df = pd.DataFrame.from_dict( + { + "event_timestamp": [ + ts-timedelta(hours=1), + ts, + ], + "driver_id": [1001, 1001], + "conv_rate": [0.3, 0.4], + "acc_rate": [0.8, 0.9], + "avg_daily_trips": [3, 4], + "created": [ts, ts] + }, + ) + + store.write_to_offline_store(driver_stats.name, second_df, allow_registry_cache=False) + + entity_df = pd.DataFrame.from_dict( + { + "driver_id": [1001, 1001, 1001, 1001], + "event_timestamp": [ + ts-timedelta(hours=4), + ts-timedelta(hours=3), + ts-timedelta(hours=1), + ts, + ], + } + ) + + after_write_df = store.get_historical_features( + entity_df=entity_df, + features=[ + "driver_stats:conv_rate", + "driver_stats:avg_daily_trips" + ], + full_feature_names=False, + ).to_df() + + expected_df = pd.concat([first_df, second_df]) + assert len(after_write_df) == len(expected_df) + assert np.where(after_write_df["conv_rate"].reset_index(drop=True) == expected_df["conv_rate"].reset_index(drop=True)) + assert np.where(after_write_df["avg_daily_trips"].reset_index(drop=True) == expected_df["avg_daily_trips"].reset_index(drop=True)) + From d4b678ac4b549c5c07eaa730991aec42ed6681bd Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Thu, 16 Jun 2022 15:38:26 -0700 Subject: [PATCH 05/11] Fix Signed-off-by: Kevin Zhang --- sdk/python/feast/infra/offline_stores/file.py | 2 +- .../offline_store/test_offline_push.py | 79 ++++++++++++++++--- 2 files changed, 69 insertions(+), 12 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index f587bc99a7..b03bff0da3 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -420,7 +420,7 @@ def offline_write_batch(config: RepoConfig, table: FeatureView, data: pyarrow.Ta prev_table = pyarrow.parquet.read_table(path, memory_map=True) if(prev_table.column_names != data.column_names): - raise ValueError(f"Input dataframe have columns in wrong order, columns should be in the order: {prev_table.column_names}") + raise ValueError(f"Input dataframe has incorrect schema or wrong order, expected columns are: {prev_table.column_names}") if(data.schema != prev_table.schema): data = data.cast(prev_table.schema) new_table = pyarrow.concat_tables([data, prev_table]) diff --git a/sdk/python/tests/integration/offline_store/test_offline_push.py b/sdk/python/tests/integration/offline_store/test_offline_push.py index d31a6ebf77..4b6fb557f4 100644 --- a/sdk/python/tests/integration/offline_store/test_offline_push.py +++ b/sdk/python/tests/integration/offline_store/test_offline_push.py @@ -5,8 +5,7 @@ import numpy as np import pandas as pd import pytest -import tempfile -import uuid +import random from feast.data_format import ParquetFormat @@ -74,8 +73,66 @@ def test_writing_incorrect_order_fails(environment, universal_data_sources): ts-timedelta(hours=3), ts, ], - "conv_rate": [0.1, 0.2], - "avg_daily_trips": [1, 2], + "conv_rate": [random.random(), random.random()], + "avg_daily_trips": [random.randint(0, 10), random.randint(0, 10)], + "created": [ts, ts] + }, + ) + with pytest.raises(ValueError): + store.write_to_offline_store(driver_stats.name, expected_df, allow_registry_cache=False) + + +@pytest.mark.integration +@pytest.mark.universal_online_stores(only=["sqlite"]) +def test_writing_incorrect_schema_fails(environment, universal_data_sources): + # TODO(kevjumba) handle incorrect order later, for now schema must be in the order that the filesource is in + store = environment.feature_store + _, _, data_sources = universal_data_sources + driver_stats = FeatureView( + name="driver_stats", + entities=["driver"], + schema=[ + Field(name="avg_daily_trips", dtype=Int32), + Field(name="conv_rate", dtype=Float32), + ], + source=data_sources.driver, + ) + + now = datetime.utcnow() + ts = pd.Timestamp(now).round("ms") + + entity_df = pd.DataFrame.from_dict( + { + "driver_id": [1001, 1002], + "event_timestamp": [ + ts-timedelta(hours=3), + ts, + ], + } + ) + + store.apply([driver(), driver_stats]) + df = store.get_historical_features( + entity_df=entity_df, + features=[ + "driver_stats:conv_rate", + "driver_stats:avg_daily_trips" + ], + full_feature_names=False, + ).to_df() + + assert df["conv_rate"].isnull().all() + assert df["avg_daily_trips"].isnull().all() + + expected_df = pd.DataFrame.from_dict( + { + "event_timestamp": [ + ts-timedelta(hours=3), + ts, + ], + "driver_id": [1001, 1002], + "conv_rate": [random.random(), random.random()], + "incorrect_schema": [random.randint(0, 10), random.randint(0, 10)], "created": [ts, ts] }, ) @@ -103,7 +160,7 @@ def test_writing_consecutively_to_offline_store(environment, universal_data_sour entity_df = pd.DataFrame.from_dict( { - "driver_id": [1001, 1002], + "driver_id": [1001, 1001], "event_timestamp": [ ts-timedelta(hours=4), ts-timedelta(hours=3), @@ -131,9 +188,9 @@ def test_writing_consecutively_to_offline_store(environment, universal_data_sour ts-timedelta(hours=3), ], "driver_id": [1001, 1001], - "conv_rate": [0.1, 0.2], - "acc_rate": [0.5, 0.6], - "avg_daily_trips": [1, 2], + "conv_rate": [random.random(), random.random()], + "acc_rate": [random.random(), random.random()], + "avg_daily_trips": [random.randint(0, 10), random.randint(0, 10)], "created": [ts, ts] }, ) @@ -159,9 +216,9 @@ def test_writing_consecutively_to_offline_store(environment, universal_data_sour ts, ], "driver_id": [1001, 1001], - "conv_rate": [0.3, 0.4], - "acc_rate": [0.8, 0.9], - "avg_daily_trips": [3, 4], + "conv_rate": [random.random(), random.random()], + "acc_rate": [random.random(), random.random()], + "avg_daily_trips": [random.randint(0, 10), random.randint(0, 10)], "created": [ts, ts] }, ) From 3e96ac14b217a0908d55a4835fb522db1ad6986e Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Thu, 16 Jun 2022 15:55:55 -0700 Subject: [PATCH 06/11] Fix Signed-off-by: Kevin Zhang --- .../integration/offline_store/test_offline_push.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/sdk/python/tests/integration/offline_store/test_offline_push.py b/sdk/python/tests/integration/offline_store/test_offline_push.py index 4b6fb557f4..85adc542fc 100644 --- a/sdk/python/tests/integration/offline_store/test_offline_push.py +++ b/sdk/python/tests/integration/offline_store/test_offline_push.py @@ -25,7 +25,7 @@ from tests.utils.logged_features import prepare_logs, to_logs_dataset @pytest.mark.integration -@pytest.mark.universal_online_stores(only=["sqlite"]) +@pytest.mark.universal_online_stores def test_writing_incorrect_order_fails(environment, universal_data_sources): # TODO(kevjumba) handle incorrect order later, for now schema must be in the order that the filesource is in store = environment.feature_store @@ -83,7 +83,7 @@ def test_writing_incorrect_order_fails(environment, universal_data_sources): @pytest.mark.integration -@pytest.mark.universal_online_stores(only=["sqlite"]) +@pytest.mark.universal_online_stores def test_writing_incorrect_schema_fails(environment, universal_data_sources): # TODO(kevjumba) handle incorrect order later, for now schema must be in the order that the filesource is in store = environment.feature_store @@ -140,7 +140,7 @@ def test_writing_incorrect_schema_fails(environment, universal_data_sources): store.write_to_offline_store(driver_stats.name, expected_df, allow_registry_cache=False) @pytest.mark.integration -@pytest.mark.universal_online_stores(only=["sqlite"]) +@pytest.mark.universal_online_stores def test_writing_consecutively_to_offline_store(environment, universal_data_sources): store = environment.feature_store _, _, data_sources = universal_data_sources @@ -150,6 +150,7 @@ def test_writing_consecutively_to_offline_store(environment, universal_data_sour schema=[ Field(name="avg_daily_trips", dtype=Int32), Field(name="conv_rate", dtype=Float32), + Field(name="acc_rate", dtype=Float32), ], source=data_sources.driver, ttl=timedelta(minutes=10), @@ -173,6 +174,7 @@ def test_writing_consecutively_to_offline_store(environment, universal_data_sour entity_df=entity_df, features=[ "driver_stats:conv_rate", + "driver_stats:avg_daily_trips" ], full_feature_names=False, @@ -241,6 +243,7 @@ def test_writing_consecutively_to_offline_store(environment, universal_data_sour entity_df=entity_df, features=[ "driver_stats:conv_rate", + "driver_stats:acc_rate", "driver_stats:avg_daily_trips" ], full_feature_names=False, @@ -249,5 +252,5 @@ def test_writing_consecutively_to_offline_store(environment, universal_data_sour expected_df = pd.concat([first_df, second_df]) assert len(after_write_df) == len(expected_df) assert np.where(after_write_df["conv_rate"].reset_index(drop=True) == expected_df["conv_rate"].reset_index(drop=True)) + assert np.where(after_write_df["acc_rate"].reset_index(drop=True) == expected_df["acc_rate"].reset_index(drop=True)) assert np.where(after_write_df["avg_daily_trips"].reset_index(drop=True) == expected_df["avg_daily_trips"].reset_index(drop=True)) - From f2d77efe35320908c26f0931e99f7c2a9002b598 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Thu, 16 Jun 2022 15:58:34 -0700 Subject: [PATCH 07/11] Fix Signed-off-by: Kevin Zhang --- sdk/python/feast/infra/offline_stores/file.py | 37 +++-- .../offline_store/test_offline_push.py | 141 +++++++----------- 2 files changed, 81 insertions(+), 97 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index b03bff0da3..69b826e60e 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -1,15 +1,15 @@ import uuid from datetime import datetime from pathlib import Path -from typing import Callable, List, Optional, Tuple, Union, Any +from typing import Any, Callable, List, Optional, Tuple, Union import dask.dataframe as dd import pandas as pd import pyarrow import pyarrow.dataset -from pyarrow import csv import pyarrow.parquet import pytz +from pyarrow import csv from pydantic.typing import Literal from feast import FileSource, OnDemandFeatureView @@ -406,31 +406,42 @@ def write_logged_features( ) @staticmethod - def offline_write_batch(config: RepoConfig, table: FeatureView, data: pyarrow.Table, progress: Optional[Callable[[int], Any]]): + def offline_write_batch( + config: RepoConfig, + table: FeatureView, + data: pyarrow.Table, + progress: Optional[Callable[[int], Any]], + ): if not table.batch_source: - raise ValueError("feature view does not have a batch source to persist offline data") + raise ValueError( + "feature view does not have a batch source to persist offline data" + ) if not isinstance(config.offline_store, FileOfflineStoreConfig): - raise ValueError(f"offline store config is of type {type(config.offline_store)} when file type required") + raise ValueError( + f"offline store config is of type {type(config.offline_store)} when file type required" + ) if not isinstance(table.batch_source, FileSource): - raise ValueError(f"feature view batch source is {type(table.batch_source)} not file source") + raise ValueError( + f"feature view batch source is {type(table.batch_source)} not file source" + ) file_options = table.batch_source.file_options filesystem, path = FileSource.create_filesystem_and_path( file_options.uri, file_options.s3_endpoint_override ) prev_table = pyarrow.parquet.read_table(path, memory_map=True) - if(prev_table.column_names != data.column_names): - raise ValueError(f"Input dataframe has incorrect schema or wrong order, expected columns are: {prev_table.column_names}") - if(data.schema != prev_table.schema): + if prev_table.column_names != data.column_names: + raise ValueError( + f"Input dataframe has incorrect schema or wrong order, expected columns are: {prev_table.column_names}" + ) + if data.schema != prev_table.schema: data = data.cast(prev_table.schema) new_table = pyarrow.concat_tables([data, prev_table]) - writer = pyarrow.parquet.ParquetWriter( - path, - data.schema, - filesystem=filesystem) + writer = pyarrow.parquet.ParquetWriter(path, data.schema, filesystem=filesystem) writer.write_table(new_table) writer.close() + def _get_entity_df_event_timestamp_range( entity_df: Union[pd.DataFrame, str], entity_df_event_timestamp_col: str, ) -> Tuple[datetime, datetime]: diff --git a/sdk/python/tests/integration/offline_store/test_offline_push.py b/sdk/python/tests/integration/offline_store/test_offline_push.py index 85adc542fc..ba851e2918 100644 --- a/sdk/python/tests/integration/offline_store/test_offline_push.py +++ b/sdk/python/tests/integration/offline_store/test_offline_push.py @@ -1,28 +1,17 @@ -import datetime +import random from datetime import datetime, timedelta import numpy as np import pandas as pd import pytest -import random -from feast.data_format import ParquetFormat - -from feast import FeatureView, Field, FileSource -from feast.types import Int32, Float32 -from feast.wait import wait_retry_backoff -from tests.integration.feature_repos.repo_configuration import ( - construct_universal_feature_views, -) -from tests.integration.feature_repos.universal.data_sources.file import FileDataSourceCreator +from feast import FeatureView, Field +from feast.types import Float32, Int32 from tests.integration.feature_repos.universal.entities import ( - customer, driver, - location, ) -from tests.integration.feature_repos.universal.feature_views import conv_rate_plus_100 -from tests.utils.logged_features import prepare_logs, to_logs_dataset + @pytest.mark.integration @pytest.mark.universal_online_stores @@ -44,22 +33,13 @@ def test_writing_incorrect_order_fails(environment, universal_data_sources): ts = pd.Timestamp(now).round("ms") entity_df = pd.DataFrame.from_dict( - { - "driver_id": [1001, 1002], - "event_timestamp": [ - ts-timedelta(hours=3), - ts, - ], - } + {"driver_id": [1001, 1002], "event_timestamp": [ts - timedelta(hours=3), ts,],} ) store.apply([driver(), driver_stats]) df = store.get_historical_features( entity_df=entity_df, - features=[ - "driver_stats:conv_rate", - "driver_stats:avg_daily_trips" - ], + features=["driver_stats:conv_rate", "driver_stats:avg_daily_trips"], full_feature_names=False, ).to_df() @@ -69,17 +49,16 @@ def test_writing_incorrect_order_fails(environment, universal_data_sources): expected_df = pd.DataFrame.from_dict( { "driver_id": [1001, 1002], - "event_timestamp": [ - ts-timedelta(hours=3), - ts, - ], + "event_timestamp": [ts - timedelta(hours=3), ts,], "conv_rate": [random.random(), random.random()], "avg_daily_trips": [random.randint(0, 10), random.randint(0, 10)], - "created": [ts, ts] + "created": [ts, ts], }, ) with pytest.raises(ValueError): - store.write_to_offline_store(driver_stats.name, expected_df, allow_registry_cache=False) + store.write_to_offline_store( + driver_stats.name, expected_df, allow_registry_cache=False + ) @pytest.mark.integration @@ -102,22 +81,13 @@ def test_writing_incorrect_schema_fails(environment, universal_data_sources): ts = pd.Timestamp(now).round("ms") entity_df = pd.DataFrame.from_dict( - { - "driver_id": [1001, 1002], - "event_timestamp": [ - ts-timedelta(hours=3), - ts, - ], - } + {"driver_id": [1001, 1002], "event_timestamp": [ts - timedelta(hours=3), ts,],} ) store.apply([driver(), driver_stats]) df = store.get_historical_features( entity_df=entity_df, - features=[ - "driver_stats:conv_rate", - "driver_stats:avg_daily_trips" - ], + features=["driver_stats:conv_rate", "driver_stats:avg_daily_trips"], full_feature_names=False, ).to_df() @@ -126,18 +96,18 @@ def test_writing_incorrect_schema_fails(environment, universal_data_sources): expected_df = pd.DataFrame.from_dict( { - "event_timestamp": [ - ts-timedelta(hours=3), - ts, - ], + "event_timestamp": [ts - timedelta(hours=3), ts,], "driver_id": [1001, 1002], "conv_rate": [random.random(), random.random()], "incorrect_schema": [random.randint(0, 10), random.randint(0, 10)], - "created": [ts, ts] + "created": [ts, ts], }, ) with pytest.raises(ValueError): - store.write_to_offline_store(driver_stats.name, expected_df, allow_registry_cache=False) + store.write_to_offline_store( + driver_stats.name, expected_df, allow_registry_cache=False + ) + @pytest.mark.integration @pytest.mark.universal_online_stores @@ -157,26 +127,19 @@ def test_writing_consecutively_to_offline_store(environment, universal_data_sour ) now = datetime.utcnow() - ts = pd.Timestamp(now, unit='ns') + ts = pd.Timestamp(now, unit="ns") entity_df = pd.DataFrame.from_dict( { "driver_id": [1001, 1001], - "event_timestamp": [ - ts-timedelta(hours=4), - ts-timedelta(hours=3), - ], + "event_timestamp": [ts - timedelta(hours=4), ts - timedelta(hours=3),], } ) store.apply([driver(), driver_stats]) df = store.get_historical_features( entity_df=entity_df, - features=[ - "driver_stats:conv_rate", - - "driver_stats:avg_daily_trips" - ], + features=["driver_stats:conv_rate", "driver_stats:avg_daily_trips"], full_feature_names=False, ).to_df() @@ -185,55 +148,56 @@ def test_writing_consecutively_to_offline_store(environment, universal_data_sour first_df = pd.DataFrame.from_dict( { - "event_timestamp": [ - ts-timedelta(hours=4), - ts-timedelta(hours=3), - ], + "event_timestamp": [ts - timedelta(hours=4), ts - timedelta(hours=3),], "driver_id": [1001, 1001], "conv_rate": [random.random(), random.random()], "acc_rate": [random.random(), random.random()], "avg_daily_trips": [random.randint(0, 10), random.randint(0, 10)], - "created": [ts, ts] + "created": [ts, ts], }, ) - store.write_to_offline_store(driver_stats.name, first_df, allow_registry_cache=False) + store.write_to_offline_store( + driver_stats.name, first_df, allow_registry_cache=False + ) after_write_df = store.get_historical_features( entity_df=entity_df, - features=[ - "driver_stats:conv_rate", - "driver_stats:avg_daily_trips" - ], + features=["driver_stats:conv_rate", "driver_stats:avg_daily_trips"], full_feature_names=False, ).to_df() assert len(after_write_df) == len(first_df) - assert np.where(after_write_df["conv_rate"].reset_index(drop=True) == first_df["conv_rate"].reset_index(drop=True)) - assert np.where(after_write_df["avg_daily_trips"].reset_index(drop=True) == first_df["avg_daily_trips"].reset_index(drop=True)) + assert np.where( + after_write_df["conv_rate"].reset_index(drop=True) + == first_df["conv_rate"].reset_index(drop=True) + ) + assert np.where( + after_write_df["avg_daily_trips"].reset_index(drop=True) + == first_df["avg_daily_trips"].reset_index(drop=True) + ) second_df = pd.DataFrame.from_dict( { - "event_timestamp": [ - ts-timedelta(hours=1), - ts, - ], + "event_timestamp": [ts - timedelta(hours=1), ts,], "driver_id": [1001, 1001], "conv_rate": [random.random(), random.random()], "acc_rate": [random.random(), random.random()], "avg_daily_trips": [random.randint(0, 10), random.randint(0, 10)], - "created": [ts, ts] + "created": [ts, ts], }, ) - store.write_to_offline_store(driver_stats.name, second_df, allow_registry_cache=False) + store.write_to_offline_store( + driver_stats.name, second_df, allow_registry_cache=False + ) entity_df = pd.DataFrame.from_dict( { "driver_id": [1001, 1001, 1001, 1001], "event_timestamp": [ - ts-timedelta(hours=4), - ts-timedelta(hours=3), - ts-timedelta(hours=1), + ts - timedelta(hours=4), + ts - timedelta(hours=3), + ts - timedelta(hours=1), ts, ], } @@ -244,13 +208,22 @@ def test_writing_consecutively_to_offline_store(environment, universal_data_sour features=[ "driver_stats:conv_rate", "driver_stats:acc_rate", - "driver_stats:avg_daily_trips" + "driver_stats:avg_daily_trips", ], full_feature_names=False, ).to_df() expected_df = pd.concat([first_df, second_df]) assert len(after_write_df) == len(expected_df) - assert np.where(after_write_df["conv_rate"].reset_index(drop=True) == expected_df["conv_rate"].reset_index(drop=True)) - assert np.where(after_write_df["acc_rate"].reset_index(drop=True) == expected_df["acc_rate"].reset_index(drop=True)) - assert np.where(after_write_df["avg_daily_trips"].reset_index(drop=True) == expected_df["avg_daily_trips"].reset_index(drop=True)) + assert np.where( + after_write_df["conv_rate"].reset_index(drop=True) + == expected_df["conv_rate"].reset_index(drop=True) + ) + assert np.where( + after_write_df["acc_rate"].reset_index(drop=True) + == expected_df["acc_rate"].reset_index(drop=True) + ) + assert np.where( + after_write_df["avg_daily_trips"].reset_index(drop=True) + == expected_df["avg_daily_trips"].reset_index(drop=True) + ) From 8049390a450fbb6bf65402adbb419aee448c8034 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Thu, 16 Jun 2022 16:03:18 -0700 Subject: [PATCH 08/11] Fix Signed-off-by: Kevin Zhang --- sdk/python/feast/infra/offline_stores/file.py | 1 - .../offline_store/test_offline_push.py | 19 ++++++++----------- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index 69b826e60e..d8015f1f3f 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -9,7 +9,6 @@ import pyarrow.dataset import pyarrow.parquet import pytz -from pyarrow import csv from pydantic.typing import Literal from feast import FileSource, OnDemandFeatureView diff --git a/sdk/python/tests/integration/offline_store/test_offline_push.py b/sdk/python/tests/integration/offline_store/test_offline_push.py index ba851e2918..068b7b0a75 100644 --- a/sdk/python/tests/integration/offline_store/test_offline_push.py +++ b/sdk/python/tests/integration/offline_store/test_offline_push.py @@ -1,4 +1,3 @@ - import random from datetime import datetime, timedelta @@ -8,9 +7,7 @@ from feast import FeatureView, Field from feast.types import Float32, Int32 -from tests.integration.feature_repos.universal.entities import ( - driver, -) +from tests.integration.feature_repos.universal.entities import driver @pytest.mark.integration @@ -33,7 +30,7 @@ def test_writing_incorrect_order_fails(environment, universal_data_sources): ts = pd.Timestamp(now).round("ms") entity_df = pd.DataFrame.from_dict( - {"driver_id": [1001, 1002], "event_timestamp": [ts - timedelta(hours=3), ts,],} + {"driver_id": [1001, 1002], "event_timestamp": [ts - timedelta(hours=3), ts]} ) store.apply([driver(), driver_stats]) @@ -49,7 +46,7 @@ def test_writing_incorrect_order_fails(environment, universal_data_sources): expected_df = pd.DataFrame.from_dict( { "driver_id": [1001, 1002], - "event_timestamp": [ts - timedelta(hours=3), ts,], + "event_timestamp": [ts - timedelta(hours=3), ts], "conv_rate": [random.random(), random.random()], "avg_daily_trips": [random.randint(0, 10), random.randint(0, 10)], "created": [ts, ts], @@ -81,7 +78,7 @@ def test_writing_incorrect_schema_fails(environment, universal_data_sources): ts = pd.Timestamp(now).round("ms") entity_df = pd.DataFrame.from_dict( - {"driver_id": [1001, 1002], "event_timestamp": [ts - timedelta(hours=3), ts,],} + {"driver_id": [1001, 1002], "event_timestamp": [ts - timedelta(hours=3), ts]} ) store.apply([driver(), driver_stats]) @@ -96,7 +93,7 @@ def test_writing_incorrect_schema_fails(environment, universal_data_sources): expected_df = pd.DataFrame.from_dict( { - "event_timestamp": [ts - timedelta(hours=3), ts,], + "event_timestamp": [ts - timedelta(hours=3), ts], "driver_id": [1001, 1002], "conv_rate": [random.random(), random.random()], "incorrect_schema": [random.randint(0, 10), random.randint(0, 10)], @@ -132,7 +129,7 @@ def test_writing_consecutively_to_offline_store(environment, universal_data_sour entity_df = pd.DataFrame.from_dict( { "driver_id": [1001, 1001], - "event_timestamp": [ts - timedelta(hours=4), ts - timedelta(hours=3),], + "event_timestamp": [ts - timedelta(hours=4), ts - timedelta(hours=3)], } ) @@ -148,7 +145,7 @@ def test_writing_consecutively_to_offline_store(environment, universal_data_sour first_df = pd.DataFrame.from_dict( { - "event_timestamp": [ts - timedelta(hours=4), ts - timedelta(hours=3),], + "event_timestamp": [ts - timedelta(hours=4), ts - timedelta(hours=3)], "driver_id": [1001, 1001], "conv_rate": [random.random(), random.random()], "acc_rate": [random.random(), random.random()], @@ -178,7 +175,7 @@ def test_writing_consecutively_to_offline_store(environment, universal_data_sour second_df = pd.DataFrame.from_dict( { - "event_timestamp": [ts - timedelta(hours=1), ts,], + "event_timestamp": [ts - timedelta(hours=1), ts], "driver_id": [1001, 1001], "conv_rate": [random.random(), random.random()], "acc_rate": [random.random(), random.random()], From 7a4be4453d4f4210dc66b9f889e5cc41b8f525cb Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Thu, 16 Jun 2022 18:12:36 -0700 Subject: [PATCH 09/11] Fix Signed-off-by: Kevin Zhang --- .../offline_store/test_offline_push.py | 17 +- .../offline_store/test_offline_write.py | 226 ++++++++++++++++++ 2 files changed, 238 insertions(+), 5 deletions(-) create mode 100644 sdk/python/tests/integration/offline_store/test_offline_write.py diff --git a/sdk/python/tests/integration/offline_store/test_offline_push.py b/sdk/python/tests/integration/offline_store/test_offline_push.py index 068b7b0a75..44a8053e15 100644 --- a/sdk/python/tests/integration/offline_store/test_offline_push.py +++ b/sdk/python/tests/integration/offline_store/test_offline_push.py @@ -14,6 +14,9 @@ @pytest.mark.universal_online_stores def test_writing_incorrect_order_fails(environment, universal_data_sources): # TODO(kevjumba) handle incorrect order later, for now schema must be in the order that the filesource is in + """This test tests if we have incorrect order when writing to offline store. + Specifically, event_timestamp should be the first column to adhere with the filesource column order. + """ store = environment.feature_store _, _, data_sources = universal_data_sources driver_stats = FeatureView( @@ -43,7 +46,7 @@ def test_writing_incorrect_order_fails(environment, universal_data_sources): assert df["conv_rate"].isnull().all() assert df["avg_daily_trips"].isnull().all() - expected_df = pd.DataFrame.from_dict( + df = pd.DataFrame.from_dict( { "driver_id": [1001, 1002], "event_timestamp": [ts - timedelta(hours=3), ts], @@ -54,7 +57,7 @@ def test_writing_incorrect_order_fails(environment, universal_data_sources): ) with pytest.raises(ValueError): store.write_to_offline_store( - driver_stats.name, expected_df, allow_registry_cache=False + driver_stats.name, df, allow_registry_cache=False ) @@ -62,6 +65,9 @@ def test_writing_incorrect_order_fails(environment, universal_data_sources): @pytest.mark.universal_online_stores def test_writing_incorrect_schema_fails(environment, universal_data_sources): # TODO(kevjumba) handle incorrect order later, for now schema must be in the order that the filesource is in + """This test tests if we have incorrect attribute when writing to offline store. + Specifically, `incorrect_attribute` is an inccorect column to adhere with the filesource column order. + """ store = environment.feature_store _, _, data_sources = universal_data_sources driver_stats = FeatureView( @@ -91,18 +97,18 @@ def test_writing_incorrect_schema_fails(environment, universal_data_sources): assert df["conv_rate"].isnull().all() assert df["avg_daily_trips"].isnull().all() - expected_df = pd.DataFrame.from_dict( + df = pd.DataFrame.from_dict( { "event_timestamp": [ts - timedelta(hours=3), ts], "driver_id": [1001, 1002], "conv_rate": [random.random(), random.random()], - "incorrect_schema": [random.randint(0, 10), random.randint(0, 10)], + "incorrect_attribute": [random.randint(0, 10), random.randint(0, 10)], "created": [ts, ts], }, ) with pytest.raises(ValueError): store.write_to_offline_store( - driver_stats.name, expected_df, allow_registry_cache=False + driver_stats.name, df, allow_registry_cache=False ) @@ -143,6 +149,7 @@ def test_writing_consecutively_to_offline_store(environment, universal_data_sour assert df["conv_rate"].isnull().all() assert df["avg_daily_trips"].isnull().all() + # This dataframe has its columns ordered exactly as it is in the parquet file generated by driver_test_data.py. first_df = pd.DataFrame.from_dict( { "event_timestamp": [ts - timedelta(hours=4), ts - timedelta(hours=3)], diff --git a/sdk/python/tests/integration/offline_store/test_offline_write.py b/sdk/python/tests/integration/offline_store/test_offline_write.py new file mode 100644 index 0000000000..068b7b0a75 --- /dev/null +++ b/sdk/python/tests/integration/offline_store/test_offline_write.py @@ -0,0 +1,226 @@ +import random +from datetime import datetime, timedelta + +import numpy as np +import pandas as pd +import pytest + +from feast import FeatureView, Field +from feast.types import Float32, Int32 +from tests.integration.feature_repos.universal.entities import driver + + +@pytest.mark.integration +@pytest.mark.universal_online_stores +def test_writing_incorrect_order_fails(environment, universal_data_sources): + # TODO(kevjumba) handle incorrect order later, for now schema must be in the order that the filesource is in + store = environment.feature_store + _, _, data_sources = universal_data_sources + driver_stats = FeatureView( + name="driver_stats", + entities=["driver"], + schema=[ + Field(name="avg_daily_trips", dtype=Int32), + Field(name="conv_rate", dtype=Float32), + ], + source=data_sources.driver, + ) + + now = datetime.utcnow() + ts = pd.Timestamp(now).round("ms") + + entity_df = pd.DataFrame.from_dict( + {"driver_id": [1001, 1002], "event_timestamp": [ts - timedelta(hours=3), ts]} + ) + + store.apply([driver(), driver_stats]) + df = store.get_historical_features( + entity_df=entity_df, + features=["driver_stats:conv_rate", "driver_stats:avg_daily_trips"], + full_feature_names=False, + ).to_df() + + assert df["conv_rate"].isnull().all() + assert df["avg_daily_trips"].isnull().all() + + expected_df = pd.DataFrame.from_dict( + { + "driver_id": [1001, 1002], + "event_timestamp": [ts - timedelta(hours=3), ts], + "conv_rate": [random.random(), random.random()], + "avg_daily_trips": [random.randint(0, 10), random.randint(0, 10)], + "created": [ts, ts], + }, + ) + with pytest.raises(ValueError): + store.write_to_offline_store( + driver_stats.name, expected_df, allow_registry_cache=False + ) + + +@pytest.mark.integration +@pytest.mark.universal_online_stores +def test_writing_incorrect_schema_fails(environment, universal_data_sources): + # TODO(kevjumba) handle incorrect order later, for now schema must be in the order that the filesource is in + store = environment.feature_store + _, _, data_sources = universal_data_sources + driver_stats = FeatureView( + name="driver_stats", + entities=["driver"], + schema=[ + Field(name="avg_daily_trips", dtype=Int32), + Field(name="conv_rate", dtype=Float32), + ], + source=data_sources.driver, + ) + + now = datetime.utcnow() + ts = pd.Timestamp(now).round("ms") + + entity_df = pd.DataFrame.from_dict( + {"driver_id": [1001, 1002], "event_timestamp": [ts - timedelta(hours=3), ts]} + ) + + store.apply([driver(), driver_stats]) + df = store.get_historical_features( + entity_df=entity_df, + features=["driver_stats:conv_rate", "driver_stats:avg_daily_trips"], + full_feature_names=False, + ).to_df() + + assert df["conv_rate"].isnull().all() + assert df["avg_daily_trips"].isnull().all() + + expected_df = pd.DataFrame.from_dict( + { + "event_timestamp": [ts - timedelta(hours=3), ts], + "driver_id": [1001, 1002], + "conv_rate": [random.random(), random.random()], + "incorrect_schema": [random.randint(0, 10), random.randint(0, 10)], + "created": [ts, ts], + }, + ) + with pytest.raises(ValueError): + store.write_to_offline_store( + driver_stats.name, expected_df, allow_registry_cache=False + ) + + +@pytest.mark.integration +@pytest.mark.universal_online_stores +def test_writing_consecutively_to_offline_store(environment, universal_data_sources): + store = environment.feature_store + _, _, data_sources = universal_data_sources + driver_stats = FeatureView( + name="driver_stats", + entities=["driver"], + schema=[ + Field(name="avg_daily_trips", dtype=Int32), + Field(name="conv_rate", dtype=Float32), + Field(name="acc_rate", dtype=Float32), + ], + source=data_sources.driver, + ttl=timedelta(minutes=10), + ) + + now = datetime.utcnow() + ts = pd.Timestamp(now, unit="ns") + + entity_df = pd.DataFrame.from_dict( + { + "driver_id": [1001, 1001], + "event_timestamp": [ts - timedelta(hours=4), ts - timedelta(hours=3)], + } + ) + + store.apply([driver(), driver_stats]) + df = store.get_historical_features( + entity_df=entity_df, + features=["driver_stats:conv_rate", "driver_stats:avg_daily_trips"], + full_feature_names=False, + ).to_df() + + assert df["conv_rate"].isnull().all() + assert df["avg_daily_trips"].isnull().all() + + first_df = pd.DataFrame.from_dict( + { + "event_timestamp": [ts - timedelta(hours=4), ts - timedelta(hours=3)], + "driver_id": [1001, 1001], + "conv_rate": [random.random(), random.random()], + "acc_rate": [random.random(), random.random()], + "avg_daily_trips": [random.randint(0, 10), random.randint(0, 10)], + "created": [ts, ts], + }, + ) + store.write_to_offline_store( + driver_stats.name, first_df, allow_registry_cache=False + ) + + after_write_df = store.get_historical_features( + entity_df=entity_df, + features=["driver_stats:conv_rate", "driver_stats:avg_daily_trips"], + full_feature_names=False, + ).to_df() + + assert len(after_write_df) == len(first_df) + assert np.where( + after_write_df["conv_rate"].reset_index(drop=True) + == first_df["conv_rate"].reset_index(drop=True) + ) + assert np.where( + after_write_df["avg_daily_trips"].reset_index(drop=True) + == first_df["avg_daily_trips"].reset_index(drop=True) + ) + + second_df = pd.DataFrame.from_dict( + { + "event_timestamp": [ts - timedelta(hours=1), ts], + "driver_id": [1001, 1001], + "conv_rate": [random.random(), random.random()], + "acc_rate": [random.random(), random.random()], + "avg_daily_trips": [random.randint(0, 10), random.randint(0, 10)], + "created": [ts, ts], + }, + ) + + store.write_to_offline_store( + driver_stats.name, second_df, allow_registry_cache=False + ) + + entity_df = pd.DataFrame.from_dict( + { + "driver_id": [1001, 1001, 1001, 1001], + "event_timestamp": [ + ts - timedelta(hours=4), + ts - timedelta(hours=3), + ts - timedelta(hours=1), + ts, + ], + } + ) + + after_write_df = store.get_historical_features( + entity_df=entity_df, + features=[ + "driver_stats:conv_rate", + "driver_stats:acc_rate", + "driver_stats:avg_daily_trips", + ], + full_feature_names=False, + ).to_df() + + expected_df = pd.concat([first_df, second_df]) + assert len(after_write_df) == len(expected_df) + assert np.where( + after_write_df["conv_rate"].reset_index(drop=True) + == expected_df["conv_rate"].reset_index(drop=True) + ) + assert np.where( + after_write_df["acc_rate"].reset_index(drop=True) + == expected_df["acc_rate"].reset_index(drop=True) + ) + assert np.where( + after_write_df["avg_daily_trips"].reset_index(drop=True) + == expected_df["avg_daily_trips"].reset_index(drop=True) + ) From f283f720805da9fd3a023b4dd3fe7ccd3b80d502 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Thu, 16 Jun 2022 18:14:19 -0700 Subject: [PATCH 10/11] Fix Signed-off-by: Kevin Zhang --- .../tests/integration/offline_store/test_offline_push.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/sdk/python/tests/integration/offline_store/test_offline_push.py b/sdk/python/tests/integration/offline_store/test_offline_push.py index 44a8053e15..2bdf775177 100644 --- a/sdk/python/tests/integration/offline_store/test_offline_push.py +++ b/sdk/python/tests/integration/offline_store/test_offline_push.py @@ -56,9 +56,7 @@ def test_writing_incorrect_order_fails(environment, universal_data_sources): }, ) with pytest.raises(ValueError): - store.write_to_offline_store( - driver_stats.name, df, allow_registry_cache=False - ) + store.write_to_offline_store(driver_stats.name, df, allow_registry_cache=False) @pytest.mark.integration @@ -107,9 +105,7 @@ def test_writing_incorrect_schema_fails(environment, universal_data_sources): }, ) with pytest.raises(ValueError): - store.write_to_offline_store( - driver_stats.name, df, allow_registry_cache=False - ) + store.write_to_offline_store(driver_stats.name, df, allow_registry_cache=False) @pytest.mark.integration From 13fa653836da74c717f7d7e17a507cfbd7f73f7d Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Fri, 17 Jun 2022 09:58:06 -0700 Subject: [PATCH 11/11] Address review comments Signed-off-by: Kevin Zhang --- sdk/python/feast/feature_store.py | 2 +- sdk/python/feast/infra/offline_stores/file.py | 10 +- .../infra/offline_stores/offline_store.py | 2 +- .../feast/infra/passthrough_provider.py | 3 - .../offline_store/test_offline_push.py | 229 ------------------ .../offline_store/test_offline_write.py | 8 +- 6 files changed, 11 insertions(+), 243 deletions(-) delete mode 100644 sdk/python/tests/integration/offline_store/test_offline_push.py diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 63aa2a2552..45b0b810ce 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1371,7 +1371,7 @@ def write_to_online_store( provider.ingest_df(feature_view, entities, df) @log_exceptions_and_usage - def write_to_offline_store( + def _write_to_offline_store( self, feature_view_name: str, df: pd.DataFrame, diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index d8015f1f3f..194c233f53 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -407,11 +407,11 @@ def write_logged_features( @staticmethod def offline_write_batch( config: RepoConfig, - table: FeatureView, + feature_view: FeatureView, data: pyarrow.Table, progress: Optional[Callable[[int], Any]], ): - if not table.batch_source: + if not feature_view.batch_source: raise ValueError( "feature view does not have a batch source to persist offline data" ) @@ -419,11 +419,11 @@ def offline_write_batch( raise ValueError( f"offline store config is of type {type(config.offline_store)} when file type required" ) - if not isinstance(table.batch_source, FileSource): + if not isinstance(feature_view.batch_source, FileSource): raise ValueError( - f"feature view batch source is {type(table.batch_source)} not file source" + f"feature view batch source is {type(feature_view.batch_source)} not file source" ) - file_options = table.batch_source.file_options + file_options = feature_view.batch_source.file_options filesystem, path = FileSource.create_filesystem_and_path( file_options.uri, file_options.s3_endpoint_override ) diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index 5dbb43fec9..cd807764ba 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -274,7 +274,7 @@ def write_logged_features( @staticmethod def offline_write_batch( config: RepoConfig, - table: FeatureView, + feature_view: FeatureView, data: pyarrow.Table, progress: Optional[Callable[[int], Any]], ): diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index 7b19be988e..e023afe782 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -109,9 +109,6 @@ def offline_write_batch( ) -> None: set_usage_attribute("provider", self.__class__.__name__) - if "created" not in data.column_names: - raise ValueError("input dataframe must have a created timestamp column") - if self.offline_store: self.offline_store.offline_write_batch(config, table, data, progress) diff --git a/sdk/python/tests/integration/offline_store/test_offline_push.py b/sdk/python/tests/integration/offline_store/test_offline_push.py deleted file mode 100644 index 2bdf775177..0000000000 --- a/sdk/python/tests/integration/offline_store/test_offline_push.py +++ /dev/null @@ -1,229 +0,0 @@ -import random -from datetime import datetime, timedelta - -import numpy as np -import pandas as pd -import pytest - -from feast import FeatureView, Field -from feast.types import Float32, Int32 -from tests.integration.feature_repos.universal.entities import driver - - -@pytest.mark.integration -@pytest.mark.universal_online_stores -def test_writing_incorrect_order_fails(environment, universal_data_sources): - # TODO(kevjumba) handle incorrect order later, for now schema must be in the order that the filesource is in - """This test tests if we have incorrect order when writing to offline store. - Specifically, event_timestamp should be the first column to adhere with the filesource column order. - """ - store = environment.feature_store - _, _, data_sources = universal_data_sources - driver_stats = FeatureView( - name="driver_stats", - entities=["driver"], - schema=[ - Field(name="avg_daily_trips", dtype=Int32), - Field(name="conv_rate", dtype=Float32), - ], - source=data_sources.driver, - ) - - now = datetime.utcnow() - ts = pd.Timestamp(now).round("ms") - - entity_df = pd.DataFrame.from_dict( - {"driver_id": [1001, 1002], "event_timestamp": [ts - timedelta(hours=3), ts]} - ) - - store.apply([driver(), driver_stats]) - df = store.get_historical_features( - entity_df=entity_df, - features=["driver_stats:conv_rate", "driver_stats:avg_daily_trips"], - full_feature_names=False, - ).to_df() - - assert df["conv_rate"].isnull().all() - assert df["avg_daily_trips"].isnull().all() - - df = pd.DataFrame.from_dict( - { - "driver_id": [1001, 1002], - "event_timestamp": [ts - timedelta(hours=3), ts], - "conv_rate": [random.random(), random.random()], - "avg_daily_trips": [random.randint(0, 10), random.randint(0, 10)], - "created": [ts, ts], - }, - ) - with pytest.raises(ValueError): - store.write_to_offline_store(driver_stats.name, df, allow_registry_cache=False) - - -@pytest.mark.integration -@pytest.mark.universal_online_stores -def test_writing_incorrect_schema_fails(environment, universal_data_sources): - # TODO(kevjumba) handle incorrect order later, for now schema must be in the order that the filesource is in - """This test tests if we have incorrect attribute when writing to offline store. - Specifically, `incorrect_attribute` is an inccorect column to adhere with the filesource column order. - """ - store = environment.feature_store - _, _, data_sources = universal_data_sources - driver_stats = FeatureView( - name="driver_stats", - entities=["driver"], - schema=[ - Field(name="avg_daily_trips", dtype=Int32), - Field(name="conv_rate", dtype=Float32), - ], - source=data_sources.driver, - ) - - now = datetime.utcnow() - ts = pd.Timestamp(now).round("ms") - - entity_df = pd.DataFrame.from_dict( - {"driver_id": [1001, 1002], "event_timestamp": [ts - timedelta(hours=3), ts]} - ) - - store.apply([driver(), driver_stats]) - df = store.get_historical_features( - entity_df=entity_df, - features=["driver_stats:conv_rate", "driver_stats:avg_daily_trips"], - full_feature_names=False, - ).to_df() - - assert df["conv_rate"].isnull().all() - assert df["avg_daily_trips"].isnull().all() - - df = pd.DataFrame.from_dict( - { - "event_timestamp": [ts - timedelta(hours=3), ts], - "driver_id": [1001, 1002], - "conv_rate": [random.random(), random.random()], - "incorrect_attribute": [random.randint(0, 10), random.randint(0, 10)], - "created": [ts, ts], - }, - ) - with pytest.raises(ValueError): - store.write_to_offline_store(driver_stats.name, df, allow_registry_cache=False) - - -@pytest.mark.integration -@pytest.mark.universal_online_stores -def test_writing_consecutively_to_offline_store(environment, universal_data_sources): - store = environment.feature_store - _, _, data_sources = universal_data_sources - driver_stats = FeatureView( - name="driver_stats", - entities=["driver"], - schema=[ - Field(name="avg_daily_trips", dtype=Int32), - Field(name="conv_rate", dtype=Float32), - Field(name="acc_rate", dtype=Float32), - ], - source=data_sources.driver, - ttl=timedelta(minutes=10), - ) - - now = datetime.utcnow() - ts = pd.Timestamp(now, unit="ns") - - entity_df = pd.DataFrame.from_dict( - { - "driver_id": [1001, 1001], - "event_timestamp": [ts - timedelta(hours=4), ts - timedelta(hours=3)], - } - ) - - store.apply([driver(), driver_stats]) - df = store.get_historical_features( - entity_df=entity_df, - features=["driver_stats:conv_rate", "driver_stats:avg_daily_trips"], - full_feature_names=False, - ).to_df() - - assert df["conv_rate"].isnull().all() - assert df["avg_daily_trips"].isnull().all() - - # This dataframe has its columns ordered exactly as it is in the parquet file generated by driver_test_data.py. - first_df = pd.DataFrame.from_dict( - { - "event_timestamp": [ts - timedelta(hours=4), ts - timedelta(hours=3)], - "driver_id": [1001, 1001], - "conv_rate": [random.random(), random.random()], - "acc_rate": [random.random(), random.random()], - "avg_daily_trips": [random.randint(0, 10), random.randint(0, 10)], - "created": [ts, ts], - }, - ) - store.write_to_offline_store( - driver_stats.name, first_df, allow_registry_cache=False - ) - - after_write_df = store.get_historical_features( - entity_df=entity_df, - features=["driver_stats:conv_rate", "driver_stats:avg_daily_trips"], - full_feature_names=False, - ).to_df() - - assert len(after_write_df) == len(first_df) - assert np.where( - after_write_df["conv_rate"].reset_index(drop=True) - == first_df["conv_rate"].reset_index(drop=True) - ) - assert np.where( - after_write_df["avg_daily_trips"].reset_index(drop=True) - == first_df["avg_daily_trips"].reset_index(drop=True) - ) - - second_df = pd.DataFrame.from_dict( - { - "event_timestamp": [ts - timedelta(hours=1), ts], - "driver_id": [1001, 1001], - "conv_rate": [random.random(), random.random()], - "acc_rate": [random.random(), random.random()], - "avg_daily_trips": [random.randint(0, 10), random.randint(0, 10)], - "created": [ts, ts], - }, - ) - - store.write_to_offline_store( - driver_stats.name, second_df, allow_registry_cache=False - ) - - entity_df = pd.DataFrame.from_dict( - { - "driver_id": [1001, 1001, 1001, 1001], - "event_timestamp": [ - ts - timedelta(hours=4), - ts - timedelta(hours=3), - ts - timedelta(hours=1), - ts, - ], - } - ) - - after_write_df = store.get_historical_features( - entity_df=entity_df, - features=[ - "driver_stats:conv_rate", - "driver_stats:acc_rate", - "driver_stats:avg_daily_trips", - ], - full_feature_names=False, - ).to_df() - - expected_df = pd.concat([first_df, second_df]) - assert len(after_write_df) == len(expected_df) - assert np.where( - after_write_df["conv_rate"].reset_index(drop=True) - == expected_df["conv_rate"].reset_index(drop=True) - ) - assert np.where( - after_write_df["acc_rate"].reset_index(drop=True) - == expected_df["acc_rate"].reset_index(drop=True) - ) - assert np.where( - after_write_df["avg_daily_trips"].reset_index(drop=True) - == expected_df["avg_daily_trips"].reset_index(drop=True) - ) diff --git a/sdk/python/tests/integration/offline_store/test_offline_write.py b/sdk/python/tests/integration/offline_store/test_offline_write.py index 068b7b0a75..41f6ea89fa 100644 --- a/sdk/python/tests/integration/offline_store/test_offline_write.py +++ b/sdk/python/tests/integration/offline_store/test_offline_write.py @@ -53,7 +53,7 @@ def test_writing_incorrect_order_fails(environment, universal_data_sources): }, ) with pytest.raises(ValueError): - store.write_to_offline_store( + store._write_to_offline_store( driver_stats.name, expected_df, allow_registry_cache=False ) @@ -101,7 +101,7 @@ def test_writing_incorrect_schema_fails(environment, universal_data_sources): }, ) with pytest.raises(ValueError): - store.write_to_offline_store( + store._write_to_offline_store( driver_stats.name, expected_df, allow_registry_cache=False ) @@ -153,7 +153,7 @@ def test_writing_consecutively_to_offline_store(environment, universal_data_sour "created": [ts, ts], }, ) - store.write_to_offline_store( + store._write_to_offline_store( driver_stats.name, first_df, allow_registry_cache=False ) @@ -184,7 +184,7 @@ def test_writing_consecutively_to_offline_store(environment, universal_data_sour }, ) - store.write_to_offline_store( + store._write_to_offline_store( driver_stats.name, second_df, allow_registry_cache=False )