Skip to content

Commit

Permalink
DynamoDB: Add software test for decoding list of objects
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Aug 28, 2024
1 parent 0b54e0c commit 2295b91
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 0 deletions.
29 changes: 29 additions & 0 deletions tests/io/dynamodb/manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import typing as t
from pathlib import Path

from yarl import URL
Expand Down Expand Up @@ -37,3 +38,31 @@ def load_product_catalog(self):
self.adapter.dynamodb_client.batch_write_item(RequestItems=data)
table.load()
return table

def load_records(self, table_name: str, records: t.List[t.Dict[str, t.Any]]):
table = self.adapter.dynamodb_resource.Table(table_name)
try:
table.delete()
except Exception: # noqa: S110
pass

table = self.adapter.dynamodb_resource.create_table(
TableName=table_name,
KeySchema=[
{"AttributeName": "Id", "KeyType": "HASH"},
],
AttributeDefinitions=[
{"AttributeName": "Id", "AttributeType": "N"},
],
ProvisionedThroughput={
"ReadCapacityUnits": 1,
"WriteCapacityUnits": 1,
},
TableClass="STANDARD",
)
table.wait_until_exists()

for record in records:
self.adapter.dynamodb_client.put_item(TableName=table_name, Item=record)
table.load()
return table
54 changes: 54 additions & 0 deletions tests/io/dynamodb/test_copy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from cratedb_toolkit.io.dynamodb.copy import DynamoDBFullLoad

RECORD_UTM = {
"Id": {"N": "101"},
"utmTags": {
"L": [
{
"M": {
"date": {"S": "2024-08-28T20:05:42.603Z"},
"utm_adgroup": {"L": [{"S": ""}, {"S": ""}]},
"utm_campaign": {"S": "34374686341"},
"utm_medium": {"S": "foobar"},
"utm_source": {"S": "google"},
}
}
]
},
}


def test_dynamodb_copy_list_of_objects(caplog, cratedb, dynamodb, dynamodb_test_manager):
"""
CLI test: Invoke `ctk load table` for DynamoDB.
"""

# Define source and target URLs.
cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo"
dynamodb_url = f"{dynamodb.get_connection_url()}/demo?region=us-east-1"

# Populate source database with data.
dynamodb_test_manager.load_records(table_name="demo", records=[RECORD_UTM])

# Run transfer command.
table_loader = DynamoDBFullLoad(dynamodb_url=dynamodb_url, cratedb_url=cratedb_url)
table_loader.start()

# Verify data in target database.
assert cratedb.database.table_exists("testdrive.demo") is True
assert cratedb.database.refresh_table("testdrive.demo") is True
assert cratedb.database.count_records("testdrive.demo") == 1

results = cratedb.database.run_sql("SELECT * FROM testdrive.demo;", records=True) # noqa: S608
assert results[0]["data"] == {
"Id": 101.0,
"utmTags": [
{
"date": "2024-08-28T20:05:42.603Z",
"utm_adgroup": ["", ""],
"utm_campaign": "34374686341",
"utm_medium": "foobar",
"utm_source": "google",
}
],
}

0 comments on commit 2295b91

Please sign in to comment.