diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 368933db3..2017424d7 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -34,7 +34,7 @@ repos:
args:
- --exclude-file=tests/sample/manifest_model_version.json
- --skip=**/manifest.json
- - -L connexion
+ - -L connexion,aci
- repo: https://github.com/pre-commit/pygrep-hooks
rev: v1.10.0
hooks:
diff --git a/cosmos/__init__.py b/cosmos/__init__.py
index d8704b346..c5a0ce845 100644
--- a/cosmos/__init__.py
+++ b/cosmos/__init__.py
@@ -88,6 +88,37 @@
"kubernetes",
)
+try:
+ from cosmos.operators.azure_container_instance import (
+ DbtLSAzureContainerInstanceOperator,
+ DbtRunAzureContainerInstanceOperator,
+ DbtRunOperationAzureContainerInstanceOperator,
+ DbtSeedAzureContainerInstanceOperator,
+ DbtSnapshotAzureContainerInstanceOperator,
+ DbtTestAzureContainerInstanceOperator,
+ )
+except ImportError:
+ DbtLSAzureContainerInstanceOperator = MissingPackage(
+ "cosmos.operators.azure_container_instance.DbtLSAzureContainerInstanceOperator", "azure-container-instance"
+ )
+ DbtRunAzureContainerInstanceOperator = MissingPackage(
+ "cosmos.operators.azure_container_instance.DbtRunAzureContainerInstanceOperator", "azure-container-instance"
+ )
+ DbtRunOperationAzureContainerInstanceOperator = MissingPackage(
+ "cosmos.operators.azure_container_instance.DbtRunOperationAzureContainerInstanceOperator",
+ "azure-container-instance",
+ )
+ DbtSeedAzureContainerInstanceOperator = MissingPackage(
+ "cosmos.operators.azure_container_instance.DbtSeedAzureContainerInstanceOperator", "azure-container-instance"
+ )
+ DbtSnapshotAzureContainerInstanceOperator = MissingPackage(
+ "cosmos.operators.azure_container_instance.DbtSnapshotAzureContainerInstanceOperator",
+ "azure-container-instance",
+ )
+ DbtTestAzureContainerInstanceOperator = MissingPackage(
+ "cosmos.operators.azure_container_instance.DbtTestAzureContainerInstanceOperator", "azure-container-instance"
+ )
+
__all__ = [
"ProjectConfig",
"ProfileConfig",
@@ -117,6 +148,12 @@
"DbtTestKubernetesOperator",
"DbtBuildKubernetesOperator",
"DbtSnapshotKubernetesOperator",
+ "DbtLSAzureContainerInstanceOperator",
+ "DbtRunOperationAzureContainerInstanceOperator",
+ "DbtRunAzureContainerInstanceOperator",
+ "DbtSeedAzureContainerInstanceOperator",
+ "DbtTestAzureContainerInstanceOperator",
+ "DbtSnapshotAzureContainerInstanceOperator",
"ExecutionMode",
"LoadMode",
"TestBehavior",
diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py
index 06080260a..177806480 100644
--- a/cosmos/airflow/graph.py
+++ b/cosmos/airflow/graph.py
@@ -25,6 +25,17 @@
logger = get_logger(__name__)
+def _snake_case_to_camelcase(value: str) -> str:
+ """Convert snake_case to CamelCase
+
+ Example: foo_bar_baz -> FooBarBaz
+
+ :param value: Value to convert to CamelCase
+ :return: Converted value
+ """
+ return "".join(x.capitalize() for x in value.lower().split("_"))
+
+
def calculate_operator_class(
execution_mode: ExecutionMode,
dbt_class: str,
@@ -37,7 +48,9 @@ def calculate_operator_class(
:returns: path string to the correspondent Cosmos Airflow operator
(e.g. cosmos.operators.localDbtSnapshotLocalOperator)
"""
- return f"cosmos.operators.{execution_mode.value}.{dbt_class}{execution_mode.value.capitalize()}Operator"
+ return (
+ f"cosmos.operators.{execution_mode.value}.{dbt_class}{_snake_case_to_camelcase(execution_mode.value)}Operator"
+ )
def calculate_leaves(tasks_ids: list[str], nodes: dict[str, DbtNode]) -> list[str]:
diff --git a/cosmos/constants.py b/cosmos/constants.py
index 96c5bdd07..4741d621d 100644
--- a/cosmos/constants.py
+++ b/cosmos/constants.py
@@ -50,6 +50,7 @@ class ExecutionMode(Enum):
DOCKER = "docker"
KUBERNETES = "kubernetes"
VIRTUALENV = "virtualenv"
+ AZURE_CONTAINER_INSTANCE = "azure_container_instance"
class TestIndirectSelection(Enum):
diff --git a/cosmos/operators/azure_container_instance.py b/cosmos/operators/azure_container_instance.py
new file mode 100644
index 000000000..903524533
--- /dev/null
+++ b/cosmos/operators/azure_container_instance.py
@@ -0,0 +1,133 @@
+from __future__ import annotations
+
+from typing import Any, Callable, Sequence
+
+from airflow.utils.context import Context
+from cosmos.config import ProfileConfig
+
+from cosmos.log import get_logger
+from cosmos.operators.base import (
+ AbstractDbtBaseOperator,
+ DbtRunMixin,
+ DbtSeedMixin,
+ DbtSnapshotMixin,
+ DbtTestMixin,
+ DbtLSMixin,
+ DbtRunOperationMixin,
+)
+
+logger = get_logger(__name__)
+
+# ACI is an optional dependency, so we need to check if it's installed
+try:
+ from airflow.providers.microsoft.azure.operators.container_instances import AzureContainerInstancesOperator
+except ImportError:
+ raise ImportError(
+ "Could not import AzureContainerInstancesOperator. Ensure you've installed the Microsoft Azure provider "
+ "separately or with `pip install astronomer-cosmos[...,azure-container-instance]`."
+ )
+
+
+class DbtAzureContainerInstanceBaseOperator(AzureContainerInstancesOperator, AbstractDbtBaseOperator): # type: ignore
+ """
+ Executes a dbt core cli command in an Azure Container Instance
+ """
+
+ template_fields: Sequence[str] = tuple(
+ list(AbstractDbtBaseOperator.template_fields) + list(AzureContainerInstancesOperator.template_fields)
+ )
+
+ def __init__(
+ self,
+ ci_conn_id: str,
+ resource_group: str,
+ name: str,
+ image: str,
+ region: str,
+ profile_config: ProfileConfig | None = None,
+ remove_on_error: bool = False,
+ fail_if_exists: bool = False,
+ registry_conn_id: str | None = None, # need to add a default for Airflow 2.3 support
+ **kwargs: Any,
+ ) -> None:
+ self.profile_config = profile_config
+ super().__init__(
+ ci_conn_id=ci_conn_id,
+ resource_group=resource_group,
+ name=name,
+ image=image,
+ region=region,
+ remove_on_error=remove_on_error,
+ fail_if_exists=fail_if_exists,
+ registry_conn_id=registry_conn_id,
+ **kwargs,
+ )
+
+ def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> None:
+ self.build_command(context, cmd_flags)
+ self.log.info(f"Running command: {self.command}")
+ result = super().execute(context)
+ logger.info(result)
+
+ def build_command(self, context: Context, cmd_flags: list[str] | None = None) -> None:
+ # For the first round, we're going to assume that the command is dbt
+ # This means that we don't have openlineage support, but we will create a ticket
+ # to add that in the future
+ self.dbt_executable_path = "dbt"
+ dbt_cmd, env_vars = self.build_cmd(context=context, cmd_flags=cmd_flags)
+ self.environment_variables: dict[str, Any] = {**env_vars, **self.environment_variables}
+ self.command: list[str] = dbt_cmd
+
+
+class DbtLSAzureContainerInstanceOperator(DbtLSMixin, DbtAzureContainerInstanceBaseOperator):
+ """
+ Executes a dbt core ls command.
+ """
+
+
+class DbtSeedAzureContainerInstanceOperator(DbtSeedMixin, DbtAzureContainerInstanceBaseOperator):
+ """
+ Executes a dbt core seed command.
+
+ :param full_refresh: dbt optional arg - dbt will treat incremental models as table models
+ """
+
+ template_fields: Sequence[str] = DbtAzureContainerInstanceBaseOperator.template_fields + DbtRunMixin.template_fields # type: ignore[operator]
+
+
+class DbtSnapshotAzureContainerInstanceOperator(DbtSnapshotMixin, DbtAzureContainerInstanceBaseOperator):
+ """
+ Executes a dbt core snapshot command.
+
+ """
+
+
+class DbtRunAzureContainerInstanceOperator(DbtRunMixin, DbtAzureContainerInstanceBaseOperator):
+ """
+ Executes a dbt core run command.
+ """
+
+ template_fields: Sequence[str] = DbtAzureContainerInstanceBaseOperator.template_fields + DbtRunMixin.template_fields # type: ignore[operator]
+
+
+class DbtTestAzureContainerInstanceOperator(DbtTestMixin, DbtAzureContainerInstanceBaseOperator):
+ """
+ Executes a dbt core test command.
+ """
+
+ def __init__(self, on_warning_callback: Callable[..., Any] | None = None, **kwargs: Any) -> None:
+ super().__init__(**kwargs)
+ # as of now, on_warning_callback in azure container instance executor does nothing
+ self.on_warning_callback = on_warning_callback
+
+
+class DbtRunOperationAzureContainerInstanceOperator(DbtRunOperationMixin, DbtAzureContainerInstanceBaseOperator):
+ """
+ Executes a dbt core run-operation command.
+
+ :param macro_name: name of macro to execute
+ :param args: Supply arguments to the macro. This dictionary will be mapped to the keyword arguments defined in the
+ selected macro.
+ """
+
+ template_fields: Sequence[str] = DbtAzureContainerInstanceBaseOperator.template_fields + DbtRunOperationMixin.template_fields # type: ignore[operator]
diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py
index 83618be77..58090ef50 100644
--- a/cosmos/operators/local.py
+++ b/cosmos/operators/local.py
@@ -12,7 +12,7 @@
import airflow
import jinja2
from airflow import DAG
-from airflow.compat.functools import cached_property
+from functools import cached_property
from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.models.taskinstance import TaskInstance
diff --git a/cosmos/operators/virtualenv.py b/cosmos/operators/virtualenv.py
index 04d4053c6..bad88e234 100644
--- a/cosmos/operators/virtualenv.py
+++ b/cosmos/operators/virtualenv.py
@@ -4,7 +4,7 @@
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING, Any
-from airflow.compat.functools import cached_property
+from functools import cached_property
from airflow.utils.python_virtualenv import prepare_virtualenv
from cosmos.hooks.subprocess import FullOutputSubprocessResult
diff --git a/docs/_static/cosmos_aci_schematic.png b/docs/_static/cosmos_aci_schematic.png
new file mode 100644
index 000000000..ece19a426
Binary files /dev/null and b/docs/_static/cosmos_aci_schematic.png differ
diff --git a/docs/_static/jaffle_shop_azure_container_instance.png b/docs/_static/jaffle_shop_azure_container_instance.png
new file mode 100644
index 000000000..48d8e450e
Binary files /dev/null and b/docs/_static/jaffle_shop_azure_container_instance.png differ
diff --git a/docs/getting_started/azure-container-instance.rst b/docs/getting_started/azure-container-instance.rst
new file mode 100644
index 000000000..8f979f514
--- /dev/null
+++ b/docs/getting_started/azure-container-instance.rst
@@ -0,0 +1,135 @@
+.. _azure-container-instance
+
+Azure Container Instance Execution Mode
+=======================================
+.. versionadded:: 1.4
+This tutorial will guide you through the steps required to use Azure Container Instance as the Execution Mode for your dbt code with Astronomer Cosmos. Schematically, the guide will walk you through the steps required to build the following architecture:
+
+.. figure:: https://github.com/astronomer/astronomer-cosmos/raw/main/docs/_static/cosmos_aci_schematic.png
+ :width: 800
+
+Prerequisites
++++++++++++++
+1. Docker with docker daemon (Docker Desktop on MacOS). Follow the `Docker installation guide `_.
+2. Airflow
+3. Azure CLI (install guide here: `Azure CLI `_)
+4. Astronomer-cosmos package containing the dbt Azure Container Instance operators
+5. Azure account with:
+ 1. A resource group
+ 2. A service principal with `Contributor` permissions on the resource group
+ 3. A Container Registry
+ 4. A Postgres instance accessible from Azure. (we use an Azure Postgres instance in the example)
+6. Docker image built with required dbt project and dbt DAG
+7. dbt DAG with dbt Azure Container Instance operators in the Airflow DAGs directory to run in Airflow
+
+More information on how to achieve 2-6 is detailed below.
+
+Note that the steps below will walk you through an example, for which the code can be found HERE
+
+Step-by-step guide
+++++++++++++++++++
+
+**Install Airflow and Cosmos**
+
+Create a python virtualenv, activate it, upgrade pip to the latest version and install apache airflow & astronomer-postgres
+
+.. code-block:: bash
+
+ python -m venv venv
+ source venv/bin/activate
+ pip install --upgrade pip
+ pip install apache-airflow
+ pip install "astronomer-cosmos[dbt-postgres,azure-container-instance]"
+
+**Setup Postgres database**
+
+You will need a postgres database running to be used as the database for the dbt project. In order to have it accessible from Azure Container Instance, the easiest way is to create an Azure Postgres instance. For this, run the following (assuming you are logged into your Azure account)
+
+.. code-block:: bash
+
+ az postgres server create -l westeurope -g <<>> -n <<>> -u dbadmin -p <<>> --sku-name B_Gen5_1 --ssl-enforcement Enabled
+
+
+**Setup Azure Container Registry**
+In order to run a container in Azure Container Instance, it needs access to the container image. In our setup, we will use Azure Container Registry for this. To set an Azure Container Registry up, you can use the following bash command:
+
+.. code-block:: bash
+ az acr create --name <<>> --resource-group <<>> --sku Basic --admin-enabled
+
+**Build the dbt Docker image**
+
+For the Docker operators to work, you need to create a docker image that will be supplied as image parameter to the dbt docker operators used in the DAG.
+
+Clone the `cosmos-example `_ repo
+
+.. code-block:: bash
+
+ git clone https://github.com/astronomer/cosmos-example.git
+ cd cosmos-example
+
+Create a docker image containing the dbt project files and dbt profile by using the `Dockerfile `_, which will be supplied to the Docker operators.
+
+.. code-block:: bash
+
+ docker build -t <<>:1.0.0 -f Dockerfile.azure_container_instance .
+
+After this, the image needs to be pushed to the registry of your choice. Note that your image name should contain the name of your registry:
+.. code-block:: bash
+
+ docker push <<>>:1.0.0
+
+.. note::
+
+ You may need to ensure docker knows to use the right credentials. If using Azure Container Registry, this can be done by running the following command:
+ .. code-block:: bash
+ az acr login
+
+.. note::
+
+ If running on M1, you may need to set the following envvars for running the docker build command in case it fails
+
+ .. code-block:: bash
+
+ export DOCKER_BUILDKIT=0
+ export COMPOSE_DOCKER_CLI_BUILD=0
+ export DOCKER_DEFAULT_PLATFORM=linux/amd64
+
+Take a read of the Dockerfile to understand what it does so that you could use it as a reference in your project.
+
+ - The `dbt profile `_ file is added to the image
+ - The dags directory containing the `dbt project jaffle_shop `_ is added to the image
+ - The dbt_project.yml is replaced with `postgres_profile_dbt_project.yml `_ which contains the profile key pointing to postgres_profile as profile creation is not handled at the moment for K8s operators like in local mode.
+
+**Setup Airflow Connections**
+Now you have the required Azure infrastructure, you still need to add configuration to Airflow to ensure the infrastructure can be used. You'll need 3 connections:
+
+1. ``aci_db``: a Postgres connection to your Azure Postgres instance.
+2. ``aci``: an Azure Container Instance connection configured with a Service Principal with sufficient permissions (i.e. ``Contributor`` on the resource group in which you will use Azure Container Instances).
+3. ``acr``: an Azure Container Registry connection configured for your Azure Container Registry.
+
+Check out the ``airflow-settings.yml`` file `here `_ for an example. If you are using Astro CLI, filling in the right values here will be enough for this to work.
+
+**Setup and Trigger the DAG with Airflow**
+
+Copy the dags directory from cosmos-example repo to your Airflow home
+
+.. code-block:: bash
+
+ cp -r dags $AIRFLOW_HOME/
+
+Run Airflow
+
+.. code-block:: bash
+
+ airflow standalone
+
+.. note::
+
+ You might need to run airflow standalone with ``sudo`` if your Airflow user is not able to access the docker socket URL or pull the images in the Kind cluster.
+
+Log in to Airflow through a web browser ``http://localhost:8080/``, using the user ``airflow`` and the password described in the ``standalone_admin_password.txt`` file.
+
+Enable and trigger a run of the `jaffle_shop_azure_container_instance `_ DAG. You will be able to see the following successful DAG run.
+
+.. figure:: https://github.com/astronomer/astronomer-cosmos/raw/main/docs/_static/jaffle_shop_azure_container_instance.png
+ :width: 800
diff --git a/docs/getting_started/execution-modes.rst b/docs/getting_started/execution-modes.rst
index 721138238..924e4ba12 100644
--- a/docs/getting_started/execution-modes.rst
+++ b/docs/getting_started/execution-modes.rst
@@ -3,18 +3,19 @@
Execution Modes
===============
-Cosmos can run ``dbt`` commands using four different approaches, called ``execution modes``:
+Cosmos can run ``dbt`` commands using five different approaches, called ``execution modes``:
1. **local**: Run ``dbt`` commands using a local ``dbt`` installation (default)
2. **virtualenv**: Run ``dbt`` commands from Python virtual environments managed by Cosmos
3. **docker**: Run ``dbt`` commands from Docker containers managed by Cosmos (requires a pre-existing Docker image)
4. **kubernetes**: Run ``dbt`` commands from Kubernetes Pods managed by Cosmos (requires a pre-existing Docker image)
+5. **azure_container_instance**: Run ``dbt`` commands from Azure Container Instances managed by Cosmos (requires a pre-existing Docker image)
The choice of the ``execution mode`` can vary based on each user's needs and concerns. For more details, check each execution mode described below.
.. list-table:: Execution Modes Comparison
- :widths: 25 25 25 25
+ :widths: 20 20 20 20 20
:header-rows: 1
* - Execution Mode
@@ -37,6 +38,10 @@ The choice of the ``execution mode`` can vary based on each user's needs and con
- Slow
- High
- No
+ * - Azure Container Instance
+ - Slow
+ - High
+ - No
Local
-----
@@ -117,7 +122,7 @@ Example DAG:
Kubernetes
----------
-Lastly, the ``kubernetes`` approach is the most isolated way of running ``dbt`` since the ``dbt`` run commands from within a Kubernetes Pod, usually in a separate host.
+The ``kubernetes`` approach is a very isolated way of running ``dbt`` since the ``dbt`` run commands from within a Kubernetes Pod, usually in a separate host.
It assumes the user has a Kubernetes cluster. It also expects the user to ensure the Docker container has up-to-date ``dbt`` pipelines and profiles, potentially leading the user to declare secrets in two places (Airflow and Docker container).
@@ -148,3 +153,30 @@ Example DAG:
"secrets": [postgres_password_secret],
},
)
+
+Azure Container Instance
+------------------------
+.. versionadded:: 1.4
+Similar to the ``kubernetes`` approach, using ``Azure Container Instances`` as the execution mode gives a very isolated way of running ``dbt``, since the ``dbt`` run itself is run within a container running in an Azure Container Instance.
+
+This execution mode requires the user has an Azure environment that can be used to run Azure Container Groups in (see :ref:`azure-container-instance` for more details on the exact requirements). Similarly to the ``Docker`` and ``Kubernetes`` execution modes, a Docker container should be available, containing the up-to-date ``dbt`` pipelines and profiles.
+
+Each task will create a new container on Azure, giving full isolation. This, however, comes at the cost of speed, as this separation of tasks introduces some overhead. Please checkout the step-by-step guide for using Azure Container Instance as the execution mode
+
+
+.. code-block:: python
+
+ docker_cosmos_dag = DbtDag(
+ # ...
+ execution_config=ExecutionConfig(
+ execution_mode=ExecutionMode.AZURE_CONTAINER_INSTANCE
+ ),
+ operator_args={
+ "ci_conn_id": "aci",
+ "registry_conn_id": "acr",
+ "resource_group": "my-rg",
+ "name": "my-aci-{{ ti.task_id.replace('.','-').replace('_','-') }}",
+ "region": "West Europe",
+ "image": "dbt-jaffle-shop:1.0.0",
+ },
+ )
diff --git a/pyproject.toml b/pyproject.toml
index e2ade9b7c..7536e703a 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -121,6 +121,9 @@ kubernetes = [
pydantic = [
"pydantic>=1.10.0",
]
+azure-container-instance = [
+ "apache-airflow-providers-microsoft-azure>=8.4.0",
+]
[project.entry-points.cosmos]
provider_info = "cosmos:get_provider_info"
@@ -148,27 +151,26 @@ packages = ["cosmos"]
[tool.hatch.envs.tests]
dependencies = [
"astronomer-cosmos[tests]",
+ "apache-airflow-providers-cncf-kubernetes>=5.1.1",
+ "apache-airflow-providers-docker>=3.5.0",
+ "apache-airflow-providers-microsoft-azure",
"types-PyYAML",
"types-attrs",
"types-requests",
"types-python-dateutil",
"Werkzeug<3.0.0",
- "apache-airflow-providers-cncf-kubernetes>=5.1.1",
- "apache-airflow-providers-docker>=3.5.0",
-]
-# Airflow install with constraint file, Airflow versions < 2.7 require a workaround for PyYAML
-pre-install-commands = ["""
- if [[ "2.3 2.4 2.5 2.6" =~ "{matrix:airflow}" ]]; then
- echo "Cython < 3" >> /tmp/constraint.txt
- pip wheel "PyYAML==6.0.0" -c /tmp/constraint.txt
- fi
- pip install 'apache-airflow=={matrix:airflow}' --constraint 'https://raw.githubusercontent.com/apache/airflow/constraints-{matrix:airflow}.0/constraints-{matrix:python}.txt'
- """
+ "apache-airflow=={matrix:airflow}.0",
]
+pre-install-commands = ["sh scripts/test/pre-install-airflow.sh {matrix:airflow} {matrix:python}"]
+
[[tool.hatch.envs.tests.matrix]]
python = ["3.8", "3.9", "3.10", "3.11"]
airflow = ["2.3", "2.4", "2.5", "2.6", "2.7", "2.8"]
+[tool.hatch.envs.tests.overrides]
+matrix.airflow.dependencies = [
+ { value = "typing_extensions<4.6", if = ["2.6"] }
+]
[tool.hatch.envs.tests.scripts]
freeze = "pip freeze"
diff --git a/scripts/test/pre-install-airflow.sh b/scripts/test/pre-install-airflow.sh
new file mode 100644
index 000000000..de29703df
--- /dev/null
+++ b/scripts/test/pre-install-airflow.sh
@@ -0,0 +1,14 @@
+#!/bin/bash
+
+AIRFLOW_VERSION="$1"
+PYTHON_VERSION="$2"
+
+CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-$AIRFLOW_VERSION.0/constraints-$PYTHON_VERSION.txt"
+curl -sSL $CONSTRAINT_URL -o /tmp/constraint.txt
+# Workaround to remove PyYAML constraint that will work on both Linux and MacOS
+sed '/PyYAML==/d' /tmp/constraint.txt > /tmp/constraint.txt.tmp
+mv /tmp/constraint.txt.tmp /tmp/constraint.txt
+# Install Airflow with constraints
+pip install apache-airflow==$AIRFLOW_VERSION --constraint /tmp/constraint.txt
+pip install pydantic --constraint /tmp/constraint.txt
+rm /tmp/constraint.txt
diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py
index 255f8afc3..16a2c7e07 100644
--- a/tests/airflow/test_graph.py
+++ b/tests/airflow/test_graph.py
@@ -15,6 +15,7 @@
create_task_metadata,
create_test_task_metadata,
generate_task_or_group,
+ _snake_case_to_camelcase,
)
from cosmos.config import ProfileConfig, RenderConfig
from cosmos.constants import (
@@ -423,3 +424,10 @@ def test_create_test_task_metadata(node_type, node_unique_id, test_indirect_sele
},
**additional_arguments,
}
+
+
+@pytest.mark.parametrize(
+ "input,expected", [("snake_case", "SnakeCase"), ("snake_case_with_underscores", "SnakeCaseWithUnderscores")]
+)
+def test_snake_case_to_camelcase(input, expected):
+ assert _snake_case_to_camelcase(input) == expected
diff --git a/tests/operators/test_azure_container_instance.py b/tests/operators/test_azure_container_instance.py
new file mode 100644
index 000000000..da7720958
--- /dev/null
+++ b/tests/operators/test_azure_container_instance.py
@@ -0,0 +1,145 @@
+from pathlib import Path
+from unittest.mock import MagicMock, patch
+
+from airflow.utils.context import Context
+from pendulum import datetime
+
+from cosmos.operators.azure_container_instance import (
+ DbtAzureContainerInstanceBaseOperator,
+ DbtLSAzureContainerInstanceOperator,
+ DbtRunAzureContainerInstanceOperator,
+ DbtTestAzureContainerInstanceOperator,
+ DbtSeedAzureContainerInstanceOperator,
+)
+
+
+class ConcreteDbtAzureContainerInstanceOperator(DbtAzureContainerInstanceBaseOperator):
+ base_cmd = ["cmd"]
+
+
+def test_dbt_azure_container_instance_operator_add_global_flags() -> None:
+ dbt_base_operator = ConcreteDbtAzureContainerInstanceOperator(
+ ci_conn_id="my_airflow_connection",
+ task_id="my-task",
+ image="my_image",
+ region="Mordor",
+ name="my-aci",
+ resource_group="my-rg",
+ project_dir="my/dir",
+ vars={
+ "start_time": "{{ data_interval_start.strftime('%Y%m%d%H%M%S') }}",
+ "end_time": "{{ data_interval_end.strftime('%Y%m%d%H%M%S') }}",
+ },
+ no_version_check=True,
+ )
+ assert dbt_base_operator.add_global_flags() == [
+ "--vars",
+ "end_time: '{{ data_interval_end.strftime(''%Y%m%d%H%M%S'') }}'\n"
+ "start_time: '{{ data_interval_start.strftime(''%Y%m%d%H%M%S'') }}'\n",
+ "--no-version-check",
+ ]
+
+
+@patch("cosmos.operators.base.context_to_airflow_vars")
+def test_dbt_azure_container_instance_operator_get_env(p_context_to_airflow_vars: MagicMock) -> None:
+ """
+ If an end user passes in a variable via the context that is also a global flag, validate that the both are kept
+ """
+ dbt_base_operator = ConcreteDbtAzureContainerInstanceOperator(
+ ci_conn_id="my_airflow_connection",
+ task_id="my-task",
+ image="my_image",
+ region="Mordor",
+ name="my-aci",
+ resource_group="my-rg",
+ project_dir="my/dir",
+ )
+ dbt_base_operator.env = {
+ "start_date": "20220101",
+ "end_date": "20220102",
+ "some_path": Path(__file__),
+ "retries": 3,
+ ("tuple", "key"): "some_value",
+ }
+ p_context_to_airflow_vars.return_value = {"START_DATE": "2023-02-15 12:30:00"}
+ env = dbt_base_operator.get_env(
+ Context(execution_date=datetime(2023, 2, 15, 12, 30)),
+ )
+ expected_env = {
+ "start_date": "20220101",
+ "end_date": "20220102",
+ "some_path": Path(__file__),
+ "START_DATE": "2023-02-15 12:30:00",
+ }
+ assert env == expected_env
+
+
+@patch("cosmos.operators.base.context_to_airflow_vars")
+def test_dbt_azure_container_instance_operator_check_environment_variables(
+ p_context_to_airflow_vars: MagicMock,
+) -> None:
+ """
+ If an end user passes in a variable via the context that is also a global flag, validate that the both are kept
+ """
+ dbt_base_operator = ConcreteDbtAzureContainerInstanceOperator(
+ ci_conn_id="my_airflow_connection",
+ task_id="my-task",
+ image="my_image",
+ region="Mordor",
+ name="my-aci",
+ resource_group="my-rg",
+ project_dir="my/dir",
+ environment_variables={"FOO": "BAR"},
+ )
+ dbt_base_operator.env = {
+ "start_date": "20220101",
+ "end_date": "20220102",
+ "some_path": Path(__file__),
+ "retries": 3,
+ "FOO": "foo",
+ ("tuple", "key"): "some_value",
+ }
+ expected_env = {"start_date": "20220101", "end_date": "20220102", "some_path": Path(__file__), "FOO": "BAR"}
+ dbt_base_operator.build_command(context=MagicMock())
+
+ assert dbt_base_operator.environment_variables == expected_env
+
+
+base_kwargs = {
+ "ci_conn_id": "my_airflow_connection",
+ "name": "my-aci",
+ "region": "Mordor",
+ "resource_group": "my-rg",
+ "task_id": "my-task",
+ "image": "my_image",
+ "project_dir": "my/dir",
+ "vars": {
+ "start_time": "{{ data_interval_start.strftime('%Y%m%d%H%M%S') }}",
+ "end_time": "{{ data_interval_end.strftime('%Y%m%d%H%M%S') }}",
+ },
+ "no_version_check": True,
+}
+
+result_map = {
+ "ls": DbtLSAzureContainerInstanceOperator(**base_kwargs),
+ "run": DbtRunAzureContainerInstanceOperator(**base_kwargs),
+ "test": DbtTestAzureContainerInstanceOperator(**base_kwargs),
+ "seed": DbtSeedAzureContainerInstanceOperator(**base_kwargs),
+}
+
+
+def test_dbt_azure_container_instance_build_command():
+ """
+ Since we know that the AzureContainerInstanceOperator is tested, we can just test that the
+ command is built correctly.
+ """
+ for command_name, command_operator in result_map.items():
+ command_operator.build_command(context=MagicMock(), cmd_flags=MagicMock())
+ assert command_operator.command == [
+ "dbt",
+ command_name,
+ "--vars",
+ "end_time: '{{ data_interval_end.strftime(''%Y%m%d%H%M%S'') }}'\n"
+ "start_time: '{{ data_interval_start.strftime(''%Y%m%d%H%M%S'') }}'\n",
+ "--no-version-check",
+ ]