Skip to content

Commit

Permalink
Add support of Pendulum 3 (#36281)
Browse files Browse the repository at this point in the history
* Add support of Pendulum 3

* Add backcompat to pendulum 2

* Update airflow/serialization/serialized_objects.py

Co-authored-by: Tzu-ping Chung <[email protected]>

* Add newsfragments

---------

Co-authored-by: Tzu-ping Chung <[email protected]>
(cherry picked from commit 2ffa6e4)
  • Loading branch information
Taragolis authored and potiuk committed Jan 13, 2024
1 parent 405f871 commit 0c93669
Show file tree
Hide file tree
Showing 41 changed files with 540 additions and 153 deletions.
92 changes: 90 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1169,11 +1169,61 @@ jobs:
breeze testing db-tests
--parallel-test-types "${{needs.build-info.outputs.parallel-test-types-list-as-string}}"
- name: >
Post Tests success: ${{needs.build-info.outputs.default-python-version}}:Boto"
Post Tests success: ${{needs.build-info.outputs.default-python-version}}:MinSQLAlchemy"
uses: ./.github/actions/post_tests_success
if: success()
- name: >
Post Tests failure: ${{needs.build-info.outputs.default-python-version}}:Boto"
Post Tests failure: ${{needs.build-info.outputs.default-python-version}}:MinSQLAlchemy"
uses: ./.github/actions/post_tests_failure
if: failure()
tests-postgres-pendulum-2:
timeout-minutes: 130
name: >
DB:Postgres${{needs.build-info.outputs.default-postgres-version}},
Pendulum2,Py${{needs.build-info.outputs.default-python-version}}:
${{needs.build-info.outputs.parallel-test-types-list-as-string}}
runs-on: ${{fromJSON(needs.build-info.outputs.runs-on)}}
needs: [build-info, wait-for-ci-images]
env:
RUNS_ON: "${{needs.build-info.outputs.runs-on}}"
PARALLEL_TEST_TYPES: "${{needs.build-info.outputs.parallel-test-types-list-as-string}}"
PR_LABELS: "${{needs.build-info.outputs.pull-request-labels}}"
FULL_TESTS_NEEDED: "${{needs.build-info.outputs.full-tests-needed}}"
DEBUG_RESOURCES: "${{needs.build-info.outputs.debug-resources}}"
BACKEND: "postgres"
ENABLE_COVERAGE: "${{needs.build-info.outputs.run-coverage}}"
PYTHON_MAJOR_MINOR_VERSION: "${{needs.build-info.outputs.default-python-version}}"
PYTHON_VERSION: "${needs.build-info.outputs.default-python-version}}"
POSTGRES_VERSION: "${{needs.build-info.outputs.default-postgres-version}}"
BACKEND_VERSION: "${{needs.build-info.outputs.default-postgres-version}}"
DOWNGRADE_PENDULUM: "true"
JOB_ID: >
postgres-pendulum-2-${{needs.build-info.outputs.default-python-version}}-
${{needs.build-info.outputs.default-postgres-version}}
if: needs.build-info.outputs.run-tests == 'true'
steps:
- name: Cleanup repo
shell: bash
run: docker run -v "${GITHUB_WORKSPACE}:/workspace" -u 0:0 bash -c "rm -rf /workspace/*"
- name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )"
uses: actions/checkout@v4
with:
persist-credentials: false
- name: >
Prepare breeze & CI image: ${{needs.build-info.outputs.default-python-version}}:${{env.IMAGE_TAG}}
uses: ./.github/actions/prepare_breeze_and_image
- name: >
Tests: ${{matrix.python-version}}:${{needs.build-info.outputs.parallel-test-types-list-as-string}}
run: >
breeze testing db-tests
--parallel-test-types "${{needs.build-info.outputs.parallel-test-types-list-as-string}}"
- name: >
Post Tests success: ${{needs.build-info.outputs.default-python-version}}:Pendulum2"
uses: ./.github/actions/post_tests_success
if: success()
- name: >
Post Tests failure: ${{needs.build-info.outputs.default-python-version}}:Pendulum2"
uses: ./.github/actions/post_tests_failure
if: failure()
Expand Down Expand Up @@ -1616,6 +1666,44 @@ jobs:
uses: ./.github/actions/post_tests_failure
if: failure()

