Skip to content

Commit

Permalink
feat: Use SQLAlchemy core objects where possible and append only added (
Browse files Browse the repository at this point in the history
#138)

Closes #121, #22, #64, #119, #21, #54
  • Loading branch information
visch authored Jun 15, 2023
1 parent bfb80d3 commit 4f677d3
Show file tree
Hide file tree
Showing 10 changed files with 440 additions and 177 deletions.
139 changes: 139 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,142 @@ smoke-test
.tox/**
.vscode/**
output/**
.env
plugins/**

# Secrets and internal config files
.secrets/*

# Ignore meltano internal cache and sqlite systemdb

.meltano/

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
.python-version

# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock

# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/
48 changes: 32 additions & 16 deletions meltano.yml
Original file line number Diff line number Diff line change
@@ -1,33 +1,49 @@
version: 1
send_anonymous_usage_stats: true
default_environment: dev
project_id: "target-postgres"
project_id: target-postgres
plugins:
extractors:
- name: "tap-smoke-test"
namespace: "tap_smoke_test"
executable: "tap-smoke-test"
pip_url: "git+https://github.com/meltano/tap-smoke-test.git"
- name: tap-smoke-test
namespace: tap_smoke_test
pip_url: git+https://github.com/meltano/tap-smoke-test.git
executable: tap-smoke-test
config:
streams:
- stream_name: animals
"input_filename": "https://gitlab.com/meltano/tap-smoke-test/-/raw/main/demo-data/animals-data.jsonl"
input_filename: https://gitlab.com/meltano/tap-smoke-test/-/raw/main/demo-data/animals-data.jsonl
- stream_name: page_views
"input_filename": "https://gitlab.com/meltano/tap-smoke-test/-/raw/main/demo-data/pageviews-data.jsonl"
input_filename: https://gitlab.com/meltano/tap-smoke-test/-/raw/main/demo-data/pageviews-data.jsonl
stream_maps:
"animals":
"__key_properties__": ["id"]
"page_views":
"__key_properties__": ["vistor_id"]
animals:
__key_properties__: [id]
page_views:
__key_properties__: [vistor_id]
- name: tap-github
variant: meltanolabs
pip_url: git+https://github.com/MeltanoLabs/tap-github.git
config:
repositories:
- sbalnojan/meltano-lightdash
start_date: '2022-01-01'
select:
- commits.url
- commits.sha
- commits.commit_timestamp
loaders:
- name: "target-postgres"
namespace: "target_postgres"
- name: target-postgres
namespace: target_postgres
pip_url: -e .
config:
sqlalchemy_url: "postgresql://postgres:postgres@localhost:5432/postgres"
target_schema: test
settings:
- name: sqlalchemy_url
kind: password
config:
host: localhost
port: 5432
user: postgres
password: postgres
database: postgres
target_schema: test
add_record_metadata: true
environments:
- name: dev
102 changes: 75 additions & 27 deletions target_postgres/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,43 @@ class PostgresConnector(SQLConnector):
allow_merge_upsert: bool = True # Whether MERGE UPSERT is supported.
allow_temp_tables: bool = True # Whether temp tables are supported.

def prepare_table(
self,
full_table_name: str,
schema: dict,
primary_keys: list[str],
partition_keys: list[str] | None = None,
as_temp_table: bool = False,
) -> sqlalchemy.Table:
"""Adapt target table to provided schema if possible.
Args:
full_table_name: the target table name.
schema: the JSON Schema for the table.
primary_keys: list of key properties.
partition_keys: list of partition keys.
as_temp_table: True to create a temp table.
"""
_, schema_name, table_name = self.parse_full_table_name(full_table_name)
meta = sqlalchemy.MetaData(bind=self._engine, schema=schema_name)
if not self.table_exists(full_table_name=full_table_name):
table = self.create_empty_table(
table_name=table_name,
meta=meta,
schema=schema,
primary_keys=primary_keys,
partition_keys=partition_keys,
as_temp_table=as_temp_table,
)
return table
for property_name, property_def in schema["properties"].items():
self.prepare_column(
full_table_name, property_name, self.to_sql_type(property_def)
)
meta.reflect(only=[table_name])

return meta.tables[full_table_name]

def create_sqlalchemy_connection(self) -> sqlalchemy.engine.Connection:
"""Return a new SQLAlchemy connection using the provided config.
Expand Down Expand Up @@ -53,22 +90,30 @@ def get_sqlalchemy_url(self, config: dict) -> str:
)
return cast(str, sqlalchemy_url)

def truncate_table(self, name):
"""Clear table data."""
self.connection.execute(f"TRUNCATE TABLE {name}")

def drop_table(self, name):
def drop_table(self, table: sqlalchemy.Table):
"""Drop table data."""
self.connection.execute(f"DROP TABLE {name}")

def create_temp_table_from_table(self, from_table_name, temp_table_name):
"""Temp table from another table."""
ddl = sqlalchemy.DDL(
"CREATE TEMP TABLE %(temp_table_name)s AS "
"SELECT * FROM %(from_table_name)s LIMIT 0",
{"temp_table_name": temp_table_name, "from_table_name": from_table_name},
)
self.connection.execute(ddl)
table.drop(bind=self.connection)

def clone_table(
self, new_table_name, table, metadata, connection, temp_table
) -> sqlalchemy.Table:
"""Clone a table."""
new_columns = []
for column in table.columns:
new_columns.append(
sqlalchemy.Column(
column.name,
column.type,
)
)
if temp_table is True:
new_table = sqlalchemy.Table(
new_table_name, metadata, *new_columns, prefixes=["TEMPORARY"]
)
else:
new_table = sqlalchemy.Table(new_table_name, metadata, *new_columns)
new_table.create(bind=connection)
return new_table

@staticmethod
def to_sql_type(jsonschema_type: dict) -> sqlalchemy.types.TypeEngine:
Expand Down Expand Up @@ -99,12 +144,13 @@ def to_sql_type(jsonschema_type: dict) -> sqlalchemy.types.TypeEngine:

def create_empty_table(
self,
full_table_name: str,
table_name: str,
meta: sqlalchemy.MetaData,
schema: dict,
primary_keys: list[str] | None = None,
partition_keys: list[str] | None = None,
as_temp_table: bool = False,
) -> None:
) -> sqlalchemy.Table:
"""Create an empty target table.
Args:
Expand All @@ -118,21 +164,16 @@ def create_empty_table(
NotImplementedError: if temp tables are unsupported and as_temp_table=True.
RuntimeError: if a variant schema is passed with no properties defined.
"""
if as_temp_table:
raise NotImplementedError("Temporary tables are not supported.")

_ = partition_keys # Not supported in generic implementation.

_, schema_name, table_name = self.parse_full_table_name(full_table_name)
meta = sqlalchemy.MetaData(schema=schema_name)
columns: list[sqlalchemy.Column] = []
primary_keys = primary_keys or []
try:
properties: dict = schema["properties"]
except KeyError:
raise RuntimeError(
f"Schema for '{full_table_name}' does not define properties: {schema}"
f"Schema for table_name: '{table_name}'"
f"does not define properties: {schema}"
)

for property_name, property_jsonschema in properties.items():
is_primary_key = property_name in primary_keys
columns.append(
Expand All @@ -142,9 +183,16 @@ def create_empty_table(
primary_key=is_primary_key,
)
)
if as_temp_table:
new_table = sqlalchemy.Table(
table_name, meta, *columns, prefixes=["TEMPORARY"]
)
new_table.create(bind=self.connection)
return new_table

_ = sqlalchemy.Table(table_name, meta, *columns)
meta.create_all(self._engine)
new_table = sqlalchemy.Table(table_name, meta, *columns)
new_table.create(bind=self.connection)
return new_table

def get_column_add_ddl(
self,
Expand Down
Loading

0 comments on commit 4f677d3

Please sign in to comment.