Skip to content

Commit

Permalink
Add docs for AIP 39: Timetables (#17552)
Browse files Browse the repository at this point in the history
  • Loading branch information
uranusjr authored Sep 21, 2021
1 parent d856b79 commit 060345c
Show file tree
Hide file tree
Showing 31 changed files with 719 additions and 170 deletions.
16 changes: 16 additions & 0 deletions airflow/example_dags/plugins/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
90 changes: 90 additions & 0 deletions airflow/example_dags/plugins/workday.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Plugin to demostrate timetable registration and accomdate example DAGs."""

# [START howto_timetable]
from datetime import timedelta
from typing import Optional

from pendulum import Date, DateTime, Time, timezone

from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable

UTC = timezone("UTC")


class AfterWorkdayTimetable(Timetable):

# [START howto_timetable_infer_data_interval]
def infer_data_interval(self, run_after: DateTime) -> DataInterval:
weekday = run_after.weekday()
if weekday in (0, 6): # Monday and Sunday -- interval is last Friday.
days_since_friday = (run_after.weekday() - 4) % 7
delta = timedelta(days=days_since_friday)
else: # Otherwise the interval is yesterday.
delta = timedelta(days=1)
start = DateTime.combine((run_after - delta).date(), Time.min).replace(tzinfo=UTC)
return DataInterval(start=start, end=(start + timedelta(days=1)))

# [END howto_timetable_infer_data_interval]

# [START howto_timetable_next_dagrun_info]
def next_dagrun_info(
self,
*,
last_automated_data_interval: Optional[DataInterval],
restriction: TimeRestriction,
) -> Optional[DagRunInfo]:
if last_automated_data_interval is not None: # There was a previous run on the regular schedule.
last_start = last_automated_data_interval.start
last_start_weekday = 7 - last_start.weekday()
if 0 <= last_start_weekday < 4: # Last run on Monday through Thursday -- next is tomorrow.
delta = timedelta(days=1)
else: # Last run on Friday -- skip to next Monday.
delta = timedelta(days=(7 - last_start_weekday))
next_start = DateTime.combine((last_start + delta).date(), Time.min)
else: # This is the first ever run on the regular schedule.
next_start = restriction.earliest
if next_start is None: # No start_date. Don't schedule.
return None
if not restriction.catchup:
# If the DAG has catchup=False, today is the earliest to consider.
next_start = max(next_start, DateTime.combine(Date.today(), Time.min).replace(tzinfo=UTC))
elif next_start.time() != Time.min:
# If earliest does not fall on midnight, skip to the next day.
next_day = next_start.date() + timedelta(days=1)
next_start = DateTime.combine(next_day, Time.min).replace(tzinfo=UTC)
next_start_weekday = next_start.weekday()
if next_start_weekday in (5, 6): # If next start is in the weekend, go to next Monday.
delta = timedelta(days=(7 - next_start_weekday))
next_start = next_start + delta
if restriction.latest is not None and next_start > restriction.latest:
return None # Over the DAG's scheduled end; don't schedule.
return DagRunInfo.interval(start=next_start, end=(next_start + timedelta(days=1)))

# [END howto_timetable_next_dagrun_info]


class WorkdayTimetablePlugin(AirflowPlugin):
name = "workday_timetable_plugin"
timetables = [AfterWorkdayTimetable]


# [END howto_timetable]
26 changes: 19 additions & 7 deletions airflow/providers/elasticsearch/log/es_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,25 +101,37 @@ def __init__(
self.context_set = False

def _render_log_id(self, ti: TaskInstance, try_number: int) -> str:
dag_run = ti.dag_run

if self.json_format:
execution_date = self._clean_execution_date(ti.execution_date)
data_interval_start = self._clean_date(dag_run.data_interval_start)
data_interval_end = self._clean_date(dag_run.data_interval_end)
execution_date = self._clean_date(dag_run.execution_date)
else:
execution_date = ti.execution_date.isoformat()
data_interval_start = dag_run.data_interval_start.isoformat()
data_interval_end = dag_run.data_interval_end.isoformat()
execution_date = dag_run.execution_date.isoformat()

return self.log_id_template.format(
dag_id=ti.dag_id, task_id=ti.task_id, execution_date=execution_date, try_number=try_number
dag_id=ti.dag_id,
task_id=ti.task_id,
run_id=ti.run_id,
data_interval_start=data_interval_start,
data_interval_end=data_interval_end,
execution_date=execution_date,
try_number=try_number,
)

@staticmethod
def _clean_execution_date(execution_date: datetime) -> str:
def _clean_date(value: datetime) -> str:
"""
Clean up an execution date so that it is safe to query in elasticsearch
Clean up a date value so that it is safe to query in elasticsearch
by removing reserved characters.
# https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html#_reserved_characters
:param execution_date: execution date of the dag run.
"""
return execution_date.strftime("%Y_%m_%dT%H_%M_%S_%f")
return value.strftime("%Y_%m_%dT%H_%M_%S_%f")

def _group_logs_by_host(self, logs):
grouped_logs = defaultdict(list)
Expand Down Expand Up @@ -270,7 +282,7 @@ def set_context(self, ti: TaskInstance) -> None:
extras={
'dag_id': str(ti.dag_id),
'task_id': str(ti.task_id),
'execution_date': self._clean_execution_date(ti.execution_date),
'execution_date': self._clean_date(ti.execution_date),
'try_number': str(ti.try_number),
'log_id': self._render_log_id(ti, ti.try_number),
},
Expand Down
14 changes: 10 additions & 4 deletions airflow/sentry.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ def flush(self):
class ConfiguredSentry(DummySentry):
"""Configure Sentry SDK."""

SCOPE_TAGS = frozenset(("task_id", "dag_id", "execution_date", "operator", "try_number"))
SCOPE_DAG_RUN_TAGS = frozenset(("data_interval_end", "data_interval_start", "execution_date"))
SCOPE_TASK_TAGS = frozenset(("operator",))
SCOPE_TASK_INSTANCE_TAGS = frozenset(("task_id", "dag_id", "try_number"))
SCOPE_TAGS = SCOPE_DAG_RUN_TAGS | SCOPE_TASK_TAGS | SCOPE_TASK_INSTANCE_TAGS
SCOPE_CRUMBS = frozenset(("task_id", "state", "operator", "duration"))

UNSUPPORTED_SENTRY_OPTIONS = frozenset(
Expand Down Expand Up @@ -116,14 +119,17 @@ def __init__(self):

def add_tagging(self, task_instance):
"""Function to add tagging for a task_instance."""
dag_run = task_instance.dag_run
task = task_instance.task

with sentry_sdk.configure_scope() as scope:
for tag_name in self.SCOPE_TAGS:
for tag_name in self.SCOPE_TASK_INSTANCE_TAGS:
attribute = getattr(task_instance, tag_name)
if tag_name == "operator":
attribute = task.__class__.__name__
scope.set_tag(tag_name, attribute)
for tag_name in self.SCOPE_DAG_RUN_TAGS:
attribute = getattr(dag_run, tag_name)
scope.set_tag(tag_name, attribute)
scope.set_tag("operator", task.__class__.__name__)

@provide_session
def add_breadcrumbs(self, task_instance, session=None):
Expand Down
2 changes: 1 addition & 1 deletion airflow/timetables/interval.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class CronDataIntervalTimetable(_DataIntervalTimetable):
a five/six-segment representation, or one of ``cron_presets``.
The implementation extends on croniter to add timezone awareness. This is
because crontier works only with naive timestamps, and cannot consider DST
because croniter works only with naive timestamps, and cannot consider DST
when determining the next/previous time.
Don't pass ``@once`` in here; use ``OnceTimetable`` instead.
Expand Down
4 changes: 2 additions & 2 deletions docs/apache-airflow-providers-elasticsearch/logging/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ First, to use the handler, ``airflow.cfg`` must be configured as follows:
[elasticsearch]
host = <host>:<port>
log_id_template = {dag_id}-{task_id}-{execution_date}-{try_number}
log_id_template = {dag_id}-{task_id}-{run_id}-{try_number}
end_of_log_mark = end_of_log
write_stdout =
json_fields =
Expand All @@ -56,7 +56,7 @@ To output task logs to stdout in JSON format, the following config could be used
[elasticsearch]
host = <host>:<port>
log_id_template = {dag_id}-{task_id}-{execution_date}-{try_number}
log_id_template = {dag_id}-{task_id}-{run_id}-{try_number}
end_of_log_mark = end_of_log
write_stdout = True
json_format = True
Expand Down
7 changes: 3 additions & 4 deletions docs/apache-airflow-providers-google/operators/cloud/gcs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,10 @@ GCSTimeSpanFileTransformOperator

Use the
:class:`~airflow.providers.google.cloud.operators.gcs.GCSTimeSpanFileTransformOperator`
to transform files that were modified in a specific time span. The time span is defined
by the DAG instance logical execution timestamp (``execution_date``, start of time span)
and the timestamp when the next DAG instance execution is scheduled (end of time span). If a DAG
to transform files that were modified in a specific time span (the data interval).
The time span is defined by the time span's start and end timestamps. If a DAG
does not have a *next* DAG instance scheduled, the time span end infinite, meaning the operator
processes all files older than ``execution_date``.
processes all files older than ``data_interval_start``.

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_gcs_timespan_file_transform.py
:language: python
Expand Down
Loading

0 comments on commit 060345c

Please sign in to comment.