Skip to content

Commit

Permalink
Carabas/DMS: Make it work
Browse files Browse the repository at this point in the history
- Use a real DMS replication instance
- Streamline configuration of DMS replication task
- Improve processor Lambda
  • Loading branch information
amotl committed Aug 6, 2024
1 parent cb17339 commit 6bfd268
Show file tree
Hide file tree
Showing 11 changed files with 495 additions and 154 deletions.
2 changes: 2 additions & 0 deletions doc/backlog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ Iteration 2
- [o] Examples: Add ``appsink`` example
- [o] Improve inline docs
- [o] Release 0.1.0
- [o] CSV: https://github.com/alan-turing-institute/CleverCSV
- [o] Excel & ODF: https://github.com/dimastbk/python-calamine


***********
Expand Down
2 changes: 2 additions & 0 deletions doc/carabas/backlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@
- [ ] Improve efficiency by using bulk operations when applicable
- [ ] is in UPDATE_ROLLBACK_COMPLETE_CLEANUP_IN_PROGRESS state and can not be updated
- [ ] is in ROLLBACK_COMPLETE state and can not be updated.
- [ ] Cannot create a publicly accessible DBInstance. The specified VPC has no
internet gateway attached.Update the VPC and then try again
184 changes: 184 additions & 0 deletions doc/carabas/dms/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
# Pipelines with AWS DMS

_AWS DMS to Kinesis to CrateDB._

## What's Inside
- [Using a PostgreSQL database as an AWS DMS source]
- [Using Amazon Kinesis Data Streams as a target for AWS Database Migration Service]
- Full load and CDC
- Source: RDS PostgreSQL
- Target: CrateDB Cloud


## Infrastructure Setup

### CrateDB Table
The destination table name in CrateDB, where the CDC record
processor will re-materialize CDC events into.
```shell
pip install crash
crash -c "CREATE TABLE public.foo (data OBJECT(DYNAMIC));"
```

### Deploy
The following walkthrough describes a full deployment of AWS DMS including relevant
outbound data processors for demonstration purposes. In order to run it in production,
you are welcome to derive from it and tweak it for your own purposes.

Configure CrateDB database sink address.
```shell
export SINK_SQLALCHEMY_URL='crate://admin:[email protected]:4200/?ssl=true'
```

Invoking the IaC driver program in order to deploy relevant resources on AWS
using CloudFormation is fundamental.
```shell
python examples/aws/rds_postgresql_kinesis_lambda_oci_cratedb.py
```

After deployment succeeded, you will be presented a corresponding
response including relevant information about entrypoints to the software
stack you've just created.
```text
Result of CloudFormation deployment:
psql command: psql "postgresql://dynapipe:secret11@testdrive-dms-postgresql-dev-db.czylftvqn1ed.eu-central-1.rds.amazonaws.com:5432/postgres"
RDS Instance ARN: arn:aws:rds:eu-central-1:831394476016:db:testdrive-dms-postgresql-dev-db
Stream ARN: arn:aws:kinesis:eu-central-1:831394476016:stream/testdrive-dms-postgresql-dev-stream
Replication ARN: arn:aws:dms:eu-central-1:831394476016:replication-config:EAM3JEHXGBGZBPN5PLON7NPDEE
```

### Status Checks

Display ARN of replication instances.
```shell
aws dms describe-replication-instances | jq -r '.ReplicationInstances[].ReplicationInstanceArn'
```

Display replication endpoints and relevant connection settings.
```shell
aws dms describe-endpoints
```

```shell
aws dms test-connection \
--replication-instance-arn arn:aws:dms:eu-central-1:831394476016:rep:JD2LL6OM35BJZNKZIRSOE2FXIY \
--endpoint-arn arn:aws:dms:eu-central-1:831394476016:endpoint:3IVDGL6E4RDNBF2LFBYF6DYV3Y

aws dms describe-connections
```


## Usage

