Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Triggerer crashes when a trigger function is changed #31743

Closed
1 of 2 tasks
pauloventurab opened this issue Jun 6, 2023 · 13 comments
Closed
1 of 2 tasks

Triggerer crashes when a trigger function is changed #31743

pauloventurab opened this issue Jun 6, 2023 · 13 comments
Assignees

Comments

@pauloventurab
Copy link

pauloventurab commented Jun 6, 2023

Apache Airflow version

2.6.1

What happened

Hi guys,

i'm using the AwaitMessageTriggerFunctionSensor from Kafka with the following example on your Docs:
https://github.com/apache/airflow/blob/providers-apache-kafka/1.1.0/tests/system/providers/apache/kafka/example_dag_event_listener.py

The issue is the apply_function. When I want to change the name of my file and the use "my_file_name.await_function" it says:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/module_loading.py", line 39, in import_string
    return getattr(module, class_name)
AttributeError: module 'my_file_name' has no attribute 'listen_function'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/triggerer_job_runner.py", line 529, in cleanup_finished_triggers
    result = details["task"].result()
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/triggerer_job_runner.py", line 607, in run_trigger
    async for event in trigger.run():
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/apache/kafka/triggers/await_message.py", line 99, in run
    processing_call = import_string(self.apply_function)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/module_loading.py", line 41, in import_string
    raise ImportError(f'Module "{module_path}" does not define a "{class_name}" attribute/class')
ImportError: Module "my_file_name" does not define a "await_function" attribute/class

Can you give a hand here please?

My file is inside dag main folder. and I just copied the example and paste here, changing the name of the file and the apply_function to "my_file_name.await_function"

Thanks

What you think should happen instead

I think it should work, and maybe also simplify the apply_function to the function name without the module importation, just the function inside the dag, could be a task as well.

How to reproduce

I don't know how to help improve it.

Operating System

Kubernetes cluster - Linux

Versions of Apache Airflow Providers

apache-airflow-providers-apache-kafka==1.0.0

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@pauloventurab pauloventurab added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Jun 6, 2023
@boring-cyborg
Copy link

boring-cyborg bot commented Jun 6, 2023

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@zoid-w
Copy link

zoid-w commented Jun 7, 2023

Hi,

I have the exact same issue.

However, I have some additional context that might be beneficial.
This seems to me like an issue with the triggerer not being in sync. This is further confirmed by my local instance (simple dockerized setup) not having this issue at all (just some folders being mounted to the containers instead of a gitsync container).

For example, if you change the name of the apply_function you will notice you get the exact same issue (without changing the file name). I have checked on my instance and the gitsync container for the triggerer is running without issues (atleast from the logs).

I have further investigated this by doing the following actions :

  1. A working DAG using the apply_function without issues
  2. Changed the DAG to use a different apply_function ; tested and failed as it could not import the function. (same error as above)
  3. downscaled the triggerer statefulset (to 0) and back to 1 (effectively recreating the containers).
  4. Ran the DAG (which was failing) once again, it now runs without issues.

To me it seems that the DAG on the Triggerer side is not correctly being synced to be in line with what is on the web UI/worker. This is very weird as I have checked the trigger-gitsync container logs to see whether a refresh was done and it did log a refresh.

*edit : obviously it should not be necessary to recreate a container in order to get this updated.

@Lee-W
Copy link
Member

Lee-W commented Jun 13, 2023

Hi, if no one's working on it, I'd like to take a look 🙂

@phanikumv phanikumv added provider:Apache area:providers and removed needs-triage label for new issues that we didn't triage yet area:core labels Jun 13, 2023
@pauloventurab
Copy link
Author

Hello guys, anything I can do to help as well?

@Lee-W
Copy link
Member

Lee-W commented Jun 19, 2023

Hi, I think I have a clue on how to fix it. It might be related to trigger cache. I was working on another PR but will have some time to work on it tomorrow. 💪

@Lee-W
Copy link
Member

Lee-W commented Jun 20, 2023

I format the error message a bit for better understanding.

Traceback (most recent call last):

    File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/module_loading.py",
    line 39,
    in import_string
    return getattr(module, class_name)
