From f4163379e946a2ccbc542aca0a87bd3f00372b1a Mon Sep 17 00:00:00 2001 From: Hai Nguyen Date: Tue, 21 Feb 2023 10:50:50 +0700 Subject: [PATCH 1/6] fix: Support param timeout when persisting Signed-off-by: Hai Nguyen --- sdk/python/feast/infra/offline_stores/bigquery.py | 10 ++++++++-- .../contrib/athena_offline_store/athena.py | 7 ++++++- .../contrib/mssql_offline_store/mssql.py | 7 ++++++- .../contrib/postgres_offline_store/postgres.py | 7 ++++++- .../contrib/spark_offline_store/spark.py | 7 ++++++- .../contrib/trino_offline_store/trino.py | 7 ++++++- sdk/python/feast/infra/offline_stores/file.py | 7 ++++++- sdk/python/feast/infra/offline_stores/offline_store.py | 7 ++++++- sdk/python/feast/infra/offline_stores/redshift.py | 7 ++++++- sdk/python/feast/infra/offline_stores/snowflake.py | 7 ++++++- 10 files changed, 62 insertions(+), 11 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 7871cea02c..f06b5f3261 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -530,11 +530,17 @@ def _execute_query( block_until_done(client=self.client, bq_job=bq_job, timeout=timeout or 1800) return bq_job - def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False): + def persist( + self, + storage: SavedDatasetStorage, + allow_overwrite: bool = False, + timeout: int = 1800, + ): assert isinstance(storage, SavedDatasetBigQueryStorage) self.to_bigquery( - bigquery.QueryJobConfig(destination=storage.bigquery_options.table) + bigquery.QueryJobConfig(destination=storage.bigquery_options.table), + timeout=timeout, ) @property diff --git a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py index 2e1fc0d983..39289365e2 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py +++ b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py @@ -412,7 +412,12 @@ def _to_arrow_internal(self, timeout: Optional[int] = None) -> pa.Table: def metadata(self) -> Optional[RetrievalMetadata]: return self._metadata - def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False): + def persist( + self, + storage: SavedDatasetStorage, + allow_overwrite: bool = False, + timeout: int = 1800, + ): assert isinstance(storage, SavedDatasetAthenaStorage) self.to_athena(table_name=storage.athena_options.table) diff --git a/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/mssql.py b/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/mssql.py index 5849105869..b0591ce5d9 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/mssql.py +++ b/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/mssql.py @@ -356,7 +356,12 @@ def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table: ## Implements persist in Feast 0.18 - This persists to filestorage ## ToDo: Persist to Azure Storage - def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False): + def persist( + self, + storage: SavedDatasetStorage, + allow_overwrite: bool = False, + timeout: int = 1800, + ): assert isinstance(storage, SavedDatasetFileStorage) filesystem, path = FileSource.create_filesystem_and_path( diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py index 837b9091e7..4d2303b7e1 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py @@ -302,7 +302,12 @@ def _to_arrow_internal(self, timeout: Optional[int] = None) -> pa.Table: def metadata(self) -> Optional[RetrievalMetadata]: return self._metadata - def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False): + def persist( + self, + storage: SavedDatasetStorage, + allow_overwrite: bool = False, + timeout: int = 1800, + ): assert isinstance(storage, SavedDatasetPostgreSQLStorage) df_to_postgres_table( diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index f51bd810ea..8fdb9bd130 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -344,7 +344,12 @@ def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table: """Return dataset as pyarrow Table synchronously""" return pyarrow.Table.from_pandas(self._to_df_internal(timeout=timeout)) - def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False): + def persist( + self, + storage: SavedDatasetStorage, + allow_overwrite: bool = False, + timeout: int = 1800, + ): """ Run the retrieval and persist the results in the same offline store used for read. Please note the persisting is done only within the scope of the spark session. diff --git a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py index 7a7afa1665..043ce7d406 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py +++ b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py @@ -126,7 +126,12 @@ def to_trino( self._client.execute_query(query_text=query) return destination_table - def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False): + def persist( + self, + storage: SavedDatasetStorage, + allow_overwrite: bool = False, + timeout: int = 1800, + ): """ Run the retrieval and persist the results in the same offline store used for read. """ diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index d6cce78bd4..3cc19b2c13 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -88,7 +88,12 @@ def _to_arrow_internal(self, timeout: Optional[int] = None): df = self.evaluation_function().compute() return pyarrow.Table.from_pandas(df) - def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False): + def persist( + self, + storage: SavedDatasetStorage, + allow_overwrite: bool = False, + timeout: int = 1800, + ): assert isinstance(storage, SavedDatasetFileStorage) # Check if the specified location already exists. diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index 27a98a120f..33e33e0f4a 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -189,7 +189,12 @@ def on_demand_feature_views(self) -> List[OnDemandFeatureView]: pass @abstractmethod - def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False): + def persist( + self, + storage: SavedDatasetStorage, + allow_overwrite: bool = False, + timeout: int = 1800, + ): """ Synchronously executes the underlying query and persists the result in the same offline store at the specified destination. diff --git a/sdk/python/feast/infra/offline_stores/redshift.py b/sdk/python/feast/infra/offline_stores/redshift.py index ffa30ba015..cb40a702e4 100644 --- a/sdk/python/feast/infra/offline_stores/redshift.py +++ b/sdk/python/feast/infra/offline_stores/redshift.py @@ -476,7 +476,12 @@ def to_redshift(self, table_name: str) -> None: query, ) - def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False): + def persist( + self, + storage: SavedDatasetStorage, + allow_overwrite: bool = False, + timeout: int = 1800, + ): assert isinstance(storage, SavedDatasetRedshiftStorage) self.to_redshift(table_name=storage.redshift_options.table) diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index 2401458be7..17c2bfc7de 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -501,7 +501,12 @@ def to_spark_df(self, spark_session: "SparkSession") -> "DataFrame": else: raise InvalidSparkSessionException(spark_session) - def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False): + def persist( + self, + storage: SavedDatasetStorage, + allow_overwrite: bool = False, + timeout: int = 1800, + ): assert isinstance(storage, SavedDatasetSnowflakeStorage) self.to_snowflake(table_name=storage.snowflake_options.table) From d48a0891412b5c00d21b43d71e473f65402cbf28 Mon Sep 17 00:00:00 2001 From: Hai Nguyen Date: Thu, 23 Feb 2023 07:58:03 +0700 Subject: [PATCH 2/6] chore: add type hint to persist methods Signed-off-by: Hai Nguyen --- sdk/python/feast/infra/offline_stores/bigquery.py | 4 ++-- .../offline_stores/contrib/athena_offline_store/athena.py | 4 ++-- .../infra/offline_stores/contrib/mssql_offline_store/mssql.py | 4 ++-- .../offline_stores/contrib/postgres_offline_store/postgres.py | 4 ++-- .../infra/offline_stores/contrib/spark_offline_store/spark.py | 4 ++-- .../infra/offline_stores/contrib/trino_offline_store/trino.py | 4 ++-- sdk/python/feast/infra/offline_stores/file.py | 4 ++-- sdk/python/feast/infra/offline_stores/offline_store.py | 4 ++-- sdk/python/feast/infra/offline_stores/redshift.py | 4 ++-- sdk/python/feast/infra/offline_stores/snowflake.py | 4 ++-- 10 files changed, 20 insertions(+), 20 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index f06b5f3261..cfc8ae7726 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -533,8 +533,8 @@ def _execute_query( def persist( self, storage: SavedDatasetStorage, - allow_overwrite: bool = False, - timeout: int = 1800, + allow_overwrite: Optional[bool] = False, + timeout: Optional[int] = 1800, ): assert isinstance(storage, SavedDatasetBigQueryStorage) diff --git a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py index 39289365e2..641aded53d 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py +++ b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py @@ -415,8 +415,8 @@ def metadata(self) -> Optional[RetrievalMetadata]: def persist( self, storage: SavedDatasetStorage, - allow_overwrite: bool = False, - timeout: int = 1800, + allow_overwrite: Optional[bool] = False, + timeout: Optional[int] = 1800, ): assert isinstance(storage, SavedDatasetAthenaStorage) self.to_athena(table_name=storage.athena_options.table) diff --git a/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/mssql.py b/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/mssql.py index b0591ce5d9..37c29cb205 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/mssql.py +++ b/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/mssql.py @@ -359,8 +359,8 @@ def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table: def persist( self, storage: SavedDatasetStorage, - allow_overwrite: bool = False, - timeout: int = 1800, + allow_overwrite: Optional[bool] = False, + timeout: Optional[int] = 1800, ): assert isinstance(storage, SavedDatasetFileStorage) diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py index 4d2303b7e1..dba5b15018 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py @@ -305,8 +305,8 @@ def metadata(self) -> Optional[RetrievalMetadata]: def persist( self, storage: SavedDatasetStorage, - allow_overwrite: bool = False, - timeout: int = 1800, + allow_overwrite: Optional[bool] = False, + timeout: Optional[int] = 1800, ): assert isinstance(storage, SavedDatasetPostgreSQLStorage) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index 8fdb9bd130..750b141f79 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -347,8 +347,8 @@ def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table: def persist( self, storage: SavedDatasetStorage, - allow_overwrite: bool = False, - timeout: int = 1800, + allow_overwrite: Optional[bool] = False, + timeout: Optional[int] = 1800, ): """ Run the retrieval and persist the results in the same offline store used for read. diff --git a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py index 043ce7d406..fbe944fdff 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py +++ b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py @@ -129,8 +129,8 @@ def to_trino( def persist( self, storage: SavedDatasetStorage, - allow_overwrite: bool = False, - timeout: int = 1800, + allow_overwrite: Optional[bool] = False, + timeout: Optional[int] = 1800, ): """ Run the retrieval and persist the results in the same offline store used for read. diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index 3cc19b2c13..c64b148151 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -91,8 +91,8 @@ def _to_arrow_internal(self, timeout: Optional[int] = None): def persist( self, storage: SavedDatasetStorage, - allow_overwrite: bool = False, - timeout: int = 1800, + allow_overwrite: Optional[bool] = False, + timeout: Optional[int] = 1800, ): assert isinstance(storage, SavedDatasetFileStorage) diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index 33e33e0f4a..e4750d7a27 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -192,8 +192,8 @@ def on_demand_feature_views(self) -> List[OnDemandFeatureView]: def persist( self, storage: SavedDatasetStorage, - allow_overwrite: bool = False, - timeout: int = 1800, + allow_overwrite: Optional[bool] = False, + timeout: Optional[int] = 1800, ): """ Synchronously executes the underlying query and persists the result in the same offline store diff --git a/sdk/python/feast/infra/offline_stores/redshift.py b/sdk/python/feast/infra/offline_stores/redshift.py index cb40a702e4..17c8dae7a9 100644 --- a/sdk/python/feast/infra/offline_stores/redshift.py +++ b/sdk/python/feast/infra/offline_stores/redshift.py @@ -479,8 +479,8 @@ def to_redshift(self, table_name: str) -> None: def persist( self, storage: SavedDatasetStorage, - allow_overwrite: bool = False, - timeout: int = 1800, + allow_overwrite: Optional[bool] = False, + timeout: Optional[int] = 1800, ): assert isinstance(storage, SavedDatasetRedshiftStorage) self.to_redshift(table_name=storage.redshift_options.table) diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index 17c2bfc7de..e1575c0ccf 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -504,8 +504,8 @@ def to_spark_df(self, spark_session: "SparkSession") -> "DataFrame": def persist( self, storage: SavedDatasetStorage, - allow_overwrite: bool = False, - timeout: int = 1800, + allow_overwrite: Optional[bool] = False, + timeout: Optional[int] = 1800, ): assert isinstance(storage, SavedDatasetSnowflakeStorage) self.to_snowflake(table_name=storage.snowflake_options.table) From cd26b6f07025cee3b6321c555a23736865fbdc7f Mon Sep 17 00:00:00 2001 From: Hai Nguyen Date: Thu, 23 Feb 2023 08:12:29 +0700 Subject: [PATCH 3/6] chore: add type hint to to_bigquery method Signed-off-by: Hai Nguyen --- sdk/python/feast/infra/offline_stores/bigquery.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index cfc8ae7726..d08b1cd62a 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -456,8 +456,8 @@ def to_sql(self) -> str: def to_bigquery( self, job_config: Optional[bigquery.QueryJobConfig] = None, - timeout: int = 1800, - retry_cadence: int = 10, + timeout: Optional[int] = 1800, + retry_cadence: Optional[int] = 10, ) -> str: """ Synchronously executes the underlying query and exports the result to a BigQuery table. The From 65a94ddcf695f16141cf7cac42254c0f27750ddf Mon Sep 17 00:00:00 2001 From: Hai Nguyen Date: Thu, 2 Mar 2023 23:37:57 +0700 Subject: [PATCH 4/6] fix: set default timeout to None in base offline store class Signed-off-by: Hai Nguyen --- sdk/python/feast/infra/offline_stores/offline_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index e4750d7a27..8694d0273a 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -193,7 +193,7 @@ def persist( self, storage: SavedDatasetStorage, allow_overwrite: Optional[bool] = False, - timeout: Optional[int] = 1800, + timeout: Optional[int] = None, ): """ Synchronously executes the underlying query and persists the result in the same offline store From 69a80910712318793c8c4f9de1d6a7f1b7422f22 Mon Sep 17 00:00:00 2001 From: Hai Nguyen Date: Thu, 2 Mar 2023 23:41:25 +0700 Subject: [PATCH 5/6] fix: set default timeout to None in to_bigquery Signed-off-by: Hai Nguyen --- sdk/python/feast/infra/offline_stores/bigquery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index d08b1cd62a..a7ded848fe 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -456,7 +456,7 @@ def to_sql(self) -> str: def to_bigquery( self, job_config: Optional[bigquery.QueryJobConfig] = None, - timeout: Optional[int] = 1800, + timeout: Optional[int] = None, retry_cadence: Optional[int] = 10, ) -> str: """ From 56e790e19263872ff7aaf2723405ad803a4833d9 Mon Sep 17 00:00:00 2001 From: Hai Nguyen Date: Fri, 3 Mar 2023 00:01:09 +0700 Subject: [PATCH 6/6] chore: update test case due to change of param Signed-off-by: Hai Nguyen --- sdk/python/feast/infra/offline_stores/offline_store.py | 2 +- .../tests/unit/infra/offline_stores/test_offline_store.py | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index 8694d0273a..6141e3c435 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -192,7 +192,7 @@ def on_demand_feature_views(self) -> List[OnDemandFeatureView]: def persist( self, storage: SavedDatasetStorage, - allow_overwrite: Optional[bool] = False, + allow_overwrite: bool = False, timeout: Optional[int] = None, ): """ diff --git a/sdk/python/tests/unit/infra/offline_stores/test_offline_store.py b/sdk/python/tests/unit/infra/offline_stores/test_offline_store.py index 53e9d061ad..ef0cce0470 100644 --- a/sdk/python/tests/unit/infra/offline_stores/test_offline_store.py +++ b/sdk/python/tests/unit/infra/offline_stores/test_offline_store.py @@ -67,7 +67,12 @@ def on_demand_feature_views(self) -> List[OnDemandFeatureView]: """Returns a list containing all the on demand feature views to be handled.""" pass - def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False): + def persist( + self, + storage: SavedDatasetStorage, + allow_overwrite: bool = False, + timeout: Optional[int] = None, + ): """ Synchronously executes the underlying query and persists the result in the same offline store at the specified destination.