From 109ee9cff5bcda46889583f2968003f6a3e375b3 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Wed, 6 Jul 2022 12:48:13 -0700 Subject: [PATCH] feat: Add to_remote_storage method to RetrievalJob (#2916) * feat: Add to_remote_storage method to RetrievalJob Signed-off-by: Achal Shah * docstring Signed-off-by: Achal Shah --- .../infra/offline_stores/offline_store.py | 20 +++++++++++++++++++ .../test_universal_historical_retrieval.py | 4 ++-- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index 439911fe2a..e24317e859 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -163,6 +163,26 @@ def metadata(self) -> Optional[RetrievalMetadata]: """ pass + def supports_remote_storage_export(self) -> bool: + """ + This method should return True if the RetrievalJob supports `to_remote_storage()`. + """ + return False + + def to_remote_storage(self) -> List[str]: + """ + This method should export the result of this RetrievalJob to + remote storage (such as S3, GCS, HDFS, etc). + Implementations of this method should export the results as + multiple parquet files, each file sized appropriately + depending on how much data is being returned by the retrieval + job. + + Returns: + A list of parquet file paths in remote storage. + """ + raise NotImplementedError() + class OfflineStore(ABC): """ diff --git a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py index 2076ab2aed..0b2965084d 100644 --- a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py +++ b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py @@ -280,7 +280,7 @@ def get_expected_training_df( @pytest.mark.integration @pytest.mark.universal_offline_stores -@pytest.mark.parametrize("full_feature_names", [True, False], ids=lambda v: str(v)) +@pytest.mark.parametrize("full_feature_names", [True, False], ids=lambda v: f"full:{v}") def test_historical_features(environment, universal_data_sources, full_feature_names): store = environment.feature_store @@ -410,7 +410,7 @@ def test_historical_features(environment, universal_data_sources, full_feature_n @pytest.mark.integration -@pytest.mark.universal +@pytest.mark.universal_offline_stores @pytest.mark.parametrize("full_feature_names", [True, False], ids=lambda v: str(v)) def test_historical_features_with_shared_batch_source( environment, universal_data_sources, full_feature_names