Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support of Pendulum 3 #36281

Merged
merged 4 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 90 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1166,11 +1166,61 @@ jobs:
breeze testing db-tests
--parallel-test-types "${{needs.build-info.outputs.parallel-test-types-list-as-string}}"
- name: >
Post Tests success: ${{needs.build-info.outputs.default-python-version}}:Boto"
Post Tests success: ${{needs.build-info.outputs.default-python-version}}:MinSQLAlchemy"
uses: ./.github/actions/post_tests_success
if: success()
- name: >
Post Tests failure: ${{needs.build-info.outputs.default-python-version}}:Boto"
Post Tests failure: ${{needs.build-info.outputs.default-python-version}}:MinSQLAlchemy"
uses: ./.github/actions/post_tests_failure
if: failure()

tests-postgres-pendulum-2:
Taragolis marked this conversation as resolved.
Show resolved Hide resolved
timeout-minutes: 130
name: >
DB:Postgres${{needs.build-info.outputs.default-postgres-version}},
Pendulum2,Py${{needs.build-info.outputs.default-python-version}}:
${{needs.build-info.outputs.parallel-test-types-list-as-string}}
runs-on: ${{fromJSON(needs.build-info.outputs.runs-on)}}
needs: [build-info, wait-for-ci-images]
env:
RUNS_ON: "${{needs.build-info.outputs.runs-on}}"
PARALLEL_TEST_TYPES: "${{needs.build-info.outputs.parallel-test-types-list-as-string}}"
PR_LABELS: "${{needs.build-info.outputs.pull-request-labels}}"
FULL_TESTS_NEEDED: "${{needs.build-info.outputs.full-tests-needed}}"
DEBUG_RESOURCES: "${{needs.build-info.outputs.debug-resources}}"
BACKEND: "postgres"
ENABLE_COVERAGE: "${{needs.build-info.outputs.run-coverage}}"
PYTHON_MAJOR_MINOR_VERSION: "${{needs.build-info.outputs.default-python-version}}"
PYTHON_VERSION: "${needs.build-info.outputs.default-python-version}}"
POSTGRES_VERSION: "${{needs.build-info.outputs.default-postgres-version}}"
BACKEND_VERSION: "${{needs.build-info.outputs.default-postgres-version}}"
DOWNGRADE_PENDULUM: "true"
JOB_ID: >
postgres-pendulum-2-${{needs.build-info.outputs.default-python-version}}-
${{needs.build-info.outputs.default-postgres-version}}
if: needs.build-info.outputs.run-tests == 'true'
steps:
- name: Cleanup repo
shell: bash
run: docker run -v "${GITHUB_WORKSPACE}:/workspace" -u 0:0 bash -c "rm -rf /workspace/*"
- name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )"
uses: actions/checkout@v4
with:
persist-credentials: false
- name: >
Prepare breeze & CI image: ${{needs.build-info.outputs.default-python-version}}:${{env.IMAGE_TAG}}
uses: ./.github/actions/prepare_breeze_and_image
- name: >
Tests: ${{matrix.python-version}}:${{needs.build-info.outputs.parallel-test-types-list-as-string}}
run: >
breeze testing db-tests
--parallel-test-types "${{needs.build-info.outputs.parallel-test-types-list-as-string}}"
- name: >
Post Tests success: ${{needs.build-info.outputs.default-python-version}}:Pendulum2"
uses: ./.github/actions/post_tests_success
if: success()
- name: >
Post Tests failure: ${{needs.build-info.outputs.default-python-version}}:Pendulum2"
uses: ./.github/actions/post_tests_failure
if: failure()

Expand Down Expand Up @@ -1542,6 +1592,44 @@ jobs:
uses: ./.github/actions/post_tests_failure
if: failure()

