Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DynamoDB: Add ctk load table interface for processing CDC events #247

Merged
merged 3 commits into from
Sep 13, 2024

Conversation

amotl
Copy link
Member

@amotl amotl commented Aug 28, 2024

About

Running DynamoDB CDC events through Kinesis and processing them using an AWS Lambda is cumbersome more often than not, and not too suitable for collaboration and development purposes. This patch provides a standalone implementation, as a sister to the corresponding full-load implementation, DynamoDB Table Loader.

Documentation

Preview: https://cratedb-toolkit--247.org.readthedocs.build/io/dynamodb/cdc.html

Synopsis

Use AWS for real, or exercise using LocalStack.

# Define target address, a CrateDB schema+table.
export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo

# Address Kinesis Stream on AWS.
ctk load table kinesis+dynamodb+cdc://AWS_ACCESS_KEY:AWS_SECRET_ACCESS_KEY@aws/cdc-stream?region=us-east-1

# Address Kinesis Stream on LocalStack.
ctk load table kinesis+dynamodb+cdc://LSIAQAAAAAAVNCBMPNSG:dummy@localhost:4566/cdc-stream?region=eu-central-1

Install

pip install 'cratedb-toolkit[kinesis] @ git+https://github.com/crate/cratedb-toolkit.git@dynamodb-cdc-standalone'

Details

This data nozzle is tapping into Change data capture for DynamoDB Streams, in this case using Kinesis Data Streams, for maximum universality, because using Kinesis isn't a bad idea: We will also use it to ingest other event/record types in the future, thus the protocol identifier kinesis+dynamodb+cdc://. On the egress side, towards CrateDB, it will use the data/aux column strategy.

It doesn't mean it's not cloud-ready, it is just more universal, because it can be used both in an ad hoc / standalone operations mode, in development sandboxes, and can also be invoked on any other managed Python environment, at your disposal.

Backlog I

  • Software integration tests.
  • Documentation: Synopsis, Basics, LocalStack, Cloud, etc.
  • Configurability for special options, like create_stream, iterator_type, sleep_time_no_records, etc.
  • Batch processing? No, it is not easily applicable for CDC events.

Backlog II

/cc @juanpardo, @hlcianfagna, @hammerhead, @wierdvanderhaar, @karynzv

Comment on lines 32 to 53
# TODO: Make configurable.
create_stream=True,
iterator_type="TRIM_HORIZON",
sleep_time_no_records=0.2,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. What the comment says.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iterator_type="TRIM_HORIZON" (currently hard-coded) means it will always read the Stream from its starting point. Sure enough, this is probably the most important detail to be made configurable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

07b1974 adds a few options to configure details when connecting to the Kinesis Stream.

Comment on lines 52 to 81
# TODO: Make configurable.
create_stream=True,
buffer_time=0.01,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dito.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

07b1974 adds a few options to configure details when connecting to the Kinesis Stream.

@amotl amotl force-pushed the amo/dynamodb-cdc-standalone branch 3 times, most recently from 092da37 to c299c4e Compare August 29, 2024 03:21
Copy link

@wierdvanderhaar wierdvanderhaar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets go.

wierdvanderhaar

This comment was marked as duplicate.

@amotl amotl force-pushed the amo/dynamodb-cdc-standalone branch from c299c4e to 6841aa8 Compare August 29, 2024 12:04
doc/io/dynamodb/cdc.md Outdated Show resolved Hide resolved
@amotl amotl force-pushed the amo/dynamodb-cdc-standalone branch from 0ff229f to 9c9cd6b Compare September 13, 2024 14:31
@cla-bot cla-bot bot added the cla-signed label Sep 13, 2024
@amotl amotl force-pushed the amo/dynamodb-cdc-standalone branch from 9c9cd6b to f305462 Compare September 13, 2024 15:35
In contrast to the Lambda-based processor implementation, this one is
a standalone one that can be used optimally in any Python environment,
managed or not.
New options: batch-size, create, create-shards, start, seqno,
idle-sleep, buffer-time.
@amotl amotl force-pushed the amo/dynamodb-cdc-standalone branch from f305462 to 07b1974 Compare September 13, 2024 20:12
@amotl amotl mentioned this pull request Sep 13, 2024
11 tasks
@amotl amotl marked this pull request as ready for review September 13, 2024 20:16
@amotl amotl merged commit 7362050 into main Sep 13, 2024
32 checks passed
@amotl amotl deleted the amo/dynamodb-cdc-standalone branch September 13, 2024 20:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants