From 16df9f6bf1cc00e0aceb9c8cdd89c4f6d2e8e14b Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Mon, 26 Aug 2024 16:52:19 +0200 Subject: [PATCH] fixup! DMS/DynamoDB/MongoDB: Use SQL with parameters instead of inlining values --- cratedb_toolkit/io/processor/kinesis_lambda.py | 5 +++++ tests/io/test_processor.py | 13 +++---------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/cratedb_toolkit/io/processor/kinesis_lambda.py b/cratedb_toolkit/io/processor/kinesis_lambda.py index 524daa01..1f10805e 100644 --- a/cratedb_toolkit/io/processor/kinesis_lambda.py +++ b/cratedb_toolkit/io/processor/kinesis_lambda.py @@ -125,6 +125,11 @@ def handler(event, context): # Process record. operation = cdc.to_sql(record_data) connection.execute(sa.text(operation.statement), parameters=operation.parameters) + + # Processing alternating CDC events requires write synchronization. + # FIXME: Needs proper table name quoting. + connection.execute(sa.text(f"REFRESH TABLE {CRATEDB_TABLE}")) + connection.commit() # Bookkeeping. diff --git a/tests/io/test_processor.py b/tests/io/test_processor.py index 3c3ce7d1..6de1bad0 100644 --- a/tests/io/test_processor.py +++ b/tests/io/test_processor.py @@ -135,25 +135,18 @@ def test_processor_kinesis_dynamodb_insert_update(cratedb, reset_handler, mocker # Define two CDC events: INSERT and UPDATE. # They have to be conveyed separately because CrateDB needs a # `REFRESH TABLE` operation between them. - event1 = { + event = { "Records": [ wrap_kinesis(DYNAMODB_CDC_INSERT_NESTED), - ] - } - event2 = { - "Records": [ wrap_kinesis(DYNAMODB_CDC_MODIFY_NESTED), ] } # Run transfer command. - handler(event1, None) - cratedb.database.refresh_table(table_name) - handler(event2, None) - cratedb.database.refresh_table(table_name) + handler(event, None) # Verify outcome of processor, per validating log output. - assert "Successfully processed 1 records" in caplog.messages + assert "Successfully processed 2 records" in caplog.messages # Verify data in target database. assert cratedb.database.count_records(table_name) == 1