From 6bfd268d3cd7fe0e7f1944ba2c7bebfc14e28898 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Tue, 6 Aug 2024 11:56:52 +0200 Subject: [PATCH] Carabas/DMS: Make it work - Use a real DMS replication instance - Streamline configuration of DMS replication task - Improve processor Lambda --- doc/backlog.rst | 2 + doc/carabas/backlog.md | 2 + doc/carabas/dms/index.md | 184 ++++++++++++ doc/carabas/research.md | 4 + ...s_postgresql_kinesis_lambda_oci_cratedb.py | 91 ++++-- lorrystream/carabas/aws/cf/dms_next.py | 2 +- lorrystream/carabas/aws/model.py | 10 +- lorrystream/carabas/aws/stack/dms.py | 262 ++++++++++-------- lorrystream/process/kinesis_cratedb_lambda.py | 88 ++++-- pyproject.toml | 2 +- tests/test_process.py | 2 + 11 files changed, 495 insertions(+), 154 deletions(-) create mode 100644 doc/carabas/dms/index.md diff --git a/doc/backlog.rst b/doc/backlog.rst index bfa0b4a..f655abf 100644 --- a/doc/backlog.rst +++ b/doc/backlog.rst @@ -40,6 +40,8 @@ Iteration 2 - [o] Examples: Add ``appsink`` example - [o] Improve inline docs - [o] Release 0.1.0 +- [o] CSV: https://github.com/alan-turing-institute/CleverCSV +- [o] Excel & ODF: https://github.com/dimastbk/python-calamine *********** diff --git a/doc/carabas/backlog.md b/doc/carabas/backlog.md index 05bcd85..e7c455d 100644 --- a/doc/carabas/backlog.md +++ b/doc/carabas/backlog.md @@ -17,3 +17,5 @@ - [ ] Improve efficiency by using bulk operations when applicable - [ ] is in UPDATE_ROLLBACK_COMPLETE_CLEANUP_IN_PROGRESS state and can not be updated - [ ] is in ROLLBACK_COMPLETE state and can not be updated. +- [ ] Cannot create a publicly accessible DBInstance. The specified VPC has no + internet gateway attached.Update the VPC and then try again diff --git a/doc/carabas/dms/index.md b/doc/carabas/dms/index.md new file mode 100644 index 0000000..f48e877 --- /dev/null +++ b/doc/carabas/dms/index.md @@ -0,0 +1,184 @@ +# Pipelines with AWS DMS + +_AWS DMS to Kinesis to CrateDB._ + +## What's Inside +- [Using a PostgreSQL database as an AWS DMS source] +- [Using Amazon Kinesis Data Streams as a target for AWS Database Migration Service] +- Full load and CDC +- Source: RDS PostgreSQL +- Target: CrateDB Cloud + + +## Infrastructure Setup + +### CrateDB Table +The destination table name in CrateDB, where the CDC record +processor will re-materialize CDC events into. +```shell +pip install crash +crash -c "CREATE TABLE public.foo (data OBJECT(DYNAMIC));" +``` + +### Deploy +The following walkthrough describes a full deployment of AWS DMS including relevant +outbound data processors for demonstration purposes. In order to run it in production, +you are welcome to derive from it and tweak it for your own purposes. + +Configure CrateDB database sink address. +```shell +export SINK_SQLALCHEMY_URL='crate://admin:dZ..qB@example.eks1.eu-west-1.aws.cratedb.net:4200/?ssl=true' +``` + +Invoking the IaC driver program in order to deploy relevant resources on AWS +using CloudFormation is fundamental. +```shell +python examples/aws/rds_postgresql_kinesis_lambda_oci_cratedb.py +``` + +After deployment succeeded, you will be presented a corresponding +response including relevant information about entrypoints to the software +stack you've just created. +```text +Result of CloudFormation deployment: +psql command: psql "postgresql://dynapipe:secret11@testdrive-dms-postgresql-dev-db.czylftvqn1ed.eu-central-1.rds.amazonaws.com:5432/postgres" +RDS Instance ARN: arn:aws:rds:eu-central-1:831394476016:db:testdrive-dms-postgresql-dev-db +Stream ARN: arn:aws:kinesis:eu-central-1:831394476016:stream/testdrive-dms-postgresql-dev-stream +Replication ARN: arn:aws:dms:eu-central-1:831394476016:replication-config:EAM3JEHXGBGZBPN5PLON7NPDEE +``` + +### Status Checks + +Display ARN of replication instances. +```shell +aws dms describe-replication-instances | jq -r '.ReplicationInstances[].ReplicationInstanceArn' +``` + +Display replication endpoints and relevant connection settings. +```shell +aws dms describe-endpoints +``` + +```shell +aws dms test-connection \ + --replication-instance-arn arn:aws:dms:eu-central-1:831394476016:rep:JD2LL6OM35BJZNKZIRSOE2FXIY \ + --endpoint-arn arn:aws:dms:eu-central-1:831394476016:endpoint:3IVDGL6E4RDNBF2LFBYF6DYV3Y + +aws dms describe-connections +``` + + +## Usage + +### Prerequisites +First of all, activate the `pglocical` extension on your RDS PostgreSQL instance. +```sql +CREATE EXTENSION pglogical; +SELECT * FROM pg_catalog.pg_extension WHERE extname='pglogical'; +``` + +### Data in Source +After that, connect to RDS PostgreSQL, and provision a little bunch of data. +```sql +DROP TABLE IF EXISTS foo CASCADE; +CREATE TABLE foo (id INT PRIMARY KEY, name TEXT, age INT, attributes JSONB); +INSERT INTO foo (id, name, age, attributes) VALUES (42, 'John', 30, '{"foo": "bar"}'); +INSERT INTO foo (id, name, age, attributes) VALUES (43, 'Jane', 31, '{"baz": "qux"}'); +``` + +### Data in Target +```sql +cr> SELECT * FROM public.foo; +``` +```postgresql ++---------------------------------------------------------------------+ +| data | ++---------------------------------------------------------------------+ +| {"age": 30, "attributes": {"foo": "bar"}, "id": 42, "name": "John"} | +| {"age": 31, "attributes": {"baz": "qux"}, "id": 43, "name": "Jane"} | ++---------------------------------------------------------------------+ +``` + +### Operations +Enumerate all configured replication tasks with compact output. +```shell +aws dms describe-replication-tasks | \ + jq '.ReplicationTasks[] | {ReplicationTaskIdentifier, ReplicationTaskArn, MigrationType, StartReplicationType, Status, StopReason, FailureMessages, ProvisionData}' +``` +Start replication task with given ARN. +```shell +aws dms start-replication-task \ + --start-replication-task-type start-replication --replication-task-arn \ + arn:aws:dms:eu-central-1:831394476016:task:7QBLNBTPCNDEBG7CHI3WA73YFA +``` +Stop replication task with given ARN. +```shell +aws dms stop-replication-task --replication-task-arn \ + arn:aws:dms:eu-central-1:831394476016:task:7QBLNBTPCNDEBG7CHI3WA73YFA +``` + + +### Logging + +To see detailed progress about the replication process, use CloudWatch to +inspect corresponding log output. + +Enumerate all log groups. +```shell +aws logs describe-log-groups +``` + +Get log output history. +```shell +aws logs get-log-events \ + --log-group-name dms-tasks-testdrive-dms-instance \ + --log-stream-name dms-task-7QBLNBTPCNDEBG7CHI3WA73YFA | jq .events[].message +``` + +Start watching the log output using the `start-live-tail` CloudWatch operation. +```shell +aws logs start-live-tail --log-group-identifiers \ + arn:aws:logs:eu-central-1:831394476016:log-group:/aws/rds/instance/testdrive-dms-postgresql-dev-db/postgresql \ + arn:aws:logs:eu-central-1:831394476016:log-group:dms-tasks-testdrive-dms-instance +``` + + +## Appendix + +### CloudFormation + +```shell +aws cloudformation continue-update-rollback --stack-name testdrive-dms-postgresql-dev +aws cloudformation delete-stack --stack-name testdrive-dms-postgresql-dev +``` + +```sql +SHOW shared_preload_libraries; +SELECT name, setting FROM pg_settings WHERE name in ('rds.logical_replication','shared_preload_libraries'); +``` + +- https://docs.aws.amazon.com/dms/latest/APIReference/API_StartReplication.html#DMS-StartReplication-request-StartReplicationType +- https://docs.aws.amazon.com/cli/latest/reference/dms/start-replication-task.html + +Possible values for `--start-replication-type`: + +- start-replication +- resume-processing +- reload-target + +```sql +update foo set age=32 where name='Jane'; +update foo set age=33 where id=43; +update foo set age=33 where attributes->>'foo'='bar'; +update foo set attributes = jsonb_set(attributes, '{last_name}', '"Doe"', true) where name='John'; +``` +```sql +delete from foo where name='Jane'; +delete from foo where name='John'; +``` + + +[AWS::DMS::ReplicationConfig]: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-dms-replicationconfig.html +[Using a PostgreSQL database as an AWS DMS source]: https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Source.PostgreSQL.html +[Using Amazon Kinesis Data Streams as a target for AWS Database Migration Service]: https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Target.Kinesis.html +[Using object mapping to migrate data to a Kinesis data stream]: https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Target.Kinesis.html#CHAP_Target.Kinesis.ObjectMapping diff --git a/doc/carabas/research.md b/doc/carabas/research.md index 22db25b..3625a38 100644 --- a/doc/carabas/research.md +++ b/doc/carabas/research.md @@ -42,3 +42,7 @@ - https://aws.amazon.com/blogs/database/stream-changes-from-amazon-rds-for-postgresql-using-amazon-kinesis-data-streams-and-aws-lambda/ - https://github.com/eulerto/wal2json - https://docs.aws.amazon.com/AmazonRDS/latest/PostgreSQLReleaseNotes/postgresql-extensions.html#postgresql-extensions-15x + +## CDC +- https://debezium.io/documentation/reference/stable/postgres-plugins.html +- https://github.com/debezium/postgres-decoderbufs diff --git a/examples/aws/rds_postgresql_kinesis_lambda_oci_cratedb.py b/examples/aws/rds_postgresql_kinesis_lambda_oci_cratedb.py index d7a2992..8bfc026 100644 --- a/examples/aws/rds_postgresql_kinesis_lambda_oci_cratedb.py +++ b/examples/aws/rds_postgresql_kinesis_lambda_oci_cratedb.py @@ -1,6 +1,10 @@ import logging +import os +from pathlib import Path -from lorrystream.carabas.aws import RDSPostgreSQLDMSKinesisPipe +from commons_codec.model import ColumnType, ColumnTypeMapStore, TableAddress + +from lorrystream.carabas.aws import LambdaFactory, LambdaPythonImage, RDSPostgreSQLDMSKinesisPipe from lorrystream.util.common import setup_logging logger = logging.getLogger(__name__) @@ -25,14 +29,12 @@ def main(): """ # Build and publish OCI image that includes the AWS Lambda function. - """ python_image = LambdaPythonImage( name="cratedb-kinesis-lambda", entrypoint_file=Path("./lorrystream/process/kinesis_cratedb_lambda.py"), entrypoint_handler="kinesis_cratedb_lambda.handler", ) python_image.publish() - """ # Define an AWS CloudFormation software stack. stack = RDSPostgreSQLDMSKinesisPipe( @@ -42,23 +44,72 @@ def main(): description="RDS PostgreSQL > DMS -> Kinesis Stream -> Python Lambda via OCI -> CrateDB", db_username="dynapipe", db_password="secret11", # noqa: S106 - environment={ - "SINK_SQLALCHEMY_URL": "crate://admin:dZ..qB@example.eks1.eu-west-1.aws.cratedb.net:4200/?ssl=true", - "SINK_TABLE": "transactions", - }, ) - # Add components to the stack. - """ - stack.table().processor( - LambdaFactory( - name="DynamoDBCrateDBProcessor", + # Exclusively deploy the VPC elements of the stack. + # Do that on the first invocation, but nothing else. + # Warning: When doing it subsequently, it will currently delete the whole RDS substack. + # Warning: When doing it and directly proceed to RDS creation, it will fail: + # The specified VPC has no internet gateway attached. Update the VPC and then try again. + # TODO: Introduce a little CLI controller for invoking different deployment steps conveniently. + # TODO: Refactor by splitting into different stacks. + # stack.vpc().deploy(); return # noqa: ERA001 + + # Deploy the full RDS+DMS demo stack. + stack.vpc().database().stream().dms() + + # Define mapping rules for replication. + # https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Target.Kinesis.html + # https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.TableMapping.html + # TODO: Currently hard-coded to table `public.foo`. + map_to_kinesis = { + "rules": [ + { + "rule-type": "selection", + "rule-id": "1", + "rule-name": "DefaultInclude", + "rule-action": "include", + "object-locator": {"schema-name": "public", "table-name": "foo"}, + "filters": [], + }, + # Using the percent wildcard ("%") in "table-settings" rules is + # not supported for source databases as shown following. + # https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.TableMapping.SelectionTransformation.Tablesettings.html#CHAP_Tasks.CustomizingTasks.TableMapping.SelectionTransformation.Tablesettings.Wildcards + # Here: Exact schema and table required when using object mapping rule with '3.5' engine. + { + "rule-type": "object-mapping", + "rule-id": "2", + "rule-name": "DefaultMapToKinesis", + "rule-action": "map-record-to-record", + "object-locator": {"schema-name": "public", "table-name": "foo"}, + "filters": [], + }, + ] + } + + # Define column type mapping for CrateDB processor. + column_types = ColumnTypeMapStore().add( + table=TableAddress(schema="public", table="foo"), + column="attributes", + type_=ColumnType.MAP, + ) + + # Add a DMS replication pipeline element to the stack. + stack.replication(dms_table_mapping=map_to_kinesis) + + # Add custom processing components to the stack. + stack.processor( + factory=LambdaFactory( + name="DMSCrateDBProcessor", oci_uri=python_image.uri, handler=python_image.entrypoint_handler, - ) + ), + environment={ + "MESSAGE_FORMAT": "dms", + "COLUMN_TYPES": column_types.to_json(), + "SINK_SQLALCHEMY_URL": os.environ.get("SINK_SQLALCHEMY_URL", "crate://"), + }, ).connect() - """ - stack.vpc().database().stream().dms() # .table() # Deploy stack. stack.deploy() @@ -68,18 +119,18 @@ def main(): # TODO: Detect when changed. stack.deploy_processor_image() - PublicDbEndpoint = stack.get_output_value(stack._bsm, "PublicDbEndpoint") - PublicDbPort = stack.get_output_value(stack._bsm, "PublicDbPort") + database_host = stack.get_output_value(stack._bsm, "DatabaseHost") + database_port = stack.get_output_value(stack._bsm, "DatabasePort") psql_command = ( - f'psql "postgresql://{stack.db_username}:{stack.db_password}@{PublicDbEndpoint}:{PublicDbPort}/postgres"' + f'psql "postgresql://{stack.db_username}:{stack.db_password}@{database_host}:{database_port}/postgres"' ) print("Result of CloudFormation deployment:") - print(psql_command) + print("psql command:", psql_command) print("RDS Instance ARN:", stack.get_output_value(stack._bsm, "RDSInstanceArn")) print("Stream ARN:", stack.get_output_value(stack._bsm, "StreamArn")) - print("Replication ARN:", stack.get_output_value(stack._bsm, "ReplicationArn")) + print("Replication ARN:", stack.get_output_value(stack._bsm, "ReplicationTaskArn")) if __name__ == "__main__": diff --git a/lorrystream/carabas/aws/cf/dms_next.py b/lorrystream/carabas/aws/cf/dms_next.py index ed2829c..76b9e9e 100644 --- a/lorrystream/carabas/aws/cf/dms_next.py +++ b/lorrystream/carabas/aws/cf/dms_next.py @@ -3,7 +3,7 @@ import attr from cottonformation.core.constant import AttrMeta from cottonformation.core.model import TypeCheck, TypeHint, Resource, Tag, GetAtt, Property -from cottonformation.res.dms import ReplicationSubnetGroup, PropEndpointKinesisSettings +from cottonformation.res.dms import ReplicationSubnetGroup, PropEndpointKinesisSettings, ReplicationTask, ReplicationInstance from cottonformation.res.dms import Endpoint as EndpointVanilla diff --git a/lorrystream/carabas/aws/model.py b/lorrystream/carabas/aws/model.py index 179c43c..34b0904 100644 --- a/lorrystream/carabas/aws/model.py +++ b/lorrystream/carabas/aws/model.py @@ -80,7 +80,7 @@ def deploy(self, respawn: bool = False): logger.info("Deploying CloudFormation stack") parameters = self.parameters or [] - self.template.batch_tagging(dict(ProjectName=self.project, Stage=self.stage)) # noqa: C408 + self.template.batch_tagging(dict(ProjectName=self.project, Stage=self.stage), mode_overwrite=True) # noqa: C408 env = cf.Env(bsm=self._bsm) if respawn: @@ -93,9 +93,11 @@ def deploy(self, respawn: bool = False): include_iam=True, include_named_iam=True, verbose=True, - skip_prompt=True, + skip_prompt=False, # 300 seconds are not enough to wait for RDS PostgreSQL, for example. - timeout=500, + # 500 seconds are not enough for a complete stack including a DMS instance, for example. + # on 110 th attempt, elapsed 555 seconds, remain 445 seconds ... + timeout=750, ) return self @@ -158,4 +160,4 @@ def deploy_processor_image(self): @attr.s class KinesisProcessorStack(GenericProcessorStack): - _event_source: t.Optional[t.Union[kinesis.Stream]] = None + _stream_source: t.Union[kinesis.Stream, None] = None diff --git a/lorrystream/carabas/aws/stack/dms.py b/lorrystream/carabas/aws/stack/dms.py index a57957e..a566dea 100644 --- a/lorrystream/carabas/aws/stack/dms.py +++ b/lorrystream/carabas/aws/stack/dms.py @@ -3,7 +3,7 @@ import attr import cottonformation as cf -from cottonformation import ResourceGroup +from cottonformation import GetAtt from cottonformation.res import awslambda, ec2, iam, kinesis, rds from lorrystream.carabas.aws import LambdaFactory @@ -38,8 +38,6 @@ class RDSPostgreSQLDMSKinesisPipe(KinesisProcessorStack): db_username: str = attr.ib() db_password: str = attr.ib() - environment: t.Dict[str, str] = attr.ib(factory=dict) - _vpc: ec2.VPC = None _public_subnet1: ec2.Subnet = None _public_subnet2: ec2.Subnet = None @@ -47,10 +45,12 @@ class RDSPostgreSQLDMSKinesisPipe(KinesisProcessorStack): _db_security_group: ec2.SecurityGroup = None _db: rds.DBInstance = None - _stream: kinesis.Stream = None + + _dms_instance: dms.ReplicationInstance = None + _dms_kinesis_access_role: iam.Role = None def vpc(self): - group = ResourceGroup() + group = cf.ResourceGroup() self._vpc = ec2.VPC( "VPCInstance", @@ -95,8 +95,8 @@ def vpc(self): group.add(self._public_subnet1) group.add(self._public_subnet2) - # Cannot create a publicly accessible DBInstance. - # The specified VPC has no internet gateway attached. + # FIXME: Problem: Cannot create a publicly accessible DBInstance. + # The specified VPC has no internet gateway attached. gateway = ec2.InternetGateway( "VPCGateway", p_Tags=cf.Tag.make_many( @@ -151,7 +151,7 @@ def vpc(self): return self.add(group) def database(self): - group = ResourceGroup() + group = cf.ResourceGroup() # https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_VPC.WorkingWithRDSInstanceinaVPC.html self._db_subnet_group = rds.DBSubnetGroup( @@ -164,10 +164,11 @@ def database(self): ) group.add(self._db_subnet_group) + db_security_group_name = f"{self.env_name}-db-security-group" self._db_security_group = ec2.SecurityGroup( "RDSPostgreSQLSecurityGroup", rp_GroupDescription=f"DB security group for {self.env_name}", - p_GroupName=f"{self.env_name}-db-security-group", + p_GroupName=db_security_group_name, p_VpcId=self._vpc.ref(), p_SecurityGroupIngress=[ ec2.PropSecurityGroupIngress( @@ -195,7 +196,7 @@ def database(self): p_CidrIp="0.0.0.0/0", ) ], - p_Tags=cf.Tag.make_many(Name=cf.Sub.from_params(f"{self.env_name}-db-security-group")), + p_Tags=cf.Tag.make_many(Name=cf.Sub.from_params(db_security_group_name)), ra_DependsOn=[self._vpc], ) group.add(self._db_security_group) @@ -210,13 +211,14 @@ def database(self): # aws rds describe-db-parameters --db-parameter-group-name default.postgres15 p_Parameters={ "log_connections": True, + # List of allowable settings for the pgaudit.log parameter: + # none, all, ddl, function, misc, read, role, write # https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Appendix.PostgreSQL.CommonDBATasks.pgaudit.html - "pgaudit.log": "all", + "pgaudit.log": "none", "pgaudit.log_statement_once": True, # `rds.logical_replication is a cluster level setting, not db instance setting? # https://stackoverflow.com/a/66252465 "rds.logical_replication": True, - # TODO: wal2json? "shared_preload_libraries": "pgaudit,pglogical,pg_stat_statements", }, ) @@ -251,7 +253,7 @@ def database(self): ], # If there's no DB subnet group, then the DB instance isn't a VPC DB instance. p_DBSubnetGroupName=self._db_subnet_group.ref(), - p_EnableCloudwatchLogsExports=["postgresql", "upgrade"], + p_EnableCloudwatchLogsExports=["postgresql"], # p_DBName="testdrive", # noqa: ERA001 p_Tags=cf.Tag.make_many( Name=cf.Sub.from_params(f"{self.env_name}-db"), @@ -269,32 +271,32 @@ def database(self): group.add(rds_arn) public_endpoint = cf.Output( - "PublicDbEndpoint", + "DatabaseHost", Value=db.rv_EndpointAddress, ) group.add(public_endpoint) public_db_port = cf.Output( - "PublicDbPort", + "DatabasePort", Value=db.rv_EndpointPort, ) group.add(public_db_port) return self.add(group) def stream(self): - group = ResourceGroup() + group = cf.ResourceGroup() # https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Target.Kinesis.html#CHAP_Target.Kinesis.Prerequisites - self._stream = kinesis.Stream( + self._stream_source = kinesis.Stream( id="KinesisStream", p_Name=f"{self.env_name}-stream", p_StreamModeDetails={"rp_StreamMode": "ON_DEMAND"}, ) stream_arn = cf.Output( "StreamArn", - Value=self._stream.rv_Arn, + Value=self._stream_source.rv_Arn, ) - group.add(self._stream) + group.add(self._stream_source) group.add(stream_arn) return self.add(group) @@ -322,7 +324,7 @@ def dms(self): -- https://github.com/hashicorp/terraform-provider-aws/issues/19580 -- https://docs.aws.amazon.com/dms/latest/userguide/security-iam.html#CHAP_Security.APIRole """ - group = ResourceGroup() + group = cf.ResourceGroup() # Trust policy that is associated with upcoming roles. # Trust policies define which entities can assume the role. @@ -345,6 +347,7 @@ def dms(self): cf.helpers.iam.AwsManagedPolicy.AmazonDMSVPCManagementRole, ], ) + group.add(dms_vpc_role) dms_cloudwatch_role = iam.Role( id="DMSCloudWatchLogsRole", rp_AssumeRolePolicyDocument=trust_policy_dms, @@ -357,12 +360,11 @@ def dms(self): cf.helpers.iam.AwsManagedPolicy.AmazonDMSCloudWatchLogsRole, ], ) - group.add(dms_vpc_role) group.add(dms_cloudwatch_role) # Allow DMS accessing the data sink. In this case, Kinesis. # For Redshift, this role needs to be called `dms-access-for-endpoint`. - dms_target_access_role = iam.Role( + self._dms_kinesis_access_role = iam.Role( id="DMSTargetAccessRole", rp_AssumeRolePolicyDocument=trust_policy_dms, p_RoleName=cf.Sub("${EnvName}-dms-target-access-role", {"EnvName": self.param_env_name.ref()}), @@ -370,13 +372,12 @@ def dms(self): p_ManagedPolicyArns=[ cf.helpers.iam.AwsManagedPolicy.AmazonKinesisFullAccess, ], - ra_DependsOn=self._stream, + ra_DependsOn=self._stream_source, ) - group.add(dms_target_access_role) + group.add(self._dms_kinesis_access_role) # Create a replication subnet group given a list of the subnet IDs in a VPC. # https://docs.aws.amazon.com/dms/latest/APIReference/API_CreateReplicationSubnetGroup.html - # """ dms_replication_subnet_group = dms.ReplicationSubnetGroup( # type: ignore[call-arg,misc] "DMSReplicationSubnetGroup", rp_SubnetIds=[self._public_subnet1.ref(), self._public_subnet2.ref()], @@ -385,12 +386,12 @@ def dms(self): ra_DependsOn=[dms_vpc_role], ) group.add(dms_replication_subnet_group) - # """ + dms_security_group_name = f"{self.env_name}-dms-security-group" dms_security_group = ec2.SecurityGroup( "DMSSecurityGroup", rp_GroupDescription=f"DMS security group for {self.env_name}", - p_GroupName=f"{self.env_name}-dms-security-group", + p_GroupName=dms_security_group_name, p_VpcId=self._vpc.ref(), p_SecurityGroupIngress=[ ec2.PropSecurityGroupIngress( @@ -418,10 +419,34 @@ def dms(self): p_CidrIp="0.0.0.0/0", ) ], + p_Tags=cf.Tag.make_many(Name=cf.Sub.from_params(dms_security_group_name)), ra_DependsOn=[self._vpc, dms_replication_subnet_group], ) group.add(dms_security_group) + # The replication instance is the main workhorse. + self._dms_instance = dms.ReplicationInstance( + "DMSReplicationInstance", + rp_ReplicationInstanceClass="dms.t3.medium", + p_ReplicationInstanceIdentifier=f"{self.env_name}-dms-instance", + p_MultiAZ=False, + p_ReplicationSubnetGroupIdentifier=dms_replication_subnet_group.ref(), + p_VpcSecurityGroupIds=[dms_security_group.ref()], + p_EngineVersion="3.5.2", + p_AllocatedStorage=5, + p_PubliclyAccessible=True, + p_AutoMinorVersionUpgrade=False, + p_AllowMajorVersionUpgrade=False, + ra_DependsOn=[ + dms_vpc_role, + dms_cloudwatch_role, + dms_security_group, + dms_replication_subnet_group, + self._dms_kinesis_access_role, + ], + ) + group.add(self._dms_instance) + # Configuring VPC endpoints as AWS DMS source and target endpoints. # https://docs.aws.amazon.com/dms/latest/userguide/CHAP_VPC_Endpoints.html vpc_endpoint_stream = ec2.VPCEndpoint( @@ -429,10 +454,19 @@ def dms(self): rp_VpcId=self._vpc.ref(), rp_ServiceName=f"com.amazonaws.{self.region}.kinesis-streams", p_SubnetIds=[self._public_subnet1.ref(), self._public_subnet2.ref()], - p_SecurityGroupIds=[self._db_security_group.ref(), dms_security_group.ref()], + # TODO: Does it really need _both_ security groups? + p_SecurityGroupIds=[ + self._db_security_group.ref(), + dms_security_group.ref(), + ], p_VpcEndpointType="Interface", ) group.add(vpc_endpoint_stream) + return self.add(group) + + def replication(self, dms_table_mapping: t.Dict[str, t.Any]): + + group = cf.ResourceGroup() # https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Source.PostgreSQL.html#CHAP_Source.PostgreSQL.Advanced # https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Source.PostgreSQL.html#CHAP_Source.PostgreSQL.RDSPostgreSQL @@ -442,7 +476,7 @@ def dms(self): rp_EndpointType="source", rp_EngineName="postgres", p_ServerName=self._db.rv_EndpointAddress, - # NOTE: Needs to be integer! + # NOTE: Needs to be integer, so it requires a patched version of cottonformation's `dms` resource wrappers. p_Port=self._db.rv_EndpointPort, p_SslMode="require", p_Username=self.db_username, @@ -462,7 +496,7 @@ def dms(self): rp_EndpointType="target", rp_EngineName="kinesis", p_KinesisSettings=dms.PropEndpointKinesisSettings( - p_StreamArn=self._stream.rv_Arn, + p_StreamArn=self.stream_arn, p_MessageFormat="json-unformatted", p_IncludeControlDetails=True, p_IncludePartitionValue=True, @@ -471,42 +505,55 @@ def dms(self): p_IncludeTableAlterOperations=True, p_PartitionIncludeSchemaTable=True, # The parameter ServiceAccessRoleArn must be provided and must not be blank. - p_ServiceAccessRoleArn=dms_target_access_role.rv_Arn, + p_ServiceAccessRoleArn=self._dms_kinesis_access_role.rv_Arn, ), p_EndpointIdentifier=f"{self.env_name}-endpoint-target", - ra_DependsOn=[self._stream, dms_target_access_role, vpc_endpoint_stream], + ra_DependsOn=[self._stream_source, self._dms_kinesis_access_role], ) group.add(source_endpoint) group.add(target_endpoint) - # FIXME: Currently hard-coded to table `public.foo`. - # https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Target.Kinesis.html - map_to_kinesis = { - "rules": [ - { - "rule-type": "selection", - "rule-id": "1", - "rule-name": "DefaultInclude", - "rule-action": "include", - "object-locator": {"schema-name": "public", "table-name": "foo"}, - "filters": [], - }, - # Using the percent wildcard ("%") in "table-settings" rules is - # not supported for source databases as shown following. - # https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.TableMapping.SelectionTransformation.Tablesettings.html#CHAP_Tasks.CustomizingTasks.TableMapping.SelectionTransformation.Tablesettings.Wildcards - # Here: Exact schema and table required when using object mapping rule with '3.5' engine. - { - "rule-type": "object-mapping", - "rule-id": "2", - "rule-name": "DefaultMapToKinesis", - "rule-action": "map-record-to-record", - "object-locator": {"schema-name": "public", "table-name": "foo"}, - "filters": [], - }, - ] + replication_settings = { + # https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.TaskSettings.BeforeImage.html + "BeforeImageSettings": { + "EnableBeforeImage": True, + "FieldName": "before-image", + "ColumnFilter": "pk-only", + }, + # https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.TaskSettings.Logging.html + "Logging": { + "EnableLogging": True, + "EnableLogContext": True, + # ERROR: Feature is not accessible. + # TODO: "LogConfiguration": {"EnableTraceOnError": True}, + "LogComponents": [ + {"Id": "COMMON", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, + {"Id": "ADDONS", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, + {"Id": "DATA_STRUCTURE", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, + {"Id": "COMMUNICATION", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, + {"Id": "FILE_TRANSFER", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, + {"Id": "FILE_FACTORY", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, + {"Id": "METADATA_MANAGER", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, + {"Id": "IO", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, + {"Id": "PERFORMANCE", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, + {"Id": "SORTER", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, + {"Id": "SOURCE_CAPTURE", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, + {"Id": "SOURCE_UNLOAD", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, + {"Id": "TABLES_MANAGER", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, + {"Id": "TARGET_APPLY", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, + {"Id": "TARGET_LOAD", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, + {"Id": "TASK_MANAGER", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, + {"Id": "TRANSFORMATION", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, + {"Id": "REST_SERVER", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, + # Replication Settings document error: Unsupported keys were found: VALIDATOR + # {"Id": "VALIDATOR", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, # noqa: ERA001 + {"Id": "VALIDATOR_EXT", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, + ], + }, } - serverless_replication = dms.ReplicationConfig( # type: ignore[call-arg,misc] + """ + replication = dms.ReplicationConfig( # type: ignore[call-arg,misc] "DMSReplicationConfig", rp_ReplicationConfigIdentifier=f"{self.env_name}-dms-serverless", # p_ResourceIdentifier=f"{self.env_name}-dms-serverless-resource", # noqa: ERA001 @@ -521,44 +568,7 @@ def dms(self): p_VpcSecurityGroupIds=[self._db_security_group.ref(), dms_security_group.ref()], ), rp_TableMappings=map_to_kinesis, - p_ReplicationSettings={ - # https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.TaskSettings.BeforeImage.html - "BeforeImageSettings": { - "EnableBeforeImage": True, - "FieldName": "before-image", - "ColumnFilter": "pk-only", - }, - # https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.TaskSettings.Logging.html - "Logging": { - "EnableLogging": True, - "EnableLogContext": True, - # ERROR: Feature is not accessible. - # TODO: "LogConfiguration": {"EnableTraceOnError": True}, - "LogComponents": [ - {"Id": "COMMON", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, - {"Id": "ADDONS", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, - {"Id": "DATA_STRUCTURE", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, - {"Id": "COMMUNICATION", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, - {"Id": "FILE_TRANSFER", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, - {"Id": "FILE_FACTORY", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, - {"Id": "METADATA_MANAGER", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, - {"Id": "IO", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, - {"Id": "PERFORMANCE", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, - {"Id": "SORTER", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, - {"Id": "SOURCE_CAPTURE", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, - {"Id": "SOURCE_UNLOAD", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, - {"Id": "TABLES_MANAGER", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, - {"Id": "TARGET_APPLY", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, - {"Id": "TARGET_LOAD", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, - {"Id": "TASK_MANAGER", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, - {"Id": "TRANSFORMATION", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, - {"Id": "REST_SERVER", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, - # Replication Settings document error: Unsupported keys were found: VALIDATOR - # {"Id": "VALIDATOR", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, # noqa: ERA001 - {"Id": "VALIDATOR_EXT", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, - ], - }, - }, + p_ReplicationSettings=replication_settings, ra_DependsOn=[ dms_replication_subnet_group, dms_security_group, @@ -569,25 +579,59 @@ def dms(self): target_endpoint, ], ) - group.add(serverless_replication) + group.add(replication) + + replication_config_arn = cf.Output( + "ReplicationConfigArn", + Value=replication.rv_ReplicationConfigArn, + ) + group.add(replication_config_arn) + return self.add(group) + """ + + replication = dms.ReplicationTask( # type: ignore[call-arg,misc] + "DMSReplicationTask", + # TODO: Use existing replication instance on demand. + # FIXME: Make configurable. + rp_ReplicationInstanceArn=self._dms_instance.ref(), + p_ReplicationTaskIdentifier=f"{self.env_name}-dms-task", + # p_ResourceIdentifier=f"{self.env_name}-dms-serverless-resource", # noqa: ERA001 + rp_MigrationType="full-load-and-cdc", + rp_SourceEndpointArn=source_endpoint.ref(), + rp_TargetEndpointArn=target_endpoint.ref(), + # https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.TableMapping.SelectionTransformation.html + rp_TableMappings=json.dumps(dms_table_mapping), + # https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.TaskSettings.html + p_ReplicationTaskSettings=json.dumps(replication_settings), + ra_DependsOn=[ + self._dms_instance, + source_endpoint, + target_endpoint, + ], + ra_CreationPolicy="Retain", + ra_DeletionPolicy="Retain", + ) + group.add(replication) - replication_arn = cf.Output( - "ReplicationArn", - Value=serverless_replication.rv_ReplicationConfigArn, + replication_task_arn = cf.Output( + "ReplicationTaskArn", + Value=replication.ref(), ) - group.add(replication_arn) + group.add(replication_task_arn) return self.add(group) @property - def stream_arn(self): - return self._stream.rv_Arn + def stream_arn(self) -> GetAtt: + if self._stream_source is None: + raise ValueError("Kinesis Stream source not defined") + return self._stream_source.rv_Arn - def processor(self, proc: LambdaFactory): + def processor(self, factory: LambdaFactory, environment: t.Dict[str, str]): """ Manifest the main processor component of this pipeline. """ - self._processor = proc.make(self, environment=self.environment) + self._processor = factory.make(self, environment=environment) return self.add(self._processor.group) def connect(self): @@ -609,17 +653,17 @@ def connect(self): """ if not self._processor: raise RuntimeError("No processor defined") - if not self._event_source: - raise RuntimeError("No event source defined") + if not self._stream_source: + raise RuntimeError("No Kinesis stream defined") # Get a handle to the AWS Lambda for dependency management purposes. awsfunc = self._processor.function # Create a mapping and add it to the stack. mapping = awslambda.EventSourceMapping( - id="EventSourceToLambdaMapping", + id="KinesisToLambdaMapping", rp_FunctionName=awsfunc.p_FunctionName, - p_EventSourceArn=self._event_source.rv_Arn, + p_EventSourceArn=self._stream_source.rv_Arn, p_BatchSize=2500, # LATEST - Read only new records. # TRIM_HORIZON - Process all available records. diff --git a/lorrystream/process/kinesis_cratedb_lambda.py b/lorrystream/process/kinesis_cratedb_lambda.py index bd6fc53..f7658e5 100644 --- a/lorrystream/process/kinesis_cratedb_lambda.py +++ b/lorrystream/process/kinesis_cratedb_lambda.py @@ -1,17 +1,30 @@ -# Copyright (c) 2024 The Kotori developers and contributors. +# Copyright (c) 2024 The Panodata Developers and contributors. # Distributed under the terms of the Apache 2 license. """ -Consume an AWS Kinesis Stream and relay into CrateDB. +Using an AWS Lambda, consume an AWS Kinesis Stream of CDC data, and relay +into CrateDB, re-materializing the original information into an OBJECT +column `data`. + +Currently supported CDC message formats: + +- AWS DMS +- AWS DynamoDB + +Details: +When using `ON_ERROR = exit`, the processor uses Linux exit codes for +signalling error conditions, see https://stackoverflow.com/a/76187305. + +Resources: - https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis-example.html - https://docs.aws.amazon.com/lambda/latest/dg/python-logging.html - -In order to run, this module/program needs the following -3rd party libraries, defined using inline script metadata. """ +# In order to run, this module/program needs the following +# 3rd party libraries, defined using inline script metadata. +# # /// script # requires-python = ">=3.9" # dependencies = [ -# "commons-codec==0.0.2", +# "commons-codec==0.0.3", # "sqlalchemy-cratedb==0.38.0", # ] # /// @@ -20,36 +33,70 @@ import logging import os import sys -import typing as t import sqlalchemy as sa +from commons_codec.exception import UnknownOperationError +from commons_codec.model import ColumnTypeMapStore +from commons_codec.transform.aws_dms import DMSTranslatorCrateDB from commons_codec.transform.dynamodb import DynamoCDCTranslatorCrateDB from sqlalchemy.util import asbool -ON_ERROR_TYPE = t.Literal["exit", "ignore", "raise"] - LOG_LEVEL: str = os.environ.get("LOG_LEVEL", "INFO") USE_BATCH_PROCESSING: bool = asbool(os.environ.get("USE_BATCH_PROCESSING", "false")) -ON_ERROR: ON_ERROR_TYPE = t.cast(ON_ERROR_TYPE, os.environ.get("ON_ERROR", "exit")) +ON_ERROR: str = os.environ.get("ON_ERROR", "exit") SQL_ECHO: bool = asbool(os.environ.get("SQL_ECHO", "false")) + +MESSAGE_FORMAT: str = os.environ.get("MESSAGE_FORMAT", "unknown") +COLUMN_TYPES: str = os.environ.get("COLUMN_TYPES", "") SINK_SQLALCHEMY_URL: str = os.environ.get("SINK_SQLALCHEMY_URL", "crate://") SINK_TABLE: str = os.environ.get("SINK_TABLE", "default") logger = logging.getLogger(__name__) logger.setLevel(LOG_LEVEL) -engine = sa.create_engine(SINK_SQLALCHEMY_URL, echo=SQL_ECHO) + + +# Sanity checks. +# If any value is invalid, terminate by signalling "22 - Invalid argument". +error_strategies = ["exit", "ignore", "raise"] +message_formats = ["dms", "dynamodb"] +if ON_ERROR not in error_strategies: + message = f"Invalid value for ON_ERROR: {ON_ERROR}. Use one of: {error_strategies}" + logger.fatal(message) + sys.exit(22) +if MESSAGE_FORMAT not in message_formats: + message = f"Invalid value for MESSAGE_FORMAT: {MESSAGE_FORMAT}. Use one of: {message_formats}" + logger.fatal(message) + sys.exit(22) +try: + column_types = ColumnTypeMapStore.from_json(COLUMN_TYPES) +except Exception as ex: + message = f"Invalid value for COLUMN_TYPES: {COLUMN_TYPES}. Reason: {ex}. Use JSON." + logger.fatal(message) + sys.exit(22) # TODO: Automatically create destination table. -cdc = DynamoCDCTranslatorCrateDB(table_name=SINK_TABLE) +# TODO: Propagate mapping definitions and other settings. +if MESSAGE_FORMAT == "dms": + cdc = DMSTranslatorCrateDB(column_types=column_types) +elif MESSAGE_FORMAT == "dynamodb": + cdc = DynamoCDCTranslatorCrateDB(table_name=SINK_TABLE) # Create the database connection outside the handler to allow # connections to be re-used by subsequent function invocations. +# TODO: Examine long-running jobs about successful reconnection behavior. try: + engine = sa.create_engine(SINK_SQLALCHEMY_URL, echo=SQL_ECHO) connection = engine.connect() -except Exception: - logger.exception("Connection to sink database failed") - -logger.info("Connected to sink database") + logger.info(f"Connection to sink database succeeded: {SINK_SQLALCHEMY_URL}") +except Exception as ex: + logger.exception(f"Connection to sink database failed: {SINK_SQLALCHEMY_URL}") + if ON_ERROR == "exit": + # Signal "Resource temporarily unavailable" when connection to database fails. + sys.exit(11) + elif ON_ERROR == "ignore": + pass + elif ON_ERROR == "raise": + raise ex def handler(event, context): @@ -63,6 +110,7 @@ def handler(event, context): logger.debug("context: %s", context) for record in event["Records"]: + logger.debug(f"Record: {record}") event_id = record["eventID"] try: @@ -80,6 +128,9 @@ def handler(event, context): # Bookkeeping. cur_record_sequence_number = record["kinesis"]["sequenceNumber"] + except UnknownOperationError as ex: + logger.warning(f"Ignoring message. Reason: {ex}. Record: {ex.record}") + except Exception as ex: error_message = f"An error occurred processing event: {event_id}" logger.exception(error_message) @@ -87,13 +138,12 @@ def handler(event, context): # Return failed record's sequence number. return {"batchItemFailures": [{"itemIdentifier": cur_record_sequence_number}]} if ON_ERROR == "exit": - sys.exit(6) + # Signal "Input/output error" when error happens while processing data. + sys.exit(5) elif ON_ERROR == "ignore": pass elif ON_ERROR == "raise": raise ex - else: - raise ValueError(f"Invalid value for ON_ERROR: {ON_ERROR}") from ex logger.info(f"Successfully processed {len(event['Records'])} records") if USE_BATCH_PROCESSING: diff --git a/pyproject.toml b/pyproject.toml index 95c829f..c6007f4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -92,7 +92,7 @@ dependencies = [ "click<9", "colorama<1", "colorlog", - "commons-codec==0.0.2", + "commons-codec==0.0.3", "cottonformation<1.2", "dask", "funcy", diff --git a/tests/test_process.py b/tests/test_process.py index 4489384..5bda2e9 100644 --- a/tests/test_process.py +++ b/tests/test_process.py @@ -24,6 +24,7 @@ def test_kinesis_dynamodb_cratedb_lambda_basic(mocker, cratedb, reset_handler): # Configure. handler_environment = { + "MESSAGE_FORMAT": "dynamodb", "SINK_SQLALCHEMY_URL": cratedb.get_connection_url(), "SINK_TABLE": "testdrive-dynamodb-cdc", } @@ -59,6 +60,7 @@ def test_kinesis_dynamodb_cratedb_lambda_batch(mocker, cratedb, reset_handler): # Configure. handler_environment = { + "MESSAGE_FORMAT": "dynamodb", "SINK_SQLALCHEMY_URL": cratedb.get_connection_url(), "SINK_TABLE": "testdrive-dynamodb-cdc", "USE_BATCH_PROCESSING": "true",