Skip to content

Commit

Permalink
fix logic in code
Browse files Browse the repository at this point in the history
  • Loading branch information
corsettigyg committed Aug 10, 2024
1 parent 013d5c9 commit 8c4ec34
Showing 1 changed file with 26 additions and 27 deletions.
53 changes: 26 additions & 27 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,43 +169,42 @@ def __init__(
raise CosmosValueError(
"If ProjectConfig.dbt_project_path is not defined, ProjectConfig.manifest_path and ProjectConfig.project_name must be defined together, or both left undefined."
)

self.manifest_path = self.get_property_from_cloud_or_local(manifest_path, manifest_conn_id)
self.dbt_project_path = self.get_property_from_cloud_or_local(dbt_project_path, dbt_project_conn_id)
self.models_path = self.dbt_project_path / Path(models_relative_path)
self.seeds_path = self.dbt_project_path / Path(seeds_relative_path)
self.snapshots_path = self.dbt_project_path / Path(snapshots_relative_path)

if project_name:
self.project_name = project_name
else:
self.project_name = self.dbt_project_path.stem
if manifest_path:
self.manifest_path = self.get_property_from_cloud_or_local(manifest_path, manifest_conn_id)
if dbt_project_path:
self.dbt_project_path = self.get_property_from_cloud_or_local(dbt_project_path, dbt_project_conn_id)
self.models_path = self.dbt_project_path / Path(models_relative_path)
self.seeds_path = self.dbt_project_path / Path(seeds_relative_path)
self.snapshots_path = self.dbt_project_path / Path(snapshots_relative_path)
if not project_name:
self.project_name = self.dbt_project_path.stem

self.env_vars = env_vars
self.dbt_vars = dbt_vars
self.partial_parse = partial_parse

def get_property_from_cloud_or_local(self, property: Path | str, property_conn_id: str | None = None) -> Path:
if property:
property_str = str(property)
if not property_conn_id:
scheme = property_str.split("://")[0]
# Use the default Airflow connection ID for the scheme if it is not provided.
property_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(scheme, lambda: None)()

if property_conn_id is not None and not AIRFLOW_IO_AVAILABLE:
raise CosmosValueError(
f"The path {property_str} uses a remote file scheme, but the required Object "
f"Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to "
f"Airflow 2.8 or later."
)
property_str = str(property)
if not property_conn_id:
scheme = property_str.split("://")[0]
# Use the default Airflow connection ID for the scheme if it is not provided.
property_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(scheme, lambda: None)()

if AIRFLOW_IO_AVAILABLE:
from airflow.io.path import ObjectStoragePath
if property_conn_id is not None and not AIRFLOW_IO_AVAILABLE:
raise CosmosValueError(
f"The path {property_str} uses a remote file scheme, but the required Object "
f"Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to "
f"Airflow 2.8 or later."
)

return ObjectStoragePath(property_str, conn_id=property_conn_id)
else:
return Path(property_str)
if AIRFLOW_IO_AVAILABLE:
from airflow.io.path import ObjectStoragePath

return ObjectStoragePath(property_str, conn_id=property_conn_id)
else:
return Path(property_str)

def validate_project(self) -> None:
"""
Expand Down

0 comments on commit 8c4ec34

Please sign in to comment.