Skip to content

Commit

Permalink
fixup! DMS/DynamoDB/MongoDB: Use SQL with parameters instead of inlin…
Browse files Browse the repository at this point in the history
…ing values
  • Loading branch information
amotl committed Aug 26, 2024
1 parent 6ec6db7 commit 16df9f6
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 10 deletions.
5 changes: 5 additions & 0 deletions cratedb_toolkit/io/processor/kinesis_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ def handler(event, context):
# Process record.
operation = cdc.to_sql(record_data)
connection.execute(sa.text(operation.statement), parameters=operation.parameters)

# Processing alternating CDC events requires write synchronization.
# FIXME: Needs proper table name quoting.
connection.execute(sa.text(f"REFRESH TABLE {CRATEDB_TABLE}"))

connection.commit()

# Bookkeeping.
Expand Down
13 changes: 3 additions & 10 deletions tests/io/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,25 +135,18 @@ def test_processor_kinesis_dynamodb_insert_update(cratedb, reset_handler, mocker
# Define two CDC events: INSERT and UPDATE.
# They have to be conveyed separately because CrateDB needs a
# `REFRESH TABLE` operation between them.
event1 = {
event = {
"Records": [
wrap_kinesis(DYNAMODB_CDC_INSERT_NESTED),
]
}
event2 = {
"Records": [
wrap_kinesis(DYNAMODB_CDC_MODIFY_NESTED),
]
}

# Run transfer command.
handler(event1, None)
cratedb.database.refresh_table(table_name)
handler(event2, None)
cratedb.database.refresh_table(table_name)
handler(event, None)

# Verify outcome of processor, per validating log output.
assert "Successfully processed 1 records" in caplog.messages
assert "Successfully processed 2 records" in caplog.messages

# Verify data in target database.
assert cratedb.database.count_records(table_name) == 1
Expand Down

0 comments on commit 16df9f6

Please sign in to comment.