Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

XCom delete in ui and db clean do not trigger delete for custom backend #31774

Closed
1 of 2 tasks
ronald-fenner opened this issue Jun 7, 2023 · 2 comments
Closed
1 of 2 tasks
Assignees
Labels
affected_version:main_branch Issues Reported for main branch area:UI Related to UI/UX. For Frontend Developers. kind:bug This is a clearly a bug

Comments

@ronald-fenner
Copy link

ronald-fenner commented Jun 7, 2023

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

When using the delete button in the UI or running the airflow db clean on the XCom table the delete method does not seem to get triggered for a custom backend.

What you think should happen instead

Expect the delete method to be called so the custom backend can remove the linked file.

How to reproduce

Using this custom backend

import warnings
from typing import Any, Iterable, TYPE_CHECKING

from airflow.exceptions import RemovedInAirflow3Warning
from airflow.models.xcom import BaseXCom
import json
import os

from airflow.utils.helpers import exactly_one
from airflow.utils.json import XComDecoder, XComEncoder
from airflow.utils.session import NEW_SESSION, provide_session
from sqlalchemy.orm import Session


class CustomXComBackendEFS(BaseXCom):
    DATA_FILE_PATH = '/opt/airflow/efs/data_files/xcoms'
    PREFIX = 'efs://'

    @staticmethod
    def serialize_value(
            value: Any,
            *,
            key: str | None = None,
            task_id: str | None = None,
            dag_id: str | None = None,
            run_id: str | None = None,
            map_index: int | None = None,
    ) -> Any:

        filename = "xcom_" + run_id.replace(':', '_').replace('+', '_') + ".json"
        file_path = f"{CustomXComBackendEFS.DATA_FILE_PATH}/{dag_id}/{task_id}"
        os.makedirs(file_path, exist_ok=True)
        with open(file_path + '/' + filename, 'w') as handle:
            handle.write(json.dumps(value, cls=XComEncoder))
            handle.flush()

        return BaseXCom.serialize_value(value=f'{CustomXComBackendEFS.PREFIX}{file_path}/{filename}')

    # noinspection PyUnusedLocal
    @staticmethod
    def deserialize_value(result) -> Any:
        file_path = BaseXCom.deserialize_value(result=result)
        if isinstance(file_path, str):
            if file_path.startswith(CustomXComBackendEFS.PREFIX):
                file_path = file_path.replace(CustomXComBackendEFS.PREFIX, '')
                if file_path.startswith(CustomXComBackendEFS.DATA_FILE_PATH):
                    with open(file_path, 'r') as handle:
                        content = handle.read()
                        return json.loads(content, cls=XComDecoder)

        return result

    @classmethod
    @provide_session
    def delete(cls, xcoms, session: Session) -> None:
        if isinstance(xcoms, XCom):
            xcoms = [xcoms]
        for xcom in xcoms:
            file_path = BaseXCom.deserialize_value(result=xcom)
            if isinstance(file_path, str):
                if file_path.startswith(CustomXComBackendEFS.PREFIX):
                    file_path = file_path.replace(CustomXComBackendEFS.PREFIX, '')
                    if file_path.startswith(CustomXComBackendEFS.DATA_FILE_PATH):
                        os.unlink(file_path)
        BaseXCom.delete(xcoms, session)

    @classmethod
    @provide_session
    def clear(
            cls,
            execution_date=None,
            dag_id=None,
            task_id=None,
            session=NEW_SESSION,
            *,
            run_id=None,
            map_index=None,
    ) -> None:
        from airflow.models import DagRun

        if dag_id is None:
            raise TypeError("clear() missing required argument: dag_id")
        if task_id is None:
            raise TypeError("clear() missing required argument: task_id")

        if not exactly_one(execution_date is not None, run_id is not None):
            raise ValueError(
                f"Exactly one of run_id or execution_date must be passed. "
                f"Passed execution_date={execution_date}, run_id={run_id}"
            )

        if execution_date is not None:
            message = "Passing 'execution_date' to 'XCom.clear()' is deprecated. Use 'run_id' instead."
            warnings.warn(message, RemovedInAirflow3Warning, stacklevel=3)
            run_id = (
                session.query(DagRun.run_id)
                .filter(DagRun.dag_id == dag_id, DagRun.execution_date == execution_date)
                .scalar()
            )

        filename = "xcom_" + run_id.replace(':', '_').replace('+', '_') + ".json"
        file_path = f"{CustomXComBackendEFS.DATA_FILE_PATH}/{dag_id}/{task_id}"
        if os.path.exists(f"{file_path}/{filename}"):
            os.unlink(f"{file_path}/{filename}")

        query = session.query(cls).filter_by(dag_id=dag_id, task_id=task_id, run_id=run_id)
        if map_index is not None:
            query = query.filter_by(map_index=map_index)
        query.delete()

The delete method never seems to be called. The clear method works as i can see the XCom disappear and reappear.

Operating System

Linux 9e28031d19f0 5.15.49-linuxkit #1 SMP PREEMPT Tue Sep 13 07:51:32 UTC 2022 x86_64 GNU/Linux

Versions of Apache Airflow Providers

apache-airflow 2.5.3
apache-airflow-providers-amazon 8.0.0
apache-airflow-providers-asana 2.1.0
apache-airflow-providers-celery 3.1.0
apache-airflow-providers-cncf-kubernetes 6.1.0
apache-airflow-providers-common-sql 1.4.0
apache-airflow-providers-databricks 4.1.0
apache-airflow-providers-ftp 3.3.1
apache-airflow-providers-google 10.0.0
apache-airflow-providers-http 4.3.0
apache-airflow-providers-imap 3.1.1
apache-airflow-providers-jdbc 3.3.0
apache-airflow-providers-mysql 5.0.0
apache-airflow-providers-postgres 5.4.0
apache-airflow-providers-redis 3.1.0
apache-airflow-providers-salesforce 5.3.0
apache-airflow-providers-slack 7.2.0
apache-airflow-providers-snowflake 4.0.5
apache-airflow-providers-sqlite 3.3.2
apache-airflow-providers-ssh 3.6.0
google-cloud-orchestration-airflow 1.4.1

Deployment

Official Apache Airflow Helm Chart

Deployment details

I'm testing this in Docker using a custom image that we deploy onto a Kubernetes cluster. It is designed to store the XComs on an efs volume attached to it. Locally in docker, the location is a mapped volume onto my local HD so I can see the files.

Anything else

every time

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@ronald-fenner ronald-fenner added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Jun 7, 2023
@boring-cyborg
Copy link

boring-cyborg bot commented Jun 7, 2023

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@hussein-awala hussein-awala added area:UI Related to UI/UX. For Frontend Developers. affected_version:main_branch Issues Reported for main branch and removed area:core needs-triage label for new issues that we didn't triage yet labels Jun 7, 2023
@hussein-awala hussein-awala self-assigned this Jun 7, 2023
@hussein-awala hussein-awala assigned Lee-W and unassigned hussein-awala Jan 21, 2024
@bolkedebruin
Copy link
Contributor

Fixed with API change by #37058

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:main_branch Issues Reported for main branch area:UI Related to UI/UX. For Frontend Developers. kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

4 participants