AttributeError: module 'my_file_name' has no attribute 'listen_function' During handling of the above exception, another exception occurred:

Traceback (most recent call last):
    File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/triggerer_job_runner.py",
    line 529,
    in cleanup_finished_triggers
    result = details["task"].result()

    File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/triggerer_job_runner.py",
    line 607,
    in run_trigger
    async for event in trigger.run():

    File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/apache/kafka/triggers/await_message.py",
    line 99
    in run

    processing_call = import_string(self.apply_function)

    File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/module_loading.py",
    line 41,
    in import_string
    raise ImportError(f'Module "{module_path}" does not define a "{class_name}" attribute/class')
ImportError: Module "my_file_name" does not define a "await_function" attribute/class

@Lee-W
Copy link
Member

Lee-W commented Jun 20, 2023

Hi @zoid-w, thank you for your reproducing steps. It's really helpful for me to track this issue

Hi @pauloventurab, it appears that the root cause of the problem can be traced back to these lines. Restarting the airflow trigger is necessary in order to ensure that any changes made to it are properly reflected. I am uncertain if we wish to consistently reload triggers, but I have created a draft pull request #32032 to showcase my current concept and waiting for feedback

As for what you suggested in

What you think should happen instead

I think it should work, and maybe also simplify the apply_function to the function name without the module importation, just the function inside the dag, could be a task as well.

This might not be something we can do for now as we cannot pass a Callable into the trigger.

@uranusjr
Copy link
Member

In general, you need to restart services when you change things like this. If you modify a plugin, for example, you need to restart the scheduler and/or the webserver. This is no different; if you change a trigger, you need to restart the triggerer service. Airflow’s service processes are all built with this design in mind, restarting is an expected operation, should not cause any data breakage, and can be performed liberally.

Maybe some notes should be added to documentation about this, at the same place where we walk through how to implement a trigger. I think it’s generally accepted that plugin changes require rebooting the processes, but triggers may be greener and less understood.

I also wonder whether it is possible for Airflow to detect changes like this and automatically suggest restarting, instead of failing later. But this may be much more difficult since Python code changes are not that easy to detect in the first place. This topic is much more involved and should be explored separately if desired.

@zoid-w
Copy link

zoid-w commented Jun 22, 2023

Can confirm that the caching mechanism is indeed the reason for this behavior. if you would like to bypass it you could mitigate this in a practical sense by renaming the file as well since the path to the class would not be cached then (possibly some people might not have cli access to the triggerer service or something).

On a seperate note :
Looking at the lines mentioned by @Lee-W, will the dictionary for the cached paths not grow indefinitely? As far as I can see there is no timed and/or triggered cleanup on that part. I assume this will never really get too much out of proportion, but since the triggerer is intended to run 1000 triggers, maybe it is something to consider.

My knowledge of async coding in python is quite limited, but maybe a time-based release of cached paths might be a good idea.

@Lee-W
Copy link
Member

Lee-W commented Jun 22, 2023

I'm not sure whether cleaning up the cache in trigger_cache makes sense as the imported lib will still be there. even if we rerun import_string, it won't reload the lib and reloading is something we might want to avoid according to this #32032 (comment)

@Lee-W
Copy link
Member

Lee-W commented Jun 26, 2023

I added a short line in this page through #32140

@Lee-W
Copy link
Member

Lee-W commented Jun 27, 2023

As #32140 has been merged, should we close this one? Or is there anything we could try to improve it? Thanks 🙂

@uranusjr uranusjr changed the title Kafka Sensor issue on module AwaitMessageTriggerFunctionSensor Triggerer crashes when a trigger function is changed Jun 27, 2023
@potiuk
Copy link
Member

potiuk commented Jun 27, 2023

Yeah. I am closing this one, unless someone has an idea that we should somehow dynamically restart Triggerer. I think with the current state state of how async loop and python interpreter works and I agree with @uranusjr that detection of such a situation automatically is likely not something that can be done reliably, we need to defer to human's judgment on it. The note from #32140 and this discussion should be enough for anyone to be able to find out what's going on and implement Trigger restarts automatically when the code they "know" might impact it would change.

@potiuk potiuk closed this as completed Jun 27, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants