Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Break out host, user, port, etc. into individual config values #121

Merged
merged 12 commits into from
Jun 23, 2023
92 changes: 86 additions & 6 deletions tap_postgres/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,91 @@
import io
import signal
from functools import cached_property
from typing import TYPE_CHECKING, Any
from typing import Any, Mapping, cast

import paramiko
from singer_sdk import SQLTap, Stream
from singer_sdk import typing as th # JSON schema typing helpers
from sqlalchemy.engine import URL
from sqlalchemy.engine.url import make_url
from sshtunnel import SSHTunnelForwarder

from tap_postgres.client import PostgresConnector, PostgresStream

if TYPE_CHECKING:
from sqlalchemy.engine.url import URL


class TapPostgres(SQLTap):
"""Singer tap for Postgres."""

name = "tap-postgres"
default_stream_class = PostgresStream

def __init__(
self,
*args,
**kwargs,
) -> None:
"""Constructor.

Should use JSON Schema instead
See https://github.com/MeltanoLabs/tap-postgres/issues/141
"""
super().__init__(*args, **kwargs)
assert (self.config.get("sqlalchemy_url") is not None) or (
self.config.get("host") is not None
and self.config.get("port") is not None
and self.config.get("user") is not None
and self.config.get("password") is not None
), (
"Need either the sqlalchemy_url to be set or host, port, user,"
+ " and password to be set"
)

config_jsonschema = th.PropertiesList(
th.Property(
"host",
th.StringType,
description=(
"Hostname for postgres instance. "
+ "Note if sqlalchemy_url is set this will be ignored."
),
),
th.Property(
"port",
th.IntegerType,
default=5432,
description=(
"The port on which postgres is awaiting connection. "
+ "Note if sqlalchemy_url is set this will be ignored."
),
),
th.Property(
"user",
th.StringType,
description=(
"User name used to authenticate. "
+ "Note if sqlalchemy_url is set this will be ignored."
),
),
th.Property(
"password",
th.StringType,
visch marked this conversation as resolved.
Show resolved Hide resolved
secret=True,
description=(
"Password used to authenticate. "
"Note if sqlalchemy_url is set this will be ignored."
),
),
th.Property(
"database",
th.StringType,
description=(
"Database name. "
+ "Note if sqlalchemy_url is set this will be ignored."
),
),
th.Property(
"sqlalchemy_url",
th.StringType,
required=True,
secret=True,
description=(
"Example postgresql://[username]:[password]@localhost:5432/[db_name]"
Expand Down Expand Up @@ -93,6 +153,26 @@ class TapPostgres(SQLTap):
),
).to_dict()

def get_sqlalchemy_url(self, config: Mapping[str, Any]) -> str:
"""Generate a SQLAlchemy URL.

Args:
config: The configuration for the connector.
"""
if config.get("sqlalchemy_url"):
return cast(str, config["sqlalchemy_url"])

else:
sqlalchemy_url = URL.create(
drivername="postgresql+psycopg2",
username=config["user"],
password=config["password"],
host=config["host"],
port=config["port"],
database=config["database"],
)
return cast(str, sqlalchemy_url)

@cached_property
def connector(self) -> PostgresConnector:
"""Get a configured connector for this Tap.
Expand All @@ -101,7 +181,7 @@ def connector(self) -> PostgresConnector:

"""
# We mutate this url to use the ssh tunnel if enabled
url = make_url(self.config["sqlalchemy_url"])
url = make_url(self.get_sqlalchemy_url(config=self.config))
ssh_config = self.config.get("ssh_tunnel", {})

if ssh_config.get("enable", False):
Expand Down
26 changes: 26 additions & 0 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@
"sqlalchemy_url": "postgresql://postgres:postgres@localhost:5432/postgres",
}

NO_SQLALCHEMY_CONFIG = {
"start_date": pendulum.datetime(2022, 11, 1).to_iso8601_string(),
"host": "localhost",
"port": 5432,
"user": "postgres",
"password": "postgres",
"database": "postgres",
}


def setup_test_table(table_name, sqlalchemy_url):
"""setup any state specific to the execution of the given module."""
Expand Down Expand Up @@ -61,6 +70,13 @@ def teardown_test_table(table_name, sqlalchemy_url):
custom_suites=[custom_test_replication_key],
)

TapPostgresTestNOSQLALCHEMY = get_tap_test_class(
tap_class=TapPostgres,
config=NO_SQLALCHEMY_CONFIG,
catalog="tests/resources/data.json",
custom_suites=[custom_test_replication_key],
)


class TestTapPostgres(TapPostgresTest):

Expand All @@ -73,6 +89,16 @@ def resource(self):
yield
teardown_test_table(self.table_name, self.sqlalchemy_url)

class TestTapPostgres_NOSQLALCHMY(TapPostgresTestNOSQLALCHEMY):

table_name = TABLE_NAME
sqlalchemy_url = SAMPLE_CONFIG["sqlalchemy_url"]

@pytest.fixture(scope="class")
def resource(self):
setup_test_table(self.table_name, self.sqlalchemy_url)
yield
teardown_test_table(self.table_name, self.sqlalchemy_url)

def test_jsonb():
"""JSONB Objects weren't being selected, make sure they are now"""
Expand Down