Skip to content

Commit

Permalink
Domain events vs infrastructure events
Browse files Browse the repository at this point in the history
Reimplemented the Payments::Order aggregate to use domain events which
are simple Data structures instead of infrastructure events carrying all
the information like metadata and data containing primitve types.

It required additional work on repository to map between domain and
infra events when loading and storing the aggregate.
  • Loading branch information
fidel committed Oct 25, 2023
1 parent 07b5a38 commit c0b859f
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 59 deletions.
1 change: 1 addition & 0 deletions .mutant.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ environment_variables:
matcher:
subjects:
- Payments::Order
- Payments::OrderRepository
- Payments::Amount
14 changes: 12 additions & 2 deletions app/domains/payments.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
module Payments
PaymentRequested = Class.new(RailsEventStore::Event)
PaymentRegistered = Class.new(RailsEventStore::Event)
PaymentRequested =
Data.define(:order_id, :amount) do
def event_type
self.class.name
end
end
PaymentRegistered =
Data.define(:order_id, :amount) do
def event_type
self.class.name
end
end

Amount =
Data.define(:value, :currency) do
Expand Down
12 changes: 6 additions & 6 deletions app/domains/payments/order.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@ def initialize(order_id)
@total_amount = NOT_SET
end

private attr_reader :order_id
attr_reader :order_id
private attr_accessor :total_amount, :paid_amount

def request_payment(amount)
apply(PaymentRequested.new(data: { order_id: order_id, amount: amount.value, currency: amount.currency }))
apply(PaymentRequested.new(order_id: order_id, amount: amount))
end

def register_payment(amount)
raise PaymentNotRequestedYet if total_amount == NOT_SET

apply(PaymentRegistered.new(data: { order_id: order_id, amount: amount.value, currency: amount.currency }))
apply(PaymentRegistered.new(order_id: order_id, amount: amount))
end

def paid?
Expand All @@ -30,12 +30,12 @@ def paid?
end

on PaymentRequested do |event|
self.total_amount = Amount.new(value: event.data.fetch(:amount), currency: event.data.fetch(:currency))
self.paid_amount = Amount.new(value: 0, currency: event.data.fetch(:currency))
self.total_amount = event.amount
self.paid_amount = Amount.new(value: 0, currency: event.amount.currency)
end

on PaymentRegistered do |event|
self.paid_amount += Amount.new(value: event.data.fetch(:amount), currency: event.data.fetch(:currency))
self.paid_amount += event.amount
end
end
end
60 changes: 55 additions & 5 deletions app/domains/payments/order_repository.rb
Original file line number Diff line number Diff line change
@@ -1,19 +1,69 @@
module Payments
class OrderRepository
def initialize(event_store = Rails.configuration.event_store)
@repository = AggregateRoot::Repository.new(event_store)
@event_store = event_store
end

private attr_reader :repository
private attr_reader :event_store

def with_order(order_id, &block)
repository.with_aggregate(Order.new(order_id), stream_name(order_id), &block)
def load(order)
event_store
.read
.stream(stream_name(order.order_id))
.reduce { |_, event| order.apply(infra_to_domain_mapper(event)) }
order.version = order.unpublished_events.count - 1
order
end

def store(order)
event_store.publish(
order.unpublished_events.map(&method(:domain_to_infra_mapper)),
stream_name: stream_name(order.order_id),
expected_version: order.version,
)
order.version = order.version + order.unpublished_events.count
end

private

def stream_name(order_id)
"Payments::Order$#{order_id}"
end

def infra_to_domain_mapper(event)
{
"Infra::Payments::PaymentRequested" =>
PaymentRequested.new(
order_id: event.data.fetch(:order_id),
amount: Amount.new(value: event.data.fetch(:amount_value), currency: event.data.fetch(:amount_currency)),
),
"Infra::Payments::PaymentRegistered" =>
PaymentRegistered.new(
order_id: event.data.fetch(:order_id),
amount: Amount.new(value: event.data.fetch(:amount_value), currency: event.data.fetch(:amount_currency)),
),
}.fetch(event.event_type)
end

def domain_to_infra_mapper(event)
{
"Payments::PaymentRequested" =>
Infra::Payments::PaymentRequested.new(
data: {
order_id: event.order_id,
amount_value: event.amount.value,
amount_currency: event.amount.currency,
},
),
"Payments::PaymentRegistered" =>
Infra::Payments::PaymentRegistered.new(
data: {
order_id: event.order_id,
amount_value: event.amount.value,
amount_currency: event.amount.currency,
},
),
}.fetch(event.event_type)
end
end
end
end
6 changes: 6 additions & 0 deletions lib/infra/payments.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
module Infra
module Payments
PaymentRequested = Class.new(RailsEventStore::Event)
PaymentRegistered = Class.new(RailsEventStore::Event)
end
end
104 changes: 104 additions & 0 deletions test/domains/payments/order_repository_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
require "test_helper"
require "minitest/mock"

module Payments
class OrderRepositoryTest < ActiveSupport::TestCase
cover OrderRepository

test "infra events are translated to domain events and applied on load" do
event_store.publish(
[
Infra::Payments::PaymentRequested.new(
data: {
order_id: "order-1",
amount_value: 100,
amount_currency: "EUR",
},
),
Infra::Payments::PaymentRegistered.new(
data: {
order_id: "order-1",
amount_value: 100,
amount_currency: "EUR",
},
),
],
stream_name: "Payments::Order$order-1",
)

order = Minitest::Mock.new(Order.new("order-1"))
order.expect(:apply, nil, [PaymentRequested.new(order_id: "order-1", amount: Amount.new(100, "EUR"))])
order.expect(:apply, nil, [PaymentRegistered.new(order_id: "order-1", amount: Amount.new(100, "EUR"))])