tests-no-db-pendulum-2:
timeout-minutes: 60
name: >
Non-DB: Pendulum2, Py${{needs.build-info.outputs.default-python-version}}:
${{needs.build-info.outputs.parallel-test-types-list-as-string}}
runs-on: ${{fromJSON(needs.build-info.outputs.runs-on)}}
needs: [build-info, wait-for-ci-images]
env:
RUNS_ON: "${{needs.build-info.outputs.runs-on}}"
PR_LABELS: "${{needs.build-info.outputs.pull-request-labels}}"
PYTHON_MAJOR_MINOR_VERSION: "${{needs.build-info.outputs.default-python-version}}"
DEBUG_RESOURCES: "${{needs.build-info.outputs.debug-resources}}"
JOB_ID: "quarantined-${{needs.build-info.outputs.default-python-version}}"
ENABLE_COVERAGE: "${{needs.build-info.outputs.run-coverage}}"
DOWNGRADE_PENDULUM: "true"
if: needs.build-info.outputs.run-tests == 'true'
steps:
- name: Cleanup repo
shell: bash
run: docker run -v "${GITHUB_WORKSPACE}:/workspace" -u 0:0 bash -c "rm -rf /workspace/*"
- name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )"
uses: actions/checkout@v4
with:
persist-credentials: false
- name: >
Prepare breeze & CI image: ${{needs.build-info.outputs.default-python-version}}:${{env.IMAGE_TAG}}
uses: ./.github/actions/prepare_breeze_and_image
- name: "Tests: ${{matrix.python-version}}:Non-DB-Pendulum2"
run: >
breeze testing non-db-tests
--parallel-test-types "${{needs.build-info.outputs.parallel-test-types-list-as-string}}"
- name: "Post Tests success: Non-DB-Pendulum2"
uses: ./.github/actions/post_tests_success
if: success()
- name: "Post Tests failure: Non-DB-Pendulum2"
uses: ./.github/actions/post_tests_failure
if: failure()

summarize-warnings:
timeout-minutes: 15
name: "Summarize warnings"
Expand Down
13 changes: 13 additions & 0 deletions Dockerfile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,18 @@ function check_download_sqlalchemy() {
pip check
}

function check_download_pendulum() {
if [[ ${DOWNGRADE_PENDULUM=} != "true" ]]; then
return
fi
min_pendulum_version=$(grep "\"pendulum>=" pyproject.toml | sed "s/.*>=\([0-9\.]*\).*/\1/" | xargs)
echo
echo "${COLOR_BLUE}Downgrading pendulum to minimum supported version: ${min_pendulum_version}${COLOR_RESET}"
echo
pip install --root-user-action ignore "pendulum==${min_pendulum_version}"
pip check
}

