Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Incremental where clause generation from triggering TypeError #1688

Merged
9 changes: 1 addition & 8 deletions singer_sdk/streams/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
import abc
import typing as t

import sqlalchemy

import singer_sdk.helpers._catalog as catalog
from singer_sdk._singerlib import CatalogEntry, MetadataMapping
from singer_sdk.connectors import SQLConnector
Expand Down Expand Up @@ -195,12 +193,7 @@ def get_records(self, context: dict | None) -> t.Iterable[dict[str, t.Any]]:

start_val = self.get_starting_replication_key_value(context)
if start_val:
query = query.where(
sqlalchemy.text(":replication_key >= :start_val").bindparams(
replication_key=replication_key_col,
start_val=start_val,
),
)
query = query.where(replication_key_col >= start_val)

if self.ABORT_AT_RECORD_COUNT is not None:
# Limit record count to one greater than the abort threshold. This ensures
Expand Down
22 changes: 20 additions & 2 deletions tests/samples/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ def _sqlite_sample_db(sqlite_connector):


@pytest.fixture
def sqlite_sample_tap(_sqlite_sample_db, sqlite_sample_db_config) -> SQLiteTap:
def sqlite_sample_tap(
_sqlite_sample_db,
sqlite_sample_db_config,
sqlite_sample_db_state,
) -> SQLiteTap:
_ = _sqlite_sample_db
catalog_obj = Catalog.from_dict(
_get_tap_catalog(SQLiteTap, config=sqlite_sample_db_config, select_all=True),
Expand All @@ -51,7 +55,11 @@ def sqlite_sample_tap(_sqlite_sample_db, sqlite_sample_db_config) -> SQLiteTap:
t2.key_properties = ["c1"]
t2.replication_key = "c1"
t2.replication_method = "INCREMENTAL"
return SQLiteTap(config=sqlite_sample_db_config, catalog=catalog_obj.to_dict())
return SQLiteTap(
config=sqlite_sample_db_config,
catalog=catalog_obj.to_dict(),
state=sqlite_sample_db_state,
)


@pytest.fixture
Expand All @@ -68,3 +76,13 @@ def path_to_sample_data_db(tmp_path: Path) -> Path:
def sqlite_sample_db_config(path_to_sample_data_db: str) -> dict:
"""Get configuration dictionary for target-csv."""
return {"path_to_db": str(path_to_sample_data_db)}


@pytest.fixture
def sqlite_sample_db_state() -> dict:
"""Get configuration dictionary for target-csv."""
return {
"bookmarks": {
"main-t0": {"replication_key": "c1", "replication_key_value": 55},
},
}