Skip to content

Commit

Permalink
Update libtime to accept time before 1970 (#504)
Browse files Browse the repository at this point in the history
  • Loading branch information
DeborahOoi96 authored Feb 29, 2024
1 parent 205dc3b commit 18cb6e2
Show file tree
Hide file tree
Showing 11 changed files with 508 additions and 145 deletions.
31 changes: 14 additions & 17 deletions generated/nidaqmx/_grpc_time.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from datetime import timezone
from datetime import datetime as std_datetime
from hightime import datetime as ht_datetime
from hightime import timedelta as ht_timedelta
from typing import Optional, Union
from nidaqmx._time import _convert_to_desired_timezone

from google.protobuf.timestamp_pb2 import Timestamp as GrpcTimestamp

# 66 years, 17 leap days = 24107 days = 2082844800 seconds
_BIAS_FROM_1970_EPOCH = 2082844800

_NS_PER_S = 10**9
_NS_PER_US = 10**3
Expand All @@ -13,10 +17,13 @@
_YS_PER_NS = 10**15
_YS_PER_FS = 10**9

_EPOCH_1970 = ht_datetime(1970, 1, 1, tzinfo=timezone.utc)

def convert_time_to_timestamp(dt: Union[std_datetime, ht_datetime], ts: Optional[GrpcTimestamp] = None) -> GrpcTimestamp:
utc_dt = dt.astimezone(tz=timezone.utc)
seconds = int(utc_dt.timestamp())
seconds_since_1970 = int((dt - _EPOCH_1970).total_seconds())
# We need to add one more negative second if applicable to compensate for a non-zero microsecond.
if dt.microsecond and (dt < _EPOCH_1970):
seconds_since_1970 -=1
if ts is None:
ts = GrpcTimestamp()

Expand All @@ -29,25 +36,15 @@ def convert_time_to_timestamp(dt: Union[std_datetime, ht_datetime], ts: Optional
if remainder_yoctoseconds >= _YS_PER_NS / 2:
nanos += 1
else:
nanos = utc_dt.microsecond * _NS_PER_US
nanos = dt.microsecond * _NS_PER_US

ts.FromNanoseconds(seconds * _NS_PER_S + nanos)
ts.FromNanoseconds(seconds_since_1970 * _NS_PER_S + nanos)
return ts


def convert_timestamp_to_time(ts: GrpcTimestamp, tzinfo: Optional[timezone] = None) -> ht_datetime:
total_nanos = ts.ToNanoseconds()
seconds, nanos = divmod(total_nanos, _NS_PER_S)

# Convert the nanoseconds to micro, femto, and yoctorseconds.
# Convert the nanoseconds to yoctoseconds.
total_yoctoseconds = int(round(_YS_PER_NS * nanos))
microsecond, remainder_yoctoseconds = divmod(total_yoctoseconds, _YS_PER_US)
femtosecond, remainder_yoctoseconds = divmod(remainder_yoctoseconds, _YS_PER_FS)
yoctosecond = remainder_yoctoseconds

# Start with UTC
dt = ht_datetime.fromtimestamp(seconds, timezone.utc)
# Add in precision
dt = dt.replace(microsecond=microsecond, femtosecond=femtosecond, yoctosecond=yoctosecond)
# Then convert to requested timezone
return dt.astimezone(tz=tzinfo)
dt = _EPOCH_1970 + ht_timedelta(seconds = seconds) + ht_timedelta(yoctoseconds=total_yoctoseconds)
return _convert_to_desired_timezone(dt, tzinfo)
49 changes: 11 additions & 38 deletions generated/nidaqmx/_lib_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
from datetime import timezone
from datetime import datetime as std_datetime
from hightime import datetime as ht_datetime
from hightime import timedelta as ht_timedelta
from typing import Optional, Union

from nidaqmx._time import _convert_to_desired_timezone

@functools.total_ordering
class AbsoluteTime(ctypes.Structure):
Expand All @@ -29,21 +30,14 @@ class AbsoluteTime(ctypes.Structure):
MAX_FS = 10**9
MAX_YS = 10**9

@classmethod
def from_datetime(cls, dt: Union[std_datetime, ht_datetime]) -> AbsoluteTime:
utc_dt = dt.astimezone(tz=timezone.utc)
_EPOCH_1904 = ht_datetime(1904, 1, 1, tzinfo=timezone.utc)

# First, calculate whole seconds by converting from the 1970 to 1904 epoch.
timestamp_1970_epoch = utc_dt.timestamp()
was_negative = timestamp_1970_epoch < 0
timestamp_1904_epoch = int(timestamp_1970_epoch + AbsoluteTime._BIAS_FROM_1970_EPOCH)

# Our bias is positive, so our sign should only change if we were previously negative.
is_negative = timestamp_1904_epoch < 0
if is_negative != was_negative and not was_negative:
raise OverflowError(f"Can't represent {dt.isoformat()} in AbsoluteTime (1904 epoch)")
@classmethod
def from_datetime(cls, dt: Union[std_datetime, ht_datetime]) -> AbsoluteTime:
seconds_since_1904 = int((dt - AbsoluteTime._EPOCH_1904).total_seconds())

# Finally, convert the subseconds.
# Convert the subseconds.
if isinstance(dt, ht_datetime):
total_yoctoseconds = dt.yoctosecond
total_yoctoseconds += dt.femtosecond * AbsoluteTime._YS_PER_FS
Expand All @@ -53,38 +47,17 @@ def from_datetime(cls, dt: Union[std_datetime, ht_datetime]) -> AbsoluteTime:
)
else:
lsb = int(
round(AbsoluteTime._NUM_SUBSECONDS * utc_dt.microsecond / AbsoluteTime._US_PER_S)
round(AbsoluteTime._NUM_SUBSECONDS * dt.microsecond / AbsoluteTime._US_PER_S)
)

return AbsoluteTime(lsb=lsb, msb=timestamp_1904_epoch)
return AbsoluteTime(lsb=lsb, msb=seconds_since_1904)

def to_datetime(self, tzinfo: Optional[timezone] = None) -> ht_datetime:
# First, calculate whole seconds by converting from the 1904 to 1970 epoch.
timestamp_1904_epoch = self.msb
was_positive = timestamp_1904_epoch > 0
timestamp_1970_epoch = int(timestamp_1904_epoch - AbsoluteTime._BIAS_FROM_1970_EPOCH)

# Our bias is negative, so our sign should only change if we were previously positive.
is_positive = timestamp_1970_epoch > 0
if is_positive != was_positive and not was_positive:
raise OverflowError(f"Can't represent {str(self)} in datetime (1970 epoch)")

# Finally, convert the subseconds to micro, femto, and yoctoseconds.
total_yoctoseconds = int(
round(AbsoluteTime._YS_PER_S * self.lsb / AbsoluteTime._NUM_SUBSECONDS)
)
microsecond, remainder_yoctoseconds = divmod(total_yoctoseconds, AbsoluteTime._YS_PER_US)
femtosecond, remainder_yoctoseconds = divmod(
remainder_yoctoseconds, AbsoluteTime._YS_PER_FS
)
yoctosecond = remainder_yoctoseconds

# Start with UTC
dt = ht_datetime.fromtimestamp(timestamp_1970_epoch, timezone.utc)
# Add in precision
dt = dt.replace(microsecond=microsecond, femtosecond=femtosecond, yoctosecond=yoctosecond)
# Then convert to requested timezone
return dt.astimezone(tz=tzinfo)
dt = AbsoluteTime._EPOCH_1904 + ht_timedelta(seconds=self.msb) + ht_timedelta(yoctoseconds=total_yoctoseconds)
return _convert_to_desired_timezone(dt,tzinfo)

def __str__(self) -> str:
return f"AbsoluteTime(lsb=0x{self.lsb:x}, msb=0x{self.msb:x})"
Expand Down
40 changes: 40 additions & 0 deletions generated/nidaqmx/_time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from __future__ import annotations

from tzlocal import get_localzone
from datetime import timezone
from datetime import tzinfo as dt_tzinfo
from datetime import datetime as std_datetime
from hightime import datetime as ht_datetime
from typing import Optional, Union
try:
import zoneinfo
except ImportError:
from backports import zoneinfo

# theoretically the same as astimezone(), but with support for dates before 1970
def _convert_to_desired_timezone(expected_time_utc: Union[std_datetime, ht_datetime], tzinfo: Optional[dt_tzinfo] = None) -> Union[std_datetime, ht_datetime]:
# if timezone matches, no need to do conversion
if expected_time_utc.tzinfo is tzinfo:
return expected_time_utc

# if timezone is not defined, use system timezone
if tzinfo is None:
tzinfo = get_localzone()

# use ZoneInfo here to account for daylight savings
if isinstance(tzinfo, zoneinfo.ZoneInfo):
localized_time = expected_time_utc.replace(tzinfo=tzinfo)
desired_expected_time = tzinfo.fromutc(localized_time)
return(desired_expected_time)

# if the tzinfo passed in is a timedelta function, then we don't need to consider daylight savings
elif tzinfo.utcoffset(None) is not None:
current_time_utc = ht_datetime.now(timezone.utc)
desired_timezone_offset = current_time_utc.astimezone(tz=tzinfo).utcoffset()
desired_expected_time = expected_time_utc + desired_timezone_offset
new_datetime = desired_expected_time.replace(tzinfo=tzinfo)
return new_datetime

# if the tzinfo passed in is none of the above, fall back to original astimezone()
else:
return expected_time_utc.astimezone(tzinfo)
31 changes: 14 additions & 17 deletions src/handwritten/_grpc_time.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from datetime import timezone
from datetime import datetime as std_datetime
from hightime import datetime as ht_datetime
from hightime import timedelta as ht_timedelta
from typing import Optional, Union
from nidaqmx._time import _convert_to_desired_timezone

from google.protobuf.timestamp_pb2 import Timestamp as GrpcTimestamp

# 66 years, 17 leap days = 24107 days = 2082844800 seconds
_BIAS_FROM_1970_EPOCH = 2082844800

_NS_PER_S = 10**9
_NS_PER_US = 10**3
Expand All @@ -13,10 +17,13 @@
_YS_PER_NS = 10**15
_YS_PER_FS = 10**9

_EPOCH_1970 = ht_datetime(1970, 1, 1, tzinfo=timezone.utc)

def convert_time_to_timestamp(dt: Union[std_datetime, ht_datetime], ts: Optional[GrpcTimestamp] = None) -> GrpcTimestamp:
utc_dt = dt.astimezone(tz=timezone.utc)
seconds = int(utc_dt.timestamp())
seconds_since_1970 = int((dt - _EPOCH_1970).total_seconds())
# We need to add one more negative second if applicable to compensate for a non-zero microsecond.
if dt.microsecond and (dt < _EPOCH_1970):
seconds_since_1970 -=1
if ts is None:
ts = GrpcTimestamp()

Expand All @@ -29,25 +36,15 @@ def convert_time_to_timestamp(dt: Union[std_datetime, ht_datetime], ts: Optional
if remainder_yoctoseconds >= _YS_PER_NS / 2:
nanos += 1
else:
nanos = utc_dt.microsecond * _NS_PER_US
nanos = dt.microsecond * _NS_PER_US

ts.FromNanoseconds(seconds * _NS_PER_S + nanos)
ts.FromNanoseconds(seconds_since_1970 * _NS_PER_S + nanos)
return ts


def convert_timestamp_to_time(ts: GrpcTimestamp, tzinfo: Optional[timezone] = None) -> ht_datetime:
total_nanos = ts.ToNanoseconds()
seconds, nanos = divmod(total_nanos, _NS_PER_S)

# Convert the nanoseconds to micro, femto, and yoctorseconds.
# Convert the nanoseconds to yoctoseconds.
total_yoctoseconds = int(round(_YS_PER_NS * nanos))
microsecond, remainder_yoctoseconds = divmod(total_yoctoseconds, _YS_PER_US)
femtosecond, remainder_yoctoseconds = divmod(remainder_yoctoseconds, _YS_PER_FS)
yoctosecond = remainder_yoctoseconds

# Start with UTC
dt = ht_datetime.fromtimestamp(seconds, timezone.utc)
# Add in precision
dt = dt.replace(microsecond=microsecond, femtosecond=femtosecond, yoctosecond=yoctosecond)
# Then convert to requested timezone
return dt.astimezone(tz=tzinfo)
dt = _EPOCH_1970 + ht_timedelta(seconds = seconds) + ht_timedelta(yoctoseconds=total_yoctoseconds)
return _convert_to_desired_timezone(dt, tzinfo)
49 changes: 11 additions & 38 deletions src/handwritten/_lib_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
from datetime import timezone
from datetime import datetime as std_datetime
from hightime import datetime as ht_datetime
from hightime import timedelta as ht_timedelta
from typing import Optional, Union

from nidaqmx._time import _convert_to_desired_timezone

@functools.total_ordering
class AbsoluteTime(ctypes.Structure):
Expand All @@ -29,21 +30,14 @@ class AbsoluteTime(ctypes.Structure):
MAX_FS = 10**9
MAX_YS = 10**9

@classmethod
def from_datetime(cls, dt: Union[std_datetime, ht_datetime]) -> AbsoluteTime:
utc_dt = dt.astimezone(tz=timezone.utc)
_EPOCH_1904 = ht_datetime(1904, 1, 1, tzinfo=timezone.utc)

# First, calculate whole seconds by converting from the 1970 to 1904 epoch.
timestamp_1970_epoch = utc_dt.timestamp()
was_negative = timestamp_1970_epoch < 0
timestamp_1904_epoch = int(timestamp_1970_epoch + AbsoluteTime._BIAS_FROM_1970_EPOCH)

# Our bias is positive, so our sign should only change if we were previously negative.
is_negative = timestamp_1904_epoch < 0
if is_negative != was_negative and not was_negative:
raise OverflowError(f"Can't represent {dt.isoformat()} in AbsoluteTime (1904 epoch)")
@classmethod
def from_datetime(cls, dt: Union[std_datetime, ht_datetime]) -> AbsoluteTime:
seconds_since_1904 = int((dt - AbsoluteTime._EPOCH_1904).total_seconds())

# Finally, convert the subseconds.
# Convert the subseconds.
if isinstance(dt, ht_datetime):
total_yoctoseconds = dt.yoctosecond
total_yoctoseconds += dt.femtosecond * AbsoluteTime._YS_PER_FS
Expand All @@ -53,38 +47,17 @@ def from_datetime(cls, dt: Union[std_datetime, ht_datetime]) -> AbsoluteTime:
)
else:
lsb = int(
round(AbsoluteTime._NUM_SUBSECONDS * utc_dt.microsecond / AbsoluteTime._US_PER_S)
round(AbsoluteTime._NUM_SUBSECONDS * dt.microsecond / AbsoluteTime._US_PER_S)
)

return AbsoluteTime(lsb=lsb, msb=timestamp_1904_epoch)
return AbsoluteTime(lsb=lsb, msb=seconds_since_1904)

def to_datetime(self, tzinfo: Optional[timezone] = None) -> ht_datetime:
# First, calculate whole seconds by converting from the 1904 to 1970 epoch.
timestamp_1904_epoch = self.msb
was_positive = timestamp_1904_epoch > 0
timestamp_1970_epoch = int(timestamp_1904_epoch - AbsoluteTime._BIAS_FROM_1970_EPOCH)

# Our bias is negative, so our sign should only change if we were previously positive.
is_positive = timestamp_1970_epoch > 0
if is_positive != was_positive and not was_positive:
raise OverflowError(f"Can't represent {str(self)} in datetime (1970 epoch)")

# Finally, convert the subseconds to micro, femto, and yoctoseconds.
total_yoctoseconds = int(
round(AbsoluteTime._YS_PER_S * self.lsb / AbsoluteTime._NUM_SUBSECONDS)
)
microsecond, remainder_yoctoseconds = divmod(total_yoctoseconds, AbsoluteTime._YS_PER_US)
femtosecond, remainder_yoctoseconds = divmod(
remainder_yoctoseconds, AbsoluteTime._YS_PER_FS
)
yoctosecond = remainder_yoctoseconds

# Start with UTC
dt = ht_datetime.fromtimestamp(timestamp_1970_epoch, timezone.utc)
# Add in precision
dt = dt.replace(microsecond=microsecond, femtosecond=femtosecond, yoctosecond=yoctosecond)
# Then convert to requested timezone
return dt.astimezone(tz=tzinfo)
dt = AbsoluteTime._EPOCH_1904 + ht_timedelta(seconds=self.msb) + ht_timedelta(yoctoseconds=total_yoctoseconds)
return _convert_to_desired_timezone(dt,tzinfo)

def __str__(self) -> str:
return f"AbsoluteTime(lsb=0x{self.lsb:x}, msb=0x{self.msb:x})"
Expand Down
40 changes: 40 additions & 0 deletions src/handwritten/_time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from __future__ import annotations

from tzlocal import get_localzone
from datetime import timezone
from datetime import tzinfo as dt_tzinfo
from datetime import datetime as std_datetime
from hightime import datetime as ht_datetime
from typing import Optional, Union
try:
import zoneinfo
except ImportError:
from backports import zoneinfo

# theoretically the same as astimezone(), but with support for dates before 1970
def _convert_to_desired_timezone(expected_time_utc: Union[std_datetime, ht_datetime], tzinfo: Optional[dt_tzinfo] = None) -> Union[std_datetime, ht_datetime]:
# if timezone matches, no need to do conversion
if expected_time_utc.tzinfo is tzinfo:
return expected_time_utc

# if timezone is not defined, use system timezone
if tzinfo is None:
tzinfo = get_localzone()

# use ZoneInfo here to account for daylight savings
if isinstance(tzinfo, zoneinfo.ZoneInfo):
localized_time = expected_time_utc.replace(tzinfo=tzinfo)
desired_expected_time = tzinfo.fromutc(localized_time)
return(desired_expected_time)

# if the tzinfo passed in is a timedelta function, then we don't need to consider daylight savings
elif tzinfo.utcoffset(None) is not None:
current_time_utc = ht_datetime.now(timezone.utc)
desired_timezone_offset = current_time_utc.astimezone(tz=tzinfo).utcoffset()
desired_expected_time = expected_time_utc + desired_timezone_offset
new_datetime = desired_expected_time.replace(tzinfo=tzinfo)
return new_datetime

# if the tzinfo passed in is none of the above, fall back to original astimezone()
else:
return expected_time_utc.astimezone(tzinfo)
Loading

0 comments on commit 18cb6e2

Please sign in to comment.