Skip to content

Commit

Permalink
py register sqlite3 timestamp converter
Browse files Browse the repository at this point in the history
  • Loading branch information
bozokopic committed Mar 8, 2024
1 parent 992668a commit f87afda
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 3 deletions.
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "hat-util"
version = "0.6.11"
version = "0.6.12"
description = "Hat utility library"
readme = "README.rst"
requires-python = ">=3.10"
Expand All @@ -16,10 +16,10 @@ Repository = "https://github.com/hat-open/hat-util.git"
Documentation = "http://hat-util.hat-open.com"

[project.optional-dependencies]
dev = ["hat-doit ~=0.15.5"]
dev = ["hat-doit ~=0.15.8"]

[build-system]
requires = ["hat-doit ~=0.15.5"]
requires = ["hat-doit ~=0.15.8"]
build-backend = "hat.doit.pep517"

[tool.hat-doit]
Expand Down
41 changes: 41 additions & 0 deletions src_py/hat/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import collections
import contextlib
import datetime
import inspect
import socket
import sqlite3
import typing
import warnings

Expand Down Expand Up @@ -221,3 +223,42 @@ def clear(self) -> int:
self._data.clear()
data_len, self._data_len = self._data_len, 0
return data_len


def register_sqlite3_timestamp_converter():
"""Register modified timestamp converter
This converter is modification of standard library convertor taking into
account possible timezone info.
"""

def convert_timestamp(val: bytes) -> datetime.datetime:
datepart, timetzpart = val.split(b" ")
if b"+" in timetzpart:
tzsign = 1
timepart, tzpart = timetzpart.split(b"+")
elif b"-" in timetzpart:
tzsign = -1
timepart, tzpart = timetzpart.split(b"-")
else:
timepart, tzpart = timetzpart, None
year, month, day = map(int, datepart.split(b"-"))
timepart_full = timepart.split(b".")
hours, minutes, seconds = map(int, timepart_full[0].split(b":"))
if len(timepart_full) == 2:
microseconds = int('{:0<6.6}'.format(timepart_full[1].decode()))
else:
microseconds = 0
if tzpart:
tzhours, tzminutes = map(int, tzpart.split(b":"))
tz = datetime.timezone(
tzsign * datetime.timedelta(hours=tzhours, minutes=tzminutes))
else:
tz = None

val = datetime.datetime(year, month, day, hours, minutes, seconds,
microseconds, tz)
return val

sqlite3.register_converter("timestamp", convert_timestamp)
32 changes: 32 additions & 0 deletions test_pytest/test_util.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
import datetime
import sqlite3

import pytest

from hat import util


@pytest.fixture(scope='session', autouse=True)
def register_sqlite3_timestamp_converter():
util.register_sqlite3_timestamp_converter()


def test_first():
x = [1, 2, 3]
assert util.first(x) == 1
Expand Down Expand Up @@ -149,3 +157,27 @@ def test_bytes_buffer():
assert len(buff) == 3
assert buff.clear() == 3
assert len(buff) == 0


@pytest.mark.parametrize("t", [
datetime.datetime.now(),
datetime.datetime(2000, 1, 1),
datetime.datetime(2000, 1, 2, 3, 4, 5, 123456),
datetime.datetime(2000, 1, 2, 3, 4, 5, 123456,
tzinfo=datetime.timezone.utc),
datetime.datetime(2000, 1, 2, 3, 4, 5, 123456,
tzinfo=datetime.timezone(datetime.timedelta(hours=1,
minutes=2))),
datetime.datetime(2000, 1, 2, 3, 4, 5, 123456,
tzinfo=datetime.timezone(-datetime.timedelta(hours=1,
minutes=2)))
])
def test_sqlite3_timestamp_converter(t):
with sqlite3.connect(':memory:',
isolation_level=None,
detect_types=sqlite3.PARSE_DECLTYPES) as conn:
conn.execute("CREATE TABLE test (t TIMESTAMP)")
conn.execute("INSERT INTO test VALUES (:t)", {'t': t})

result = conn.execute("SELECT t FROM test").fetchone()[0]
assert result == t

0 comments on commit f87afda

Please sign in to comment.