diff --git a/.github/workflows/dynamodb.yml b/.github/workflows/dynamodb.yml index d44d7ae2..cad71307 100644 --- a/.github/workflows/dynamodb.yml +++ b/.github/workflows/dynamodb.yml @@ -42,8 +42,8 @@ jobs: os: ["ubuntu-latest"] # TODO: yarl, dependency of influxio, is currently not available on Python 3.12. # https://github.com/aio-libs/yarl/pull/942 - python-version: ["3.8", "3.11"] - localstack-version: ["3.7"] + python-version: ["3.9", "3.11"] + localstack-version: ["3.6"] env: OS: ${{ matrix.os }} @@ -78,7 +78,7 @@ jobs: pip install "setuptools>=64" --upgrade # Install package in editable mode. - pip install --use-pep517 --prefer-binary --editable=.[dynamodb,test,develop] + pip install --use-pep517 --prefer-binary --editable=.[dynamodb,kinesis,test,develop] - name: Run linter and software tests run: | diff --git a/CHANGES.md b/CHANGES.md index f16e011d..22f75f98 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -7,6 +7,7 @@ - MongoDB: Unlock processing JSON files from HTTP resource, using `https+bson://` - MongoDB: Optionally filter server collection using MongoDB query expression - MongoDB: Improve error handling wrt. bulk operations vs. usability +- DynamoDB: Add `ctk load table` interface for processing CDC events ## 2024/09/10 v0.0.22 - MongoDB: Rename columns with leading underscores to use double leading underscores diff --git a/cratedb_toolkit/api/main.py b/cratedb_toolkit/api/main.py index db516915..b4060744 100644 --- a/cratedb_toolkit/api/main.py +++ b/cratedb_toolkit/api/main.py @@ -138,9 +138,18 @@ def load_table(self, resource: InputOutputResource, target: TableAddress, transf logger.error("Data loading failed or incomplete") return False + elif source_url_obj.scheme.startswith("kinesis"): + if "+cdc" in source_url_obj.scheme: + from cratedb_toolkit.io.kinesis.api import kinesis_relay + + return kinesis_relay(str(source_url_obj), target_url) + else: + raise NotImplementedError("Loading full data via Kinesis not implemented yet") + elif source_url_obj.scheme in ["file+bson", "http+bson", "https+bson", "mongodb", "mongodb+srv"]: if "+cdc" in source_url_obj.scheme: source_url_obj.scheme = source_url_obj.scheme.replace("+cdc", "") + from cratedb_toolkit.io.mongodb.api import mongodb_relay_cdc return mongodb_relay_cdc(str(source_url_obj), target_url, progress=True) diff --git a/cratedb_toolkit/io/kinesis/__init__.py b/cratedb_toolkit/io/kinesis/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cratedb_toolkit/io/kinesis/adapter.py b/cratedb_toolkit/io/kinesis/adapter.py new file mode 100644 index 00000000..f6f709a4 --- /dev/null +++ b/cratedb_toolkit/io/kinesis/adapter.py @@ -0,0 +1,83 @@ +import asyncio +import typing as t + +import boto3 +from aiobotocore.session import AioSession +from kinesis import Consumer, JsonProcessor, Producer +from yarl import URL + + +class KinesisAdapter: + def __init__(self, kinesis_url: URL): + self.async_session = AioSession() + self.async_session.set_credentials(access_key=kinesis_url.user, secret_key=kinesis_url.password) + + self.session = boto3.Session( + aws_access_key_id=kinesis_url.user, + aws_secret_access_key=kinesis_url.password, + region_name=kinesis_url.query.get("region"), + ) + + self.endpoint_url = None + if kinesis_url.host and kinesis_url.host.lower() != "aws": + self.endpoint_url = f"http://{kinesis_url.host}:{kinesis_url.port}" + self.kinesis_url = kinesis_url + self.region_name = kinesis_url.query.get("region") + self.stream_name = self.kinesis_url.path.lstrip("/") + self.kinesis_client = self.session.client("kinesis", endpoint_url=self.endpoint_url) + + def consumer_factory(self, **kwargs): + return Consumer( + stream_name=self.stream_name, + session=self.async_session, + endpoint_url=self.endpoint_url, + region_name=self.region_name, + processor=JsonProcessor(), + **kwargs, + ) + + def consume_forever(self, handler: t.Callable): + asyncio.run(self._consume_forever(handler)) + + def consume_once(self, handler: t.Callable): + asyncio.run(self._consume_once(handler)) + + async def _consume_forever(self, handler: t.Callable): + """ + Consume items from a Kinesis stream. + """ + async with self.consumer_factory( + # TODO: Make configurable. + create_stream=True, + iterator_type="TRIM_HORIZON", + sleep_time_no_records=0.2, + ) as consumer: + while True: + async for item in consumer: + handler(item) + + async def _consume_once(self, handler: t.Callable): + async with self.consumer_factory( + # TODO: Make configurable. + create_stream=True, + iterator_type="TRIM_HORIZON", + sleep_time_no_records=0.2, + ) as consumer: + async for item in consumer: + handler(item) + + def produce(self, data: t.Dict[str, t.Any]): + asyncio.run(self._produce(data)) + + async def _produce(self, data: t.Dict[str, t.Any]): + # Put item onto queue to be flushed via `put_records()`. + async with Producer( + stream_name=self.stream_name, + session=self.async_session, + endpoint_url=self.endpoint_url, + region_name=self.region_name, + # TODO: Make configurable. + create_stream=True, + buffer_time=0.01, + ) as producer: + await producer.put(data) diff --git a/cratedb_toolkit/io/kinesis/api.py b/cratedb_toolkit/io/kinesis/api.py new file mode 100644 index 00000000..9d302736 --- /dev/null +++ b/cratedb_toolkit/io/kinesis/api.py @@ -0,0 +1,6 @@ +from cratedb_toolkit.io.kinesis.relay import KinesisRelay + + +def kinesis_relay(source_url, target_url): + ka = KinesisRelay(source_url, target_url) + ka.start() diff --git a/cratedb_toolkit/io/kinesis/relay.py b/cratedb_toolkit/io/kinesis/relay.py new file mode 100644 index 00000000..9a9ec8de --- /dev/null +++ b/cratedb_toolkit/io/kinesis/relay.py @@ -0,0 +1,82 @@ +import base64 +import json +import logging + +import sqlalchemy as sa +from commons_codec.transform.dynamodb import DynamoDBCDCTranslator +from tqdm import tqdm +from tqdm.contrib.logging import logging_redirect_tqdm +from yarl import URL + +from cratedb_toolkit.io.kinesis.adapter import KinesisAdapter +from cratedb_toolkit.model import DatabaseAddress +from cratedb_toolkit.util import DatabaseAdapter + +logger = logging.getLogger(__name__) + + +class KinesisRelay: + """ + Relay events from Kinesis into CrateDB table. + """ + + def __init__( + self, + kinesis_url: str, + cratedb_url: str, + ): + cratedb_address = DatabaseAddress.from_string(cratedb_url) + cratedb_sqlalchemy_url, cratedb_table_address = cratedb_address.decode() + cratedb_table = cratedb_table_address.fullname + + self.kinesis_url = URL(kinesis_url) + self.kinesis_adapter = KinesisAdapter(self.kinesis_url) + self.cratedb_adapter = DatabaseAdapter(str(cratedb_sqlalchemy_url), echo=False) + self.cratedb_table = self.cratedb_adapter.quote_relation_name(cratedb_table) + + if "dynamodb+cdc" in self.kinesis_url.scheme: + self.translator = DynamoDBCDCTranslator(table_name=self.cratedb_table) + else: + raise NotImplementedError(f"Data processing not implemented for {self.kinesis_url}") + + self.connection: sa.Connection + self.progress_bar: tqdm + + def start(self, once: bool = False): + """ + Read events from Kinesis stream, convert to SQL statements, and submit to CrateDB. + """ + logger.info(f"Source: Kinesis stream={self.kinesis_adapter.stream_name} count=unknown") + self.connection = self.cratedb_adapter.engine.connect() + if not self.cratedb_adapter.table_exists(self.cratedb_table): + self.connection.execute(sa.text(self.translator.sql_ddl)) + self.connection.commit() + records_target = self.cratedb_adapter.count_records(self.cratedb_table) + logger.info(f"Target: CrateDB table={self.cratedb_table} count={records_target}") + # Harmonize logging and progress bar. + # https://github.com/tqdm/tqdm#redirecting-logging + self.progress_bar = tqdm() + with logging_redirect_tqdm(): + if once: + self.kinesis_adapter.consume_once(self.process_event) + else: + self.kinesis_adapter.consume_forever(self.process_event) + + def process_event(self, event): + try: + record = json.loads(base64.b64decode(event["kinesis"]["data"]).decode("utf-8")) + operation = self.translator.to_sql(record) + except Exception: + logger.exception("Decoding Kinesis event failed") + return + try: + # Process record. + self.connection.execute(sa.text(operation.statement), operation.parameters) + + # Processing alternating CDC events requires write synchronization. + self.connection.execute(sa.text(f"REFRESH TABLE {self.cratedb_table}")) + + self.connection.commit() + except sa.exc.ProgrammingError as ex: + logger.warning(f"Running query failed: {ex}") + self.progress_bar.update() diff --git a/doc/io/dynamodb/cdc-lambda.md b/doc/io/dynamodb/cdc-lambda.md new file mode 100644 index 00000000..30ac67dd --- /dev/null +++ b/doc/io/dynamodb/cdc-lambda.md @@ -0,0 +1,250 @@ +# DynamoDB CDC Relay with AWS Lambda + + +## What's Inside +- A convenient [Infrastructure as code (IaC)] procedure to define data pipelines on [AWS]. +- Written in Python, using [AWS CloudFormation] stack deployments. To learn + what's behind, see also [How CloudFormation works]. +- Code for running on [AWS Lambda] is packaged into [OCI] images, for efficient + delta transfers, built-in versioning, and testing purposes. + + +## Details +- This specific document includes a few general guidelines, and a + a few specifics coming from `examples/aws/dynamodb_kinesis_lambda_oci_cratedb.py`. +- That program defines a pipeline which looks like this: + + DynamoDB CDC -> Kinesis Stream -> Python Lambda via OCI -> CrateDB Cloud + +For exercising an AWS pipeline, you need two components: The IaC description, +and a record processor implementation for the AWS Lambda. + +The IaC description will deploy a complete software stack for demonstration +purposes, including a DynamoDB Table, connected to a Kinesis Stream. + + +## Prerequisites + +### CrateDB +This walkthrough assumes a running CrateDB cluster, and focuses on CrateDB Cloud. +It does not provide relevant guidelines to set up a cluster, yet. + +### OCI image +In order to package code for AWS Lambda functions packages into OCI images, +and use them, you will need to publish them to the AWS ECR container image +registry. + +You will need to authenticate your local Docker environment, and create a +container image repository once for each project using a different runtime +image. + +Define your AWS ID, region label, and repository name, to be able to use +the templated commands 1:1. +```shell +aws_id=831394476016 +aws_region=eu-central-1 +repository_name=kinesis-cratedb-processor-lambda +``` +```shell +aws ecr get-login-password --region=${aws_region} | \ + docker login --username AWS --password-stdin ${aws_id}.dkr.ecr.${aws_region}.amazonaws.com +``` + +(ecr-repository)= +### ECR Repository +Just once, before proceeding, create an image repository hosting the runtime +code for your Lambda function. +```shell +aws ecr create-repository --region=${aws_region} \ + --repository-name=${repository_name} --image-tag-mutability=MUTABLE +``` +In order to allow others to pull that image, you will need to define a +[repository policy] using the [set-repository-policy] subcommend of the AWS CLI. +In order to invoke that command, put the [](project:#ecr-repository-policy) +JSON definition into a file called `policy.json`. +```shell +aws ecr set-repository-policy --repository-name=${repository_name} --policy-text file://policy.json +``` + + +## Install +In order to exercise the example outlined below, you need to install +CrateDB Toolkit with the "kinesis" extension, because CDC data will be +relayed using AWS Kinesis. +```shell +pip install 'cratedb-toolkit[kinesis]' +``` + + +## Usage + +:::{rubric} Configure +::: +```shell +export CRATEDB_HTTP_URL='https://admin:dZ...6LqB@testdrive.eks1.eu-west-1.aws.cratedb.net:4200/' +export CRATEDB_SQLALCHEMY_URL='crate://admin:dZ...6LqB@testdrive.eks1.eu-west-1.aws.cratedb.net:4200/?ssl=true' +``` + +:::{rubric} CrateDB Table +::: +The destination table name in CrateDB, where the CDC record +processor will re-materialize CDC events into. +```shell +pip install crash +crash --hosts "${CRATEDB_HTTP_URL}" -c 'CREATE TABLE "demo-sink" (data OBJECT(DYNAMIC));' +``` + +:::{rubric} Invoke pipeline +::: +Package the Lambda function, upload it, and deploy demo software stack. +```shell +python dynamodb_kinesis_lambda_oci_cratedb.py +``` +For example, choose those two variants: + +- IaC driver: [dynamodb_kinesis_lambda_oci_cratedb.py] +- Record processor: [kinesis_lambda.py] + +Putting them next to each other into a directory, and adjusting +`LambdaPythonImage(entrypoint_file=...)` to point to the second, +should be enough to get you started. + + +:::{rubric} Trigger CDC events +::: +Inserting a document into the DynamoDB table, and updating it, will trigger two CDC events. +```shell +READING_SQL="{'timestamp': '2024-07-12T01:17:42', 'device': 'foo', 'temperature': 42.42, 'humidity': 84.84}" +READING_WHERE="\"device\"='foo' AND \"timestamp\"='2024-07-12T01:17:42'" + +aws dynamodb execute-statement --statement \ + "INSERT INTO \"demo-source\" VALUE ${READING_SQL};" + +aws dynamodb execute-statement --statement \ + "UPDATE \"demo-source\" SET temperature=43.59 WHERE ${READING_WHERE};" +``` + +:::{rubric} Query data in CrateDB +::: +When the stream delivered the CDC data to the processor, and everything worked well, +data should have materialized in the target table in CrateDB. +```shell +crash --hosts "${CRATEDB_HTTP_URL}" --command \ + 'SELECT * FROM "demo-sink";' +``` + +:::{rubric} Shut down AWS stack +::: +In order to complete the experiment, you may want to shut down the AWS stack again. +```shell +aws cloudformation delete-stack --stack-name testdrive-dynamodb-dev +``` + + +## Appendix + +### Processor +Check status of Lambda function. +```shell +aws lambda get-function \ + --function-name arn:aws:lambda:eu-central-1:831394476016:function:testdrive-dynamodb-dev-lambda-processor +``` +Check status of stream mapping(s). +```shell +aws lambda list-event-source-mappings +``` +Check logs. +```shell +aws logs describe-log-groups +aws logs start-live-tail --log-group-identifiers arn:aws:logs:eu-central-1:831394476016:log-group:/aws/lambda/DynamoDBCrateDBProcessor +``` + +### Database + +There are a few utility commands that help you operate the stack, that have not +been absorbed yet. See also [Monitoring and troubleshooting Lambda functions]. + +Query records in CrateDB table. +```shell +crash --hosts "${CRATEDB_HTTP_URL}" --command \ + 'SELECT * FROM "demo-sink";' +``` + +Truncate CrateDB table. +```shell +crash --hosts "${CRATEDB_HTTP_URL}" --command \ + 'DELETE FROM "demo-sink";' +``` + +Query documents in DynamoDB table. +```shell +aws dynamodb execute-statement --statement \ + "SELECT * FROM \"demo-source\";" +``` + + +(ecr-repository-policy)= +### ECR Repository Policy +```json +{ + "Version": "2008-10-17", + "Statement": [ + { + "Sid": "allow public pull", + "Effect": "Allow", + "Principal": "*", + "Action": [ + "ecr:BatchCheckLayerAvailability", + "ecr:BatchGetImage", + "ecr:GetDownloadUrlForLayer" + ] + } + ] +} +``` + +## Troubleshooting + +### ECR Repository +If you receive such an error message, your session has expired, and you need +to re-run the authentication step. +```text +denied: Your authorization token has expired. Reauthenticate and try again. +``` + +This error message indicates your ECR repository does not exist. The solution +is to create it, using the command shared above. +```text +name unknown: The repository with name 'kinesis-cratedb-processor-lambda' does +not exist in the registry with id '831394476016' +``` + +### AWS CloudFormation +If you receive such an error, ... +```text +botocore.exceptions.ClientError: An error occurred (ValidationError) when calling +the CreateChangeSet operation: Stack:arn:aws:cloudformation:eu-central-1:931394475905:stack/testdrive-dynamodb-dev/ea8c32e0-492c-11ef-b9b3-06b708ecd03f +is in UPDATE_ROLLBACK_FAILED state and can not be updated. +``` +because some detail when deploying or updating the CloudFormation recipe fails, +the CloudFormation stack is stuck, and you will need to [continue rolling back +an update] manually. +```shell +aws cloudformation continue-update-rollback --stack-name testdrive-dynamodb-dev +``` + + + +[AWS]: https://en.wikipedia.org/wiki/Amazon_Web_Services +[AWS CloudFormation]: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/Welcome.html +[AWS Lambda]: https://en.wikipedia.org/wiki/AWS_Lambda +[continue rolling back an update]: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/using-cfn-updating-stacks-continueupdaterollback.html +[dynamodb_kinesis_lambda_oci_cratedb.py]: https://github.com/crate/cratedb-toolkit/blob/main/examples/aws/dynamodb_kinesis_lambda_oci_cratedb.py +[example program]: https://github.com/crate/cratedb-toolkit/tree/main/examples/aws +[How CloudFormation works]: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/cloudformation-overview.html +[Infrastructure as code (IaC)]: https://en.wikipedia.org/wiki/Infrastructure_as_code +[kinesis_lambda.py]: https://github.com/crate/cratedb-toolkit/blob/main/cratedb_toolkit/io/processor/kinesis_lambda.py +[Monitoring and troubleshooting Lambda functions]: https://docs.aws.amazon.com/lambda/latest/dg/lambda-monitoring.html +[OCI]: https://en.wikipedia.org/wiki/Open_Container_Initiative +[repository policy]: https://docs.aws.amazon.com/lambda/latest/dg/images-create.html#gettingstarted-images-permissions +[set-repository-policy]: https://docs.aws.amazon.com/cli/latest/reference/ecr/set-repository-policy.html diff --git a/doc/io/dynamodb/cdc.md b/doc/io/dynamodb/cdc.md index eb1917cf..5e67f021 100644 --- a/doc/io/dynamodb/cdc.md +++ b/doc/io/dynamodb/cdc.md @@ -1,250 +1,76 @@ +(dynamodb-cdc)= # DynamoDB CDC Relay +## About +Relay data changes from DynamoDB into CrateDB using a one-stop command +`ctk load table kinesis+dynamodb+cdc://...`, in order to facilitate +convenient data transfers to be used within data pipelines or ad hoc +operations. -## What's Inside -- A convenient [Infrastructure as code (IaC)] procedure to define data pipelines on [AWS]. -- Written in Python, using [AWS CloudFormation] stack deployments. To learn - what's behind, see also [How CloudFormation works]. -- Code for running on [AWS Lambda] is packaged into [OCI] images, for efficient - delta transfers, built-in versioning, and testing purposes. - - -## Details -- This specific document includes a few general guidelines, and a - a few specifics coming from `examples/aws/dynamodb_kinesis_lambda_oci_cratedb.py`. -- That program defines a pipeline which looks like this: - - DynamoDB CDC -> Kinesis Stream -> Python Lambda via OCI -> CrateDB Cloud - -For exercising an AWS pipeline, you need two components: The IaC description, -and a record processor implementation for the AWS Lambda. - -The IaC description will deploy a complete software stack for demonstration -purposes, including a DynamoDB Table, connected to a Kinesis Stream. - - -## Prerequisites - -### CrateDB -This walkthrough assumes a running CrateDB cluster, and focuses on CrateDB Cloud. -It does not provide relevant guidelines to set up a cluster, yet. - -### OCI image -In order to package code for AWS Lambda functions packages into OCI images, -and use them, you will need to publish them to the AWS ECR container image -registry. - -You will need to authenticate your local Docker environment, and create a -container image repository once for each project using a different runtime -image. - -Define your AWS ID, region label, and repository name, to be able to use -the templated commands 1:1. -```shell -aws_id=831394476016 -aws_region=eu-central-1 -repository_name=kinesis-cratedb-processor-lambda -``` -```shell -aws ecr get-login-password --region=${aws_region} | \ - docker login --username AWS --password-stdin ${aws_id}.dkr.ecr.${aws_region}.amazonaws.com -``` - -(ecr-repository)= -### ECR Repository -Just once, before proceeding, create an image repository hosting the runtime -code for your Lambda function. -```shell -aws ecr create-repository --region=${aws_region} \ - --repository-name=${repository_name} --image-tag-mutability=MUTABLE -``` -In order to allow others to pull that image, you will need to define a -[repository policy] using the [set-repository-policy] subcommend of the AWS CLI. -In order to invoke that command, put the [](project:#ecr-repository-policy) -JSON definition into a file called `policy.json`. -```shell -aws ecr set-repository-policy --repository-name=${repository_name} --policy-text file://policy.json -``` - +It taps into [Change data capture for DynamoDB Streams], in this case +using [Kinesis Data Streams]. It is the sister to the corresponding +full-load implementation, [](#dynamodb-loader). ## Install -In order to exercise the example outlined below, you need to install -CrateDB Toolkit with the "kinesis" extension, because CDC data will be -relayed using AWS Kinesis. ```shell -pip install 'cratedb-toolkit[kinesis]' +pip install --upgrade 'cratedb-toolkit[kinesis] @ git+https://github.com/crate/cratedb-toolkit.git@dynamodb-cdc-standalone' ``` - ## Usage - -:::{rubric} Configure -::: +Consume data from Kinesis Data Stream of DynamoDB CDC events into +CrateDB schema/table. ```shell -export CRATEDB_HTTP_URL='https://admin:dZ...6LqB@testdrive.eks1.eu-west-1.aws.cratedb.net:4200/' -export CRATEDB_SQLALCHEMY_URL='crate://admin:dZ...6LqB@testdrive.eks1.eu-west-1.aws.cratedb.net:4200/?ssl=true' +export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo +ctk load table kinesis+dynamodb+cdc://AWS_ACCESS_KEY:AWS_SECRET_ACCESS_KEY@aws/cdc-stream?region=eu-central-1 ``` -:::{rubric} CrateDB Table -::: -The destination table name in CrateDB, where the CDC record -processor will re-materialize CDC events into. +Query data in CrateDB. ```shell -pip install crash -crash --hosts "${CRATEDB_HTTP_URL}" -c 'CREATE TABLE "demo-sink" (data OBJECT(DYNAMIC));' +export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo +ctk shell --command "SELECT * FROM testdrive.demo;" +ctk show table "testdrive.demo" ``` -:::{rubric} Invoke pipeline -::: -Package the Lambda function, upload it, and deploy demo software stack. -```shell -python dynamodb_kinesis_lambda_oci_cratedb.py -``` -For example, choose those two variants: - -- IaC driver: [dynamodb_kinesis_lambda_oci_cratedb.py] -- Record processor: [kinesis_lambda.py] - -Putting them next to each other into a directory, and adjusting -`LambdaPythonImage(entrypoint_file=...)` to point to the second, -should be enough to get you started. - +## Variants -:::{rubric} Trigger CDC events -::: -Inserting a document into the DynamoDB table, and updating it, will trigger two CDC events. +### CrateDB Cloud +When aiming to transfer data to CrateDB Cloud, the shape of the target URL +looks like that. ```shell -READING_SQL="{'timestamp': '2024-07-12T01:17:42', 'device': 'foo', 'temperature': 42.42, 'humidity': 84.84}" -READING_WHERE="\"device\"='foo' AND \"timestamp\"='2024-07-12T01:17:42'" - -aws dynamodb execute-statement --statement \ - "INSERT INTO \"demo-source\" VALUE ${READING_SQL};" - -aws dynamodb execute-statement --statement \ - "UPDATE \"demo-source\" SET temperature=43.59 WHERE ${READING_WHERE};" -``` - -:::{rubric} Query data in CrateDB -::: -When the stream delivered the CDC data to the processor, and everything worked well, -data should have materialized in the target table in CrateDB. -```shell -crash --hosts "${CRATEDB_HTTP_URL}" --command \ - 'SELECT * FROM "demo-sink";' -``` - -:::{rubric} Shut down AWS stack -::: -In order to complete the experiment, you may want to shut down the AWS stack again. -```shell -aws cloudformation delete-stack --stack-name testdrive-dynamodb-dev -``` - - -## Appendix - -### Processor -Check status of Lambda function. -```shell -aws lambda get-function \ - --function-name arn:aws:lambda:eu-central-1:831394476016:function:testdrive-dynamodb-dev-lambda-processor -``` -Check status of stream mapping(s). -```shell -aws lambda list-event-source-mappings -``` -Check logs. -```shell -aws logs describe-log-groups -aws logs start-live-tail --log-group-identifiers arn:aws:logs:eu-central-1:831394476016:log-group:/aws/lambda/DynamoDBCrateDBProcessor -``` - -### Database - -There are a few utility commands that help you operate the stack, that have not -been absorbed yet. See also [Monitoring and troubleshooting Lambda functions]. - -Query records in CrateDB table. -```shell -crash --hosts "${CRATEDB_HTTP_URL}" --command \ - 'SELECT * FROM "demo-sink";' +export CRATEDB_SQLALCHEMY_URL='crate://admin:dZ...6LqB@testdrive.eks1.eu-west-1.aws.cratedb.net:4200/?ssl=true' ``` -Truncate CrateDB table. -```shell -crash --hosts "${CRATEDB_HTTP_URL}" --command \ - 'DELETE FROM "demo-sink";' -``` +### LocalStack +In order to exercise data transfers exclusively on your workstation, you can +use LocalStack to run DynamoDB and Kinesis service surrogates locally. See +also the [Get started with Kinesis on LocalStack] tutorial. -Query documents in DynamoDB table. +For addressing a Kinesis Data Stream on LocalStack, use a command of that shape. +See [Credentials for accessing LocalStack AWS API] for further information. ```shell -aws dynamodb execute-statement --statement \ - "SELECT * FROM \"demo-source\";" +ctk load table kinesis+dynamodb+cdc://LSIAQAAAAAAVNCBMPNSG:dummy@localhost:4566/cdc-stream?region=eu-central-1 ``` +:::{tip} +LocalStack is a cloud service emulator that runs in a single container on your +laptop or in your CI environment. With LocalStack, you can run your AWS +applications or Lambdas entirely on your local machine without connecting to +a remote cloud provider. -(ecr-repository-policy)= -### ECR Repository Policy -```json -{ - "Version": "2008-10-17", - "Statement": [ - { - "Sid": "allow public pull", - "Effect": "Allow", - "Principal": "*", - "Action": [ - "ecr:BatchCheckLayerAvailability", - "ecr:BatchGetImage", - "ecr:GetDownloadUrlForLayer" - ] - } - ] -} -``` - -## Troubleshooting - -### ECR Repository -If you receive such an error message, your session has expired, and you need -to re-run the authentication step. -```text -denied: Your authorization token has expired. Reauthenticate and try again. -``` - -This error message indicates your ECR repository does not exist. The solution -is to create it, using the command shared above. -```text -name unknown: The repository with name 'kinesis-cratedb-processor-lambda' does -not exist in the registry with id '831394476016' -``` - -### AWS CloudFormation -If you receive such an error, ... -```text -botocore.exceptions.ClientError: An error occurred (ValidationError) when calling -the CreateChangeSet operation: Stack:arn:aws:cloudformation:eu-central-1:931394475905:stack/testdrive-dynamodb-dev/ea8c32e0-492c-11ef-b9b3-06b708ecd03f -is in UPDATE_ROLLBACK_FAILED state and can not be updated. -``` -because some detail when deploying or updating the CloudFormation recipe fails, -the CloudFormation stack is stuck, and you will need to [continue rolling back -an update] manually. +In order to invoke LocalStack on your workstation, you can use this Docker +command. ```shell -aws cloudformation continue-update-rollback --stack-name testdrive-dynamodb-dev +docker run \ + --rm -it \ + -p 127.0.0.1:4566:4566 \ + -p 127.0.0.1:4510-4559:4510-4559 \ + -v /var/run/docker.sock:/var/run/docker.sock \ + localstack/localstack:latest ``` +::: - -[AWS]: https://en.wikipedia.org/wiki/Amazon_Web_Services -[AWS CloudFormation]: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/Welcome.html -[AWS Lambda]: https://en.wikipedia.org/wiki/AWS_Lambda -[continue rolling back an update]: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/using-cfn-updating-stacks-continueupdaterollback.html -[dynamodb_kinesis_lambda_oci_cratedb.py]: https://github.com/crate/cratedb-toolkit/blob/main/examples/aws/dynamodb_kinesis_lambda_oci_cratedb.py -[example program]: https://github.com/crate/cratedb-toolkit/tree/main/examples/aws -[How CloudFormation works]: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/cloudformation-overview.html -[Infrastructure as code (IaC)]: https://en.wikipedia.org/wiki/Infrastructure_as_code -[kinesis_lambda.py]: https://github.com/crate/cratedb-toolkit/blob/main/cratedb_toolkit/io/processor/kinesis_lambda.py -[Monitoring and troubleshooting Lambda functions]: https://docs.aws.amazon.com/lambda/latest/dg/lambda-monitoring.html -[OCI]: https://en.wikipedia.org/wiki/Open_Container_Initiative -[repository policy]: https://docs.aws.amazon.com/lambda/latest/dg/images-create.html#gettingstarted-images-permissions -[set-repository-policy]: https://docs.aws.amazon.com/cli/latest/reference/ecr/set-repository-policy.html +[Change data capture for DynamoDB Streams]: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html +[Credentials for accessing LocalStack AWS API]: https://docs.localstack.cloud/references/credentials/ +[Get started with Kinesis on LocalStack]: https://docs.localstack.cloud/user-guide/aws/kinesis/ +[Kinesis Data Streams]: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/kds.html diff --git a/doc/io/dynamodb/index.md b/doc/io/dynamodb/index.md index 9b37c671..f61dfbcf 100644 --- a/doc/io/dynamodb/index.md +++ b/doc/io/dynamodb/index.md @@ -10,4 +10,5 @@ Using the DynamoDB subsystem, you can transfer data from and to DynamoDB. loader cdc +cdc-lambda ``` diff --git a/doc/io/dynamodb/loader.md b/doc/io/dynamodb/loader.md index 9d3a2e78..5fed0f1b 100644 --- a/doc/io/dynamodb/loader.md +++ b/doc/io/dynamodb/loader.md @@ -6,6 +6,9 @@ Load data from DynamoDB into CrateDB using a one-stop command `ctk load table dynamodb://...`, in order to facilitate convenient data transfers to be used within data pipelines or ad hoc operations. +It is the brother to the corresponding cdc-relay implementation, +[](#dynamodb-cdc). + ## Install ```shell pip install --upgrade 'cratedb-toolkit[dynamodb]' diff --git a/examples/aws/kinesis_put.py b/examples/aws/kinesis_put.py new file mode 100644 index 00000000..8e554f6d --- /dev/null +++ b/examples/aws/kinesis_put.py @@ -0,0 +1,14 @@ +from yarl import URL + +from cratedb_toolkit.io.kinesis.adapter import KinesisAdapter +from tests.io.test_processor import DYNAMODB_CDC_INSERT_NESTED, DYNAMODB_CDC_MODIFY_NESTED, wrap_kinesis + + +def main(): + ka = KinesisAdapter(URL("kinesis://LSIAQAAAAAAVNCBMPNSG:dummy@localhost:4566/cdc-stream?region=eu-central-1")) + ka.produce(wrap_kinesis(DYNAMODB_CDC_INSERT_NESTED)) + ka.produce(wrap_kinesis(DYNAMODB_CDC_MODIFY_NESTED)) + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml index 135396e3..2db06f9a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -158,6 +158,8 @@ io = [ "sqlalchemy>=2", ] kinesis = [ + "aiobotocore<2.15", + "async-kinesis<1.2", "commons-codec>=0.0.14", "lorrystream[carabas]>=0.0.6", ] diff --git a/tests/io/dynamodb/conftest.py b/tests/io/dynamodb/conftest.py index 8806c508..c21d0e86 100644 --- a/tests/io/dynamodb/conftest.py +++ b/tests/io/dynamodb/conftest.py @@ -29,7 +29,7 @@ def setup(self): from cratedb_toolkit.testing.testcontainers.localstack import LocalStackContainerWithKeepalive self.container = LocalStackContainerWithKeepalive() - self.container.with_services("dynamodb") + self.container.with_services("dynamodb", "kinesis") self.container.start() def finalize(self): @@ -44,10 +44,14 @@ def reset(self): for database_name in RESET_TABLES: self.client.drop_database(database_name) - def get_connection_url(self): + def get_connection_url_dynamodb(self): url = URL(self.container.get_url()) return f"dynamodb://LSIAQAAAAAAVNCBMPNSG:dummy@{url.host}:{url.port}" + def get_connection_url_kinesis_dynamodb_cdc(self): + url = URL(self.container.get_url()) + return f"kinesis+dynamodb+cdc://LSIAQAAAAAAVNCBMPNSG:dummy@{url.host}:{url.port}" + @pytest.fixture(scope="session") def dynamodb_service(): @@ -71,4 +75,4 @@ def dynamodb(dynamodb_service): @pytest.fixture(scope="session") def dynamodb_test_manager(dynamodb_service): - return DynamoDBTestManager(dynamodb_service.get_connection_url()) + return DynamoDBTestManager(dynamodb_service.get_connection_url_dynamodb()) diff --git a/tests/io/dynamodb/test_adapter.py b/tests/io/dynamodb/test_adapter.py index f8c8e7ca..05ae643d 100644 --- a/tests/io/dynamodb/test_adapter.py +++ b/tests/io/dynamodb/test_adapter.py @@ -13,7 +13,7 @@ def test_adapter_scan_success(dynamodb): - dynamodb_url = f"{dynamodb.get_connection_url()}/demo?region=us-east-1" + dynamodb_url = f"{dynamodb.get_connection_url_dynamodb()}/demo?region=us-east-1" adapter = DynamoDBAdapter(URL(dynamodb_url)) adapter.scan("foo") @@ -22,7 +22,7 @@ def test_adapter_scan_failure_consistent_read(dynamodb): """ Check supplying invalid parameters to `DynamoDBAdapter` fails as expected. """ - dynamodb_url = f"{dynamodb.get_connection_url()}/demo?region=us-east-1" + dynamodb_url = f"{dynamodb.get_connection_url_dynamodb()}/demo?region=us-east-1" adapter = DynamoDBAdapter(URL(dynamodb_url)) with pytest.raises(ParamValidationError) as ex: @@ -34,7 +34,7 @@ def test_adapter_scan_failure_page_size(dynamodb): """ Check supplying invalid parameters to `DynamoDBAdapter` fails as expected. """ - dynamodb_url = f"{dynamodb.get_connection_url()}/demo?region=us-east-1" + dynamodb_url = f"{dynamodb.get_connection_url_dynamodb()}/demo?region=us-east-1" adapter = DynamoDBAdapter(URL(dynamodb_url)) with pytest.raises(ParamValidationError) as ex: diff --git a/tests/io/dynamodb/test_cli.py b/tests/io/dynamodb/test_cli.py index 71d33e11..45f7e2fe 100644 --- a/tests/io/dynamodb/test_cli.py +++ b/tests/io/dynamodb/test_cli.py @@ -10,7 +10,7 @@ def test_dynamodb_load_table(caplog, cratedb, dynamodb, dynamodb_test_manager): """ CLI test: Invoke `ctk load table` for DynamoDB. """ - dynamodb_url = f"{dynamodb.get_connection_url()}/ProductCatalog?region=us-east-1" + dynamodb_url = f"{dynamodb.get_connection_url_dynamodb()}/ProductCatalog?region=us-east-1" cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" # Populate source database with sample dataset. diff --git a/tests/io/dynamodb/test_copy.py b/tests/io/dynamodb/test_copy.py index 2ced15b1..609f29b0 100644 --- a/tests/io/dynamodb/test_copy.py +++ b/tests/io/dynamodb/test_copy.py @@ -16,7 +16,7 @@ def test_dynamodb_copy_success(caplog, cratedb, dynamodb, dynamodb_test_manager) """ # Define source and target URLs. - dynamodb_url = f"{dynamodb.get_connection_url()}/demo?region=us-east-1" + dynamodb_url = f"{dynamodb.get_connection_url_dynamodb()}/demo?region=us-east-1" cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" # Populate source database with data. diff --git a/tests/io/dynamodb/test_relay.py b/tests/io/dynamodb/test_relay.py new file mode 100644 index 00000000..d65f8097 --- /dev/null +++ b/tests/io/dynamodb/test_relay.py @@ -0,0 +1,62 @@ +import time + +import botocore +import pytest + +from cratedb_toolkit.io.kinesis.relay import KinesisRelay +from tests.io.test_processor import DYNAMODB_CDC_INSERT_NESTED, DYNAMODB_CDC_MODIFY_NESTED, wrap_kinesis + +pytestmark = pytest.mark.kinesis + +pytest.importorskip("commons_codec", reason="Only works with commons-codec installed") +pytest.importorskip("kinesis", reason="Only works with async-kinesis installed") + +from commons_codec.transform.dynamodb import DynamoDBCDCTranslator # noqa: E402 + + +def test_kinesis_dynamodb_cdc_insert_update(caplog, cratedb, dynamodb): + """ + Roughly verify that the AWS DynamoDB CDC processing works as expected. + """ + + # Define source and target URLs. + kinesis_url = f"{dynamodb.get_connection_url_kinesis_dynamodb_cdc()}/demo?region=us-east-1" + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" + + # Define target table name. + table_name = '"testdrive"."demo"' + + # Create target table. + cratedb.database.run_sql(DynamoDBCDCTranslator(table_name=table_name).sql_ddl) + + # Define two CDC events: INSERT and UPDATE. + events = [ + wrap_kinesis(DYNAMODB_CDC_INSERT_NESTED), + wrap_kinesis(DYNAMODB_CDC_MODIFY_NESTED), + ] + + # Initialize table loader. + table_loader = KinesisRelay(kinesis_url=kinesis_url, cratedb_url=cratedb_url) + + # Delete stream for blank canvas. + try: + table_loader.kinesis_adapter.kinesis_client.delete_stream(StreamName="demo") + except botocore.exceptions.ClientError as error: + if error.response["Error"]["Code"] != "ResourceNotFoundException": + raise + + # LocalStack needs a while when deleting the Stream. + # FIXME: Can this be made more efficient? + time.sleep(0.5) + + # Populate source database with data. + for event in events: + table_loader.kinesis_adapter.produce(event) + + # Run transfer command, consuming once not forever. + table_loader.start(once=True) + + # Verify data in target database. + assert cratedb.database.count_records(table_name) == 1 + results = cratedb.database.run_sql(f"SELECT * FROM {table_name}", records=True) # noqa: S608 + assert results[0]["data"]["list_of_objects"] == [{"foo": "bar"}, {"baz": "qux"}] diff --git a/tests/io/test_processor.py b/tests/io/test_processor.py index 6de1bad0..3439380b 100644 --- a/tests/io/test_processor.py +++ b/tests/io/test_processor.py @@ -133,8 +133,6 @@ def test_processor_kinesis_dynamodb_insert_update(cratedb, reset_handler, mocker from cratedb_toolkit.io.processor.kinesis_lambda import handler # Define two CDC events: INSERT and UPDATE. - # They have to be conveyed separately because CrateDB needs a - # `REFRESH TABLE` operation between them. event = { "Records": [ wrap_kinesis(DYNAMODB_CDC_INSERT_NESTED), @@ -162,6 +160,6 @@ def wrap_kinesis(data): "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "kinesis": { "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", - "data": base64.b64encode(json.dumps(data).encode("utf-8")), + "data": base64.b64encode(json.dumps(data).encode("utf-8")).decode(), }, }