diff --git a/singer_sdk/_singerlib/messages.py b/singer_sdk/_singerlib/messages.py index 7fc17e57d..02c0f30f2 100644 --- a/singer_sdk/_singerlib/messages.py +++ b/singer_sdk/_singerlib/messages.py @@ -6,9 +6,10 @@ import sys import typing as t from dataclasses import asdict, dataclass, field +from datetime import timezone -import pytz import simplejson as json +from dateutil.parser import parse if t.TYPE_CHECKING: from datetime import datetime @@ -84,6 +85,27 @@ class RecordMessage(Message): time_extracted: datetime | None = None """The time the record was extracted.""" + @classmethod + def from_dict(cls: type[RecordMessage], data: dict[str, t.Any]) -> RecordMessage: + """Create a record message from a dictionary. + + This overrides the default conversion logic, since it uses unnecessary + deep copying and is very slow. + + Args: + data: The dictionary to create the message from. + + Returns: + The created message. + """ + time_extracted = data.get("time_extracted") + return cls( + stream=data["stream"], + record=data["record"], + version=data.get("version"), + time_extracted=parse(time_extracted) if time_extracted else None, + ) + def to_dict(self) -> dict[str, t.Any]: """Return a dictionary representation of the message. @@ -119,7 +141,7 @@ def __post_init__(self) -> None: raise ValueError(msg) if self.time_extracted: - self.time_extracted = self.time_extracted.astimezone(pytz.utc) + self.time_extracted = self.time_extracted.astimezone(timezone.utc) @dataclass diff --git a/tests/_singerlib/test_messages.py b/tests/_singerlib/test_messages.py index 47a36aca6..b4a33db44 100644 --- a/tests/_singerlib/test_messages.py +++ b/tests/_singerlib/test_messages.py @@ -1,15 +1,17 @@ from __future__ import annotations +import datetime import io from contextlib import redirect_stdout -from datetime import datetime import pytest -from pytz import UTC, timezone +from pytz import timezone import singer_sdk._singerlib as singer from singer_sdk._singerlib.messages import format_message +UTC = datetime.timezone.utc + def test_exclude_null_dict(): pairs = [("a", 1), ("b", None), ("c", 3)] @@ -55,19 +57,33 @@ def test_record_message(): assert singer.RecordMessage.from_dict(record.to_dict()) == record +def test_record_message_parse_time_extracted(): + message_dic = { + "type": "RECORD", + "stream": "test", + "record": {"id": 1, "name": "test"}, + "time_extracted": "2021-01-01T00:00:00Z", + } + record = singer.RecordMessage.from_dict(message_dic) + assert record.type == "RECORD" + assert record.stream == "test" + assert record.record == {"id": 1, "name": "test"} + assert record.time_extracted == datetime.datetime(2021, 1, 1, 0, 0, 0, tzinfo=UTC) + + def test_record_message_naive_time_extracted(): """Check that record message' time_extracted must be timezone-aware.""" with pytest.raises(ValueError, match="must be either None or an aware datetime"): singer.RecordMessage( stream="test", record={"id": 1, "name": "test"}, - time_extracted=datetime(2021, 1, 1), # noqa: DTZ001 + time_extracted=datetime.datetime(2021, 1, 1), # noqa: DTZ001 ) def test_record_message_time_extracted_to_utc(): """Check that record message's time_extracted is converted to UTC.""" - naive = datetime(2021, 1, 1, 12) # noqa: DTZ001 + naive = datetime.datetime(2021, 1, 1, 12) # noqa: DTZ001 nairobi = timezone("Africa/Nairobi") record = singer.RecordMessage( @@ -75,7 +91,7 @@ def test_record_message_time_extracted_to_utc(): record={"id": 1, "name": "test"}, time_extracted=nairobi.localize(naive), ) - assert record.time_extracted == datetime(2021, 1, 1, 9, tzinfo=UTC) + assert record.time_extracted == datetime.datetime(2021, 1, 1, 9, tzinfo=UTC) def test_schema_message():