Skip to content

Commit

Permalink
get environment info from env
Browse files Browse the repository at this point in the history
  • Loading branch information
erikvatt committed Oct 31, 2023
1 parent 9c7d916 commit a81f331
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 11 deletions.
4 changes: 2 additions & 2 deletions dataverk_airflow/kubernetes_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ def kubernetes_operator(
name: str,
image: str,
repo: str = None,
is_composer: bool = False,
cmds: list = None,
branch: str = "main",
email: str = None,
Expand All @@ -58,7 +57,6 @@ def kubernetes_operator(
:param dag: DAG: owner DAG
:param name: str: Name of task
:param repo: str: Github repo
:param is_composer: bool: Boolean flag indicating whether the environment is Cloud Composer.
:param image: str: Dockerimage the pod should use
:param cmds: str: Command to run in pod
:param branch: str: Branch in repo, default "main"
Expand All @@ -78,6 +76,8 @@ def kubernetes_operator(
:return: KubernetesPodOperator
"""
is_composer = True if os.getenv("GCS_BUCKET") else False

if not is_composer and repo in [None, ""]:
raise MissingValueException("repo cannot be empty when is_composer is false")

Expand Down
4 changes: 1 addition & 3 deletions dataverk_airflow/notebook_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ def notebook_operator(
name: str,
nb_path: str,
repo: str = None,
is_composer: bool = False,
log_output: bool = False,
image: str = None,
branch: str = "main",
Expand All @@ -37,7 +36,6 @@ def notebook_operator(
:param name: str: Name of task
:param repo: str: Github repo
:param nb_path: str: Path to notebook in repo
:param is_composer: bool: Boolean flag indicating whether the environment is Cloud Composer.
:param log_output: bool: Write logs from notebook to stdout, default False
:param image: str: Dockerimage the pod should use
:param branch: str: Branch in repo, default "main"
Expand Down Expand Up @@ -68,7 +66,7 @@ def notebook_operator(
"slack_channel": slack_channel, "extra_envs": extra_envs, "allowlist": allowlist, "requirements_path": requirements_path,
"resources": resources, "startup_timeout_seconds": startup_timeout_seconds, "retries": retries,
"delete_on_finish": delete_on_finish, "retry_delay": retry_delay, "do_xcom_push": do_xcom_push,
"on_success_callback": on_success_callback, "working_dir": str(Path(nb_path).parent), "is_composer": is_composer
"on_success_callback": on_success_callback, "working_dir": str(Path(nb_path).parent)
}
kwargs = {k: v for k, v in kwargs.items() if v is not None}

Expand Down
4 changes: 1 addition & 3 deletions dataverk_airflow/python_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ def python_operator(
name: str,
script_path: str,
repo: str = None,
is_composer: bool = False,
image: str = None,
branch: str = "main",
email: str = None,
Expand All @@ -36,7 +35,6 @@ def python_operator(
:param name: str: Name of task
:param repo: str: Github repo
:param script_path: str: Path to script in repo
:param is_composer: bool: Boolean flag indicating whether the environment is Cloud Composer.
:param image: str: Dockerimage the pod should use
:param branch: str: Branch in repo, default "main"
:param email: str: Email of owner
Expand Down Expand Up @@ -64,7 +62,7 @@ def python_operator(
"slack_channel": slack_channel, "extra_envs": extra_envs, "allowlist": allowlist, "requirements_path": requirements_path,
"resources": resources, "startup_timeout_seconds": startup_timeout_seconds, "retries": retries,
"delete_on_finish": delete_on_finish, "retry_delay": retry_delay, "do_xcom_push": do_xcom_push,
"on_success_callback": on_success_callback, "working_dir": str(Path(script_path).parent), "is_composer": is_composer
"on_success_callback": on_success_callback, "working_dir": str(Path(script_path).parent)
}
kwargs = {k: v for k, v in kwargs.items() if v is not None}

Expand Down
4 changes: 1 addition & 3 deletions dataverk_airflow/quarto_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ def quarto_operator(
name: str,
quarto: dict,
repo: str = None,
is_composer: bool = False,
image: str = None,
branch: str = "main",
email: str = None,
Expand All @@ -36,7 +35,6 @@ def quarto_operator(
:param name: str: Name of task
:param repo: str: Github repo
:param quarto: dict: Dict of Quarto configuration, needs the following values {"path": "path/to/index.qmd", "env": "dev/prod", "id":"uuid", "token": "quarto-token"}
:param is_composer: bool: Boolean flag indicating whether the environment is Cloud Composer.
:param image: str: Dockerimage the pod should use
:param branch: str: Branch in repo, default "main"
:param email: str: Email of owner
Expand Down Expand Up @@ -74,7 +72,7 @@ def quarto_operator(
"slack_channel": slack_channel, "extra_envs": extra_envs, "allowlist": allowlist, "requirements_path": requirements_path,
"resources": resources, "startup_timeout_seconds": startup_timeout_seconds, "retries": retries,
"delete_on_finish": delete_on_finish, "retry_delay": retry_delay, "do_xcom_push": do_xcom_push,
"on_success_callback": on_success_callback, "working_dir": str(working_dir), "is_composer": is_composer
"on_success_callback": on_success_callback, "working_dir": str(working_dir)
}

kwargs = {k: v for k, v in kwargs.items() if v is not None}
Expand Down

0 comments on commit a81f331

Please sign in to comment.