Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataset-aware scheduling with Astro-SDK doesn't work under Airflow 2.9.1 #2164

Open
skolchin opened this issue May 27, 2024 · 0 comments
Open

Comments

@skolchin
Copy link
Contributor

skolchin commented May 27, 2024

Hi, I was testing newest Airflow version 2.9.1 for compatibility with our project written using astro-sdk 1.8.0 and I found out that dataset-aware scheduling which worked fine under Airflow 2.8.4 just stopped working in new environment.

I wrote several small examples to illustrate that:

  • A file is loaded to database thus triggering dataset change
  • Table data gets copied to another table - again, triggering dataset change
  • Modification of simple Airflow dataset
import pendulum
import pandas as pd
from airflow.models import DAG
from airflow.decorators import task
from airflow.datasets import Dataset
from astro import sql as aql
from astro.files import File
from astro.sql.table import Table, Metadata

types_table = Table(name='types', conn_id='source_db', metadata=Metadata(schema='stage'))
types_copy_table = Table(name='types_copy', conn_id='source_db', metadata=Metadata(schema='stage'))

dataset = Dataset("myscheme://myhost?table=mytable")

@task
def print_triggering_dataset_events(triggering_dataset_events=None):
    """ Print out dataset trigger information """
    for dataset, event_list in triggering_dataset_events.items():
        print(f'Dataset: {dataset}')
        print(f'Events: {event_list}')

with DAG(
    dag_id='load_file',
    start_date=pendulum.today().add(days=-1),
    schedule='@daily',
    catchup=False,
    tags=['testing']
) as dag:
    """ Load file into TYPES table. This will modify `types_table` dataset and trigger corresponding DAG """
    aql.load_file(File(path='./dags/test.csv'), output_table=types_table)

with DAG(
    dag_id='triggered_by_file_load',
    start_date=pendulum.today().add(days=-1),
    schedule=[types_table],
    catchup=False,
    tags=['testing']
) as dag:
    """ This DAG is to be initiated by `types_table` dataset modifications """
    print_triggering_dataset_events()

with DAG(
    dag_id='copy-table',
    start_date=pendulum.today().add(days=-1),
    schedule='@daily',
    catchup=False,
    tags=['testing']
) as dag:
    """ Load all data from TYPES table and save into new `TYPES_COPY` table. 
    This should modify `types_copy_table` dataset and trigger corresponding DAG """

    @aql.run_raw_sql(results_format='pandas_dataframe')
    def load_table(table: Table):
        return '''select * from {{table}}'''

    @aql.dataframe
    def save_data(data: pd.DataFrame):
        return data

    data = load_table(types_table)
    save_data(data, output_table=types_copy_table)

with DAG(
    dag_id='triggered_by_copy_table',
    start_date=pendulum.today().add(days=-1),
    schedule=[types_copy_table],
    catchup=False,
    tags=['testing']
) as dag:
    """ This DAG is to be initiated by `types_copy_table` dataset modifications """
    print_triggering_dataset_events()

with DAG(
    dag_id='dataset_triggerer',
    start_date=pendulum.today().add(days=-1),
    schedule='@daily',
    catchup=False,
    tags=['testing']
) as dag:
    """ Simply trigger `dataset` dataset changes to run corresponding DAG  """

    @dag.task(outlets=[dataset])
    def trigger_dataset_event():
        print('Triggering event')

    trigger_dataset_event()

with DAG(
    dag_id='triggered_by_dataset',
    start_date=pendulum.today().add(days=-1),
    schedule=[dataset],
    catchup=False,
    tags=['testing']
) as dag:
    """ This DAG is to be initiated by `dataset` dataset modifications """
    print_triggering_dataset_events()

Under Airflow 2.8.4 everything works just fine - dependend DAGs start after dataset changes:

image

However, under Airflow 2.9.1 only the last pair of DAGs (which are using Airflow dataset) work as expected. Ones which rely on Astro-SDK tables are not triggered at all:

image

No code, obviously, gets changed, I just modify base image in Dockerfile used to build the environment (FROM apache/airflow:slim-2.8.4-python3.10 to FROM apache/airflow:slim-2.9.1-python3.10).

I could not find any clue on this in Airflow logs.

Please help to solve this. Thanks!

Versions

  • Astro-SDK: 1.8.0
  • Airflow: [2.8.4, 2.9.1]
  • Python: 3.10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant