Skip to content

Commit

Permalink
Rename execution_date to logical_date across codebase (#43902)
Browse files Browse the repository at this point in the history
Co-authored-by: Tzu-ping Chung <[email protected]>
  • Loading branch information
sunank200 and uranusjr authored Nov 15, 2024
1 parent 36e267d commit 123dadd
Show file tree
Hide file tree
Showing 329 changed files with 3,722 additions and 3,400 deletions.
4 changes: 2 additions & 2 deletions airflow/api/client/local_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ def __init__(self, auth=None, session: httpx.Client | None = None):
self._session.auth = auth

def trigger_dag(
self, dag_id, run_id=None, conf=None, execution_date=None, replace_microseconds=True
self, dag_id, run_id=None, conf=None, logical_date=None, replace_microseconds=True
) -> dict | None:
dag_run = trigger_dag.trigger_dag(
dag_id=dag_id,
triggered_by=DagRunTriggeredByType.CLI,
run_id=run_id,
conf=conf,
execution_date=execution_date,
logical_date=logical_date,
replace_microseconds=replace_microseconds,
)
if dag_run:
Expand Down
122 changes: 61 additions & 61 deletions airflow/api/common/mark_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,19 @@ def _create_dagruns(
:param dag: The DAG to create runs for.
:param infos: List of logical dates and data intervals to evaluate.
:param state: The state to set the dag run to
:param run_type: The prefix will be used to construct dag run id: ``{run_id_prefix}__{execution_date}``.
:return: Newly created and existing dag runs for the execution dates supplied.
:param run_type: The prefix will be used to construct dag run id: ``{run_id_prefix}__{logical_date}``.
:return: Newly created and existing dag runs for the logical dates supplied.
"""
# Find out existing DAG runs that we don't need to create.
dag_runs = {
run.logical_date: run
for run in DagRun.find(dag_id=dag.dag_id, execution_date=[info.logical_date for info in infos])
for run in DagRun.find(dag_id=dag.dag_id, logical_date=[info.logical_date for info in infos])
}

for info in infos:
if info.logical_date not in dag_runs:
dag_runs[info.logical_date] = dag.create_dagrun(
execution_date=info.logical_date,
logical_date=info.logical_date,
data_interval=info.data_interval,
start_date=timezone.utcnow(),
external_trigger=False,
Expand All @@ -87,7 +87,7 @@ def set_state(
*,
tasks: Collection[Operator | tuple[Operator, int]],
run_id: str | None = None,
execution_date: datetime | None = None,
logical_date: datetime | None = None,
upstream: bool = False,
downstream: bool = False,
future: bool = False,
Expand All @@ -107,11 +107,11 @@ def set_state(
:param tasks: the iterable of tasks or (task, map_index) tuples from which to work.
``task.dag`` needs to be set
:param run_id: the run_id of the dagrun to start looking from
:param execution_date: the execution date from which to start looking (deprecated)
:param logical_date: the logical date from which to start looking (deprecated)
:param upstream: Mark all parents (upstream tasks)
:param downstream: Mark all siblings (downstream tasks) of task_id
:param future: Mark all future tasks on the interval of the dag up until
last execution date.
last logical date.
:param past: Retroactively mark all tasks starting from start_date of the DAG
:param state: State to which the tasks need to be set
:param commit: Commit tasks to be altered to the database
Expand All @@ -121,11 +121,11 @@ def set_state(
if not tasks:
return []

if not exactly_one(execution_date, run_id):
raise ValueError("Exactly one of dag_run_id and execution_date must be set")
if not exactly_one(logical_date, run_id):
raise ValueError("Exactly one of dag_run_id and logical_date must be set")

if execution_date and not timezone.is_localized(execution_date):
raise ValueError(f"Received non-localized date {execution_date}")
if logical_date and not timezone.is_localized(logical_date):
raise ValueError(f"Received non-localized date {logical_date}")

task_dags = {task[0].dag if isinstance(task, tuple) else task.dag for task in tasks}
if len(task_dags) > 1:
Expand All @@ -134,8 +134,8 @@ def set_state(
if dag is None:
raise ValueError("Received tasks with no DAG")

if execution_date:
run_id = dag.get_dagrun(execution_date=execution_date, session=session).run_id
if logical_date:
run_id = dag.get_dagrun(logical_date=logical_date, session=session).run_id
if not run_id:
raise ValueError("Received tasks with no run_id")

Expand Down Expand Up @@ -200,26 +200,26 @@ def find_task_relatives(tasks, downstream, upstream):


@provide_session
def get_execution_dates(
dag: DAG, execution_date: datetime, future: bool, past: bool, *, session: SASession = NEW_SESSION
def get_logical_dates(
dag: DAG, logical_date: datetime, future: bool, past: bool, *, session: SASession = NEW_SESSION
) -> list[datetime]:
"""Return DAG execution dates."""
latest_execution_date = dag.get_latest_execution_date(session=session)
if latest_execution_date is None:
raise ValueError(f"Received non-localized date {execution_date}")
execution_date = timezone.coerce_datetime(execution_date)
"""Return DAG logical dates."""
latest_logical_date = dag.get_latest_logical_date(session=session)
if latest_logical_date is None:
raise ValueError(f"Received non-localized date {logical_date}")
logical_date = timezone.coerce_datetime(logical_date)
# determine date range of dag runs and tasks to consider
end_date = latest_execution_date if future else execution_date
end_date = latest_logical_date if future else logical_date
if dag.start_date:
start_date = dag.start_date
else:
start_date = execution_date
start_date = execution_date if not past else start_date
start_date = logical_date
start_date = logical_date if not past else start_date
if not dag.timetable.can_be_scheduled:
# If the DAG never schedules, need to look at existing DagRun if the user wants future or
# past runs.
dag_runs = dag.get_dagruns_between(start_date=start_date, end_date=end_date)
dates = sorted({d.execution_date for d in dag_runs})
dates = sorted({d.logical_date for d in dag_runs})
elif not dag.timetable.periodic:
dates = [start_date]
else:
Expand All @@ -235,7 +235,7 @@ def get_run_ids(dag: DAG, run_id: str, future: bool, past: bool, session: SASess
last_dagrun = dag.get_last_dagrun(include_externally_triggered=True, session=session)
current_dagrun = dag.get_dagrun(run_id=run_id, session=session)
first_dagrun = session.scalar(
select(DagRun).filter(DagRun.dag_id == dag.dag_id).order_by(DagRun.execution_date.asc()).limit(1)
select(DagRun).filter(DagRun.dag_id == dag.dag_id).order_by(DagRun.logical_date.asc()).limit(1)
)

if last_dagrun is None:
Expand All @@ -255,7 +255,7 @@ def get_run_ids(dag: DAG, run_id: str, future: bool, past: bool, session: SASess
dates = [
info.logical_date for info in dag.iter_dagrun_infos_between(start_date, end_date, align=False)
]
run_ids = [dr.run_id for dr in DagRun.find(dag_id=dag.dag_id, execution_date=dates, session=session)]
run_ids = [dr.run_id for dr in DagRun.find(dag_id=dag.dag_id, logical_date=dates, session=session)]
return run_ids


Expand All @@ -279,37 +279,37 @@ def _set_dag_run_state(dag_id: str, run_id: str, state: DagRunState, session: SA
def set_dag_run_state_to_success(
*,
dag: DAG,
execution_date: datetime | None = None,
logical_date: datetime | None = None,
run_id: str | None = None,
commit: bool = False,
session: SASession = NEW_SESSION,
) -> list[TaskInstance]:
"""
Set the dag run's state to success.
Set for a specific execution date and its task instances to success.
Set for a specific logical date and its task instances to success.
:param dag: the DAG of which to alter state
:param execution_date: the execution date from which to start looking(deprecated)
:param logical_date: the logical date from which to start looking(deprecated)
:param run_id: the run_id to start looking from
:param commit: commit DAG and tasks to be altered to the database
:param session: database session
:return: If commit is true, list of tasks that have been updated,
otherwise list of tasks that will be updated
:raises: ValueError if dag or execution_date is invalid
:raises: ValueError if dag or logical_date is invalid
"""
if not exactly_one(execution_date, run_id):
if not exactly_one(logical_date, run_id):
return []

if not dag:
return []

if execution_date:
if not timezone.is_localized(execution_date):
raise ValueError(f"Received non-localized date {execution_date}")
dag_run = dag.get_dagrun(execution_date=execution_date)
if logical_date:
if not timezone.is_localized(logical_date):
raise ValueError(f"Received non-localized date {logical_date}")
dag_run = dag.get_dagrun(logical_date=logical_date)
if not dag_run:
raise ValueError(f"DagRun with execution_date: {execution_date} not found")
raise ValueError(f"DagRun with logical_date: {logical_date} not found")
run_id = dag_run.run_id
if not run_id:
raise ValueError(f"Invalid dag_run_id: {run_id}")
Expand All @@ -333,36 +333,36 @@ def set_dag_run_state_to_success(
def set_dag_run_state_to_failed(
*,
dag: DAG,
execution_date: datetime | None = None,
logical_date: datetime | None = None,
run_id: str | None = None,
commit: bool = False,
session: SASession = NEW_SESSION,
) -> list[TaskInstance]:
"""
Set the dag run's state to failed.
Set for a specific execution date and its task instances to failed.
Set for a specific logical date and its task instances to failed.
:param dag: the DAG of which to alter state
:param execution_date: the execution date from which to start looking(deprecated)
:param logical_date: the logical date from which to start looking(deprecated)
:param run_id: the DAG run_id to start looking from
:param commit: commit DAG and tasks to be altered to the database
:param session: database session
:return: If commit is true, list of tasks that have been updated,
otherwise list of tasks that will be updated
:raises: AssertionError if dag or execution_date is invalid
:raises: AssertionError if dag or logical_date is invalid
"""
if not exactly_one(execution_date, run_id):
if not exactly_one(logical_date, run_id):
return []
if not dag:
return []

if execution_date:
if not timezone.is_localized(execution_date):
raise ValueError(f"Received non-localized date {execution_date}")
dag_run = dag.get_dagrun(execution_date=execution_date)
if logical_date:
if not timezone.is_localized(logical_date):
raise ValueError(f"Received non-localized date {logical_date}")
dag_run = dag.get_dagrun(logical_date=logical_date)
if not dag_run:
raise ValueError(f"DagRun with execution_date: {execution_date} not found")
raise ValueError(f"DagRun with logical_date: {logical_date} not found")
run_id = dag_run.run_id

if not run_id:
Expand Down Expand Up @@ -429,16 +429,16 @@ def __set_dag_run_state_to_running_or_queued(
*,
new_state: DagRunState,
dag: DAG,
execution_date: datetime | None = None,
logical_date: datetime | None = None,
run_id: str | None = None,
commit: bool = False,
session: SASession,
) -> list[TaskInstance]:
"""
Set the dag run for a specific execution date to running.
Set the dag run for a specific logical date to running.
:param dag: the DAG of which to alter state
:param execution_date: the execution date from which to start looking
:param logical_date: the logical date from which to start looking
:param run_id: the id of the DagRun
:param commit: commit DAG and tasks to be altered to the database
:param session: database session
Expand All @@ -447,18 +447,18 @@ def __set_dag_run_state_to_running_or_queued(
"""
res: list[TaskInstance] = []

if not exactly_one(execution_date, run_id):
if not exactly_one(logical_date, run_id):
return res

if not dag:
return res

if execution_date:
if not timezone.is_localized(execution_date):
raise ValueError(f"Received non-localized date {execution_date}")
dag_run = dag.get_dagrun(execution_date=execution_date)
if logical_date:
if not timezone.is_localized(logical_date):
raise ValueError(f"Received non-localized date {logical_date}")
dag_run = dag.get_dagrun(logical_date=logical_date)
if not dag_run:
raise ValueError(f"DagRun with execution_date: {execution_date} not found")
raise ValueError(f"DagRun with logical_date: {logical_date} not found")
run_id = dag_run.run_id
if not run_id:
raise ValueError(f"DagRun with run_id: {run_id} not found")
Expand All @@ -474,20 +474,20 @@ def __set_dag_run_state_to_running_or_queued(
def set_dag_run_state_to_running(
*,
dag: DAG,
execution_date: datetime | None = None,
logical_date: datetime | None = None,
run_id: str | None = None,
commit: bool = False,
session: SASession = NEW_SESSION,
) -> list[TaskInstance]:
"""
Set the dag run's state to running.
Set for a specific execution date and its task instances to running.
Set for a specific logical date and its task instances to running.
"""
return __set_dag_run_state_to_running_or_queued(
new_state=DagRunState.RUNNING,
dag=dag,
execution_date=execution_date,
logical_date=logical_date,
run_id=run_id,
commit=commit,
session=session,
Expand All @@ -498,20 +498,20 @@ def set_dag_run_state_to_running(
def set_dag_run_state_to_queued(
*,
dag: DAG,
execution_date: datetime | None = None,
logical_date: datetime | None = None,
run_id: str | None = None,
commit: bool = False,
session: SASession = NEW_SESSION,
) -> list[TaskInstance]:
"""
Set the dag run's state to queued.
Set for a specific execution date and its task instances to queued.
Set for a specific logical date and its task instances to queued.
"""
return __set_dag_run_state_to_running_or_queued(
new_state=DagRunState.QUEUED,
dag=dag,
execution_date=execution_date,
logical_date=logical_date,
run_id=run_id,
commit=commit,
session=session,
Expand Down
Loading

0 comments on commit 123dadd

Please sign in to comment.