Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DMS/DynamoDB/MongoDB: Use SQL with parameters instead of inlining values #243

Merged
merged 1 commit into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

## Unreleased
- MongoDB: Fix and verify Zyp transformations
- DMS/DynamoDB/MongoDB I/O: Use SQL with parameters instead of inlining values

## 2024/08/21 v0.0.18
- Dependencies: Unpin commons-codec, to always use the latest version
Expand Down
44 changes: 10 additions & 34 deletions cratedb_toolkit/io/dynamodb/copy.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
# ruff: noqa: S608
import logging
import typing as t

import sqlalchemy as sa
from commons_codec.transform.dynamodb import DynamoCDCTranslatorCrateDB
from commons_codec.transform.dynamodb import DynamoDBFullLoadTranslator
from tqdm import tqdm
from yarl import URL

Expand Down Expand Up @@ -34,7 +33,7 @@ def __init__(
self.dynamodb_table = self.dynamodb_url.path.lstrip("/")
self.cratedb_adapter = DatabaseAdapter(str(cratedb_sqlalchemy_url), echo=False)
self.cratedb_table = self.cratedb_adapter.quote_relation_name(cratedb_table)
self.translator = DynamoDBCrateDBTranslator(table_name=self.cratedb_table)
self.translator = DynamoDBFullLoadTranslator(table_name=self.cratedb_table)

self.progress = progress

Expand All @@ -53,45 +52,22 @@ def start(self):
progress_bar = tqdm(total=records_in)
result = self.dynamodb_adapter.scan(table_name=self.dynamodb_table)
records_out = 0
for sql in self.items_to_sql(result["Items"]):
if sql:
try:
connection.execute(sa.text(sql))
records_out += 1
except sa.exc.ProgrammingError as ex:
logger.warning(f"Running query failed: {ex}")
progress_bar.update()
for operation in self.items_to_operations(result["Items"]):
try:
connection.execute(sa.text(operation.statement), operation.parameters)
records_out += 1
except sa.exc.ProgrammingError as ex:
logger.warning(f"Running query failed: {ex}")
progress_bar.update()
progress_bar.close()
connection.commit()
logger.info(f"Number of records written: {records_out}")
if records_out < records_in:
logger.warning("No data has been copied")

def items_to_sql(self, items):
def items_to_operations(self, items):
"""
Convert data for record items to INSERT statements.
"""
for item in items:
yield self.translator.to_sql(item)


class DynamoDBCrateDBTranslator(DynamoCDCTranslatorCrateDB):
@property
def sql_ddl(self):
"""`
Define SQL DDL statement for creating table in CrateDB that stores re-materialized CDC events.
"""
return f"CREATE TABLE IF NOT EXISTS {self.table_name} ({self.DATA_COLUMN} OBJECT(DYNAMIC));"

def to_sql(self, record: t.Dict[str, t.Any]) -> str:
"""
Produce INSERT|UPDATE|DELETE SQL statement from INSERT|MODIFY|REMOVE CDC event record.
"""
values_clause = self.image_to_values(record)
sql = f"INSERT INTO {self.table_name} ({self.DATA_COLUMN}) VALUES ('{values_clause}');"
return sql

@staticmethod
def quote_table_name(name: str):
# TODO @ Upstream: Quoting table names should be the responsibility of the caller.
return name
6 changes: 3 additions & 3 deletions cratedb_toolkit/io/mongodb/cdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ def start(self):
# FIXME: Note that the function does not perform any sensible error handling yet.
with self.cratedb_adapter.engine.connect() as connection:
connection.execute(sa.text(self.cdc.sql_ddl))
for sql in self.cdc_to_sql():
if sql:
connection.execute(sa.text(sql))
for operation in self.cdc_to_sql():
if operation:
connection.execute(sa.text(operation.statement), operation.parameters)

def cdc_to_sql(self):
"""
Expand Down
17 changes: 11 additions & 6 deletions cratedb_toolkit/io/processor/kinesis_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
# /// script
# requires-python = ">=3.9"
# dependencies = [
# "commons-codec",
# "commons-codec>=0.0.12",
# "sqlalchemy-cratedb==0.38.0",
# ]
# ///
Expand All @@ -40,7 +40,7 @@
from commons_codec.exception import UnknownOperationError
from commons_codec.model import ColumnTypeMapStore
from commons_codec.transform.aws_dms import DMSTranslatorCrateDB
from commons_codec.transform.dynamodb import DynamoCDCTranslatorCrateDB
from commons_codec.transform.dynamodb import DynamoDBCDCTranslator
from sqlalchemy.util import asbool

LOG_LEVEL: str = os.environ.get("LOG_LEVEL", "INFO")
Expand Down Expand Up @@ -78,11 +78,11 @@

# TODO: Automatically create destination table.
# TODO: Propagate mapping definitions and other settings.
cdc: t.Union[DMSTranslatorCrateDB, DynamoCDCTranslatorCrateDB]
cdc: t.Union[DMSTranslatorCrateDB, DynamoDBCDCTranslator]
if MESSAGE_FORMAT == "dms":
cdc = DMSTranslatorCrateDB(column_types=column_types)
elif MESSAGE_FORMAT == "dynamodb":
cdc = DynamoCDCTranslatorCrateDB(table_name=CRATEDB_TABLE)
cdc = DynamoDBCDCTranslator(table_name=CRATEDB_TABLE)

# Create the database connection outside the handler to allow
# connections to be re-used by subsequent function invocations.
Expand Down Expand Up @@ -123,8 +123,13 @@ def handler(event, context):
logger.debug(f"Record Data: {record_data}")

# Process record.
sql = cdc.to_sql(record_data)
connection.execute(sa.text(sql))
operation = cdc.to_sql(record_data)
connection.execute(sa.text(operation.statement), parameters=operation.parameters)

# Processing alternating CDC events requires write synchronization.
# FIXME: Needs proper table name quoting.
connection.execute(sa.text(f"REFRESH TABLE {CRATEDB_TABLE}"))

Comment on lines 125 to +132
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hereby, I am adding a REFRESH TABLE operation again. Please share your objections if you have any.

/cc @wierdvanderhaar, @hlcianfagna, @hammerhead, @proddata, @widmogrod

connection.commit()

# Bookkeeping.
Expand Down
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ docs = [
]
dynamodb = [
"boto3",
"commons-codec",
"commons-codec>=0.0.12",
]
full = [
"cratedb-toolkit[cfr,cloud,datasets,io,service]",
Expand All @@ -155,10 +155,11 @@ io = [
"sqlalchemy>=2",
]
kinesis = [
"commons-codec>=0.0.12",
"lorrystream[carabas]",
]
mongodb = [
"commons-codec[mongodb,zyp]>=0.0.4",
"commons-codec[mongodb,zyp]>=0.0.12",
"cratedb-toolkit[io]",
"orjson<4,>=3.3.1",
"pymongo<5,>=3.10.1",
Expand Down
1 change: 1 addition & 0 deletions tests/io/mongodb/test_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
pytestmark = pytest.mark.mongodb

pymongo = pytest.importorskip("pymongo", reason="Skipping tests because pymongo is not installed")
pytest.importorskip("bsonjs", reason="Skipping tests because bsonjs is not installed")
pytest.importorskip("rich", reason="Skipping tests because rich is not installed")

from cratedb_toolkit.io.mongodb.api import mongodb_copy # noqa: E402
Expand Down
136 changes: 134 additions & 2 deletions tests/io/test_processor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import base64
import json
import os
import sys

Expand All @@ -7,6 +9,80 @@

pytest.importorskip("commons_codec", reason="Only works with commons-codec installed")

from commons_codec.transform.dynamodb import DynamoDBCDCTranslator # noqa: E402

DYNAMODB_CDC_INSERT_NESTED = {
"awsRegion": "us-east-1",
"eventID": "b581c2dc-9d97-44ed-94f7-cb77e4fdb740",
"eventName": "INSERT",
"userIdentity": None,
"recordFormat": "application/json",
"tableName": "table-testdrive-nested",
"dynamodb": {
"ApproximateCreationDateTime": 1720800199717446,
"Keys": {"id": {"S": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266"}},
"NewImage": {
"id": {"S": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266"},
"data": {"M": {"temperature": {"N": "42.42"}, "humidity": {"N": "84.84"}}},
"meta": {"M": {"timestamp": {"S": "2024-07-12T01:17:42"}, "device": {"S": "foo"}}},
"string_set": {"SS": ["location_1"]},
"number_set": {"NS": [1, 2, 3, 0.34]},
"binary_set": {"BS": ["U3Vubnk="]},
"somemap": {
"M": {
"test": {"N": 1},
"test2": {"N": 2},
}
},
},
"SizeBytes": 156,
"ApproximateCreationDateTimePrecision": "MICROSECOND",
},
"eventSource": "aws:dynamodb",
}

DYNAMODB_CDC_MODIFY_NESTED = {
"awsRegion": "us-east-1",
"eventID": "24757579-ebfd-480a-956d-a1287d2ef707",
"eventName": "MODIFY",
"userIdentity": None,
"recordFormat": "application/json",
"tableName": "foo",
"dynamodb": {
"ApproximateCreationDateTime": 1720742302233719,
"Keys": {"id": {"S": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266"}},
"NewImage": {
"id": {"S": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266"},
"device": {"M": {"id": {"S": "bar"}, "serial": {"N": 12345}}},
"tags": {"L": [{"S": "foo"}, {"S": "bar"}]},
"empty_map": {"M": {}},
"empty_list": {"L": []},
"timestamp": {"S": "2024-07-12T01:17:42"},
"string_set": {"SS": ["location_1"]},
"number_set": {"NS": [1, 2, 3, 0.34]},
"binary_set": {"BS": ["U3Vubnk="]},
"somemap": {
"M": {
"test": {"N": 1},
"test2": {"N": 2},
}
},
"list_of_objects": {"L": [{"M": {"foo": {"S": "bar"}}}, {"M": {"baz": {"S": "qux"}}}]},
},
"OldImage": {
"NOTE": "This event does not match the INSERT record",
"humidity": {"N": "84.84"},
"temperature": {"N": "42.42"},
"location": {"S": "Sydney"},
"timestamp": {"S": "2024-07-12T01:17:42"},
"device": {"M": {"id": {"S": "bar"}, "serial": {"N": 12345}}},
},
"SizeBytes": 161,
"ApproximateCreationDateTimePrecision": "MICROSECOND",
},
"eventSource": "aws:dynamodb",
}


@pytest.fixture
def reset_handler():
Expand All @@ -16,9 +92,9 @@ def reset_handler():
pass


def test_processor_invoke_no_records(reset_handler, mocker, caplog):
def test_processor_kinesis_dms_no_records(reset_handler, mocker, caplog):
"""
Roughly verify that the unified Lambda handler works.
Roughly verify that the unified Lambda handler works with AWS DMS.
"""

# Configure environment variables.
Expand All @@ -33,3 +109,59 @@ def test_processor_invoke_no_records(reset_handler, mocker, caplog):
handler(event, None)

assert "Successfully processed 0 records" in caplog.messages


def test_processor_kinesis_dynamodb_insert_update(cratedb, reset_handler, mocker, caplog):
"""
Roughly verify that the unified Lambda handler works with AWS DynamoDB.
"""

# Define target table name.
table_name = '"testdrive"."demo"'

# Create target table.
cratedb.database.run_sql(DynamoDBCDCTranslator(table_name=table_name).sql_ddl)

# Configure Lambda processor per environment variables.
handler_environment = {
"CRATEDB_SQLALCHEMY_URL": cratedb.get_connection_url(),
"MESSAGE_FORMAT": "dynamodb",
"CRATEDB_TABLE": table_name,
}
mocker.patch.dict(os.environ, handler_environment)

from cratedb_toolkit.io.processor.kinesis_lambda import handler

# Define two CDC events: INSERT and UPDATE.
# They have to be conveyed separately because CrateDB needs a
# `REFRESH TABLE` operation between them.
event = {
"Records": [
wrap_kinesis(DYNAMODB_CDC_INSERT_NESTED),
wrap_kinesis(DYNAMODB_CDC_MODIFY_NESTED),
]
}

# Run transfer command.
handler(event, None)

# Verify outcome of processor, per validating log output.
assert "Successfully processed 2 records" in caplog.messages

# Verify data in target database.
assert cratedb.database.count_records(table_name) == 1
results = cratedb.database.run_sql(f"SELECT * FROM {table_name}", records=True) # noqa: S608
assert results[0]["data"]["list_of_objects"] == [{"foo": "bar"}, {"baz": "qux"}]


def wrap_kinesis(data):
"""
Wrap a CDC event into a Kinesis message, to satisfy the interface of the Lambda processor.
"""
return {
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
"kinesis": {
"sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
"data": base64.b64encode(json.dumps(data).encode("utf-8")),
},
}