From 144aee29bedbadf1f62e810666f48278df9af188 Mon Sep 17 00:00:00 2001 From: "Hai Nguyen (Harry)" Date: Fri, 21 Apr 2023 23:58:45 +0700 Subject: [PATCH] fix: Support param timeout when persisting (#3593) * fix: Support param timeout when persisting Signed-off-by: Hai Nguyen * fix: Revert default timeout value in `to_bigquery` Signed-off-by: Hai Nguyen --------- Signed-off-by: Hai Nguyen Signed-off-by: zerafachris PERSONAL --- sdk/python/feast/infra/offline_stores/bigquery.py | 14 ++++++++++---- .../contrib/athena_offline_store/athena.py | 7 ++++++- .../contrib/mssql_offline_store/mssql.py | 7 ++++++- .../contrib/postgres_offline_store/postgres.py | 7 ++++++- .../contrib/spark_offline_store/spark.py | 7 ++++++- .../contrib/trino_offline_store/trino.py | 7 ++++++- sdk/python/feast/infra/offline_stores/file.py | 7 ++++++- .../feast/infra/offline_stores/offline_store.py | 7 ++++++- sdk/python/feast/infra/offline_stores/redshift.py | 7 ++++++- sdk/python/feast/infra/offline_stores/snowflake.py | 7 ++++++- .../infra/offline_stores/test_offline_store.py | 7 ++++++- 11 files changed, 70 insertions(+), 14 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 973eddc7fb..770477f251 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -456,8 +456,8 @@ def to_sql(self) -> str: def to_bigquery( self, job_config: Optional[bigquery.QueryJobConfig] = None, - timeout: int = 1800, - retry_cadence: int = 10, + timeout: Optional[int] = 1800, + retry_cadence: Optional[int] = 10, ) -> str: """ Synchronously executes the underlying query and exports the result to a BigQuery table. The @@ -530,11 +530,17 @@ def _execute_query( block_until_done(client=self.client, bq_job=bq_job, timeout=timeout or 1800) return bq_job - def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False): + def persist( + self, + storage: SavedDatasetStorage, + allow_overwrite: Optional[bool] = False, + timeout: Optional[int] = None, + ): assert isinstance(storage, SavedDatasetBigQueryStorage) self.to_bigquery( - bigquery.QueryJobConfig(destination=storage.bigquery_options.table) + bigquery.QueryJobConfig(destination=storage.bigquery_options.table), + timeout=timeout, ) @property diff --git a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py index 2e1fc0d983..85a61106aa 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py +++ b/sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/athena.py @@ -412,7 +412,12 @@ def _to_arrow_internal(self, timeout: Optional[int] = None) -> pa.Table: def metadata(self) -> Optional[RetrievalMetadata]: return self._metadata - def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False): + def persist( + self, + storage: SavedDatasetStorage, + allow_overwrite: Optional[bool] = False, + timeout: Optional[int] = None, + ): assert isinstance(storage, SavedDatasetAthenaStorage) self.to_athena(table_name=storage.athena_options.table) diff --git a/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/mssql.py b/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/mssql.py index 5849105869..849d5cc797 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/mssql.py +++ b/sdk/python/feast/infra/offline_stores/contrib/mssql_offline_store/mssql.py @@ -356,7 +356,12 @@ def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table: ## Implements persist in Feast 0.18 - This persists to filestorage ## ToDo: Persist to Azure Storage - def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False): + def persist( + self, + storage: SavedDatasetStorage, + allow_overwrite: Optional[bool] = False, + timeout: Optional[int] = None, + ): assert isinstance(storage, SavedDatasetFileStorage) filesystem, path = FileSource.create_filesystem_and_path( diff --git a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py index 837b9091e7..c2e95a8648 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py +++ b/sdk/python/feast/infra/offline_stores/contrib/postgres_offline_store/postgres.py @@ -302,7 +302,12 @@ def _to_arrow_internal(self, timeout: Optional[int] = None) -> pa.Table: def metadata(self) -> Optional[RetrievalMetadata]: return self._metadata - def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False): + def persist( + self, + storage: SavedDatasetStorage, + allow_overwrite: Optional[bool] = False, + timeout: Optional[int] = None, + ): assert isinstance(storage, SavedDatasetPostgreSQLStorage) df_to_postgres_table( diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index f51bd810ea..7574ac4865 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -344,7 +344,12 @@ def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table: """Return dataset as pyarrow Table synchronously""" return pyarrow.Table.from_pandas(self._to_df_internal(timeout=timeout)) - def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False): + def persist( + self, + storage: SavedDatasetStorage, + allow_overwrite: Optional[bool] = False, + timeout: Optional[int] = None, + ): """ Run the retrieval and persist the results in the same offline store used for read. Please note the persisting is done only within the scope of the spark session. diff --git a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py index 7a7afa1665..e0f73404eb 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py +++ b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino.py @@ -126,7 +126,12 @@ def to_trino( self._client.execute_query(query_text=query) return destination_table - def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False): + def persist( + self, + storage: SavedDatasetStorage, + allow_overwrite: Optional[bool] = False, + timeout: Optional[int] = None, + ): """ Run the retrieval and persist the results in the same offline store used for read. """ diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index d6cce78bd4..5e4107545f 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -88,7 +88,12 @@ def _to_arrow_internal(self, timeout: Optional[int] = None): df = self.evaluation_function().compute() return pyarrow.Table.from_pandas(df) - def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False): + def persist( + self, + storage: SavedDatasetStorage, + allow_overwrite: Optional[bool] = False, + timeout: Optional[int] = None, + ): assert isinstance(storage, SavedDatasetFileStorage) # Check if the specified location already exists. diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index 27a98a120f..6141e3c435 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -189,7 +189,12 @@ def on_demand_feature_views(self) -> List[OnDemandFeatureView]: pass @abstractmethod - def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False): + def persist( + self, + storage: SavedDatasetStorage, + allow_overwrite: bool = False, + timeout: Optional[int] = None, + ): """ Synchronously executes the underlying query and persists the result in the same offline store at the specified destination. diff --git a/sdk/python/feast/infra/offline_stores/redshift.py b/sdk/python/feast/infra/offline_stores/redshift.py index ffa30ba015..35fd49f746 100644 --- a/sdk/python/feast/infra/offline_stores/redshift.py +++ b/sdk/python/feast/infra/offline_stores/redshift.py @@ -476,7 +476,12 @@ def to_redshift(self, table_name: str) -> None: query, ) - def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False): + def persist( + self, + storage: SavedDatasetStorage, + allow_overwrite: Optional[bool] = False, + timeout: Optional[int] = None, + ): assert isinstance(storage, SavedDatasetRedshiftStorage) self.to_redshift(table_name=storage.redshift_options.table) diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index 847e073381..d0bd9bd30a 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -531,7 +531,12 @@ def to_spark_df(self, spark_session: "SparkSession") -> "DataFrame": else: raise InvalidSparkSessionException(spark_session) - def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False): + def persist( + self, + storage: SavedDatasetStorage, + allow_overwrite: Optional[bool] = False, + timeout: Optional[int] = None, + ): assert isinstance(storage, SavedDatasetSnowflakeStorage) self.to_snowflake(table_name=storage.snowflake_options.table) diff --git a/sdk/python/tests/unit/infra/offline_stores/test_offline_store.py b/sdk/python/tests/unit/infra/offline_stores/test_offline_store.py index 53e9d061ad..ef0cce0470 100644 --- a/sdk/python/tests/unit/infra/offline_stores/test_offline_store.py +++ b/sdk/python/tests/unit/infra/offline_stores/test_offline_store.py @@ -67,7 +67,12 @@ def on_demand_feature_views(self) -> List[OnDemandFeatureView]: """Returns a list containing all the on demand feature views to be handled.""" pass - def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False): + def persist( + self, + storage: SavedDatasetStorage, + allow_overwrite: bool = False, + timeout: Optional[int] = None, + ): """ Synchronously executes the underlying query and persists the result in the same offline store at the specified destination.