diff --git a/cratedb_toolkit/io/processor/kinesis_lambda.py b/cratedb_toolkit/io/processor/kinesis_lambda.py index 005b49cf..e2f938e4 100644 --- a/cratedb_toolkit/io/processor/kinesis_lambda.py +++ b/cratedb_toolkit/io/processor/kinesis_lambda.py @@ -51,7 +51,7 @@ MESSAGE_FORMAT: str = os.environ.get("MESSAGE_FORMAT", "unknown") COLUMN_TYPES: str = os.environ.get("COLUMN_TYPES", "") CRATEDB_SQLALCHEMY_URL: str = os.environ.get("CRATEDB_SQLALCHEMY_URL", "crate://") -CRATEDB_TABLE: str = os.environ.get("CRATEDB_TABLE", "default") +CRATEDB_TABLE: t.Optional[str] = os.environ.get("CRATEDB_TABLE") logger = logging.getLogger(__name__) logger.setLevel(LOG_LEVEL) @@ -127,7 +127,9 @@ def handler(event, context): connection.execute(sa.text(operation.statement), parameters=operation.parameters) # Processing alternating CDC events requires write synchronization. - connection.execute(sa.text(f"REFRESH TABLE {cdc.quote_table_name(CRATEDB_TABLE)}")) + # TODO: Improve interface. + if hasattr(cdc, "table_name"): + connection.execute(sa.text(f"REFRESH TABLE {cdc.table_name}")) connection.commit() diff --git a/tests/io/dynamodb/test_cli.py b/tests/io/dynamodb/test_cli.py index 80d87d63..71d33e11 100644 --- a/tests/io/dynamodb/test_cli.py +++ b/tests/io/dynamodb/test_cli.py @@ -10,8 +10,8 @@ def test_dynamodb_load_table(caplog, cratedb, dynamodb, dynamodb_test_manager): """ CLI test: Invoke `ctk load table` for DynamoDB. """ - cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" dynamodb_url = f"{dynamodb.get_connection_url()}/ProductCatalog?region=us-east-1" + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" # Populate source database with sample dataset. dynamodb_test_manager.load_product_catalog()