From a81f331ade3b3e5ae160dc0186258107f5644f4d Mon Sep 17 00:00:00 2001 From: Erik Vattekar Date: Tue, 31 Oct 2023 13:27:12 +0100 Subject: [PATCH] get environment info from env --- dataverk_airflow/kubernetes_operator.py | 4 ++-- dataverk_airflow/notebook_operator.py | 4 +--- dataverk_airflow/python_operator.py | 4 +--- dataverk_airflow/quarto_operator.py | 4 +--- 4 files changed, 5 insertions(+), 11 deletions(-) diff --git a/dataverk_airflow/kubernetes_operator.py b/dataverk_airflow/kubernetes_operator.py index b98f50d..8311879 100644 --- a/dataverk_airflow/kubernetes_operator.py +++ b/dataverk_airflow/kubernetes_operator.py @@ -36,7 +36,6 @@ def kubernetes_operator( name: str, image: str, repo: str = None, - is_composer: bool = False, cmds: list = None, branch: str = "main", email: str = None, @@ -58,7 +57,6 @@ def kubernetes_operator( :param dag: DAG: owner DAG :param name: str: Name of task :param repo: str: Github repo - :param is_composer: bool: Boolean flag indicating whether the environment is Cloud Composer. :param image: str: Dockerimage the pod should use :param cmds: str: Command to run in pod :param branch: str: Branch in repo, default "main" @@ -78,6 +76,8 @@ def kubernetes_operator( :return: KubernetesPodOperator """ + is_composer = True if os.getenv("GCS_BUCKET") else False + if not is_composer and repo in [None, ""]: raise MissingValueException("repo cannot be empty when is_composer is false") diff --git a/dataverk_airflow/notebook_operator.py b/dataverk_airflow/notebook_operator.py index c98b578..d12a75a 100644 --- a/dataverk_airflow/notebook_operator.py +++ b/dataverk_airflow/notebook_operator.py @@ -14,7 +14,6 @@ def notebook_operator( name: str, nb_path: str, repo: str = None, - is_composer: bool = False, log_output: bool = False, image: str = None, branch: str = "main", @@ -37,7 +36,6 @@ def notebook_operator( :param name: str: Name of task :param repo: str: Github repo :param nb_path: str: Path to notebook in repo - :param is_composer: bool: Boolean flag indicating whether the environment is Cloud Composer. :param log_output: bool: Write logs from notebook to stdout, default False :param image: str: Dockerimage the pod should use :param branch: str: Branch in repo, default "main" @@ -68,7 +66,7 @@ def notebook_operator( "slack_channel": slack_channel, "extra_envs": extra_envs, "allowlist": allowlist, "requirements_path": requirements_path, "resources": resources, "startup_timeout_seconds": startup_timeout_seconds, "retries": retries, "delete_on_finish": delete_on_finish, "retry_delay": retry_delay, "do_xcom_push": do_xcom_push, - "on_success_callback": on_success_callback, "working_dir": str(Path(nb_path).parent), "is_composer": is_composer + "on_success_callback": on_success_callback, "working_dir": str(Path(nb_path).parent) } kwargs = {k: v for k, v in kwargs.items() if v is not None} diff --git a/dataverk_airflow/python_operator.py b/dataverk_airflow/python_operator.py index 7d231dd..c6fe41e 100644 --- a/dataverk_airflow/python_operator.py +++ b/dataverk_airflow/python_operator.py @@ -14,7 +14,6 @@ def python_operator( name: str, script_path: str, repo: str = None, - is_composer: bool = False, image: str = None, branch: str = "main", email: str = None, @@ -36,7 +35,6 @@ def python_operator( :param name: str: Name of task :param repo: str: Github repo :param script_path: str: Path to script in repo - :param is_composer: bool: Boolean flag indicating whether the environment is Cloud Composer. :param image: str: Dockerimage the pod should use :param branch: str: Branch in repo, default "main" :param email: str: Email of owner @@ -64,7 +62,7 @@ def python_operator( "slack_channel": slack_channel, "extra_envs": extra_envs, "allowlist": allowlist, "requirements_path": requirements_path, "resources": resources, "startup_timeout_seconds": startup_timeout_seconds, "retries": retries, "delete_on_finish": delete_on_finish, "retry_delay": retry_delay, "do_xcom_push": do_xcom_push, - "on_success_callback": on_success_callback, "working_dir": str(Path(script_path).parent), "is_composer": is_composer + "on_success_callback": on_success_callback, "working_dir": str(Path(script_path).parent) } kwargs = {k: v for k, v in kwargs.items() if v is not None} diff --git a/dataverk_airflow/quarto_operator.py b/dataverk_airflow/quarto_operator.py index 1115f55..b38c155 100644 --- a/dataverk_airflow/quarto_operator.py +++ b/dataverk_airflow/quarto_operator.py @@ -14,7 +14,6 @@ def quarto_operator( name: str, quarto: dict, repo: str = None, - is_composer: bool = False, image: str = None, branch: str = "main", email: str = None, @@ -36,7 +35,6 @@ def quarto_operator( :param name: str: Name of task :param repo: str: Github repo :param quarto: dict: Dict of Quarto configuration, needs the following values {"path": "path/to/index.qmd", "env": "dev/prod", "id":"uuid", "token": "quarto-token"} - :param is_composer: bool: Boolean flag indicating whether the environment is Cloud Composer. :param image: str: Dockerimage the pod should use :param branch: str: Branch in repo, default "main" :param email: str: Email of owner @@ -74,7 +72,7 @@ def quarto_operator( "slack_channel": slack_channel, "extra_envs": extra_envs, "allowlist": allowlist, "requirements_path": requirements_path, "resources": resources, "startup_timeout_seconds": startup_timeout_seconds, "retries": retries, "delete_on_finish": delete_on_finish, "retry_delay": retry_delay, "do_xcom_push": do_xcom_push, - "on_success_callback": on_success_callback, "working_dir": str(working_dir), "is_composer": is_composer + "on_success_callback": on_success_callback, "working_dir": str(working_dir) } kwargs = {k: v for k, v in kwargs.items() if v is not None}