Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to coordinate Airflow run with versioning #293

Open
noklam opened this issue Aug 3, 2023 · 0 comments
Open

How to coordinate Airflow run with versioning #293

noklam opened this issue Aug 3, 2023 · 0 comments
Labels
technical documentation A technical documentation for workarounds or technical decision

Comments

@noklam
Copy link
Contributor

noklam commented Aug 3, 2023

Description

Is your feature request related to a problem? A clear and concise description of what the problem is: "I'm always frustrated when ..."

User need to pass a value of model_version to the Kedro pipeline. It works fine locally but when deployed as Airflow DAGs, it is triggered as many indivdiual KedroOperator. There is no easy way to share information across the DAGs.

This issue document a solution reported by an user. Note that this works but if we need to use the build in versioned: true feature it won't work because each KedroSession has its own session_id. kedro-org/kedro#1731

More Context

The user here want to use current timestamp as model_version, which has similar idea to session_id.

    def create_model_version(**kwargs):
        model_version = datetime.now().strftime('%Y%m%d-%H%M%S')
        kwargs['ti'].xcom_push(key='model_version', value=model_version)

Workaround

with DAG(
        "test-fi",
        start_date=datetime(2023, 1, 1),
        max_active_runs=3,
        schedule_interval=timedelta(days=30),  
        default_args=default_args,
        catchup=False 
) as dag:
    tasks = {}


    def create_model_version(**kwargs):
        model_version = datetime.now().strftime('%Y%m%d-%H%M%S')
        kwargs['ti'].xcom_push(key='model_version', value=model_version)


    def define_project_parameters_task(**kwargs):
        ti = kwargs['ti']
        model_version = ti.xcom_pull(task_ids='create_model_version', key='model_version')
        print(f'Model version {model_version}')
        # Use the model_version in your KedroOperator configuration
        define_project_parameters_task = KedroOperator(
            task_id="define-project-parameters",
            package_name=package_name,
            pipeline_name=pipeline_name,
            node_name="define_project_parameters",
            project_path=project_path,
            env=env,
            params={"model_version": model_version}  # Pass the model_version as a parameter
        )
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
technical documentation A technical documentation for workarounds or technical decision
Projects
None yet
Development

No branches or pull requests

2 participants