diff --git a/pglookout/common.py b/pglookout/common.py index 4d1f032..6975650 100644 --- a/pglookout/common.py +++ b/pglookout/common.py @@ -4,40 +4,51 @@ Copyright (c) 2015 Ohmu Ltd See LICENSE for details """ -import datetime +from __future__ import annotations + +from datetime import datetime, timedelta +from typing import Final + import re -def convert_xlog_location_to_offset(wal_location): +def convert_xlog_location_to_offset(wal_location: str) -> int: log_id, offset = wal_location.split("/") return int(log_id, 16) << 32 | int(offset, 16) -ISO_EXT_RE = re.compile( +ISO_EXT_RE: Final[re.Pattern[str]] = re.compile( r"(?P\d{4})-(?P\d\d)-(?P\d\d)(T(?P\d\d):(?P\d\d)" r"(:(?P\d\d)(.(?P\d{6}))?)?Z)?$" ) -ISO_BASIC_RE = re.compile( +ISO_BASIC_RE: Final[re.Pattern[str]] = re.compile( r"(?P\d{4})(?P\d\d)(?P\d\d)(T(?P\d\d)(?P\d\d)" r"((?P\d\d)((?P\d{6}))?)?Z)?$" ) +ISO_GROUP_NAMES: Final[tuple[str, ...]] = ( + "year", + "month", + "day", + "hour", + "minute", + "second", + "microsecond", +) -def parse_iso_datetime(value): +def parse_iso_datetime(value: str) -> datetime: match = ISO_EXT_RE.match(value) if not match: match = ISO_BASIC_RE.match(value) if not match: raise ValueError(f"Invalid ISO timestamp {value!r}") - parts = dict( - (key, int(match.group(key) or "0")) for key in ("year", "month", "day", "hour", "minute", "second", "microsecond") - ) - return datetime.datetime(tzinfo=None, **parts) + parts = {key: int(match.group(key) or "0") for key in ISO_GROUP_NAMES} + return datetime(tzinfo=None, **parts) -def get_iso_timestamp(fetch_time=None): +def get_iso_timestamp(fetch_time: datetime | None = None) -> str: if not fetch_time: - fetch_time = datetime.datetime.utcnow() - elif fetch_time.tzinfo: - fetch_time = fetch_time.replace(tzinfo=None) - datetime.timedelta(seconds=fetch_time.utcoffset().seconds) + fetch_time = datetime.utcnow() + elif (offset := fetch_time.utcoffset()) is not None: + fetch_time = fetch_time.replace(tzinfo=None) - timedelta(seconds=offset.seconds) return fetch_time.isoformat() + "Z" diff --git a/pyproject.toml b/pyproject.toml index 16ce87c..8b60e40 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,7 +32,6 @@ exclude = [ # Implementation. 'pglookout/__main__.py', 'pglookout/cluster_monitor.py', - 'pglookout/common.py', 'pglookout/current_master.py', 'pglookout/logutil.py', 'pglookout/pglookout.py', @@ -43,7 +42,6 @@ exclude = [ # Tests. 'test/conftest.py', 'test/test_cluster_monitor.py', - 'test/test_common.py', 'test/test_lookout.py', 'test/test_pgutil.py', 'test/test_webserver.py', diff --git a/test/test_common.py b/test/test_common.py index 8139d6d..6df4b55 100644 --- a/test/test_common.py +++ b/test/test_common.py @@ -4,33 +4,77 @@ Copyright (c) 2015 Ohmu Ltd See LICENSE for details """ - +from datetime import datetime from pglookout.common import convert_xlog_location_to_offset, get_iso_timestamp, ISO_EXT_RE, parse_iso_datetime -from pytest import raises -import datetime +import pytest -def test_convert_xlog_location_to_offset(): +def test_convert_xlog_location_to_offset() -> None: assert convert_xlog_location_to_offset("1/00000000") == 1 << 32 assert convert_xlog_location_to_offset("F/AAAAAAAA") == (0xF << 32) | 0xAAAAAAAA - with raises(ValueError): + with pytest.raises(ValueError): convert_xlog_location_to_offset("x") - with raises(ValueError): + with pytest.raises(ValueError): convert_xlog_location_to_offset("x/y") -def test_parse_iso_datetime(): - date = datetime.datetime.utcnow() +def test_parse_iso_datetime() -> None: + date = datetime.utcnow() date.replace(microsecond=0) assert date == parse_iso_datetime(date.isoformat() + "Z") - with raises(ValueError): + with pytest.raises(ValueError): parse_iso_datetime("foobar") -def test_get_iso_timestamp(): +def test_get_iso_timestamp() -> None: v = get_iso_timestamp() assert ISO_EXT_RE.match(v) - ts = datetime.datetime.now() + ts = datetime.now() v = get_iso_timestamp(ts) assert parse_iso_datetime(v) == ts + + +@pytest.mark.parametrize( + "timestamp", + [ + datetime(2021, 1, 1, 23, 42, 11, 123456), + datetime(2021, 1, 1, 23, 42, 11), + datetime(2021, 1, 1, 23, 42), + datetime(2021, 1, 1, 23), + datetime(2021, 1, 1), + ], +) +def test_roundtrip(timestamp: datetime) -> None: + ts2 = parse_iso_datetime(get_iso_timestamp(timestamp)) + + assert ts2 == timestamp + + +@pytest.mark.parametrize( + ("value", "normalized_value"), + # fmt: off + [ + # Extended format + ("2021-01-01T00:00:00.000000Z", "2021-01-01T00:00:00Z"), # noqa: E241 + ("2021-01-01T23:42:11.123456Z", "2021-01-01T23:42:11.123456Z"), # noqa: E241 + ("2021-01-01T00:00:00Z", "2021-01-01T00:00:00Z"), # noqa: E241 + ("2021-01-01T23:42:11Z", "2021-01-01T23:42:11Z"), # noqa: E241 + ("2021-01-01T00:00Z", "2021-01-01T00:00:00Z"), # noqa: E241 + ("2021-01-01T23:42Z", "2021-01-01T23:42:00Z"), # noqa: E241 + ("2021-01-01", "2021-01-01T00:00:00Z"), # noqa: E241 + # Basic format + ("20210101T000000Z", "2021-01-01T00:00:00Z"), # noqa: E241 + ("20210101T234211123456Z", "2021-01-01T23:42:11.123456Z"), # noqa: E241 + ("20210101T000000Z", "2021-01-01T00:00:00Z"), # noqa: E241 + ("20210101T234211Z", "2021-01-01T23:42:11Z"), # noqa: E241 + ("20210101T0000Z", "2021-01-01T00:00:00Z"), # noqa: E241 + ("20210101T2342Z", "2021-01-01T23:42:00Z"), # noqa: E241 + ("20210101", "2021-01-01T00:00:00Z"), # noqa: E241 + ], + # fmt: on +) +def test_reverse_roundtrip(value: str, normalized_value: str) -> None: + v2 = get_iso_timestamp(parse_iso_datetime(value)) + + assert v2 == normalized_value