Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(targets): Handle missing record properties in SQL sinks #1865

14 changes: 12 additions & 2 deletions singer_sdk/sinks/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,10 +323,20 @@ def bulk_insert_records(
insert_sql = sqlalchemy.text(insert_sql)

conformed_records = [self.conform_record(record) for record in records]
property_names = list(self.conform_schema(schema)["properties"].keys())

# Create new record dicts with missing properties filled in with None
new_records = [
{name: record.get(name, None) for name in property_names}
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved
for record in conformed_records
]

self.logger.info("Inserting with SQL: %s", insert_sql)

with self.connector._connect() as conn, conn.begin():
conn.execute(insert_sql, conformed_records)
return len(conformed_records) if isinstance(conformed_records, list) else None
result = conn.execute(insert_sql, new_records)

return result.rowcount

def merge_upsert_from_table(
self,
Expand Down
29 changes: 29 additions & 0 deletions tests/samples/test_target_sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,35 @@ def test_sqlite_column_no_morph(sqlite_sample_target: SQLTarget):
target_sync_test(sqlite_sample_target, input=StringIO(tap_output_b), finalize=True)


def test_record_with_missing_properties(
sqlite_sample_target: SQLTarget,
):
"""Test handling of records with missing properties."""
tap_output = "\n".join(
json.dumps(msg)
for msg in [
{
"type": "SCHEMA",
"stream": "test_stream",
"schema": {
"type": "object",
"properties": {
"id": {"type": "integer"},
"name": {"type": "string"},
},
},
"key_properties": ["id"],
},
{
"type": "RECORD",
"stream": "test_stream",
"record": {"id": 1},
},
]
)
target_sync_test(sqlite_sample_target, input=StringIO(tap_output), finalize=True)


@pytest.mark.parametrize(
"stream_name,schema,key_properties,expected_dml",
[
Expand Down