tests-no-db-pendulum-2:
timeout-minutes: 60
name: >
Non-DB: Pendulum2, Py${{needs.build-info.outputs.default-python-version}}:
${{needs.build-info.outputs.parallel-test-types-list-as-string}}
runs-on: ${{fromJSON(needs.build-info.outputs.runs-on)}}
needs: [build-info, wait-for-ci-images]
env:
RUNS_ON: "${{needs.build-info.outputs.runs-on}}"
PR_LABELS: "${{needs.build-info.outputs.pull-request-labels}}"
PYTHON_MAJOR_MINOR_VERSION: "${{needs.build-info.outputs.default-python-version}}"
DEBUG_RESOURCES: "${{needs.build-info.outputs.debug-resources}}"
JOB_ID: "quarantined-${{needs.build-info.outputs.default-python-version}}"
ENABLE_COVERAGE: "${{needs.build-info.outputs.run-coverage}}"
DOWNGRADE_PENDULUM: "true"
if: needs.build-info.outputs.run-tests == 'true'
steps:
- name: Cleanup repo
shell: bash
run: docker run -v "${GITHUB_WORKSPACE}:/workspace" -u 0:0 bash -c "rm -rf /workspace/*"
- name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )"
uses: actions/checkout@v4
with:
persist-credentials: false
- name: >
Prepare breeze & CI image: ${{needs.build-info.outputs.default-python-version}}:${{env.IMAGE_TAG}}
uses: ./.github/actions/prepare_breeze_and_image
- name: "Tests: ${{matrix.python-version}}:Non-DB-Pendulum2"
run: >
breeze testing non-db-tests
--parallel-test-types "${{needs.build-info.outputs.parallel-test-types-list-as-string}}"
- name: "Post Tests success: Non-DB-Pendulum2"
uses: ./.github/actions/post_tests_success
if: success()
- name: "Post Tests failure: Non-DB-Pendulum2"
uses: ./.github/actions/post_tests_failure
if: failure()

summarize-warnings:
timeout-minutes: 15
name: "Summarize warnings"
Expand Down
13 changes: 13 additions & 0 deletions Dockerfile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -909,6 +909,18 @@ function check_download_sqlalchemy() {
pip check
}

function check_download_pendulum() {
if [[ ${DOWNGRADE_PENDULUM=} != "true" ]]; then
return
fi
min_pendulum_version=$(grep "\"pendulum>=" pyproject.toml | sed "s/.*>=\([0-9\.]*\).*/\1/" | xargs)
echo
echo "${COLOR_BLUE}Downgrading pendulum to minimum supported version: ${min_pendulum_version}${COLOR_RESET}"
echo
pip install --root-user-action ignore "pendulum==${min_pendulum_version}"
pip check
}

