Skip to content

Commit

Permalink
Fix lock file
Browse files Browse the repository at this point in the history
  • Loading branch information
visch committed Oct 20, 2023
2 parents 49b377a + 420efc5 commit d247545
Show file tree
Hide file tree
Showing 8 changed files with 266 additions and 252 deletions.
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ ci:

repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0
rev: v4.5.0
hooks:
- id: check-json
- id: check-toml
Expand Down Expand Up @@ -40,7 +40,7 @@ repos:
- flake8-docstrings==1.6.0

- repo: https://github.com/asottile/pyupgrade
rev: v3.13.0
rev: v3.15.0
hooks:
- id: pyupgrade
args: [--py37-plus]
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# `target-postgres`

![PyPI - Version](https://img.shields.io/pypi/v/meltanolabs-target-postgres)
![PyPI - Downloads](https://img.shields.io/pypi/dm/meltanolabs-target-postgres)
![PyPI - License](https://img.shields.io/pypi/l/meltanolabs-target-postgres)
![Test target-postgres](https://github.com/meltanolabs/target-postgres/actions/workflows/ci_workflow.yml/badge.svg)

Target for Postgres.

Built with the [Meltano SDK](https://sdk.meltano.com) for Singer Taps and Targets.
Expand Down
432 changes: 236 additions & 196 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ packages = [
python = "<3.12,>=3.7.1"
requests = "^2.25.1"
singer-sdk = ">=0.28,<0.33"
psycopg2-binary = "2.9.8"
psycopg2-binary = "2.9.9"
sqlalchemy = ">=2.0,<3.0"
sshtunnel = "0.4.0"

Expand Down
12 changes: 8 additions & 4 deletions target_postgres/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
DATETIME,
DECIMAL,
INTEGER,
TEXT,
TIME,
TIMESTAMP,
VARCHAR,
Expand Down Expand Up @@ -268,7 +269,10 @@ def pick_individual_type(jsonschema_type: dict):
return ARRAY(JSONB())
if jsonschema_type.get("format") == "date-time":
return TIMESTAMP()
return th.to_sql_type(jsonschema_type)
individual_type = th.to_sql_type(jsonschema_type)
if isinstance(individual_type, VARCHAR):
return TEXT()
return individual_type

@staticmethod
def pick_best_sql_type(sql_type_array: list):
Expand All @@ -283,7 +287,7 @@ def pick_best_sql_type(sql_type_array: list):
precedence_order = [
ARRAY,
JSONB,
VARCHAR,
TEXT,
TIMESTAMP,
DATETIME,
DATE,
Expand All @@ -299,7 +303,7 @@ def pick_best_sql_type(sql_type_array: list):
for obj in sql_type_array:
if isinstance(obj, sql_type):
return obj
return VARCHAR()
return TEXT()

def create_empty_table(
self,
Expand Down Expand Up @@ -775,7 +779,7 @@ def column_exists(
class NOTYPE(TypeDecorator):
"""Type to use when none is provided in the schema."""

impl = VARCHAR
impl = TEXT
cache_ok = True

def process_bind_param(self, value, dialect):
Expand Down
24 changes: 8 additions & 16 deletions target_postgres/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,22 +142,14 @@ def bulk_insert_records(

if self.append_only is False:
insert_records: Dict[str, Dict] = {} # pk : record
try:
for record in records:
insert_record = {}
for column in columns:
insert_record[column.name] = record.get(column.name)
primary_key_value = "".join(
[str(record[key]) for key in primary_keys]
)
insert_records[primary_key_value] = insert_record
except KeyError:
raise RuntimeError(
"Primary key not found in record. "
f"full_table_name: {table.name}. "
f"schema: {table.schema}. "
f"primary_keys: {primary_keys}."
)
for record in records:
insert_record = {}
for column in columns:
insert_record[column.name] = record.get(column.name)
# No need to check for a KeyError here because the SDK already
# guaruntees that all key properties exist in the record.
primary_key_value = "".join([str(record[key]) for key in primary_keys])
insert_records[primary_key_value] = insert_record
data_to_insert = list(insert_records.values())
else:
for record in records:
Expand Down
18 changes: 0 additions & 18 deletions target_postgres/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

from pathlib import PurePath

import jsonschema
from singer_sdk import typing as th
from singer_sdk.target_base import SQLTarget

Expand Down Expand Up @@ -320,20 +319,3 @@ def max_parallelism(self) -> int:
"""
# https://github.com/MeltanoLabs/target-postgres/issues/3
return 1

def _process_record_message(self, message_dict: dict) -> None:
"""Process a RECORD message.
Args:
message_dict: TODO
"""
stream_name = message_dict["stream"]
if self.mapper.stream_maps.get(stream_name) is None:
raise Exception(f"Schema message has not been sent for {stream_name}")
try:
super()._process_record_message(message_dict)
except jsonschema.exceptions.ValidationError as e:
self.logger.error(
f"Exception is being thrown for stream_name: {stream_name}"
)
raise e
21 changes: 6 additions & 15 deletions target_postgres/tests/test_standard_target.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
import jsonschema
import pytest
import sqlalchemy
from singer_sdk.exceptions import MissingKeyPropertiesError
from singer_sdk.testing import sync_end_to_end
from sqlalchemy.dialects.postgresql import ARRAY
from sqlalchemy.types import TIMESTAMP, VARCHAR
from sqlalchemy.types import TEXT, TIMESTAMP

from target_postgres.connector import PostgresConnector
from target_postgres.target import TargetPostgres
Expand Down Expand Up @@ -197,16 +198,6 @@ def test_aapl_to_postgres(postgres_config):
sync_end_to_end(tap, target)


def test_record_before_schema(postgres_target):
with pytest.raises(Exception) as e:
file_name = "record_before_schema.singer"
singer_file_to_target(file_name, postgres_target)

assert (
str(e.value) == "Schema message has not been sent for test_record_before_schema"
)


def test_invalid_schema(postgres_target):
with pytest.raises(Exception) as e:
file_name = "invalid_schema.singer"
Expand All @@ -217,7 +208,7 @@ def test_invalid_schema(postgres_target):


def test_record_missing_key_property(postgres_target):
with pytest.raises(Exception) as e:
with pytest.raises(MissingKeyPropertiesError) as e:
file_name = "record_missing_key_property.singer"
singer_file_to_target(file_name, postgres_target)
assert "Record is missing one or more key_properties." in str(e.value)
Expand Down Expand Up @@ -443,7 +434,7 @@ def test_anyof(postgres_target):
for column in table.c:
# {"type":"string"}
if column.name == "id":
assert isinstance(column.type, VARCHAR)
assert isinstance(column.type, TEXT)

# Any of nullable date-time.
# Note that postgres timestamp is equivalent to jsonschema date-time.
Expand All @@ -459,12 +450,12 @@ def test_anyof(postgres_target):
# Any of nullable string.
# {"anyOf":[{"type":"string"},{"type":"null"}]}
if column.name == "commit_message":
assert isinstance(column.type, VARCHAR)
assert isinstance(column.type, TEXT)

# Any of nullable string or integer.
# {"anyOf":[{"type":"string"},{"type":"integer"},{"type":"null"}]}
if column.name == "legacy_id":
assert isinstance(column.type, VARCHAR)
assert isinstance(column.type, TEXT)


def test_new_array_column(postgres_target):
Expand Down

0 comments on commit d247545

Please sign in to comment.