Replies: 1 comment 2 replies
-
That is unlikely to happen because of security. Callbacks are executed in the context of Worker or DagFileProcessor, Scheuler is not supposed to execute any code provided by the user in the DAG. IT's the scheduler that dermines which executor can be used, and it sends prepared task to the executor (sometimes based on the "queue") parameter. And as you mentioned - the celery workers pick the tasks from the queu that they are configured with, so by the time the task start, their queue already pre-determined where they should be run. The only real place where you can change queue for the tasks is at teh DAG parsing time - which effectively means that once the task has been plced in the DAG structure it's queue has to be determined. You canot dynamically change it in scheduler. Schedulers just schedules whatever is declared in the code that comes "pre-installed" with airflow. - for example custom triggers, or custom timetables have to be pre-installed and DAGs cannot define their logic - they can at most declare and configure which timetable/trigger will be used. So the only way it could be implemented is by defining some "customizable" mechanism of queue selection - rather than allow DAG writer to define it in the way that callbacks are defined. I will convert it into discussion - maybe it will be picked by someone who would like to have similar mechanism, but at the very least it would require extensive discussion in devlist and AIP (Airflow Improvement Proposal). |
Beta Was this translation helpful? Give feedback.
-
Description
Right now the queue of a dag/task is determined by the queue parameter in dag/task definition or the config file. I want the queue parameter to take a function as input. If a function is given as input, it should pass a context variable and use the return value as the queue for the dag/task.
Airflow dags and tasks support callback functions like for on_success_callback, which is given the context variable and executed on success. I want similar capability for determining queue.
Use case/motivation
I use an independent ec2 instance as a celery worker for every "dagrun". The queue for any dagrun is dag_id-run_id. In all my dags my first task is always an operator working in the "master" queue that sets up an ec2 instance and starts the worker with the custom queue name.
I modified a line in the _enqueue_task_instances_with_queued_state function in scheduler_job.py:-
queue = ti.queue if ti.queue == "master" else f"{ti.dag_id}-{ti.run_id}"
So finally the queue for every task that is not the ec2 operator is dag_id-run_id. As the ec2_operator starts a celery worker with that specific queue name all tasks not defined with "master" queue (which has a worker running locally) are executed in the celery worker.
So my setup required a small modification to the airflow code base. It would be helpful if the scheduler can determine the queue name through a user-defined function taking the context variable as input.
Presently airflow allows defining a queue only for each dag but not for each dag-run, as the run_id is determined at runtime. Also changing the queue name at the enqueue step does not reflect in the UI as the queue name is only changed w.r.t the scheduler only.
Related issues
No response
Are you willing to submit a PR?
Code of Conduct
Beta Was this translation helpful? Give feedback.
All reactions