Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make next_dagrun_info take a data interval #18088

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,12 @@ def post_dag_run(dag_id, session):
.first()
)
if not dagrun_instance:
dag_run = current_app.dag_bag.get_dag(dag_id).create_dagrun(
dag = current_app.dag_bag.get_dag(dag_id)
dag_run = dag.create_dagrun(
run_type=DagRunType.MANUAL,
run_id=run_id,
execution_date=logical_date,
data_interval=dag.timetable.infer_manual_data_interval(run_after=logical_date),
state=State.QUEUED,
conf=post_body.get("conf"),
external_trigger=True,
Expand Down
47 changes: 30 additions & 17 deletions airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import signal
import subprocess
import sys
from typing import Optional

from graphviz.dot import Dot
from sqlalchemy.sql.functions import func

from airflow import settings
from airflow.api.client import get_current_api_client
Expand Down Expand Up @@ -255,26 +257,37 @@ def dag_next_execution(args):
if dag.get_is_paused():
print("[INFO] Please be reminded this DAG is PAUSED now.", file=sys.stderr)

latest_execution_date = dag.get_latest_execution_date()
if latest_execution_date:
next_execution_dttm = dag.following_schedule(latest_execution_date)
with create_session() as session:
max_date_subq = (
session.query(func.max(DagRun.execution_date).label("max_date"))
.filter(DagRun.dag_id == dag.dag_id)
.subquery()
)
max_date_run: Optional[DagRun] = (
session.query(DagRun)
.filter(DagRun.dag_id == dag.dag_id, DagRun.execution_date == max_date_subq.c.max_date)
.one_or_none()
)

if next_execution_dttm is None:
print(
"[WARN] No following schedule can be found. "
+ "This DAG may have schedule interval '@once' or `None`.",
file=sys.stderr,
)
if max_date_run is None:
print("[WARN] Only applicable when there is execution record found for the DAG.", file=sys.stderr)
print(None)
else:
print(next_execution_dttm.isoformat())

for _ in range(1, args.num_executions):
next_execution_dttm = dag.following_schedule(next_execution_dttm)
print(next_execution_dttm.isoformat())
else:
print("[WARN] Only applicable when there is execution record found for the DAG.", file=sys.stderr)
return

next_info = dag.next_dagrun_info(dag.get_run_data_interval(max_date_run), restricted=False)
if next_info is None:
print(
"[WARN] No following schedule can be found. "
"This DAG may have schedule interval '@once' or `None`.",
file=sys.stderr,
)
print(None)
return

print(next_info.logical_date.isoformat())
for _ in range(1, args.num_executions):
next_info = dag.next_dagrun_info(next_info.data_interval, restricted=False)
print(next_info.logical_date.isoformat())


@cli_utils.action_logging
Expand Down
25 changes: 17 additions & 8 deletions airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@

from setproctitle import setproctitle
from sqlalchemy import func, or_
from sqlalchemy.orm import eagerload
from sqlalchemy.orm.session import Session

from airflow import models, settings
from airflow.configuration import conf
from airflow.exceptions import AirflowException, TaskNotFound
from airflow.models import DAG, DagModel, SlaMiss, errors
from airflow.models import SlaMiss, errors
from airflow.models.dag import DAG, DagModel
from airflow.models.dagbag import DagBag
from airflow.stats import Stats
from airflow.utils import timezone
Expand Down Expand Up @@ -391,6 +393,7 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None:

max_tis: Iterator[TI] = (
session.query(TI)
.options(eagerload(TI.dag_run))
.join(TI.dag_run)
.filter(
TI.dag_id == dag.dag_id,
Expand All @@ -411,14 +414,20 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None:
f"{type(task.sla)} in {task.dag_id}:{task.task_id}"
)

dttm = dag.following_schedule(ti.execution_date)
while dttm < ts:
following_schedule = dag.following_schedule(dttm)
if following_schedule + task.sla < ts:
session.merge(
SlaMiss(task_id=ti.task_id, dag_id=ti.dag_id, execution_date=dttm, timestamp=ts)
sla_misses = []
next_info = dag.next_dagrun_info(dag.get_run_data_interval(ti.dag_run), restricted=False)
while next_info.logical_date < ts:
next_info = dag.next_dagrun_info(next_info.data_interval, restricted=False)
if next_info.logical_date + task.sla < ts:
sla_miss = SlaMiss(
task_id=ti.task_id,
dag_id=ti.dag_id,
execution_date=next_info.logical_date,
timestamp=ts,
)
dttm = dag.following_schedule(dttm)
sla_misses.append(sla_miss)
if sla_misses:
session.add_all(sla_misses)
session.commit()

slas: List[SlaMiss] = (
Expand Down
25 changes: 14 additions & 11 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,8 @@ def _create_dag_runs(self, dag_models: Iterable[DagModel], session: Session) ->
self.log.exception("DAG '%s' not found in serialized_dag table", dag_model.dag_id)
continue
dag_hash = self.dagbag.dags_hash.get(dag.dag_id)

data_interval = dag.get_next_data_interval(dag_model)
# Explicitly check if the DagRun already exists. This is an edge case
# where a Dag Run is created but `DagModel.next_dagrun` and `DagModel.next_dagrun_create_after`
# are not updated.
Expand All @@ -858,19 +860,18 @@ def _create_dag_runs(self, dag_models: Iterable[DagModel], session: Session) ->
# create a new one. This is so that in the next Scheduling loop we try to create new runs
# instead of falling in a loop of Integrity Error.
if (dag.dag_id, dag_model.next_dagrun) not in existing_dagruns:

dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=dag_model.next_dagrun,
state=State.QUEUED,
data_interval=dag_model.next_dagrun_data_interval,
data_interval=data_interval,
external_trigger=False,
session=session,
dag_hash=dag_hash,
creating_job_id=self.id,
)
queued_runs_of_dags[dag_model.dag_id] += 1
dag_model.calculate_dagrun_date_fields(dag, dag_model.next_dagrun)
dag_model.calculate_dagrun_date_fields(dag, data_interval)

# TODO[HA]: Should we do a session.flush() so we don't have to keep lots of state/object in
# memory for larger dags? or expunge_all()
Expand All @@ -894,16 +895,18 @@ def _start_queued_dagruns(
.all(),
)

def _update_state(dag_run):
def _update_state(dag: DAG, dag_run: DagRun):
dag_run.state = State.RUNNING
dag_run.start_date = timezone.utcnow()
expected_start_date = dag.following_schedule(dag_run.execution_date)
if expected_start_date:
if dag.timetable.periodic:
# TODO: Logically, this should be DagRunInfo.run_after, but the
# information is not stored on a DagRun, only before the actual
# execution on DagModel.next_dagrun_create_after. We should add
# a field on DagRun for this instead of relying on the run
# always happening immediately after the data interval.
expected_start_date = dag.get_run_data_interval(dag_run).end
schedule_delay = dag_run.start_date - expected_start_date
Stats.timing(
f'dagrun.schedule_delay.{dag.dag_id}',
schedule_delay,
)
Stats.timing(f'dagrun.schedule_delay.{dag.dag_id}', schedule_delay)

for dag_run in dag_runs:

Expand All @@ -923,7 +926,7 @@ def _update_state(dag_run):
)
else:
active_runs_of_dags[dag_run.dag_id] += 1
_update_state(dag_run)
_update_state(dag, dag_run)

def _schedule_dag_run(
self,
Expand Down
Loading