### Prerequisites
First of all, activate the `pglocical` extension on your RDS PostgreSQL instance.
```sql
CREATE EXTENSION pglogical;
SELECT * FROM pg_catalog.pg_extension WHERE extname='pglogical';
```

### Data in Source
After that, connect to RDS PostgreSQL, and provision a little bunch of data.
```sql
DROP TABLE IF EXISTS foo CASCADE;
CREATE TABLE foo (id INT PRIMARY KEY, name TEXT, age INT, attributes JSONB);
INSERT INTO foo (id, name, age, attributes) VALUES (42, 'John', 30, '{"foo": "bar"}');
INSERT INTO foo (id, name, age, attributes) VALUES (43, 'Jane', 31, '{"baz": "qux"}');
```

### Data in Target
```sql
cr> SELECT * FROM public.foo;
```
```postgresql
+---------------------------------------------------------------------+
| data |
+---------------------------------------------------------------------+
| {"age": 30, "attributes": {"foo": "bar"}, "id": 42, "name": "John"} |
| {"age": 31, "attributes": {"baz": "qux"}, "id": 43, "name": "Jane"} |
+---------------------------------------------------------------------+
```

### Operations
Enumerate all configured replication tasks with compact output.
```shell
aws dms describe-replication-tasks | \
jq '.ReplicationTasks[] | {ReplicationTaskIdentifier, ReplicationTaskArn, MigrationType, StartReplicationType, Status, StopReason, FailureMessages, ProvisionData}'
```
Start replication task with given ARN.
```shell
aws dms start-replication-task \
--start-replication-task-type start-replication --replication-task-arn \
arn:aws:dms:eu-central-1:831394476016:task:7QBLNBTPCNDEBG7CHI3WA73YFA
```
Stop replication task with given ARN.
```shell
aws dms stop-replication-task --replication-task-arn \
arn:aws:dms:eu-central-1:831394476016:task:7QBLNBTPCNDEBG7CHI3WA73YFA
```


### Logging

To see detailed progress about the replication process, use CloudWatch to
inspect corresponding log output.

Enumerate all log groups.
```shell
aws logs describe-log-groups
```

Get log output history.
```shell
aws logs get-log-events \
--log-group-name dms-tasks-testdrive-dms-instance \
--log-stream-name dms-task-7QBLNBTPCNDEBG7CHI3WA73YFA | jq .events[].message
```

Start watching the log output using the `start-live-tail` CloudWatch operation.
```shell
aws logs start-live-tail --log-group-identifiers \
arn:aws:logs:eu-central-1:831394476016:log-group:/aws/rds/instance/testdrive-dms-postgresql-dev-db/postgresql \
arn:aws:logs:eu-central-1:831394476016:log-group:dms-tasks-testdrive-dms-instance
```


## Appendix

### CloudFormation

```shell
aws cloudformation continue-update-rollback --stack-name testdrive-dms-postgresql-dev
aws cloudformation delete-stack --stack-name testdrive-dms-postgresql-dev
```

```sql
SHOW shared_preload_libraries;
SELECT name, setting FROM pg_settings WHERE name in ('rds.logical_replication','shared_preload_libraries');
```

- https://docs.aws.amazon.com/dms/latest/APIReference/API_StartReplication.html#DMS-StartReplication-request-StartReplicationType
- https://docs.aws.amazon.com/cli/latest/reference/dms/start-replication-task.html

Possible values for `--start-replication-type`:

- start-replication
- resume-processing
- reload-target

```sql
update foo set age=32 where name='Jane';
update foo set age=33 where id=43;
update foo set age=33 where attributes->>'foo'='bar';
update foo set attributes = jsonb_set(attributes, '{last_name}', '"Doe"', true) where name='John';
```
```sql
delete from foo where name='Jane';
delete from foo where name='John';
```


