Skip to content

Commit

Permalink
Run some example in Kubernetes execution mode in CI
Browse files Browse the repository at this point in the history
  • Loading branch information
pankajastro committed Jul 29, 2024
1 parent bbec5d8 commit f73ad6a
Show file tree
Hide file tree
Showing 9 changed files with 252 additions and 5 deletions.
19 changes: 19 additions & 0 deletions .github/workflows/test_kubernetes.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
name: Create Cluster

on: push

jobs:
create-cluster:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Kubernetes KinD Cluster
uses: container-tools/kind-action@v1
- name: Run setup
run: |
sh ./scripts/test/kubernetes-setup.sh
pip install hatch
hatch -e tests.py3.9-2.9 run pip freeze
hatch run tests.py3.9-2.9:test-kubernetes
kubectl get pods -o wide
kubectl logs postgres-postgresql-0
17 changes: 17 additions & 0 deletions dev/Dockerfile.postgres_profile_docker_k8s
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
FROM python:3.9

RUN pip install dbt-postgres==1.3.1 psycopg2==2.9.3 pytz

ENV POSTGRES_DATABASE=postgres
ENV POSTGRES_HOST=postgres-postgresql.default.svc.cluster.local
ENV POSTGRES_PASSWORD=<postgres_password>
ENV POSTGRES_PORT=5432
ENV POSTGRES_SCHEMA=public
ENV POSTGRES_USER=postgres

RUN mkdir /root/.dbt
COPY dags/dbt/jaffle_shop/profiles.yml /root/.dbt/profiles.yml

RUN mkdir dags
COPY dags dags
RUN rm dags/dbt/jaffle_shop/packages.yml
12 changes: 12 additions & 0 deletions dev/dags/dbt/jaffle_shop/profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,15 @@ default:
dbname: "{{ env_var('POSTGRES_DB') }}"
schema: "{{ env_var('POSTGRES_SCHEMA') }}"
threads: 4