function check_run_tests() {
if [[ ${RUN_TESTS=} != "true" ]]; then
return
Expand Down Expand Up @@ -938,6 +950,7 @@ determine_airflow_to_use
environment_initialization
check_boto_upgrade
check_download_sqlalchemy
check_download_pendulum
check_run_tests "${@}"

exec /bin/bash "${@}"
Expand Down
6 changes: 3 additions & 3 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@
if TYPE_CHECKING:
from types import ModuleType

from pendulum.tz.timezone import Timezone
from pendulum.tz.timezone import FixedTimezone, Timezone
from sqlalchemy.orm.query import Query
from sqlalchemy.orm.session import Session

Expand Down Expand Up @@ -214,7 +214,7 @@ def _get_model_data_interval(
return DataInterval(start, end)


def create_timetable(interval: ScheduleIntervalArg, timezone: Timezone) -> Timetable:
def create_timetable(interval: ScheduleIntervalArg, timezone: Timezone | FixedTimezone) -> Timetable:
"""Create a Timetable instance from a ``schedule_interval`` argument."""
if interval is NOTSET:
return DeltaDataIntervalTimetable(DEFAULT_SCHEDULE_INTERVAL)
Expand Down Expand Up @@ -533,7 +533,7 @@ def __init__(

tzinfo = None if date.tzinfo else settings.TIMEZONE
tz = pendulum.instance(date, tz=tzinfo).timezone
self.timezone: Timezone = tz or settings.TIMEZONE
self.timezone: Timezone | FixedTimezone = tz or settings.TIMEZONE

# Apply the timezone we settled on to end_date if it wasn't supplied
if "end_date" in self.default_args and self.default_args["end_date"]:
Expand Down
6 changes: 3 additions & 3 deletions airflow/providers/cncf/kubernetes/pod_launcher_deprecated.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import math
import time
import warnings
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, cast

import pendulum
import tenacity
Expand Down Expand Up @@ -148,13 +148,13 @@ def monitor_pod(self, pod: V1Pod, get_logs: bool) -> tuple[State, str | None]:
"""
if get_logs:
read_logs_since_sec = None
last_log_time = None
last_log_time: pendulum.DateTime | None = None
while True:
logs = self.read_pod_logs(pod, timestamps=True, since_seconds=read_logs_since_sec)
for line in logs:
timestamp, message = self.parse_log_line(line.decode("utf-8"))
if timestamp:
last_log_time = pendulum.parse(timestamp)
last_log_time = cast(pendulum.DateTime, pendulum.parse(timestamp))
Taragolis marked this conversation as resolved.
Show resolved Hide resolved
self.log.info(message)
time.sleep(1)

Expand Down
9 changes: 5 additions & 4 deletions airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
from airflow.utils.module_loading import import_string, qualname
from airflow.utils.operator_resources import Resources
from airflow.utils.task_group import MappedTaskGroup, TaskGroup
from airflow.utils.timezone import parse_timezone
from airflow.utils.types import NOTSET, ArgNotSet

if TYPE_CHECKING:
Expand Down Expand Up @@ -144,7 +145,7 @@ def decode_relativedelta(var: dict[str, Any]) -> relativedelta.relativedelta:
return relativedelta.relativedelta(**var)


def encode_timezone(var: Timezone) -> str | int:
def encode_timezone(var: Timezone | FixedTimezone) -> str | int:
"""
Encode a Pendulum Timezone for serialization.

Expand All @@ -167,9 +168,9 @@ def encode_timezone(var: Timezone) -> str | int:
)


def decode_timezone(var: str | int) -> Timezone:
def decode_timezone(var: str | int) -> Timezone | FixedTimezone:
"""Decode a previously serialized Pendulum Timezone."""
return pendulum.tz.timezone(var)
return parse_timezone(var)


def _get_registered_timetable(importable_string: str) -> type[Timetable] | None:
Expand Down Expand Up @@ -607,7 +608,7 @@ def deserialize(cls, encoded_var: Any, use_pydantic_models=False) -> Any:
raise TypeError(f"Invalid type {type_!s} in deserialization.")

_deserialize_datetime = pendulum.from_timestamp
_deserialize_timezone = pendulum.tz.timezone
_deserialize_timezone = parse_timezone

@classmethod
def _deserialize_timedelta(cls, seconds: int) -> datetime.timedelta:
Expand Down
14 changes: 7 additions & 7 deletions airflow/serialization/serializers/datetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
serialize as serialize_timezone,
)
from airflow.utils.module_loading import qualname
from airflow.utils.timezone import parse_timezone

if TYPE_CHECKING:
import datetime
Expand Down Expand Up @@ -62,23 +63,22 @@ def deserialize(classname: str, version: int, data: dict | str) -> datetime.date
import datetime

from pendulum import DateTime
from pendulum.tz import fixed_timezone, timezone

tz: datetime.tzinfo | None = None
if isinstance(data, dict) and TIMEZONE in data:
if version == 1:
# try to deserialize unsupported timezones
timezone_mapping = {
"EDT": fixed_timezone(-4 * 3600),
"CDT": fixed_timezone(-5 * 3600),
"MDT": fixed_timezone(-6 * 3600),
"PDT": fixed_timezone(-7 * 3600),
"CEST": timezone("CET"),
"EDT": parse_timezone(-4 * 3600),
"CDT": parse_timezone(-5 * 3600),
"MDT": parse_timezone(-6 * 3600),
"PDT": parse_timezone(-7 * 3600),
"CEST": parse_timezone("CET"),
}
if data[TIMEZONE] in timezone_mapping:
tz = timezone_mapping[data[TIMEZONE]]
else:
tz = timezone(data[TIMEZONE])
tz = parse_timezone(data[TIMEZONE])
else:
tz = (
deserialize_timezone(data[TIMEZONE][1], data[TIMEZONE][2], data[TIMEZONE][0])
Expand Down
7 changes: 2 additions & 5 deletions airflow/serialization/serializers/timezone.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,14 @@ def serialize(o: object) -> tuple[U, str, int, bool]:


def deserialize(classname: str, version: int, data: object) -> Any:
from pendulum.tz import fixed_timezone, timezone
from airflow.utils.timezone import parse_timezone

if not isinstance(data, (str, int)):
raise TypeError(f"{data} is not of type int or str but of {type(data)}")

if version > __version__:
raise TypeError(f"serialized {version} of {classname} > {__version__}")

if isinstance(data, int):
return fixed_timezone(data)

if "zoneinfo.ZoneInfo" in classname:
try:
from zoneinfo import ZoneInfo
Expand All @@ -93,7 +90,7 @@ def deserialize(classname: str, version: int, data: object) -> Any:

return ZoneInfo(data)

return timezone(data)
return parse_timezone(data)


# ported from pendulum.tz.timezone._get_tzinfo_name
Expand Down
11 changes: 5 additions & 6 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import warnings
from typing import TYPE_CHECKING, Any, Callable

import pendulum
import pluggy
from sqlalchemy import create_engine, exc, text
from sqlalchemy.orm import scoped_session, sessionmaker
Expand All @@ -39,6 +38,7 @@
from airflow.logging_config import configure_logging
from airflow.utils.orm_event_handlers import setup_event_handlers
from airflow.utils.state import State
from airflow.utils.timezone import local_timezone, parse_timezone, utc

if TYPE_CHECKING:
from sqlalchemy.engine import Engine
Expand All @@ -49,13 +49,12 @@
log = logging.getLogger(__name__)

try:
tz = conf.get_mandatory_value("core", "default_timezone")
if tz == "system":
TIMEZONE = pendulum.tz.local_timezone()
if (tz := conf.get_mandatory_value("core", "default_timezone")) != "system":
TIMEZONE = parse_timezone(tz)
else:
TIMEZONE = pendulum.tz.timezone(tz)
TIMEZONE = local_timezone()
except Exception:
TIMEZONE = pendulum.tz.timezone("UTC")
TIMEZONE = utc

log.info("Configured default timezone %s", TIMEZONE)

Expand Down
9 changes: 4 additions & 5 deletions airflow/timetables/_cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@
import datetime
from typing import TYPE_CHECKING, Any

import pendulum
from cron_descriptor import CasingTypeEnum, ExpressionDescriptor, FormatException, MissingFieldException
from croniter import CroniterBadCronError, CroniterBadDateError, croniter

from airflow.exceptions import AirflowTimetableInvalid
from airflow.utils.dates import cron_presets
from airflow.utils.timezone import convert_to_utc, make_aware, make_naive
from airflow.utils.timezone import convert_to_utc, make_aware, make_naive, parse_timezone

if TYPE_CHECKING:
from pendulum import DateTime
from pendulum.tz.timezone import Timezone
from pendulum.tz.timezone import FixedTimezone, Timezone


def _covers_every_hour(cron: croniter) -> bool:
Expand Down Expand Up @@ -63,11 +62,11 @@ def _covers_every_hour(cron: croniter) -> bool:
class CronMixin:
"""Mixin to provide interface to work with croniter."""

def __init__(self, cron: str, timezone: str | Timezone) -> None:
def __init__(self, cron: str, timezone: str | Timezone | FixedTimezone) -> None:
self._expression = cron_presets.get(cron, cron)

if isinstance(timezone, str):
timezone = pendulum.tz.timezone(timezone)
timezone = parse_timezone(timezone)
self._timezone = timezone

try:
Expand Down
Loading