diff --git a/CHANGES.md b/CHANGES.md index 17fd9160..c58595f8 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -8,6 +8,8 @@ - MongoDB: Optionally filter server collection using MongoDB query expression - MongoDB: Improve error handling wrt. bulk operations vs. usability - DynamoDB CDC: Add `ctk load table` interface for processing CDC events +- DynamoDB CDC: Accept a few more options for the Kinesis Stream: + batch-size, create, create-shards, start, seqno, idle-sleep, buffer-time ## 2024/09/10 v0.0.22 - MongoDB: Rename columns with leading underscores to use double leading underscores diff --git a/cratedb_toolkit/io/kinesis/adapter.py b/cratedb_toolkit/io/kinesis/adapter.py index f6f709a4..93f0d557 100644 --- a/cratedb_toolkit/io/kinesis/adapter.py +++ b/cratedb_toolkit/io/kinesis/adapter.py @@ -6,8 +6,20 @@ from kinesis import Consumer, JsonProcessor, Producer from yarl import URL +from cratedb_toolkit.util.data import asbool + class KinesisAdapter: + # Configuration for Kinesis shard iterators. + # https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetShardIterator.html + # Map `start` option to `ShardIteratorType`. + start_iterator_type_map = { + "earliest": "TRIM_HORIZON", + "latest": "LATEST", + "seqno-at": "AT_SEQUENCE_NUMBER", + "seqno-after": "AFTER_SEQUENCE_NUMBER", + } + def __init__(self, kinesis_url: URL): self.async_session = AioSession() self.async_session.set_credentials(access_key=kinesis_url.user, secret_key=kinesis_url.password) @@ -21,10 +33,35 @@ def __init__(self, kinesis_url: URL): self.endpoint_url = None if kinesis_url.host and kinesis_url.host.lower() != "aws": self.endpoint_url = f"http://{kinesis_url.host}:{kinesis_url.port}" + self.kinesis_url = kinesis_url - self.region_name = kinesis_url.query.get("region") self.stream_name = self.kinesis_url.path.lstrip("/") + + self.region_name: str = self.kinesis_url.query.get("region", "us-east-1") + self.batch_size: int = int(self.kinesis_url.query.get("batch-size", 100)) + self.create: bool = asbool(self.kinesis_url.query.get("create", "false")) + self.create_shards: int = int(self.kinesis_url.query.get("create-shards", 1)) + self.start: str = self.kinesis_url.query.get("start", "earliest") + self.seqno: int = int(self.kinesis_url.query.get("seqno", 0)) + self.idle_sleep: float = float(self.kinesis_url.query.get("idle-sleep", 0.5)) + self.buffer_time: float = float(self.kinesis_url.query.get("buffer-time", 0.5)) + self.kinesis_client = self.session.client("kinesis", endpoint_url=self.endpoint_url) + self.stopping: bool = False + + @property + def iterator_type(self): + """ + Map `start` option to Kinesis' `ShardIteratorType`. + """ + if self.start.startswith("seqno"): + raise NotImplementedError( + "Consuming Kinesis Stream from sequence number " "not implemented yet, please file an issue." + ) + try: + return self.start_iterator_type_map[self.start] + except KeyError as ex: + raise KeyError(f"Value for 'start' option unknown: {self.start}") from ex def consumer_factory(self, **kwargs): return Consumer( @@ -32,7 +69,12 @@ def consumer_factory(self, **kwargs): session=self.async_session, endpoint_url=self.endpoint_url, region_name=self.region_name, + max_queue_size=self.batch_size, + sleep_time_no_records=self.idle_sleep, + iterator_type=self.iterator_type, processor=JsonProcessor(), + create_stream=self.create, + create_stream_shards=self.create_shards, **kwargs, ) @@ -42,42 +84,46 @@ def consume_forever(self, handler: t.Callable): def consume_once(self, handler: t.Callable): asyncio.run(self._consume_once(handler)) + def stop(self): + self.stopping = True + async def _consume_forever(self, handler: t.Callable): """ - Consume items from a Kinesis stream. + Consume items from a Kinesis stream, forever. """ - async with self.consumer_factory( - # TODO: Make configurable. - create_stream=True, - iterator_type="TRIM_HORIZON", - sleep_time_no_records=0.2, - ) as consumer: + async with self.consumer_factory() as consumer: while True: async for item in consumer: handler(item) + if self.stopping: + self.stopping = False + break async def _consume_once(self, handler: t.Callable): - async with self.consumer_factory( - # TODO: Make configurable. - create_stream=True, - iterator_type="TRIM_HORIZON", - sleep_time_no_records=0.2, - ) as consumer: + """ + Consume items from a Kinesis stream, one-shot. + """ + async with self.consumer_factory() as consumer: async for item in consumer: handler(item) def produce(self, data: t.Dict[str, t.Any]): + """ + Produce an item to a Kinesis stream. + """ asyncio.run(self._produce(data)) async def _produce(self, data: t.Dict[str, t.Any]): - # Put item onto queue to be flushed via `put_records()`. + """ + Put item onto queue to be flushed via `put_records()`. + """ async with Producer( stream_name=self.stream_name, session=self.async_session, endpoint_url=self.endpoint_url, region_name=self.region_name, - # TODO: Make configurable. - create_stream=True, - buffer_time=0.01, + buffer_time=self.buffer_time, + create_stream=self.create, + create_stream_shards=self.create_shards, ) as producer: await producer.put(data) diff --git a/cratedb_toolkit/io/kinesis/relay.py b/cratedb_toolkit/io/kinesis/relay.py index 9a9ec8de..6cb4fa45 100644 --- a/cratedb_toolkit/io/kinesis/relay.py +++ b/cratedb_toolkit/io/kinesis/relay.py @@ -62,6 +62,10 @@ def start(self, once: bool = False): else: self.kinesis_adapter.consume_forever(self.process_event) + def stop(self): + self.progress_bar.close() + self.kinesis_adapter.stop() + def process_event(self, event): try: record = json.loads(base64.b64decode(event["kinesis"]["data"]).decode("utf-8")) @@ -80,3 +84,6 @@ def process_event(self, event): except sa.exc.ProgrammingError as ex: logger.warning(f"Running query failed: {ex}") self.progress_bar.update() + + def __del__(self): + self.stop() diff --git a/doc/io/dynamodb/cdc.md b/doc/io/dynamodb/cdc.md index fd5aac7b..bc766c26 100644 --- a/doc/io/dynamodb/cdc.md +++ b/doc/io/dynamodb/cdc.md @@ -31,6 +31,88 @@ ctk shell --command "SELECT * FROM testdrive.demo;" ctk show table "testdrive.demo" ``` +## Options + +### Batch Size +The source URL option `batch-size` configures how many items to consume from +the Kinesis Stream at once. The default value is `100`. +For many datasets, a much larger batch size is applicable for most efficient +data transfers. +```shell +ctk load table .../cdc-stream?batch-size=5000 +``` + +### Create +The source URL option `create` configures whether the designated Kinesis Stream +should be created upfront. The default value is `false`. +```shell +ctk load table .../cdc-stream?create=true +``` + +### Create Shards +The source URL option `create-shards` configures whether the designated number +of shards when a Kinesis Stream is created before consuming. +The default value is `1`. +```shell +ctk load table .../cdc-stream?create=true&create-shards=4 +``` + +### Region +The source URL accepts the `region` option to configure the AWS region +label. The default value is `us-east-1`. +```shell +ctk load table .../cdc-stream?region=eu-central-1 +``` + +### Start +The source URL accepts the `start` option to configure the DynamoDB [ShardIteratorType]. +It accepts the following values, mapping to corresponding original options. The default +value is `earliest`. + +```shell +ctk load table .../cdc-stream?start=latest +``` + +- `start=earliest` + + Start reading at the last (untrimmed) stream record, which is the oldest record in the + shard. In DynamoDB Streams, there is a 24 hour limit on data retention. Stream records + whose age exceeds this limit are subject to removal (trimming) from the stream. + This option equals `ShardIteratorType=TRIM_HORIZON`. + +- `start=latest` + + Start reading just after the most recent stream record in the shard, so that you always + read the most recent data in the shard. This option equals `ShardIteratorType=LATEST`. + +- `start=seqno-at&seqno=...` + + Start reading exactly from the position denoted by a specific sequence number. + This option equals `ShardIteratorType=AT_SEQUENCE_NUMBER` and `SequenceNumber=...`. + +- `start=seqno-after&seqno=...` + + Start reading right after the position denoted by a specific sequence number. + This option equals `ShardIteratorType=AFTER_SEQUENCE_NUMBER` and `SequenceNumber=...`. + + +### SeqNo +The source URL accepts the `seqno` option to configure the DynamoDB [SequenceNumber] +parameter. It accepts the sequence number of a stream record in the shard from which +to start reading. +```shell +ctk load table .../cdc-stream?start=seqno-after&seqno=49590338271490256608559692538361571095921575989136588898 +``` + +### Idle Sleep +The `idle-sleep` option configures the waiting time to hibernate the event loop after +running out of items to consume. The default value is `0.5`. + +### Buffer Time +The `buffer-time` option configures the time to wait before flushing produced items +to the wire. The default value is `0.5`. + + ## Variants ### CrateDB Cloud @@ -74,3 +156,5 @@ docker run \ [Credentials for accessing LocalStack AWS API]: https://docs.localstack.cloud/references/credentials/ [Get started with Kinesis on LocalStack]: https://docs.localstack.cloud/user-guide/aws/kinesis/ [Kinesis Data Streams]: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/kds.html +[SequenceNumber]: https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetShardIterator.html#DDB-streams_GetShardIterator-request-SequenceNumber +[ShardIteratorType]: https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetShardIterator.html#DDB-streams_GetShardIterator-request-ShardIteratorType diff --git a/tests/io/dynamodb/test_relay.py b/tests/io/dynamodb/test_relay.py index d65f8097..7576cf28 100644 --- a/tests/io/dynamodb/test_relay.py +++ b/tests/io/dynamodb/test_relay.py @@ -1,3 +1,4 @@ +import threading import time import botocore @@ -14,13 +15,19 @@ from commons_codec.transform.dynamodb import DynamoDBCDCTranslator # noqa: E402 -def test_kinesis_dynamodb_cdc_insert_update(caplog, cratedb, dynamodb): +def test_kinesis_earliest_dynamodb_cdc_insert_update(caplog, cratedb, dynamodb): """ - Roughly verify that the AWS DynamoDB CDC processing works as expected. + Roughly verify that the AWS DynamoDB CDC processing through Kinesis works as expected. + + This test case consumes the Kinesis Stream from the "earliest" point, i.e. from the beginning. + No option is configured, because `start=earliest` is the default mode. """ # Define source and target URLs. - kinesis_url = f"{dynamodb.get_connection_url_kinesis_dynamodb_cdc()}/demo?region=us-east-1" + kinesis_url = ( + f"{dynamodb.get_connection_url_kinesis_dynamodb_cdc()}/demo" + f"?region=us-east-1&create=true&buffer-time=0.01&idle-sleep=0.01" + ) cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" # Define target table name. @@ -40,7 +47,7 @@ def test_kinesis_dynamodb_cdc_insert_update(caplog, cratedb, dynamodb): # Delete stream for blank canvas. try: - table_loader.kinesis_adapter.kinesis_client.delete_stream(StreamName="demo") + table_loader.kinesis_adapter.kinesis_client.delete_stream(StreamName="demo", EnforceConsumerDeletion=True) except botocore.exceptions.ClientError as error: if error.response["Error"]["Code"] != "ResourceNotFoundException": raise @@ -56,7 +63,67 @@ def test_kinesis_dynamodb_cdc_insert_update(caplog, cratedb, dynamodb): # Run transfer command, consuming once not forever. table_loader.start(once=True) - # Verify data in target database. + # Verify data in target database, more specifically that both events have been processed well. + assert cratedb.database.count_records(table_name) == 1 + results = cratedb.database.run_sql(f"SELECT * FROM {table_name}", records=True) # noqa: S608 + assert results[0]["data"]["list_of_objects"] == [{"foo": "bar"}, {"baz": "qux"}] + + +def test_kinesis_latest_dynamodb_cdc_insert_update(caplog, cratedb, dynamodb): + """ + Roughly verify that the AWS DynamoDB CDC processing through Kinesis works as expected. + + This test case consumes the Kinesis Stream from the "latest" point, i.e. from "now". + """ + + # Define source and target URLs. + kinesis_url = ( + f"{dynamodb.get_connection_url_kinesis_dynamodb_cdc()}/demo" + f"?region=us-east-1&create=true&buffer-time=0.01&idle-sleep=0.01&start=latest" + ) + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" + + # Define target table name. + table_name = '"testdrive"."demo"' + + # Create target table. + cratedb.database.run_sql(DynamoDBCDCTranslator(table_name=table_name).sql_ddl) + + # Define two CDC events: INSERT and UPDATE. + events = [ + wrap_kinesis(DYNAMODB_CDC_INSERT_NESTED), + wrap_kinesis(DYNAMODB_CDC_MODIFY_NESTED), + ] + + # Initialize table loader. + table_loader = KinesisRelay(kinesis_url=kinesis_url, cratedb_url=cratedb_url) + + # Delete stream for blank canvas. + try: + table_loader.kinesis_adapter.kinesis_client.delete_stream(StreamName="demo") + except botocore.exceptions.ClientError as error: + if error.response["Error"]["Code"] != "ResourceNotFoundException": + raise + + # LocalStack needs a while when deleting the Stream. + # FIXME: Can this be made more efficient instead of waiting multiple times to orchestrate this sequence? + time.sleep(0.5) + + # Start event processor / stream consumer in separate thread, consuming forever. + thread = threading.Thread(target=table_loader.start) + thread.start() + time.sleep(1) + + # Populate source database with data. + for event in events: + table_loader.kinesis_adapter.produce(event) + + # Stop stream consumer. + table_loader.stop() + thread.join() + + # Verify data in target database, more specifically that both events have been processed well. + assert cratedb.database.refresh_table(table_name) is True assert cratedb.database.count_records(table_name) == 1 results = cratedb.database.run_sql(f"SELECT * FROM {table_name}", records=True) # noqa: S608 assert results[0]["data"]["list_of_objects"] == [{"foo": "bar"}, {"baz": "qux"}]