postgres_profile:
target: dev
outputs:
dev:
type: postgres
dbname: '{{ env_var(''POSTGRES_DATABASE'') }}'
host: '{{ env_var(''POSTGRES_HOST'') }}'
pass: '{{ env_var(''POSTGRES_PASSWORD'') }}'
port: '{{ env_var(''POSTGRES_PORT'') | as_number }}'
schema: '{{ env_var(''POSTGRES_SCHEMA'') }}'
user: '{{ env_var(''POSTGRES_USER'') }}'
67 changes: 67 additions & 0 deletions dev/dags/jaffle_shop_kubernetes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""
## Jaffle Shop DAG
[Jaffle Shop](https://github.com/dbt-labs/jaffle_shop) is a fictional eCommerce store. This dbt project originates from
dbt labs as an example project with dummy data to demonstrate a working dbt core project. This DAG uses the cosmos dbt
parser to generate an Airflow TaskGroup from the dbt project folder.
The step-by-step to run this DAG are described in:
https://astronomer.github.io/astronomer-cosmos/getting_started/kubernetes.html#kubernetes
"""
from airflow import DAG
from airflow.providers.cncf.kubernetes.secret import Secret
from pendulum import datetime

from cosmos import (
ProfileConfig,
DbtSeedKubernetesOperator,
)
from cosmos.profiles import PostgresUserPasswordProfileMapping

DBT_IMAGE = "dbt-jaffle-shop:1.0.0"


project_seeds = [
{"project": "jaffle_shop", "seeds": ["raw_customers", "raw_payments", "raw_orders"]}
]

postgres_password_secret = Secret(
deploy_type="env",
deploy_target="POSTGRES_PASSWORD",
secret="postgres-secrets",
key="password",
)

postgres_host_secret = Secret(
deploy_type="env",
deploy_target="POSTGRES_HOST",
secret="postgres-secrets",
key="host",
)

with DAG(
dag_id="jaffle_shop_kubernetes",
start_date=datetime(2022, 11, 27),
doc_md=__doc__,
catchup=False,
) as dag:
load_seeds = DbtSeedKubernetesOperator(
task_id="load_seeds",
project_dir="dags/dbt/jaffle_shop",
get_logs=True,
schema="public",
image=DBT_IMAGE,
is_delete_operator_pod=False,
secrets=[postgres_password_secret, postgres_host_secret],
profile_config=ProfileConfig(
profile_name="postgres_profile",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="postgres_default",
profile_args={
"schema": "public",
}
)
)
)
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ freeze = "pip freeze"
test = 'sh scripts/test/unit.sh'
test-cov = 'sh scripts/test/unit-cov.sh'
test-integration = 'sh scripts/test/integration.sh'
test-kubernetes = "sh scripts/test/integration-kubernetes.sh"
test-integration-dbt-1-5-4 = 'sh scripts/test/integration-dbt-1-5-4.sh'
test-integration-expensive = 'sh scripts/test/integration-expensive.sh'
test-integration-setup = 'sh scripts/test/integration-setup.sh'
Expand Down
9 changes: 9 additions & 0 deletions scripts/test/integration-kubernetes.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/bin/bash

set -x
set -e


airflow db reset -y

pytest tests/test_example_k8s_dags.py
77 changes: 77 additions & 0 deletions scripts/test/kubernetes-setup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#!/bin/bash

set -x
set -e

check_nodes_ready() {
# Get the list of node statuses
node_statuses=$(kubectl get nodes --no-headers | awk '{print $2}')
# Check if all nodes are in the "Ready" state
for status in $node_statuses; do
if [ "$status" != "Ready" ]; then
return 1
fi
done
return 0
}

wait_for_nodes_ready() {
local max_attempts=60
local interval=5
local attempt=0

echo "Waiting for nodes in the kind cluster to be in 'Ready' state..."

while [ $attempt -lt $max_attempts ]; do
if check_nodes_ready; then
echo "All nodes in the kind cluster are in 'Ready' state."
return 0
else
echo "Nodes are not yet ready. Checking again in $interval seconds..."
sleep $interval
attempt=$((attempt + 1))
fi
done

echo "Timeout waiting for nodes in the kind cluster to be in 'Ready' state."
return 1
}

kubectl config set-context default

# Deploy a Postgres pod to Kind
helm repo add bitnami https://charts.bitnami.com/bitnami
helm repo update
helm upgrade --install postgres bitnami/postgresql

# Retrieve the Postgres password and set it as an environment variable
POSTGRES_PASSWORD=$(kubectl get secret --namespace default postgres-postgresql -o jsonpath="{.data.postgres-password}" | base64 -d)
export POSTGRES_PASSWORD

# Expose the Postgres to the host running Docker/Kind
kubectl port-forward --namespace default postgres-postgresql-0 5432:5432 &
kubectl create secret generic postgres-secrets --from-literal=host=postgres-postgresql.default.svc.cluster.local --from-literal=password=$POSTGRES_PASSWORD

# Create a docker image containing the dbt project files and dbt profile
cd dev && docker build -t dbt-jaffle-shop:1.0.0 -f Dockerfile.postgres_profile_docker_k8s .

wait_for_nodes_ready

# Make the build image available in the Kind K8s cluster
kind load docker-image dbt-jaffle-shop:1.0.0

# Wait for the kind cluster to be in 'Ready' state
wait_for_nodes_ready

# For Debugging
echo "nodes"
kubectl get nodes
echo "helm"
helm list
echo "pod service"
kubectl get pods --namespace default
kubectl get svc --namespace default
echo "pg log"
kubectl logs postgres-postgresql-0 -c postgresql
kubectl describe pod postgres-postgresql-0

11 changes: 6 additions & 5 deletions tests/test_example_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
except ImportError:
from functools import lru_cache as cache


import airflow
import pytest
from airflow.models.dagbag import DagBag
Expand All @@ -32,7 +31,6 @@

IGNORED_DAG_FILES = ["performance_dag.py"]


# Sort descending based on Versions and convert string to an actual version
MIN_VER_DAG_FILE_VER: dict[Version, list[str]] = {
Version(version): MIN_VER_DAG_FILE[version] for version in sorted(MIN_VER_DAG_FILE, key=Version, reverse=True)
Expand All @@ -51,7 +49,7 @@ def session():


@cache
def get_dag_bag() -> DagBag:
def get_dag_bag(in_kube: bool = False) -> DagBag:
"""Create a DagBag by adding the files that are not supported to .airflowignore"""
if AIRFLOW_VERSION in PARTIALLY_SUPPORTED_AIRFLOW_VERSIONS:
return DagBag(dag_folder=None, include_examples=False)
Expand All @@ -72,6 +70,9 @@ def get_dag_bag() -> DagBag:
if DBT_VERSION < Version("1.6.0"):
file.writelines(["example_model_version.py\n"])

if not in_kube:
file.writelines(["jaffle_shop_kubernetes.py\n"])

print(".airflowignore contents: ")
print(AIRFLOW_IGNORE_FILE.read_text())
db = DagBag(EXAMPLE_DAGS_DIR, include_examples=False)
Expand All @@ -80,8 +81,8 @@ def get_dag_bag() -> DagBag:
return db


def get_dag_ids() -> list[str]:
dag_bag = get_dag_bag()
def get_dag_ids(in_kube: bool = False) -> list[str]:
dag_bag = get_dag_bag(in_kube)
return dag_bag.dag_ids


Expand Down
44 changes: 44 additions & 0 deletions tests/test_example_k8s_dags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import os
from pathlib import Path

import pytest
from airflow.models.dagbag import DagBag

from . import utils as test_utils
from airflow.utils.db import create_default_connections
from airflow.utils.session import provide_session

EXAMPLE_DAGS_DIR = Path(__file__).parent.parent / "dev/dags"
AIRFLOW_IGNORE_FILE = EXAMPLE_DAGS_DIR / ".airflowignore"

KUBERNETES_DAG_FILES = ["jaffle_shop_kubernetes.py"]


@provide_session
def get_session(session=None):
create_default_connections(session)
return session


@pytest.fixture()
def session():
return get_session()


def get_all_dag_files():
python_files = []
for file in os.listdir(EXAMPLE_DAGS_DIR):
if file.endswith(".py") and file not in KUBERNETES_DAG_FILES:
python_files.append(file)

with open(AIRFLOW_IGNORE_FILE, "w+") as dag_ignorefile:
dag_ignorefile.writelines([f"{file}\n" for file in python_files])


@pytest.mark.integration
def test_example_dag_kubernetes(session):
get_all_dag_files()
db = DagBag(EXAMPLE_DAGS_DIR, include_examples=False)
#for dag_id in KUBERNETES_DAG_FILES:
dag = db.get_dag("jaffle_shop_kubernetes")
test_utils.run_dag(dag)

0 comments on commit f73ad6a

Please sign in to comment.