From 8c4ec34b86a92453dfa5487f3e1147dbcbf607ef Mon Sep 17 00:00:00 2001 From: corsettigyg Date: Sat, 10 Aug 2024 14:46:33 +0200 Subject: [PATCH] fix logic in code --- cosmos/config.py | 53 ++++++++++++++++++++++++------------------------ 1 file changed, 26 insertions(+), 27 deletions(-) diff --git a/cosmos/config.py b/cosmos/config.py index 85032dbeb..3308ed08f 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -169,43 +169,42 @@ def __init__( raise CosmosValueError( "If ProjectConfig.dbt_project_path is not defined, ProjectConfig.manifest_path and ProjectConfig.project_name must be defined together, or both left undefined." ) - - self.manifest_path = self.get_property_from_cloud_or_local(manifest_path, manifest_conn_id) - self.dbt_project_path = self.get_property_from_cloud_or_local(dbt_project_path, dbt_project_conn_id) - self.models_path = self.dbt_project_path / Path(models_relative_path) - self.seeds_path = self.dbt_project_path / Path(seeds_relative_path) - self.snapshots_path = self.dbt_project_path / Path(snapshots_relative_path) - if project_name: self.project_name = project_name - else: - self.project_name = self.dbt_project_path.stem + if manifest_path: + self.manifest_path = self.get_property_from_cloud_or_local(manifest_path, manifest_conn_id) + if dbt_project_path: + self.dbt_project_path = self.get_property_from_cloud_or_local(dbt_project_path, dbt_project_conn_id) + self.models_path = self.dbt_project_path / Path(models_relative_path) + self.seeds_path = self.dbt_project_path / Path(seeds_relative_path) + self.snapshots_path = self.dbt_project_path / Path(snapshots_relative_path) + if not project_name: + self.project_name = self.dbt_project_path.stem self.env_vars = env_vars self.dbt_vars = dbt_vars self.partial_parse = partial_parse def get_property_from_cloud_or_local(self, property: Path | str, property_conn_id: str | None = None) -> Path: - if property: - property_str = str(property) - if not property_conn_id: - scheme = property_str.split("://")[0] - # Use the default Airflow connection ID for the scheme if it is not provided. - property_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(scheme, lambda: None)() - - if property_conn_id is not None and not AIRFLOW_IO_AVAILABLE: - raise CosmosValueError( - f"The path {property_str} uses a remote file scheme, but the required Object " - f"Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to " - f"Airflow 2.8 or later." - ) + property_str = str(property) + if not property_conn_id: + scheme = property_str.split("://")[0] + # Use the default Airflow connection ID for the scheme if it is not provided. + property_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(scheme, lambda: None)() - if AIRFLOW_IO_AVAILABLE: - from airflow.io.path import ObjectStoragePath + if property_conn_id is not None and not AIRFLOW_IO_AVAILABLE: + raise CosmosValueError( + f"The path {property_str} uses a remote file scheme, but the required Object " + f"Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to " + f"Airflow 2.8 or later." + ) - return ObjectStoragePath(property_str, conn_id=property_conn_id) - else: - return Path(property_str) + if AIRFLOW_IO_AVAILABLE: + from airflow.io.path import ObjectStoragePath + + return ObjectStoragePath(property_str, conn_id=property_conn_id) + else: + return Path(property_str) def validate_project(self) -> None: """