OrderRepository.new.load(order)

order.verify
end

test "no unpublished events on fresh aggregate instance" do
refute OrderRepository.new.load(Order.new("order-1")).unpublished_events.any?
end

test "aggregate version is set accordingly" do
event_store.publish(
Infra::Payments::PaymentRequested.new(data: { order_id: "order-1", amount_value: 100, amount_currency: "EUR" }),
stream_name: "Payments::Order$order-1",
)
order = OrderRepository.new.load(Order.new("order-1"))

assert_equal(0, order.version)
end

test "aggregate version is set for given instance" do
event_store.publish(
Infra::Payments::PaymentRequested.new(data: { order_id: "order-1", amount_value: 100, amount_currency: "EUR" }),
stream_name: "Payments::Order$order-1",
)
event_store.publish(
Infra::Payments::PaymentRequested.new(data: { order_id: "order-1", amount_value: 100, amount_currency: "EUR" }),
stream_name: "dummy",
)
order = OrderRepository.new.load(Order.new("order-1"))

assert_equal(0, order.version)
end

test "store translates domain events to infra ones" do
event_store = Minitest::Mock.new
event_store.expect(:publish, nil) do |events, stream_name:, expected_version:|
assert_equal_event(
Infra::Payments::PaymentRequested.new(
data: {
order_id: "order-1",
amount_value: 100,
amount_currency: "EUR",
},
),
events[0],
)
assert_equal_event(
Infra::Payments::PaymentRegistered.new(
data: {
order_id: "order-1",
amount_value: 100,
amount_currency: "EUR",
},
),
events[1],
)
assert_equal("Payments::Order$order-1", stream_name)
assert_equal(-1, expected_version)
end

order = Order.new("order-1")
order.apply(PaymentRequested.new(order_id: "order-1", amount: Amount.new(100, "EUR")))
order.apply(PaymentRegistered.new(order_id: "order-1", amount: Amount.new(100, "EUR")))

OrderRepository.new(event_store).store(order)

event_store.verify

assert_equal(1, order.version)
end
end
end
75 changes: 29 additions & 46 deletions test/domains/payments/order_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,82 +5,65 @@ class OrderTest < ActiveSupport::TestCase
cover Order

test "request payment" do
repository.with_order("order-123") { |order| order.request_payment(Amount.new(300, "EUR")) }

assert_expected_events_in_stream(
event_store,
[PaymentRequested.new(data: { order_id: "order-123", amount: 300, currency: "EUR" })],
"Payments::Order$order-123",
assert_equal(
[PaymentRequested.new(order_id: "order-123", amount: Amount.new(300, "EUR"))],
Order.new("order-123").request_payment(Amount.new(300, "EUR")),
)
end

test "register payment" do
repository.with_order("order-123") do |order|
order.request_payment(Amount.new(300, "EUR"))
order.register_payment(Amount.new(300, "EUR"))
end
order = Order.new("order-123")
order.request_payment(Amount.new(300, "EUR"))
order.register_payment(Amount.new(300, "EUR"))

assert_expected_events_in_stream(
event_store,
assert_equal(
[
PaymentRequested.new(data: { order_id: "order-123", amount: 300, currency: "EUR" }),
PaymentRegistered.new(data: { order_id: "order-123", amount: 300, currency: "EUR" }),
PaymentRequested.new(order_id: "order-123", amount: Amount.new(300, "EUR")),
PaymentRegistered.new(order_id: "order-123", amount: Amount.new(300, "EUR")),
],
"Payments::Order$order-123",
order.unpublished_events.to_a,
)
end

test "full amount paid" do
repository.with_order("order-123") do |order|
order.request_payment(Amount.new(300, "EUR"))
order.register_payment(Amount.new(300, "EUR"))
order = Order.new("order-123")
order.request_payment(Amount.new(300, "EUR"))
order.register_payment(Amount.new(300, "EUR"))

assert order.paid?
end
assert order.paid?
end

test "full amount paid with multiple payments" do
repository.with_order("order-123") do |order|
order.request_payment(Amount.new(300, "EUR"))
order.register_payment(Amount.new(100, "EUR"))
order.register_payment(Amount.new(200, "EUR"))
order = Order.new("order-123")
order.request_payment(Amount.new(300, "EUR"))
order.register_payment(Amount.new(100, "EUR"))
order.register_payment(Amount.new(200, "EUR"))

assert order.paid?
end
assert order.paid?
end

test "too much paid" do
repository.with_order("order-123") do |order|
order.request_payment(Amount.new(300, "EUR"))
order.register_payment(Amount.new(301, "EUR"))
order = Order.new("order-123")
order.request_payment(Amount.new(300, "EUR"))
order.register_payment(Amount.new(301, "EUR"))

assert order.paid?
end
assert order.paid?
end

test "too little paid" do
repository.with_order("order-123") do |order|
order.request_payment(Amount.new(300, "EUR"))
order.register_payment(Amount.new(299, "EUR"))
order = Order.new("order-123")
order.request_payment(Amount.new(300, "EUR"))
order.register_payment(Amount.new(299, "EUR"))

refute order.paid?
end
refute order.paid?
end

test "payment not yet requested" do
repository.with_order("order-123") { |order| refute order.paid? }
refute Order.new("order-123").paid?
end

test "payment not yet requested but paid" do
repository.with_order("order-123") do |order|
assert_raises(Order::PaymentNotRequestedYet) { order.register_payment(Amount.new(300, "EUR")) }
end
end

private

def repository
OrderRepository.new(event_store)
assert_raises(Order::PaymentNotRequestedYet) { Order.new("order-123").register_payment(Amount.new(300, "EUR")) }
end
end
end

0 comments on commit c0b859f

Please sign in to comment.