diff --git a/CHANGES.md b/CHANGES.md index a18e5c5b..653990d1 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,7 @@ ## Unreleased +- DynamoDB: Add `ctk load table` interface for processing CDC events ## 2024/08/27 v0.0.20 - DMS/DynamoDB: Fix table name quoting within CDC processor handler diff --git a/cratedb_toolkit/api/main.py b/cratedb_toolkit/api/main.py index 01a3d029..215b59a7 100644 --- a/cratedb_toolkit/api/main.py +++ b/cratedb_toolkit/api/main.py @@ -132,6 +132,13 @@ def load_table(self, resource: InputOutputResource, target: TableAddress, transf msg = "Data loading failed" logger.error(msg) raise OperationFailed(msg) + elif source_url.startswith("kinesis"): + if "+cdc" in source_url: + from cratedb_toolkit.io.kinesis.api import kinesis_relay + + kinesis_relay(source_url, target_url) + else: + raise NotImplementedError("Loading full data via Kinesis not implemented yet") elif source_url.startswith("mongodb"): if "+cdc" in source_url: source_url = source_url.replace("+cdc", "") diff --git a/cratedb_toolkit/io/kinesis/__init__.py b/cratedb_toolkit/io/kinesis/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cratedb_toolkit/io/kinesis/adapter.py b/cratedb_toolkit/io/kinesis/adapter.py new file mode 100644 index 00000000..9b29f071 --- /dev/null +++ b/cratedb_toolkit/io/kinesis/adapter.py @@ -0,0 +1,56 @@ +import asyncio +import typing as t + +from aiobotocore.session import AioSession +from kinesis import Consumer, JsonProcessor, Producer +from yarl import URL + + +class KinesisAdapter: + def __init__(self, kinesis_url: URL): + self.session = AioSession() + self.session.set_credentials(access_key=kinesis_url.user, secret_key=kinesis_url.password) + self.endpoint_url = None + if kinesis_url.host and kinesis_url.host.lower() != "aws": + self.endpoint_url = f"http://{kinesis_url.host}:{kinesis_url.port}" + self.kinesis_url = kinesis_url + self.region_name = kinesis_url.query.get("region") + self.stream_name = self.kinesis_url.path.lstrip("/") + + def consume_forever(self, handler: t.Callable): + asyncio.run(self._consume_forever(handler)) + + async def _consume_forever(self, handler: t.Callable): + """ + Consume items from a Kinesis stream. + """ + async with Consumer( + stream_name=self.stream_name, + session=self.session, + endpoint_url=self.endpoint_url, + region_name=self.region_name, + # TODO: Make configurable. + create_stream=True, + iterator_type="TRIM_HORIZON", + sleep_time_no_records=0.2, + processor=JsonProcessor(), + ) as consumer: + while True: + async for item in consumer: + handler(item) + + def produce(self, data: t.Dict[str, t.Any]): + asyncio.run(self._produce(data)) + + async def _produce(self, data: t.Dict[str, t.Any]): + # Put item onto queue to be flushed via `put_records()`. + async with Producer( + stream_name=self.stream_name, + session=self.session, + endpoint_url=self.endpoint_url, + region_name=self.region_name, + # TODO: Make configurable. + create_stream=True, + buffer_time=0.01, + ) as producer: + await producer.put(data) diff --git a/cratedb_toolkit/io/kinesis/api.py b/cratedb_toolkit/io/kinesis/api.py new file mode 100644 index 00000000..9d302736 --- /dev/null +++ b/cratedb_toolkit/io/kinesis/api.py @@ -0,0 +1,6 @@ +from cratedb_toolkit.io.kinesis.relay import KinesisRelay + + +def kinesis_relay(source_url, target_url): + ka = KinesisRelay(source_url, target_url) + ka.start() diff --git a/cratedb_toolkit/io/kinesis/relay.py b/cratedb_toolkit/io/kinesis/relay.py new file mode 100644 index 00000000..4060cf66 --- /dev/null +++ b/cratedb_toolkit/io/kinesis/relay.py @@ -0,0 +1,79 @@ +import base64 +import json +import logging + +import sqlalchemy as sa +from commons_codec.transform.dynamodb import DynamoDBCDCTranslator +from tqdm import tqdm +from tqdm.contrib.logging import logging_redirect_tqdm +from yarl import URL + +from cratedb_toolkit.io.kinesis.adapter import KinesisAdapter +from cratedb_toolkit.model import DatabaseAddress +from cratedb_toolkit.util import DatabaseAdapter + +logger = logging.getLogger(__name__) + + +class KinesisRelay: + """ + Relay events from Kinesis into CrateDB table. + """ + + def __init__( + self, + kinesis_url: str, + cratedb_url: str, + ): + cratedb_address = DatabaseAddress.from_string(cratedb_url) + cratedb_sqlalchemy_url, cratedb_table_address = cratedb_address.decode() + cratedb_table = cratedb_table_address.fullname + + self.kinesis_url = URL(kinesis_url) + self.kinesis_adapter = KinesisAdapter(self.kinesis_url) + self.cratedb_adapter = DatabaseAdapter(str(cratedb_sqlalchemy_url), echo=False) + self.cratedb_table = self.cratedb_adapter.quote_relation_name(cratedb_table) + + if "dynamodb+cdc" in self.kinesis_url.scheme: + self.translator = DynamoDBCDCTranslator(table_name=self.cratedb_table) + else: + raise NotImplementedError(f"Data processing not implemented for {self.kinesis_url}") + + self.connection: sa.Connection + self.progress_bar: tqdm + + def start(self): + """ + Read events from Kinesis stream, convert to SQL statements, and submit to CrateDB. + """ + logger.info(f"Source: Kinesis stream={self.kinesis_adapter.stream_name} count=unknown") + self.connection = self.cratedb_adapter.engine.connect() + if not self.cratedb_adapter.table_exists(self.cratedb_table): + self.connection.execute(sa.text(self.translator.sql_ddl)) + self.connection.commit() + records_target = self.cratedb_adapter.count_records(self.cratedb_table) + logger.info(f"Target: CrateDB table={self.cratedb_table} count={records_target}") + # Harmonize logging and progress bar. + # https://github.com/tqdm/tqdm#redirecting-logging + self.progress_bar = tqdm() + with logging_redirect_tqdm(): + self.kinesis_adapter.consume_forever(self.process) + + def process(self, event): + try: + record = json.loads(base64.b64decode(event["kinesis"]["data"]).decode("utf-8")) + operation = self.translator.to_sql(record) + except Exception: + logger.exception("Decoding Kinesis event failed") + return + try: + # Process record. + self.connection.execute(sa.text(operation.statement), operation.parameters) + + # Processing alternating CDC events requires write synchronization. + self.connection.execute(sa.text(f"REFRESH TABLE {self.translator.quote_table_name(self.cratedb_table)}")) + + self.connection.commit() + except sa.exc.ProgrammingError as ex: + logger.warning(f"Running query failed: {ex}") + self.progress_bar.update() diff --git a/examples/aws/kinesis_put.py b/examples/aws/kinesis_put.py new file mode 100644 index 00000000..8e554f6d --- /dev/null +++ b/examples/aws/kinesis_put.py @@ -0,0 +1,14 @@ +from yarl import URL + +from cratedb_toolkit.io.kinesis.adapter import KinesisAdapter +from tests.io.test_processor import DYNAMODB_CDC_INSERT_NESTED, DYNAMODB_CDC_MODIFY_NESTED, wrap_kinesis + + +def main(): + ka = KinesisAdapter(URL("kinesis://LSIAQAAAAAAVNCBMPNSG:dummy@localhost:4566/cdc-stream?region=eu-central-1")) + ka.produce(wrap_kinesis(DYNAMODB_CDC_INSERT_NESTED)) + ka.produce(wrap_kinesis(DYNAMODB_CDC_MODIFY_NESTED)) + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml index 2c9ae342..cae73840 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -155,6 +155,7 @@ io = [ "sqlalchemy>=2", ] kinesis = [ + "async-kinesis<1.2", "commons-codec>=0.0.12", "lorrystream[carabas]", ]