Skip to content

Commit

Permalink
feat: Dates can now be parsed only as strings, also migrated to the n…
Browse files Browse the repository at this point in the history
…on dep… (#247)

Closes #1 , #246

---------

Co-authored-by: Edgar Ramírez Mondragón <[email protected]>
  • Loading branch information
visch and edgarrmondragon authored Sep 27, 2023
1 parent ec4a38f commit e174f27
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 13 deletions.
4 changes: 4 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,7 @@ repos:
exclude: tests
additional_dependencies:
- types-paramiko
- repo: https://github.com/pycqa/flake8
rev: 6.1.0
hooks:
- id: flake8
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Built with the [Meltano Singer SDK](https://sdk.meltano.com).
| database | False | None | Database name. Note if sqlalchemy_url is set this will be ignored. |
| sqlalchemy_url | False | None | Example postgresql://[username]:[password]@localhost:5432/[db_name] |
| filter_schemas | False | None | If an array of schema names is provided, the tap will only process the specified Postgres schemas and ignore others. If left blank, the tap automatically determines ALL available Postgres schemas. |
| dates_as_string | False | 0 | Defaults to false, if true, date, and timestamp fields will be Strings. If you see ValueError: Year is out of range, try setting this to True. |
| ssh_tunnel | False | None | SSH Tunnel Configuration, this is a json object |
| ssh_tunnel.enable | True (if ssh_tunnel set) | False | Enable an ssh tunnel (also known as bastion server), see the other ssh_tunnel.* properties for more details.
| ssh_tunnel.host | True (if ssh_tunnel set) | False | Host of the bastion server, this is the host we'll connect to via ssh
Expand Down
52 changes: 45 additions & 7 deletions tap_postgres/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import datetime
from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Type, Union

import psycopg2
import singer_sdk.helpers._typing
import sqlalchemy
from singer_sdk import SQLConnector, SQLStream
Expand Down Expand Up @@ -44,20 +45,53 @@ def patched_conform(
class PostgresConnector(SQLConnector):
"""Connects to the Postgres SQL source."""

@staticmethod
def __init__(
self,
config: dict | None = None,
sqlalchemy_url: str | None = None,
) -> None:
"""Initialize the SQL connector.
Args:
config: The parent tap or target object's config.
sqlalchemy_url: Optional URL for the connection.
"""
# Dates in postgres don't all convert to python datetime objects, so we
# need to register a custom type caster to convert these to a string
# See https://www.psycopg.org/psycopg3/docs/advanced/adapt.html#example-handling-infinity-date # noqa: E501
# For more information
if config is not None and config["dates_as_string"] is True:
string_dates = psycopg2.extensions.new_type(
(1082, 1114, 1184), "STRING_DATES", psycopg2.STRING
)
string_date_arrays = psycopg2.extensions.new_array_type(
(1182, 1115, 1188), "STRING_DATE_ARRAYS[]", psycopg2.STRING
)
psycopg2.extensions.register_type(string_dates)
psycopg2.extensions.register_type(string_date_arrays)

super().__init__(config=config, sqlalchemy_url=sqlalchemy_url)

# Note super is static, we can get away with this because this is called once
# and is luckily referenced via the instance of the class
def to_jsonschema_type(
self,
sql_type: Union[
str,
sqlalchemy.types.TypeEngine,
Type[sqlalchemy.types.TypeEngine],
postgresql.ARRAY,
Any,
]
],
) -> dict:
"""Return a JSON Schema representation of the provided type.
Overidden from SQLConnector to correctly handle JSONB and Arrays.
Also Overridden in order to call our instance method `sdk_typing_object()`
instead of the static version
By default will call `typing.to_jsonschema_type()` for strings and SQLAlchemy
types.
Expand Down Expand Up @@ -89,12 +123,12 @@ def to_jsonschema_type(
and isinstance(sql_type, sqlalchemy.dialects.postgresql.ARRAY)
and type_name == "ARRAY"
):
array_type = PostgresConnector.sdk_typing_object(sql_type.item_type)
array_type = self.sdk_typing_object(sql_type.item_type)
return th.ArrayType(array_type).type_dict
return PostgresConnector.sdk_typing_object(sql_type).type_dict
return self.sdk_typing_object(sql_type).type_dict

@staticmethod
def sdk_typing_object(
self,
from_type: str
| sqlalchemy.types.TypeEngine
| type[sqlalchemy.types.TypeEngine],
Expand Down Expand Up @@ -148,6 +182,9 @@ def sdk_typing_object(
"bool": th.BooleanType(),
"variant": th.StringType(),
}
if self.config["dates_as_string"] is True:
sqltype_lookup["date"] = th.StringType()
sqltype_lookup["datetime"] = th.StringType()
if isinstance(from_type, str):
type_name = from_type
elif isinstance(from_type, sqlalchemy.types.TypeEngine):
Expand Down Expand Up @@ -235,5 +272,6 @@ def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]:
if start_val:
query = query.filter(replication_key_col >= start_val)

for row in self.connector.connection.execute(query):
yield dict(row)
with self.connector._connect() as con:
for row in con.execute(query):
yield dict(row)
14 changes: 12 additions & 2 deletions tap_postgres/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import signal
from functools import cached_property
from os import chmod, path
from typing import Any, Mapping, cast
from typing import Any, Dict, cast

import paramiko
from singer_sdk import SQLTap, Stream
Expand Down Expand Up @@ -137,6 +137,16 @@ def __init__(
"tap automatically determines ALL available Postgres schemas."
),
),
th.Property(
"dates_as_string",
th.BooleanType,
description=(
"Defaults to false, if true, date, and timestamp fields will be "
"Strings. If you see ValueError: Year is out of range, "
"try setting this to True."
),
default=False,
),
th.Property(
"ssh_tunnel",
th.ObjectType(
Expand Down Expand Up @@ -274,7 +284,7 @@ def __init__(
),
).to_dict()

def get_sqlalchemy_url(self, config: Mapping[str, Any]) -> str:
def get_sqlalchemy_url(self, config: Dict[Any, Any]) -> str:
"""Generate a SQLAlchemy URL.
Args:
Expand Down
98 changes: 94 additions & 4 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def test_temporal_datatypes():
This test checks that dates are being parsed correctly, and additionally implements
schema checks, and performs similar tests on times and timestamps.
"""
table_name = "test_date"
table_name = "test_temporal_datatypes"
engine = sqlalchemy.create_engine(SAMPLE_CONFIG["sqlalchemy_url"])

metadata_obj = MetaData()
Expand Down Expand Up @@ -237,11 +237,17 @@ def test_jsonb_json():
"stream" in schema_message
and schema_message["stream"] == altered_table_name
):
assert "object" in schema_message["schema"]["properties"]["column_jsonb"]["type"]
assert "object" in schema_message["schema"]["properties"]["column_json"]["type"]
assert (
"object"
in schema_message["schema"]["properties"]["column_jsonb"]["type"]
)
assert (
"object"
in schema_message["schema"]["properties"]["column_json"]["type"]
)
assert test_runner.records[altered_table_name][0] == {
"column_jsonb": {"foo": "bar"},
"column_json": {"baz": "foo"}
"column_json": {"baz": "foo"},
}


Expand Down Expand Up @@ -323,3 +329,87 @@ def run_sync_dry_run(self) -> bool:
new_tap = self.new_tap()
new_tap.sync_all()
return True


def test_invalid_python_dates():
"""Some dates are invalid in python, but valid in Postgres
Check out https://www.psycopg.org/psycopg3/docs/advanced/adapt.html#example-handling-infinity-date
for more information.
"""
table_name = "test_invalid_python_dates"
engine = sqlalchemy.create_engine(SAMPLE_CONFIG["sqlalchemy_url"])

metadata_obj = MetaData()
table = Table(
table_name,
metadata_obj,
Column("date", DATE),
Column("datetime", DateTime),
)
with engine.connect() as conn:
if table.exists(conn):
table.drop(conn)
metadata_obj.create_all(conn)
insert = table.insert().values(
date="4713-04-03 BC",
datetime="4712-10-19 10:23:54 BC",
)
conn.execute(insert)
tap = TapPostgres(config=SAMPLE_CONFIG)
# Alter config and then check the data comes through as a string
tap_catalog = json.loads(tap.catalog_json_text)
altered_table_name = f"public-{table_name}"
for stream in tap_catalog["streams"]:
if stream.get("stream") and altered_table_name not in stream["stream"]:
for metadata in stream["metadata"]:
metadata["metadata"]["selected"] = False
else:
for metadata in stream["metadata"]:
metadata["metadata"]["selected"] = True
if metadata["breadcrumb"] == []:
metadata["metadata"]["replication-method"] = "FULL_TABLE"

test_runner = PostgresTestRunner(
tap_class=TapPostgres, config=SAMPLE_CONFIG, catalog=tap_catalog
)
with pytest.raises(ValueError):
test_runner.sync_all()

copied_config = copy.deepcopy(SAMPLE_CONFIG)
# This should cause the same data to pass
copied_config["dates_as_string"] = True
tap = TapPostgres(config=copied_config)
tap_catalog = json.loads(tap.catalog_json_text)
altered_table_name = f"public-{table_name}"
for stream in tap_catalog["streams"]:
if stream.get("stream") and altered_table_name not in stream["stream"]:
for metadata in stream["metadata"]:
metadata["metadata"]["selected"] = False
else:
for metadata in stream["metadata"]:
metadata["metadata"]["selected"] = True
if metadata["breadcrumb"] == []:
metadata["metadata"]["replication-method"] = "FULL_TABLE"

test_runner = PostgresTestRunner(
tap_class=TapPostgres, config=SAMPLE_CONFIG, catalog=tap_catalog
)
test_runner.sync_all()

for schema_message in test_runner.schema_messages:
if (
"stream" in schema_message
and schema_message["stream"] == altered_table_name
):
assert ["string", "null"] == schema_message["schema"]["properties"]["date"][
"type"
]
assert ["string", "null"] == schema_message["schema"]["properties"][
"datetime"
]["type"]
assert test_runner.records[altered_table_name][0] == {
"date": "4713-04-03 BC",
"datetime": "4712-10-19 10:23:54 BC",
}

0 comments on commit e174f27

Please sign in to comment.