Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mypy: pglookout/common.py [BF-1560] #96

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 24 additions & 13 deletions pglookout/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,40 +4,51 @@
Copyright (c) 2015 Ohmu Ltd
See LICENSE for details
"""
import datetime
from __future__ import annotations

from datetime import datetime, timedelta
from typing import Final

import re


def convert_xlog_location_to_offset(wal_location):
def convert_xlog_location_to_offset(wal_location: str) -> int:
log_id, offset = wal_location.split("/")
return int(log_id, 16) << 32 | int(offset, 16)


ISO_EXT_RE = re.compile(
ISO_EXT_RE: Final[re.Pattern[str]] = re.compile(
r"(?P<year>\d{4})-(?P<month>\d\d)-(?P<day>\d\d)(T(?P<hour>\d\d):(?P<minute>\d\d)"
r"(:(?P<second>\d\d)(.(?P<microsecond>\d{6}))?)?Z)?$"
)
ISO_BASIC_RE = re.compile(
ISO_BASIC_RE: Final[re.Pattern[str]] = re.compile(
r"(?P<year>\d{4})(?P<month>\d\d)(?P<day>\d\d)(T(?P<hour>\d\d)(?P<minute>\d\d)"
r"((?P<second>\d\d)((?P<microsecond>\d{6}))?)?Z)?$"
)
ISO_GROUP_NAMES: Final[tuple[str, ...]] = (
"year",
"month",
"day",
"hour",
"minute",
"second",
"microsecond",
)


def parse_iso_datetime(value):
def parse_iso_datetime(value: str) -> datetime:
match = ISO_EXT_RE.match(value)
if not match:
match = ISO_BASIC_RE.match(value)
if not match:
raise ValueError(f"Invalid ISO timestamp {value!r}")
parts = dict(
(key, int(match.group(key) or "0")) for key in ("year", "month", "day", "hour", "minute", "second", "microsecond")
)
return datetime.datetime(tzinfo=None, **parts)
parts = {key: int(match.group(key) or "0") for key in ISO_GROUP_NAMES}
return datetime(tzinfo=None, **parts)


def get_iso_timestamp(fetch_time=None):
def get_iso_timestamp(fetch_time: datetime | None = None) -> str:
if not fetch_time:
fetch_time = datetime.datetime.utcnow()
elif fetch_time.tzinfo:
fetch_time = fetch_time.replace(tzinfo=None) - datetime.timedelta(seconds=fetch_time.utcoffset().seconds)
fetch_time = datetime.utcnow()
elif (offset := fetch_time.utcoffset()) is not None:
fetch_time = fetch_time.replace(tzinfo=None) - timedelta(seconds=offset.seconds)
return fetch_time.isoformat() + "Z"
2 changes: 0 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ exclude = [
# Implementation.
'pglookout/__main__.py',
'pglookout/cluster_monitor.py',
'pglookout/common.py',
'pglookout/current_master.py',
'pglookout/logutil.py',
'pglookout/pglookout.py',
Expand All @@ -43,7 +42,6 @@ exclude = [
# Tests.
'test/conftest.py',
'test/test_cluster_monitor.py',
'test/test_common.py',
'test/test_lookout.py',
'test/test_pgutil.py',
'test/test_webserver.py',
Expand Down
66 changes: 55 additions & 11 deletions test/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,77 @@
Copyright (c) 2015 Ohmu Ltd
See LICENSE for details
"""

from datetime import datetime
from pglookout.common import convert_xlog_location_to_offset, get_iso_timestamp, ISO_EXT_RE, parse_iso_datetime
from pytest import raises

import datetime
import pytest


def test_convert_xlog_location_to_offset():
def test_convert_xlog_location_to_offset() -> None:
assert convert_xlog_location_to_offset("1/00000000") == 1 << 32
assert convert_xlog_location_to_offset("F/AAAAAAAA") == (0xF << 32) | 0xAAAAAAAA
with raises(ValueError):
with pytest.raises(ValueError):
convert_xlog_location_to_offset("x")
with raises(ValueError):
with pytest.raises(ValueError):
convert_xlog_location_to_offset("x/y")


def test_parse_iso_datetime():
date = datetime.datetime.utcnow()
def test_parse_iso_datetime() -> None:
date = datetime.utcnow()
date.replace(microsecond=0)
assert date == parse_iso_datetime(date.isoformat() + "Z")
with raises(ValueError):
with pytest.raises(ValueError):
parse_iso_datetime("foobar")


def test_get_iso_timestamp():
def test_get_iso_timestamp() -> None:
v = get_iso_timestamp()
assert ISO_EXT_RE.match(v)
ts = datetime.datetime.now()
ts = datetime.now()
v = get_iso_timestamp(ts)
assert parse_iso_datetime(v) == ts


@pytest.mark.parametrize(
"timestamp",
[
datetime(2021, 1, 1, 23, 42, 11, 123456),
datetime(2021, 1, 1, 23, 42, 11),
datetime(2021, 1, 1, 23, 42),
datetime(2021, 1, 1, 23),
datetime(2021, 1, 1),
],
)
def test_roundtrip(timestamp: datetime) -> None:
ts2 = parse_iso_datetime(get_iso_timestamp(timestamp))

assert ts2 == timestamp


@pytest.mark.parametrize(
("value", "normalized_value"),
# fmt: off
[
# Extended format
("2021-01-01T00:00:00.000000Z", "2021-01-01T00:00:00Z"), # noqa: E241
("2021-01-01T23:42:11.123456Z", "2021-01-01T23:42:11.123456Z"), # noqa: E241
("2021-01-01T00:00:00Z", "2021-01-01T00:00:00Z"), # noqa: E241
("2021-01-01T23:42:11Z", "2021-01-01T23:42:11Z"), # noqa: E241
("2021-01-01T00:00Z", "2021-01-01T00:00:00Z"), # noqa: E241
("2021-01-01T23:42Z", "2021-01-01T23:42:00Z"), # noqa: E241
("2021-01-01", "2021-01-01T00:00:00Z"), # noqa: E241
# Basic format
("20210101T000000Z", "2021-01-01T00:00:00Z"), # noqa: E241
("20210101T234211123456Z", "2021-01-01T23:42:11.123456Z"), # noqa: E241
("20210101T000000Z", "2021-01-01T00:00:00Z"), # noqa: E241
("20210101T234211Z", "2021-01-01T23:42:11Z"), # noqa: E241
("20210101T0000Z", "2021-01-01T00:00:00Z"), # noqa: E241
("20210101T2342Z", "2021-01-01T23:42:00Z"), # noqa: E241
("20210101", "2021-01-01T00:00:00Z"), # noqa: E241
],
# fmt: on
)
def test_reverse_roundtrip(value: str, normalized_value: str) -> None:
v2 = get_iso_timestamp(parse_iso_datetime(value))

assert v2 == normalized_value