[AWS::DMS::ReplicationConfig]: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-dms-replicationconfig.html
[Using a PostgreSQL database as an AWS DMS source]: https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Source.PostgreSQL.html
[Using Amazon Kinesis Data Streams as a target for AWS Database Migration Service]: https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Target.Kinesis.html
[Using object mapping to migrate data to a Kinesis data stream]: https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Target.Kinesis.html#CHAP_Target.Kinesis.ObjectMapping
4 changes: 4 additions & 0 deletions doc/carabas/research.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,7 @@
- https://aws.amazon.com/blogs/database/stream-changes-from-amazon-rds-for-postgresql-using-amazon-kinesis-data-streams-and-aws-lambda/
- https://github.com/eulerto/wal2json
- https://docs.aws.amazon.com/AmazonRDS/latest/PostgreSQLReleaseNotes/postgresql-extensions.html#postgresql-extensions-15x

## CDC
- https://debezium.io/documentation/reference/stable/postgres-plugins.html
- https://github.com/debezium/postgres-decoderbufs
91 changes: 71 additions & 20 deletions examples/aws/rds_postgresql_kinesis_lambda_oci_cratedb.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import logging
import os
from pathlib import Path

from lorrystream.carabas.aws import RDSPostgreSQLDMSKinesisPipe
from commons_codec.model import ColumnType, ColumnTypeMapStore, TableAddress

from lorrystream.carabas.aws import LambdaFactory, LambdaPythonImage, RDSPostgreSQLDMSKinesisPipe
from lorrystream.util.common import setup_logging

