Skip to content

Commit

Permalink
Type-annotate SkipMixin and BaseXCom (#20011)
Browse files Browse the repository at this point in the history
(cherry picked from commit 6dd0a0d)
  • Loading branch information
uranusjr authored and jedcunningham committed Feb 17, 2022
1 parent dda8f43 commit 016929f
Show file tree
Hide file tree
Showing 2 changed files with 232 additions and 118 deletions.
15 changes: 6 additions & 9 deletions airflow/models/skipmixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# under the License.

import warnings
from typing import TYPE_CHECKING, Iterable, Union
from typing import TYPE_CHECKING, Iterable, Optional, Sequence, Union

from airflow.models.taskinstance import TaskInstance
from airflow.utils import timezone
Expand All @@ -26,6 +26,7 @@
from airflow.utils.state import State

if TYPE_CHECKING:
from pendulum import DateTime
from sqlalchemy import Session

from airflow.models import DagRun
Expand Down Expand Up @@ -66,9 +67,9 @@ def _set_state_to_skipped(self, dag_run: "DagRun", tasks: "Iterable[BaseOperator
def skip(
self,
dag_run: "DagRun",
execution_date: "timezone.DateTime",
tasks: "Iterable[BaseOperator]",
session: "Session" = None,
execution_date: "DateTime",
tasks: Sequence["BaseOperator"],
session: "Session",
):
"""
Sets tasks instances to skipped from the same dag run.
Expand Down Expand Up @@ -114,11 +115,7 @@ def skip(
session.commit()

# SkipMixin may not necessarily have a task_id attribute. Only store to XCom if one is available.
try:
task_id = self.task_id
except AttributeError:
task_id = None

task_id: Optional[str] = getattr(self, "task_id", None)
if task_id is not None:
from airflow.models.xcom import XCom

Expand Down
Loading

0 comments on commit 016929f

Please sign in to comment.