This repository is packed with concepts, drawings, slides, and code related to a working "playground" building DDD Aggregates via a CDC-CQRS data stream.
The Presentation Slides outline a solution journey from story (requirements) to solution (architecture & design) to code. The goal of this demo is to give context to code and provide a pragmatic approach for designing sofware using DDD as a key driver.
This demo highlights a CDC-CQRS pipeline between a normalized relational database, MySQL, as the command database and a de-normalized NoSQL database, MongoDB, as the query database resulting in the creation of DDD Aggregates via Debezium & Kafka-Streams.
The Snacks Unlimited "Store" source code is centered around three microservices:
- snack-order-commands
- snack-order-processor
- snack-customer-orders.
These services are implemented as Spring-Boot applications in Java.
The snack-order-commands
exposes API REST endpoints which persist item-details, shipping-info, and payment in their respective tables on MySQL database.
Debezium tails the MySQL bin logs to capture any events in both these tables and publishes messages to Kafka topics.
These topics are consumed by snack-order-processor
which is a Kafka-Streams application that joins data from these topics to create an Order-Aggregate (CustomerOrder)
object which is then published to a customer-order-aggregate
topic.
This topic is consumed by MongoDB Sink Connector and the data is persisted in MongoDB which is served by snack-customer-orders
service.
- Source Material
- Telling the Story - Methods
- Solutioning
- Development
Snacks Unlimited Order Management System.
Below is a break down of a fictitious but relatable problem and the solution journey to building a scalable, flexible system using CQRS with CDC and Kadka.
The "platform" consists of:
- Confluent Platform (Docker) - Kafka, Kafka Connect
- MySQL and "Adminer" (manage MySQL via browser)
- MongoDB and Mongo Express (manage Mongo via browser)
Start and Stop the Platform
$ ./platform kafka|redpanda start
$ ./platform kafka|redpanda stop
This will start the following Spring Boot applications:
- snack-order-commands (Write / MySQL)
- snack-customer-orders (Read / MongoDB)
- snack-order-processor (Anti-corruption / Kafka Streams)
NOTE: Ensure the docker images for the apps have been build by running the following in each service project.
$ ./gradlew clean docker
Start & Stop the Apps
$ ./apps start
$ ./apps stop
- In Postman run the POST Create MySQL CDC Connector Request
- In Postman run the POST Create MongoDB Sink Connector Request
See Snacks Unlimited - Postman Collection.
--- DEMO START ---
-
1 - Create Kafka Connectors
-
2 - Create Customer Order (Snack Customer)
- POST Item Details
- POST Shipping Location
- POST Payment
-
3 - Fulfill Orders (Sam the Wizard)
- GET PAID Orders
- PUT Order Fulfillment
-
4 - Ship Orders (Minions Delivery Team)
- GET FULFILLED Orders
- PUT Order Shipment
--- DEMO COMPLETE ---
- List Kafka Topics
- Consume Kafka Topic Records
- MySQL Database
- MongoDB Database
This is a transactional service that triggers the overall CQRS workflow by initiating order commands:
- POST order items (api/item)
- POST shipping details (api/shipping)
- POST payment details (api/payment)
$ spring init -d=web,jpa,mysql --build=gradle --type=gradle-project-kotlin snack-order-commands
Create the base packages:
- api.rest
- data
- exception
- service
- see src/main/java/api/rest/OrderWriteController.java
This service consumes from the CDC Topics (triggered from order-write-service) and joins them into an Order Aggregrate for reading.
$ spring init -d=web --build=gradle --type=gradle-project-kotlin snack-order-processor
Add in the Kafka dependencies in build.gradle.kts:
implementation("org.springframework.kafka:spring-kafka")
implementation("org.apache.kafka:kafka-streams")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.springframework.kafka:spring-kafka-test")
Create the base packages:
- config (Kafka Configuration)
- model.domain
- model.inbound
- stream (Kafka Stream topology and Serdes configuration)
- Add the following line to OrderAggregateStream:
See TopologyController class
- (OPTIONAL) Add this to application.yml to enable actuator endpoints.
management:
endpoints:å
web:
exposure:
include: "*"
-
From Browser: http://locahost:8801/api/topology
-
Paste Input into https://zz85.github.io/kafka-streams-viz/
-
View Topology
This is a query service that reads from a MongoDB datastore and represents the read segregated responsibilities of CQRS.
Query commands:
- GET api/orders/{orderId}
$ spring init -d=web,data-mongodb --build=gradle --type=gradle-project-kotlin snack-customer-orders
Create the base packages:
- api.rest
- api.model
- data.repository
- service
- see src/main/java/api/rest/OrderWriteController.java
With the Platform Running (See Above).
POST http://localhost:8083/connectors
--
{
"name": "order-command-db-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"topic.prefix": "order-command-server",
"database.hostname": "mysql_db_server",
"database.port": "3306",
"database.user": "order-command-user",
"database.password": "password",
"database.server.id": "142401",
"database.server.name": "order-command-server",
"database.whitelist": "order-command-db",
"table.whitelist": "order-command-db.payment, order-command-db.shipping_location,order-command-db.item_detail",
"schema.history.internal.kafka.bootstrap.servers": "broker:29092",
"schema.history.internal.kafka.topic": "dbhistory.order-command-db",
"include.schema.changes": "true",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}
}
With the Platform Running (See Above).
POST http://localhost:8083/connectors
--
{
"name": "order-app-mongo-sink-connector",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics": "customer-order-aggregate",
"connection.uri": "mongodb://mongo-user:password@mongodb_server:27017",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"database": "customer_order_db",
"collection": "customerOrder",
"document.id.strategy.overwrite.existing": "true",
"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy",
"transforms": "hk,hv",
"transforms.hk.type": "org.apache.kafka.connect.transforms.HoistField$Key",
"transforms.hk.field": "_id",
"transforms.hv.type": "org.apache.kafka.connect.transforms.HoistField$Value",
"transforms.hv.field": "customerOrder"
}
}
Iorg.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'dataSourceScriptDatabaseInitializer'
This is due to the JPA Data Source not being configured correctly. This can be fixed in the following way:
- Add the data source configuration to application.yml.
spring:
datasource:
url: jdbc:mysql://localhost:13306/order-command-db
username: order-command-user
password: password
driver-class-name: com.mysql.cj.jdbc.Driver
jpa:
generate-ddl: true
show-sql: true
hibernate:
ddl-auto: update
show-sql: true
properties:
hibernate:
format_sql: true
dialect: org.hibernate.dialect.MySQLDialect
Add this to your build.gradle.kts:
testImplementation("com.h2database:h2")
This annotation starts up an in-memory database to run the JPA repository tests.
In application.yml you do not need this:
# dialect: org.hibernate.dialect.MySQLDialect
Debezium MySQLConnector Error: Access denied; you need the SUPER, RELOAD, or FLUSH_TABLES privilege for this operation
Need to grant privileges for the Debezium MySQLConnector
GRANT ALL PRIVILEGES ON *.* TO 'order-command-user';
Item | Discovery Notes |
---|---|
Using @DataJpaTest for JPA Repository Tests | This annotation bootstraps the test with an in-memory H2 SQL database. |
Leverage Spring profiles for ./gradlew bootRun, ./apps start (local-docker), local-embedded for testing | See _src/main/resources/application.yml |
Access privileges for CDC Connector | Check out ./docker/mysql/init/init-db.sql to grant privileges to the order-command-user. |