function check_run_tests() {
if [[ ${RUN_TESTS=} != "true" ]]; then
return
Expand Down Expand Up @@ -937,6 +949,7 @@ determine_airflow_to_use
environment_initialization
check_boto_upgrade
check_download_sqlalchemy
check_download_pendulum
check_run_tests "${@}"

exec /bin/bash "${@}"
Expand Down
6 changes: 3 additions & 3 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@
if TYPE_CHECKING:
from types import ModuleType

from pendulum.tz.timezone import Timezone
from pendulum.tz.timezone import FixedTimezone, Timezone
from sqlalchemy.orm.query import Query
from sqlalchemy.orm.session import Session

Expand Down Expand Up @@ -213,7 +213,7 @@ def _get_model_data_interval(
return DataInterval(start, end)


def create_timetable(interval: ScheduleIntervalArg, timezone: Timezone) -> Timetable:
def create_timetable(interval: ScheduleIntervalArg, timezone: Timezone | FixedTimezone) -> Timetable:
"""Create a Timetable instance from a ``schedule_interval`` argument."""
if interval is NOTSET:
return DeltaDataIntervalTimetable(DEFAULT_SCHEDULE_INTERVAL)
Expand Down Expand Up @@ -529,7 +529,7 @@ def __init__(

tzinfo = None if date.tzinfo else settings.TIMEZONE
tz = pendulum.instance(date, tz=tzinfo).timezone
self.timezone: Timezone = tz or settings.TIMEZONE
self.timezone: Timezone | FixedTimezone = tz or settings.TIMEZONE

# Apply the timezone we settled on to end_date if it wasn't supplied
if "end_date" in self.default_args and self.default_args["end_date"]:
Expand Down
6 changes: 3 additions & 3 deletions airflow/providers/cncf/kubernetes/pod_launcher_deprecated.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import math
import time
import warnings
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, cast

import pendulum
import tenacity
Expand Down Expand Up @@ -148,13 +148,13 @@ def monitor_pod(self, pod: V1Pod, get_logs: bool) -> tuple[State, str | None]:
"""
if get_logs:
read_logs_since_sec = None
last_log_time = None
last_log_time: pendulum.DateTime | None = None
while True:
logs = self.read_pod_logs(pod, timestamps=True, since_seconds=read_logs_since_sec)
for line in logs:
timestamp, message = self.parse_log_line(line.decode("utf-8"))
if timestamp:
last_log_time = pendulum.parse(timestamp)
last_log_time = cast(pendulum.DateTime, pendulum.parse(timestamp))
self.log.info(message)
time.sleep(1)

Expand Down
9 changes: 5 additions & 4 deletions airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
from airflow.utils.module_loading import import_string, qualname
from airflow.utils.operator_resources import Resources
from airflow.utils.task_group import MappedTaskGroup, TaskGroup
from airflow.utils.timezone import parse_timezone
from airflow.utils.types import NOTSET, ArgNotSet

if TYPE_CHECKING:
Expand Down Expand Up @@ -144,7 +145,7 @@ def decode_relativedelta(var: dict[str, Any]) -> relativedelta.relativedelta:
return relativedelta.relativedelta(**var)


def encode_timezone(var: Timezone) -> str | int:
def encode_timezone(var: Timezone | FixedTimezone) -> str | int:
"""
Encode a Pendulum Timezone for serialization.
Expand All @@ -167,9 +168,9 @@ def encode_timezone(var: Timezone) -> str | int:
)


def decode_timezone(var: str | int) -> Timezone:
def decode_timezone(var: str | int) -> Timezone | FixedTimezone:
"""Decode a previously serialized Pendulum Timezone."""
return pendulum.tz.timezone(var)
return parse_timezone(var)


def _get_registered_timetable(importable_string: str) -> type[Timetable] | None:
Expand Down Expand Up @@ -607,7 +608,7 @@ def deserialize(cls, encoded_var: Any, use_pydantic_models=False) -> Any:
raise TypeError(f"Invalid type {type_!s} in deserialization.")

_deserialize_datetime = pendulum.from_timestamp
_deserialize_timezone = pendulum.tz.timezone
_deserialize_timezone = parse_timezone

@classmethod
def _deserialize_timedelta(cls, seconds: int) -> datetime.timedelta:
Expand Down
14 changes: 7 additions & 7 deletions airflow/serialization/serializers/datetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
serialize as serialize_timezone,
)
from airflow.utils.module_loading import qualname
from airflow.utils.timezone import parse_timezone

if TYPE_CHECKING:
import datetime
Expand Down Expand Up @@ -62,23 +63,22 @@ def deserialize(classname: str, version: int, data: dict | str) -> datetime.date
import datetime

from pendulum import DateTime
from pendulum.tz import fixed_timezone, timezone

tz: datetime.tzinfo | None = None
if isinstance(data, dict) and TIMEZONE in data:
if version == 1:
# try to deserialize unsupported timezones
timezone_mapping = {
"EDT": fixed_timezone(-4 * 3600),
"CDT": fixed_timezone(-5 * 3600),
"MDT": fixed_timezone(-6 * 3600),
"PDT": fixed_timezone(-7 * 3600),
"CEST": timezone("CET"),
"EDT": parse_timezone(-4 * 3600),
"CDT": parse_timezone(-5 * 3600),
"MDT": parse_timezone(-6 * 3600),
"PDT": parse_timezone(-7 * 3600),
"CEST": parse_timezone("CET"),
}
if data[TIMEZONE] in timezone_mapping:
tz = timezone_mapping[data[TIMEZONE]]
else:
tz = timezone(data[TIMEZONE])
tz = parse_timezone(data[TIMEZONE])
else:
tz = (
deserialize_timezone(data[TIMEZONE][1], data[TIMEZONE][2], data[TIMEZONE][0])
Expand Down
7 changes: 2 additions & 5 deletions airflow/serialization/serializers/timezone.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,14 @@ def serialize(o: object) -> tuple[U, str, int, bool]:


def deserialize(classname: str, version: int, data: object) -> Any:
from pendulum.tz import fixed_timezone, timezone
from airflow.utils.timezone import parse_timezone

if not isinstance(data, (str, int)):
raise TypeError(f"{data} is not of type int or str but of {type(data)}")

if version > __version__:
raise TypeError(f"serialized {version} of {classname} > {__version__}")

if isinstance(data, int):
return fixed_timezone(data)

if "zoneinfo.ZoneInfo" in classname:
try:
from zoneinfo import ZoneInfo
Expand All @@ -93,7 +90,7 @@ def deserialize(classname: str, version: int, data: object) -> Any:

return ZoneInfo(data)

return timezone(data)
return parse_timezone(data)


# ported from pendulum.tz.timezone._get_tzinfo_name
Expand Down
11 changes: 5 additions & 6 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import warnings
from typing import TYPE_CHECKING, Any, Callable

import pendulum
import pluggy
import sqlalchemy
from sqlalchemy import create_engine, exc, text
Expand All @@ -40,6 +39,7 @@
from airflow.logging_config import configure_logging
from airflow.utils.orm_event_handlers import setup_event_handlers
from airflow.utils.state import State
from airflow.utils.timezone import local_timezone, parse_timezone, utc

if TYPE_CHECKING:
from sqlalchemy.engine import Engine
Expand All @@ -50,13 +50,12 @@
log = logging.getLogger(__name__)

try:
tz = conf.get_mandatory_value("core", "default_timezone")
if tz == "system":
TIMEZONE = pendulum.tz.local_timezone()
if (tz := conf.get_mandatory_value("core", "default_timezone")) != "system":
TIMEZONE = parse_timezone(tz)
else:
TIMEZONE = pendulum.tz.timezone(tz)
TIMEZONE = local_timezone()
except Exception:
TIMEZONE = pendulum.tz.timezone("UTC")
TIMEZONE = utc

log.info("Configured default timezone %s", TIMEZONE)

Expand Down
9 changes: 4 additions & 5 deletions airflow/timetables/_cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@
import datetime
from typing import TYPE_CHECKING, Any

import pendulum
from cron_descriptor import CasingTypeEnum, ExpressionDescriptor, FormatException, MissingFieldException
from croniter import CroniterBadCronError, CroniterBadDateError, croniter

from airflow.exceptions import AirflowTimetableInvalid
from airflow.utils.dates import cron_presets
from airflow.utils.timezone import convert_to_utc, make_aware, make_naive
from airflow.utils.timezone import convert_to_utc, make_aware, make_naive, parse_timezone

if TYPE_CHECKING:
from pendulum import DateTime
from pendulum.tz.timezone import Timezone
from pendulum.tz.timezone import FixedTimezone, Timezone


def _covers_every_hour(cron: croniter) -> bool:
Expand Down Expand Up @@ -63,11 +62,11 @@ def _covers_every_hour(cron: croniter) -> bool:
class CronMixin:
"""Mixin to provide interface to work with croniter."""

def __init__(self, cron: str, timezone: str | Timezone) -> None:
def __init__(self, cron: str, timezone: str | Timezone | FixedTimezone) -> None:
self._expression = cron_presets.get(cron, cron)

if isinstance(timezone, str):
timezone = pendulum.tz.timezone(timezone)
timezone = parse_timezone(timezone)
self._timezone = timezone

try:
Expand Down
Loading

0 comments on commit 0c93669

Please sign in to comment.