-
Notifications
You must be signed in to change notification settings - Fork 14.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[AIRFLOW-56] Airflow's scheduler can "lose" queued tasks #1378
Conversation
@abridgett please give this a shot regarding #1342 |
|
Coverage decreased (-0.02%) to 67.979% when pulling 8fa7fd487c59673c1163dee1b0631f4d49c6809e on jlowin:queued-tasks into 5d15d68 on airbnb:master. |
It seems to have worked today, will keep an eye on it - many many thanks @jlowin |
worked today as well FYI (last time it was updated in the middle of a run effectively so this was the first real test) |
Great. @syvineckruyk any luck? |
No rush, just wanted to make sure you had seen the issue. |
@syvineckruyk just want to ping you on this, we may need to merge it to master shortly |
|
Coverage decreased (-0.03%) to 66.996% when pulling f592c65fb6b8d974ddb1ce477cb7aefc5c0fff5b on jlowin:queued-tasks into c1f485f on airbnb:master. |
@jlowin gotcha ... starting to test now. |
@jlowin so I am running your "queued-tasks" branch ... and seeing some weird issues around dag runs. don't have an extact handle on it yet... failed dag runs are getting created ... and I am now seeing dag runs created for 5 days prior to the start date. |
I think the 5 day before thing is a known issue with the scheduler logic -- basically if you have a non-scheduled execution, the scheduler picks up 5 days before it. We will have to deal with that separately. |
@jlowin cool. I found the cause of the failing dag runs .. it was an unrelated issue. continuing to run this version today ... so far so good. |
@jlowin so far on this branch I have found a couple of issues. This may be helpful ... the job was behaving as expected. A few minutes ago when we crossed over to April 24th UTC... that is when the task-instances that should not have been kicked off began running. The schedule of those tasks was not midnight ... it was actually for 12 hours from then at 12pm. So something about the new UTC day seems to be involved. I have also observed several instances of subtasks completing in success.... but SDO never notices and stays in a running state indefinitely. |
@jlowin still trying to create a generic DAG to expose these bugs ... but wanted to let you know that it appears that the SDO operators that do not get updated do so when another task in the dag run has failed.
|
Excellent, thanks so much for the feedback @syvineckruyk. I'm trying to figure out what in this code could have led to that behavior. I think it might be the extra |
@jlowin just a question when I was running the last version you gave me.... things were running well ... what was the issue that required this area to need more work ?
|
The issue you were experiencing was because Scheduler and Backfill were both trying to queue your SubDag tasks. The fix I put in place was that Scheduler kept track of tasks it submitted to the executor, and only tried to run those tasks if they became queued. That way, the queued subdag tasks were left alone and Backfill handled them without interference. The problem is that folks often run Scheduler with the |
|
Coverage increased (+1.7%) to 68.704% when pulling 88a211d66894b8d02b8923378a97a3e5cce50373 on jlowin:queued-tasks into c1f485f on airbnb:master. |
@jlowin were you able to identify any of the issue I referred to above ? Thanks |
@syvineckruyk not directly but I removed some of the code that I suspect is causing it |
As a matter of fact it looks like now Travis is failing with queued tasks -- so I think I need to leave that code in. More thinking... |
|
Coverage increased (+0.04%) to 67.066% when pulling 12b7b6a116a0eadcf3101b636ffbec6239437870 on jlowin:queued-tasks into c1f485f on airbnb:master. |
@@ -1159,7 +1167,7 @@ def run( | |||
self.pool = pool or task.pool | |||
self.test_mode = test_mode | |||
self.force = force | |||
self.refresh_from_db() | |||
self.refresh_from_db(lock_for_update=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please provide the session as well, to make sure the commit happens in the same session
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
I think that here: https://github.com/jlowin/airflow/blob/queued-tasks/airflow/jobs.py#L650 we need an extra session.commit() after the delete. Update: here we can actually lose a commit. |
@@ -86,10 +84,23 @@ def heartbeat(self): | |||
key=lambda x: x[1][1], | |||
reverse=True) | |||
for i in range(min((open_slots, len(self.queued_tasks)))): | |||
key, (command, priority, queue) = sorted_queue.pop(0) | |||
self.running[key] = command | |||
key, (command, priority, queue, ti) = sorted_queue.pop(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_ instead of priority (looks like it's unused)
+1! |
When Scheduler is run with `—num-runs`, there can be multiple Schedulers and Executors all trying to run tasks. For queued tasks, Scheduler was previously only trying to run tasks that it itself had queued — but that doesn’t work if the Scheduler is restarting. This PR reverts that behavior and adds two types of “best effort” executions — before running a TI, executors check if it is already running, and before ending executors call sync() one last time
The scheduler can encounter a queued task twice before the task actually starts to run -- this locks the task and avoids that condition.
LGTM |
We have been blocked on this issue in 1.7.0. Thank you very much |
1.7.1 is in much better condition than 1.7.0. So I would definitely use that one. |
To be clear, 1.7.1.2 is the version you should be using. This version is quite stable. |
I have tested 1.7.1.2. |
@griffinqiu can you open up JIRAs for the known issues? |
When Scheduler is run with
—num-runs
, there can be multipleSchedulers and Executors all trying to run tasks. For queued tasks,
Scheduler was previously only trying to run tasks that it itself had
queued — but that doesn’t work if the Scheduler is restarting. This PR
reverts that behavior and adds two types of “best effort” executions —
before running a TI, executors check if it is already running, and
before ending executors call sync() one last time
Closes https://issues.apache.org/jira/browse/AIRFLOW-56