diff --git a/CHANGES.md b/CHANGES.md index 560b6674..0a881545 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -5,6 +5,8 @@ - IO: Added the `if-exists` query parameter by updating to influxio 0.4.0. - Rockset: Added CrateDB Rockset Adapter, a HTTP API emulation layer - MongoDB: Added adapter amalgamating PyMongo to use CrateDB as backend +- SQLAlchemy: Clean up and refactor SQLAlchemy polyfills + to `cratedb_toolkit.util.sqlalchemy` ## 2024/06/18 v0.0.14 - Add `ctk cfr` and `ctk wtf` diagnostics programs diff --git a/cratedb_toolkit/adapter/pymongo/api.py b/cratedb_toolkit/adapter/pymongo/api.py index 24bea82d..2e71fb0d 100644 --- a/cratedb_toolkit/adapter/pymongo/api.py +++ b/cratedb_toolkit/adapter/pymongo/api.py @@ -3,9 +3,9 @@ import pymongo.collection from cratedb_toolkit.adapter.pymongo.collection import collection_factory -from cratedb_toolkit.sqlalchemy.patch import patch_types_map from cratedb_toolkit.util import DatabaseAdapter from cratedb_toolkit.util.pandas import patch_pandas_sqltable_with_extended_mapping +from cratedb_toolkit.util.sqlalchemy import patch_types_map class PyMongoCrateDBAdapter: diff --git a/cratedb_toolkit/adapter/pymongo/backlog.md b/cratedb_toolkit/adapter/pymongo/backlog.md index 346083b6..c75b98d1 100644 --- a/cratedb_toolkit/adapter/pymongo/backlog.md +++ b/cratedb_toolkit/adapter/pymongo/backlog.md @@ -2,7 +2,7 @@ ## Iteration +1 - Upstream / converge patches. - - `cratedb_toolkit/sqlalchemy/patch.py` + - `cratedb_toolkit/util/sqlalchemy.py` - `cratedb_toolkit/util/pandas.py` - `cratedb_toolkit/adapter/pymongo/api.py::adjust_sqlalchemy` - `cratedb_toolkit/adapter/pymongo/collection.py::insert_returning_id` diff --git a/cratedb_toolkit/cfr/systable.py b/cratedb_toolkit/cfr/systable.py index b4a7b70b..28f36aaa 100644 --- a/cratedb_toolkit/cfr/systable.py +++ b/cratedb_toolkit/cfr/systable.py @@ -27,9 +27,9 @@ import sqlalchemy as sa from tqdm import tqdm -from cratedb_toolkit.sqlalchemy.patch import patch_encoder from cratedb_toolkit.util import DatabaseAdapter from cratedb_toolkit.util.cli import error_logger +from cratedb_toolkit.util.sqlalchemy import patch_encoder from cratedb_toolkit.wtf.core import InfoContainer logger = logging.getLogger(__name__) diff --git a/cratedb_toolkit/sqlalchemy/__init__.py b/cratedb_toolkit/sqlalchemy/__init__.py deleted file mode 100644 index 6b9e2cde..00000000 --- a/cratedb_toolkit/sqlalchemy/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .polyfill import check_uniqueness_factory, polyfill_autoincrement, polyfill_refresh_after_dml, refresh_table diff --git a/cratedb_toolkit/sqlalchemy/polyfill.py b/cratedb_toolkit/sqlalchemy/polyfill.py deleted file mode 100644 index e8b3abfc..00000000 --- a/cratedb_toolkit/sqlalchemy/polyfill.py +++ /dev/null @@ -1,113 +0,0 @@ -import itertools - -import sqlalchemy as sa -from sqlalchemy.event import listen - - -def polyfill_autoincrement(): - """ - Configure SQLAlchemy model columns with an alternative to `autoincrement=True`. - - In this case, use a random identifier: Nagamani19, a short, unique, - non-sequential identifier based on Hashids. - - TODO: Submit patch to `crate-python`, to be enabled by a - dialect parameter `crate_polyfill_autoincrement` or such. - """ - import sqlalchemy.sql.schema as schema - from sqlalchemy import func - - init_dist = schema.Column.__init__ - - def __init__(self, *args, **kwargs): - if "autoincrement" in kwargs: - del kwargs["autoincrement"] - if "default" not in kwargs: - kwargs["default"] = func.now() - init_dist(self, *args, **kwargs) - - schema.Column.__init__ = __init__ # type: ignore[method-assign] - - -def check_uniqueness_factory(sa_entity, *attribute_names): - """ - Run a manual column value uniqueness check on a table, and raise an IntegrityError if applicable. - - CrateDB does not support the UNIQUE constraint on columns. This attempts to emulate it. - - TODO: Submit patch to `crate-python`, to be enabled by a - dialect parameter `crate_polyfill_unique` or such. - """ - - # Synthesize a canonical "name" for the constraint, - # composed of all column names involved. - constraint_name: str = "-".join(attribute_names) - - def check_uniqueness(mapper, connection, target): - from sqlalchemy.exc import IntegrityError - - if isinstance(target, sa_entity): - # TODO: How to use `session.query(SqlExperiment)` here? - stmt = mapper.selectable.select() - for attribute_name in attribute_names: - stmt = stmt.filter(getattr(sa_entity, attribute_name) == getattr(target, attribute_name)) - stmt = stmt.compile(bind=connection.engine) - results = connection.execute(stmt) - if results.rowcount > 0: - raise IntegrityError( - statement=stmt, - params=[], - orig=Exception( - f"DuplicateKeyException in table '{target.__tablename__}' " f"on constraint '{constraint_name}'" - ), - ) - - return check_uniqueness - - -def polyfill_refresh_after_dml(session): - """ - Run `REFRESH TABLE ` after each INSERT, UPDATE, and DELETE operation. - - CrateDB is eventually consistent, i.e. write operations are not flushed to - disk immediately, so readers may see stale data. In a traditional OLTP-like - application, this is not applicable. - - This SQLAlchemy extension makes sure that data is synchronized after each - operation manipulating data. - - > `after_{insert,update,delete}` events only apply to the session flush operation - > and do not apply to the ORM DML operations described at ORM-Enabled INSERT, - > UPDATE, and DELETE statements. To intercept ORM DML events, use - > `SessionEvents.do_orm_execute().` - > -- https://docs.sqlalchemy.org/en/20/orm/events.html#sqlalchemy.orm.MapperEvents.after_insert - - > Intercept statement executions that occur on behalf of an ORM Session object. - > -- https://docs.sqlalchemy.org/en/20/orm/events.html#sqlalchemy.orm.SessionEvents.do_orm_execute - - > Execute after flush has completed, but before commit has been called. - > -- https://docs.sqlalchemy.org/en/20/orm/events.html#sqlalchemy.orm.SessionEvents.after_flush - - TODO: Submit patch to `crate-python`, to be enabled by a - dialect parameter `crate_dml_refresh` or such. - """ # noqa: E501 - listen(session, "after_flush", do_flush) - - -def do_flush(session, flush_context): - """ - SQLAlchemy event handler for the 'after_flush' event, - invoking `REFRESH TABLE` on each table which has been modified. - """ - dirty_entities = itertools.chain(session.new, session.dirty, session.deleted) - dirty_classes = {entity.__class__ for entity in dirty_entities} - for class_ in dirty_classes: - refresh_table(session, class_) - - -def refresh_table(connection, target): - """ - Invoke a `REFRESH TABLE` statement. - """ - sql = f"REFRESH TABLE {target.__tablename__}" - connection.execute(sa.text(sql)) diff --git a/cratedb_toolkit/sqlalchemy/patch.py b/cratedb_toolkit/util/sqlalchemy.py similarity index 93% rename from cratedb_toolkit/sqlalchemy/patch.py rename to cratedb_toolkit/util/sqlalchemy.py index a440e22a..6d591ca4 100644 --- a/cratedb_toolkit/sqlalchemy/patch.py +++ b/cratedb_toolkit/util/sqlalchemy.py @@ -1,3 +1,9 @@ +""" +Patches and polyfills, mostly for SQLAlchemy. + +TODO: Refactor to `crate` or `sqlalchemy-cratedb` packages. +""" + import calendar import datetime as dt import json diff --git a/tests/sqlalchemy/test_patch.py b/tests/sqlalchemy/test_patch.py index d6539869..0d44eb29 100644 --- a/tests/sqlalchemy/test_patch.py +++ b/tests/sqlalchemy/test_patch.py @@ -4,7 +4,7 @@ import pytest import sqlalchemy as sa -from cratedb_toolkit.sqlalchemy.patch import CrateJsonEncoderWithNumPy +from cratedb_toolkit.util.sqlalchemy import CrateJsonEncoderWithNumPy from tests.conftest import TESTDRIVE_DATA_SCHEMA diff --git a/tests/sqlalchemy/test_polyfill.py b/tests/sqlalchemy/test_polyfill.py deleted file mode 100644 index 50c57f90..00000000 --- a/tests/sqlalchemy/test_polyfill.py +++ /dev/null @@ -1,151 +0,0 @@ -import re - -import pytest -import sqlalchemy as sa - -from cratedb_toolkit.sqlalchemy import check_uniqueness_factory, polyfill_autoincrement, polyfill_refresh_after_dml - - -def get_autoincrement_model(): - """ - Provide a minimal SQLAlchemy model including an AUTOINCREMENT primary key. - """ - Base = sa.orm.declarative_base() - - class FooBar(Base): - """ - Minimal SQLAlchemy model with autoincrement primary key. - """ - - __tablename__ = "foobar" - identifier = sa.Column(sa.BigInteger, primary_key=True, autoincrement=True) - foo = sa.Column(sa.String) - - return FooBar - - -def get_unique_model_single(): - """ - Provide a minimal SQLAlchemy model including a column with UNIQUE constraint. - """ - Base = sa.orm.declarative_base() - - class FooBarSingle(Base): - """ - Minimal SQLAlchemy model with UNIQUE constraint. - """ - - __tablename__ = "foobar_unique_single" - identifier = sa.Column(sa.BigInteger, primary_key=True, default=sa.func.now()) - name = sa.Column(sa.String, unique=True, nullable=False) - - return FooBarSingle - - -def get_unique_model_composite(): - """ - Provide a minimal SQLAlchemy model using a composite UNIQUE constraint. - """ - Base = sa.orm.declarative_base() - - class FooBarComposite(Base): - """ - Minimal SQLAlchemy model with UNIQUE constraint. - """ - - __tablename__ = "foobar_unique_composite" - identifier = sa.Column(sa.BigInteger, primary_key=True, default=sa.func.now()) - name = sa.Column(sa.String, nullable=False) - user_id = sa.Column(sa.Integer, nullable=False) - __table_args__ = (sa.UniqueConstraint("name", "user_id", name="unique_name_user"),) - - return FooBarComposite - - -def test_autoincrement_vanilla(database, needs_sqlalchemy2): - """ - When using a model including an autoincrement column, and not assigning a value, CrateDB will fail. - """ - FooBar = get_autoincrement_model() - FooBar.metadata.create_all(database.engine) - with sa.orm.Session(database.engine) as session: - session.add(FooBar(foo="bar")) - with pytest.raises(sa.exc.ProgrammingError) as ex: - session.commit() - assert ex.match( - re.escape("SQLParseException[Column `identifier` is required but is missing from the insert statement]") - ) - - -def test_autoincrement_polyfill(database, needs_sqlalchemy2): - """ - When using a model including an autoincrement column, and the corresponding polyfill - is installed, the procedure will succeed. - """ - polyfill_autoincrement() - - FooBar = get_autoincrement_model() - FooBar.metadata.create_all(database.engine) - with sa.orm.Session(database.engine) as session: - session.add(FooBar(foo="bar")) - session.commit() - - -def test_unique_patched(database): - """ - When using a model including a column with UNIQUE constraint, the SQLAlchemy dialect will ignore it. - """ - FooBar = get_unique_model_single() - FooBar.metadata.create_all(database.engine) - - with sa.orm.Session(database.engine) as session: - session.add(FooBar(name="name-1")) - session.commit() - session.add(FooBar(name="name-1")) - session.commit() - - -def test_unique_patched_and_active_single(database): - """ - When using a model including a column with UNIQUE constraint, enabling the patch, - and activating the uniqueness check, SQLAlchemy will raise `DuplicateKeyException` - errors if uniqueness constraints don't hold. - """ - FooBar = get_unique_model_single() - FooBar.metadata.create_all(database.engine) - - # For uniqueness checks to take place, installing an event handler is needed. - # TODO: Maybe add to some helper function? - # TODO: Maybe derive from the model definition itself? - sa.event.listen(FooBar, "before_insert", check_uniqueness_factory(FooBar, "name")) - - with sa.orm.Session(database.engine) as session: - polyfill_refresh_after_dml(session) - session.add(FooBar(name="name-1")) - session.commit() - session.add(FooBar(name="name-1")) - with pytest.raises(sa.exc.IntegrityError) as ex: - session.commit() - assert ex.match("DuplicateKeyException in table 'foobar_unique_single' on constraint 'name'") - - -def test_unique_patched_and_active_composite(database): - """ - Similar to the _single variant, verify emulated **composite** UNIQUE constraints. - """ - FooBar = get_unique_model_composite() - FooBar.metadata.create_all(database.engine) - - # For uniqueness checks to take place, installing an event handler is needed. - # TODO: Maybe add to some helper function? - # TODO: Maybe derive from the model definition itself? - sa.event.listen(FooBar, "before_insert", check_uniqueness_factory(FooBar, "name", "user_id")) - - with sa.orm.Session(database.engine) as session: - polyfill_refresh_after_dml(session) - session.add(FooBar(name="name-1", user_id=1)) - session.commit() - session.add(FooBar(name="name-1", user_id=1)) - with pytest.raises(sa.exc.IntegrityError) as ex: - session.commit() - assert ex.match("DuplicateKeyException in table 'foobar_unique_composite' on constraint 'name-user_id'")