logger = logging.getLogger(__name__)
Expand All @@ -25,14 +29,12 @@ def main():
"""

# Build and publish OCI image that includes the AWS Lambda function.
"""
python_image = LambdaPythonImage(
name="cratedb-kinesis-lambda",
entrypoint_file=Path("./lorrystream/process/kinesis_cratedb_lambda.py"),
entrypoint_handler="kinesis_cratedb_lambda.handler",
)
python_image.publish()
"""

# Define an AWS CloudFormation software stack.
stack = RDSPostgreSQLDMSKinesisPipe(
Expand All @@ -42,23 +44,72 @@ def main():
description="RDS PostgreSQL > DMS -> Kinesis Stream -> Python Lambda via OCI -> CrateDB",
db_username="dynapipe",
db_password="secret11", # noqa: S106
environment={
"SINK_SQLALCHEMY_URL": "crate://admin:[email protected]:4200/?ssl=true",
"SINK_TABLE": "transactions",
},
)

# Add components to the stack.
"""
stack.table().processor(
LambdaFactory(
name="DynamoDBCrateDBProcessor",
# Exclusively deploy the VPC elements of the stack.
# Do that on the first invocation, but nothing else.
# Warning: When doing it subsequently, it will currently delete the whole RDS substack.
# Warning: When doing it and directly proceed to RDS creation, it will fail:
# The specified VPC has no internet gateway attached. Update the VPC and then try again.
# TODO: Introduce a little CLI controller for invoking different deployment steps conveniently.
# TODO: Refactor by splitting into different stacks.
# stack.vpc().deploy(); return # noqa: ERA001

# Deploy the full RDS+DMS demo stack.
stack.vpc().database().stream().dms()

# Define mapping rules for replication.
# https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Target.Kinesis.html
# https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.TableMapping.html
# TODO: Currently hard-coded to table `public.foo`.
map_to_kinesis = {
"rules": [
{
"rule-type": "selection",
"rule-id": "1",
"rule-name": "DefaultInclude",
"rule-action": "include",
"object-locator": {"schema-name": "public", "table-name": "foo"},
"filters": [],
},
# Using the percent wildcard ("%") in "table-settings" rules is
# not supported for source databases as shown following.
# https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.TableMapping.SelectionTransformation.Tablesettings.html#CHAP_Tasks.CustomizingTasks.TableMapping.SelectionTransformation.Tablesettings.Wildcards
# Here: Exact schema and table required when using object mapping rule with '3.5' engine.
{
"rule-type": "object-mapping",
"rule-id": "2",
"rule-name": "DefaultMapToKinesis",
"rule-action": "map-record-to-record",
"object-locator": {"schema-name": "public", "table-name": "foo"},
"filters": [],
},
]
}

# Define column type mapping for CrateDB processor.
column_types = ColumnTypeMapStore().add(
table=TableAddress(schema="public", table="foo"),
column="attributes",
type_=ColumnType.MAP,
)

# Add a DMS replication pipeline element to the stack.
stack.replication(dms_table_mapping=map_to_kinesis)

# Add custom processing components to the stack.
stack.processor(
factory=LambdaFactory(
name="DMSCrateDBProcessor",
oci_uri=python_image.uri,
handler=python_image.entrypoint_handler,
)
),
environment={
"MESSAGE_FORMAT": "dms",
"COLUMN_TYPES": column_types.to_json(),
"SINK_SQLALCHEMY_URL": os.environ.get("SINK_SQLALCHEMY_URL", "crate://"),
},
).connect()
"""
stack.vpc().database().stream().dms() # .table()

# Deploy stack.
stack.deploy()
Expand All @@ -68,18 +119,18 @@ def main():
# TODO: Detect when changed.
stack.deploy_processor_image()

PublicDbEndpoint = stack.get_output_value(stack._bsm, "PublicDbEndpoint")
PublicDbPort = stack.get_output_value(stack._bsm, "PublicDbPort")
database_host = stack.get_output_value(stack._bsm, "DatabaseHost")
database_port = stack.get_output_value(stack._bsm, "DatabasePort")
psql_command = (
f'psql "postgresql://{stack.db_username}:{stack.db_password}@{PublicDbEndpoint}:{PublicDbPort}/postgres"'
f'psql "postgresql://{stack.db_username}:{stack.db_password}@{database_host}:{database_port}/postgres"'
)

print("Result of CloudFormation deployment:")
print(psql_command)
print("psql command:", psql_command)

print("RDS Instance ARN:", stack.get_output_value(stack._bsm, "RDSInstanceArn"))
print("Stream ARN:", stack.get_output_value(stack._bsm, "StreamArn"))
print("Replication ARN:", stack.get_output_value(stack._bsm, "ReplicationArn"))
print("Replication ARN:", stack.get_output_value(stack._bsm, "ReplicationTaskArn"))


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion lorrystream/carabas/aws/cf/dms_next.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import attr
from cottonformation.core.constant import AttrMeta
from cottonformation.core.model import TypeCheck, TypeHint, Resource, Tag, GetAtt, Property
from cottonformation.res.dms import ReplicationSubnetGroup, PropEndpointKinesisSettings
from cottonformation.res.dms import ReplicationSubnetGroup, PropEndpointKinesisSettings, ReplicationTask, ReplicationInstance
from cottonformation.res.dms import Endpoint as EndpointVanilla


Expand Down
10 changes: 6 additions & 4 deletions lorrystream/carabas/aws/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def deploy(self, respawn: bool = False):
logger.info("Deploying CloudFormation stack")
parameters = self.parameters or []

self.template.batch_tagging(dict(ProjectName=self.project, Stage=self.stage)) # noqa: C408
self.template.batch_tagging(dict(ProjectName=self.project, Stage=self.stage), mode_overwrite=True) # noqa: C408

env = cf.Env(bsm=self._bsm)
if respawn:
Expand All @@ -93,9 +93,11 @@ def deploy(self, respawn: bool = False):
include_iam=True,
include_named_iam=True,
verbose=True,
skip_prompt=True,
skip_prompt=False,
# 300 seconds are not enough to wait for RDS PostgreSQL, for example.
timeout=500,
# 500 seconds are not enough for a complete stack including a DMS instance, for example.
# on 110 th attempt, elapsed 555 seconds, remain 445 seconds ...
timeout=750,
)
return self

Expand Down Expand Up @@ -158,4 +160,4 @@ def deploy_processor_image(self):
@attr.s
class KinesisProcessorStack(GenericProcessorStack):

_event_source: t.Optional[t.Union[kinesis.Stream]] = None
_stream_source: t.Union[kinesis.Stream, None] = None
Loading

0 comments on commit 6bfd268

Please sign in to comment.