Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Plugin for listeners - on_dag_run_running hook ignored #31180

Closed
1 of 2 tasks
funes79 opened this issue May 10, 2023 · 9 comments · Fixed by #32269
Closed
1 of 2 tasks

Plugin for listeners - on_dag_run_running hook ignored #31180

funes79 opened this issue May 10, 2023 · 9 comments · Fixed by #32269

Comments

@funes79
Copy link

funes79 commented May 10, 2023

Apache Airflow version

2.6.0

What happened

I created a plugin for custom listeners, the task level listeners works fine, but the dag level listeners are not triggered.

The docs states that listeners defined in airflow/listeners/spec should be supported.


@hookimpl
def on_task_instance_failed(previous_state: TaskInstanceState, task_instance: TaskInstance, session):
    """
    This method is called when task state changes to FAILED.
    Through callback, parameters like previous_task_state, task_instance object can be accessed.
    This will give more information about current task_instance that has failed its dag_run,
    task and dag information.
    """
    print("This works fine")


@hookimpl
def on_dag_run_failed(dag_run: DagRun, msg: str):
    """
    This method is called when dag run state changes to FAILED.
    """
    print("This is not called!")

What you think should happen instead

The dag specs defined airflow/listeners/spec/dagrun.py should be working

How to reproduce

Create a plugin and add the two hooks into a listeners.

Operating System

linux

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@funes79 funes79 added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels May 10, 2023
@VVildVVolf
Copy link
Contributor

VVildVVolf commented May 14, 2023

Hi @funes79 ,

Could you please try v2.6.1* or provide more details about your usecase?

I checked with recent master (584a9f5) and the listener is being triggered. Here is the sample I used:
DAG: test_dag.py

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def _func():
    raise Exception("")

with DAG("my_dag",
    start_date=datetime(2011, 11 ,11),
    schedule_interval='@daily',
    catchup=False
) as dag:
    pythonOperator = PythonOperator(
        task_id="task",
        python_callable=_func,
    )

plugin: p.py

from airflow.listeners import hookimpl
from airflow.plugins_manager import AirflowPlugin
import logging

class Plg(AirflowPlugin):
    class Listener:
        @hookimpl
        def on_task_instance_failed(previous_state, task_instance, session):
            """
            This method is called when task state changes to FAILED.
            Through callback, parameters like previous_task_state, task_instance object can be accessed.
            This will give more information about current task_instance that has failed its dag_run,
            task and dag information.
            """
            logging.info("on_task_instance_failed")

        @hookimpl
        def on_dag_run_failed(dag_run, msg: str):
            """
            This method is called when dag run state changes to FAILED.
            """
            logging.info("on_dag_run_failed")

    name = "name"
    listeners = [Listener()]

and after triggering, scheduller's logs show:
image
P.S. The screenshot shows logs from breeze console, by default scheduler should write to something like $AIRFLOW_HOME/logs/scheduler/

@funes79
Copy link
Author

funes79 commented May 15, 2023

I am sorry, I tested out now your code and test again my plugin both on 2.5.1 and 2.6.1rc2 and both works. I just somehow did not see those logged in the Kubernetes environment - as the log and print is only visible in the pod log but not in logs/scheduler/YYYY-MM-DD/folder/dag_file.py.log

@potiuk
Copy link
Member

potiuk commented May 15, 2023

@funes79 -> maybe you can make a PR in our docs to clarify that behaviour if it was not clear? It should be as easy as clicking "Suggest a change on this page" at bottom right in the right page -> it will create a PR and you will be able to describe it in the way that will make it easy to see for others like you ?

Then you can add Fixes: #31180 in commit message and merging the PR will close this issue automatically?

@potiuk potiuk added good first issue kind:documentation and removed needs-triage label for new issues that we didn't triage yet labels May 15, 2023
@VVildVVolf
Copy link
Contributor

@potiuk , if I understand correct, the issue might be related to logs (@funes79 please correct me if not). The logs of scheduler do not appear in scheduler's logs directory (like logs/scheduler/YYYY-MM-DD/folder/dag_file.py.log).

It might be related to wrapping logger for processing dag , the logs under _handle_dag_file_processing() does not appear in the log files (at least for me). Did not have a chance take a closer look yet.

@funes79
Copy link
Author

funes79 commented May 16, 2023

Yes, correct, it is not visible in the logs.
Another observation: it looks like the listener code is in the main loop of scheduler, because when I try to access some external API in listener and it hangs, then the execution of dags are halted.
As an example: I started a DAG, the hook on dag running got fired, but as a listener was waiting and timeouting on API call for couple of seconds, the scheduler was not starting the DAG and webserver reported "queued" state of the DAG and the warning message about scheduler not running. If this is the intended way how it should work, then yes, I think we need to update the documentation to warn about it.

@potiuk
Copy link
Member

potiuk commented May 17, 2023

cc: @mobuchowski -> maybe you have some good answers for that ?

@mobuchowski
Copy link
Contributor

mobuchowski commented May 17, 2023

It might be related to wrapping logger for processing dag , the logs under _handle_dag_file_processing() does not appear in the log files (at least for me). Did not have a chance take a closer look yet.

It does not run on the same process as dag_processor, but as you mentioned, in main loop.

Another observation: it looks like the listener code is in the main loop of scheduler, because when I try to access some external API in listener and it hangs, then the execution of dags are halted

There are on_starting and before_stopping methods that allow you to set up some task executor like ThreadPoolExecutor that you can use to deliver messages outside of that. We thought it's better to leave those to particular listener rather than try to manage it for them.

You're right, it should be better documented.

@vijayasarathib
Copy link
Contributor

@potiuk @mobuchowski Can I work on this?

@mobuchowski
Copy link
Contributor

@vijayasarathib if you want to improve documentation, sure, go for it.

vijayasarathib added a commit to vijayasarathib/airflow that referenced this issue Jun 29, 2023
…ignored

Documentation update for Plugin for listeners - on_dag_run_running hook ignored
potiuk pushed a commit that referenced this issue Jul 4, 2023
* Fixes: #31180 - Plugin for listeners - on_dag_run_running hook ignored

Documentation update for Plugin for listeners - on_dag_run_running hook ignored
Co-authored-by: Tzu-ping Chung <[email protected]>
ephraimbuddy pushed a commit that referenced this issue Jul 6, 2023
* Fixes: #31180 - Plugin for listeners - on_dag_run_running hook ignored

Documentation update for Plugin for listeners - on_dag_run_running hook ignored
Co-authored-by: Tzu-ping Chung <[email protected]>

(cherry picked from commit ab2c861)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants