From 22c109bc088d093a7c81c59e11490a9a21f82309 Mon Sep 17 00:00:00 2001 From: nsuraeva Date: Mon, 14 Aug 2023 05:24:47 +0300 Subject: [PATCH] feat: Add possibility to save dataset as table, when spark config has remote warehouse info (#3645) feat: add possibility to save dataset as table, when spark config has remote warehouse info Signed-off-by: nsuraeva Co-authored-by: nsuraeva --- .../contrib/spark_offline_store/spark.py | 27 +++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index 7574ac4865..c9591b7c3f 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -352,13 +352,36 @@ def persist( ): """ Run the retrieval and persist the results in the same offline store used for read. - Please note the persisting is done only within the scope of the spark session. + Please note the persisting is done only within the scope of the spark session for local warehouse directory. """ assert isinstance(storage, SavedDatasetSparkStorage) table_name = storage.spark_options.table if not table_name: raise ValueError("Cannot persist, table_name is not defined") - self.to_spark_df().createOrReplaceTempView(table_name) + if self._has_remote_warehouse_in_config(): + file_format = storage.spark_options.file_format + if not file_format: + self.to_spark_df().write.saveAsTable(table_name) + else: + self.to_spark_df().write.format(file_format).saveAsTable(table_name) + else: + self.to_spark_df().createOrReplaceTempView(table_name) + + def _has_remote_warehouse_in_config(self) -> bool: + """ + Check if Spark Session config has info about hive metastore uri + or warehouse directory is not a local path + """ + self.spark_session.sparkContext.getConf().getAll() + try: + self.spark_session.conf.get("hive.metastore.uris") + return True + except Exception: + warehouse_dir = self.spark_session.conf.get("spark.sql.warehouse.dir") + if warehouse_dir and warehouse_dir.startswith("file:"): + return False + else: + return True def supports_remote_storage_export(self) -> bool: return self._config.offline_store.staging_location is not None