In this chapter we’ll start to make events more fundamental to the internal structure of our application, by transforming it into a message-processor; everything will go via the message bus.
-
We’ll integrate a new requirement that introduces new events, and re-uses some of our existing logic
-
We’ll show the increasing similarity between functions at the service layer and functions for event handling
-
We’ll merge the two, and use events to represent the external inputs to our system, as well as internal events
TODO: DIAGRAM GOES HERE
We learn about the need to change batch quantities when they’re already in the system. Perhaps someone made a mistake on the number in the manifest, or perhaps some sofas fell off a truck. Following a conversation with the business,[1], we model the situation as in batch quantity changed means deallocate and reallocate:
[ditaa, batch_changed_events_flow_diagram] +----------+ /----\ +------------+ +--------------------+ | Batch |--> |RULE| --> | Deallocate | ----> | AllocationRequired | | Quantity | \----/ +------------+-+ +--------------------+-+ | Changed | | Deallocate | ----> | AllocationRequired | +----------+ +------------+-+ +--------------------+-+ | Deallocate | ----> | AllocationRequired | +------------+ +--------------------+
An event we’ll called batch quantity changed should lead us to change the
quantity on the batch, yes, but also to apply a business rule: if the new
quantity drops to less than the total already allocated, we need to
deallocate those orders from that batch. Then each one will require
a new allocation, which we can capture as an event called AllocationRequired
.
Perhaps you’re already anticipating that our internal messagebus and events can
help implement this requirement. We could define a service called
change_batch_quantity
that knows how to adjust batch quantities and also how
to deallocate any excess order lines, and then each deallocation can emit an
AllocationRequired
event which can be forwarded on to the existing allocate
service, in separate transactions. Once again, our message bus helps us to
enforce the single responsibility principle, and it allows us to make choices about
transactions and data integrity.
But before we jump in, think about where we’re headed. There are two kinds of flows through our system:
-
API calls that are handled by a service-layer function
-
Internal events (which might be raised as a side-effect of a service-layer function) and their handlers (which in turn call service-layer functions)
Wouldn’t it be easier if everything was an event handler? If we rethink our API calls as capturing events, then the service-layer functions can be event handlers too, and we no longer need to make a distinction between internal and external event handlers:
-
services.allocate()
we could imagine as being the handler for anAllocationRequired
event, and it can emitAllocated
events as its output. -
services.add_batch()
could be the handler for aBatchCreated
event.[2]
Our new requirement will fit the same pattern:
-
An event called
BatchQuantityChanged
can invoke a handler calledchange_batch_quantity()
. -
And the new
AllocationRequired
events that it may raise can be passed on toservices.allocate()
too, so there is no conceptual difference between a brand-new allocation coming from the API, and a reallocation that’s internally triggered by a deallocation
All sound like a bit much? Let’s work towards it all gradually. We’ll follow the Preparatory Refactoring workflow, AKA "make the change easy, then make the easy change":
-
We’ll start by refactoring our service layer into event handlers. We can get used to the idea of events being the way we describe inputs to our system. In particular, the existing
services.allocate()
function will become the handler for an event calledAllocationRequired
. -
Then we’ll build an end-to-end test that uses Redis to put
BatchQuantityChanged
events into the system, and look forAllocated
events coming out. -
And then our actual implementation will be conceptually very simple: a new handler for
BatchQuantityChanged
events, whose implementation will emitAllocationRequired
events, which in turn will be handled by the exact same handler for allocation that in use in the API.
We start by defining the two events that capture our current API inputs:
AllocationRequired
and BatchCreated
:
@dataclass
class BatchCreated(Event):
ref: str
sku: str
qty: int
eta: Optional[date] = None
...
@dataclass
class AllocationRequired(Event):
orderid: str
sku: str
qty: int
Then we rename services.py
to handlers.py
, we add in with the existing
message handler for send_out_of_stock_notification
, and most importantly,
we change all the handlers so that they have the same inputs: an event
and a UoW:
def add_batch(
event: events.BatchCreated, uow: unit_of_work.AbstractUnitOfWork
):
with uow:
product = uow.products.get(sku=event.sku)
...
def allocate(
event: events.AllocationRequired, uow: unit_of_work.AbstractUnitOfWork
) -> str:
line = OrderLine(event.orderid, event.sku, event.qty)
...
def send_out_of_stock_notification(
event: events.OutOfStock, uow: unit_of_work.AbstractUnitOfWork,
):
email.send(
'[email protected]',
f'Out of stock for {event.sku}',
)
TODO: discuss moving from primitives (primitive obsession) to events as our service-layer api, contrast with move in chatper 3 from domain model objects to primitivecontrast with move in chatper 3 from domain model objects to primitives
The change might be clearer as a diff:
def add_batch(
- ref: str, sku: str, qty: int, eta: Optional[date],
- uow: unit_of_work.AbstractUnitOfWork
+ event: events.BatchCreated, uow: unit_of_work.AbstractUnitOfWork
):
with uow:
- product = uow.products.get(sku=sku)
+ product = uow.products.get(sku=event.sku)
...
def allocate(
- orderid: str, sku: str, qty: int,
- uow: unit_of_work.AbstractUnitOfWork
+ event: events.AllocationRequired, uow: unit_of_work.AbstractUnitOfWork
) -> str:
- line = OrderLine(orderid, sku, qty)
+ line = OrderLine(event.orderid, event.sku, event.qty)
...
+
+def send_out_of_stock_notification(
+ event: events.OutOfStock, uow: unit_of_work.AbstractUnitOfWork,
+):
+ email.send(
...
Our event handlers now need a UoW. We make a small modification
to the main messagebus.handle()
function:
def handle(events_: List[events.Event], uow: unit_of_work.AbstractUnitOfWork): #(1)
while events_:
event = events_.pop(0)
for handler in HANDLERS[type(event)]:
handler(event, uow=uow) #(1)
-
The messagebus passes a UoW down to each handler
And to unit_of_work.py:
class AbstractUnitOfWork(abc.ABC):
...
def commit(self):
self._commit()
for obj in self.products.seen:
messagebus.handle(obj.events, uow=self) #(1)
-
The UoW passes itself to the messagebus.
class TestAddBatch:
@staticmethod
def test_for_new_product():
uow = FakeUnitOfWork()
messagebus.handle([events.BatchCreated("b1", "CRUNCHY-ARMCHAIR", 100, None)], uow)
assert uow.products.get("CRUNCHY-ARMCHAIR") is not None
assert uow.committed
...
class TestAllocate:
@staticmethod
def test_returns_allocation():
uow = FakeUnitOfWork()
result = messagebus.handle([
events.BatchCreated("b1", "COMPLICATED-LAMP", 100, None),
events.AllocationRequired("o1", "COMPLICATED-LAMP", 10)
], uow)
assert result == "b1"
Our API and our service layer currently want to know the allocated batch ref
when they invoke our allocate()
handler. This means we need to put in
a temporary hack on our messagebus to let it return events.
def handle(events_: List[events.Event], uow: unit_of_work.AbstractUnitOfWork):
+ results = []
while events_:
event = events_.pop(0)
for handler in HANDLERS[type(event)]:
- handler(event, uow=uow)
+ r = handler(event, uow=uow)
+ results.append(r)
+ return results
It’s because we’re mixing the read and write responsibilities in our system. We’ll come back to fix this wart in [chapter_09_cqrs].
@app.route("/allocate", methods=['POST'])
def allocate_endpoint():
try:
- batchref = services.allocate(
- request.json['orderid'], #(1)
- request.json['sku'],
- request.json['qty'],
- unit_of_work.SqlAlchemyUnitOfWork(),
+ event = events.AllocationRequired( #(2)
+ request.json['orderid'], request.json['sku'], request.json['qty'],
)
+ results = messagebus.handle([event], unit_of_work.SqlAlchemyUnitOfWork()) #(3)
+ batchref = results.pop()
except exceptions.InvalidSku as e:
-
Instead of calling the service layer with a bunch of primitives extracted from the request JSON…
-
We instantiate an event
-
And pass it to the messagebus.
And we should be back to a fully functional application.
TODO: recap?
We’re done with our refactoring phase. Our application is a message processor, everything is driven by events and the message bus.
Let’s see if we really have "made the change easy". Let’s implement our new
requirement: we’ll listen to a Redis channel for BatchQuantityChanged
events,
pass them to a handler, which in turn might emit some AllocationRequired
events, and those might emit some Allocated
events which we want to publish
back out to Redis.
[plantuml, reallocation_sequence_diagram] @startuml API -> MessageBus : BatchQuantityChanged event group BatchQuantityChanged Handler + Unit of Work 1 MessageBus -> Domain_Model : change batch quantity Domain_Model -> MessageBus : emit AllocationRequired event(s) end group AllocationRequired Handler + Unit of Work 2 (or more) MessageBus -> Domain_Model : allocate Domain_Model -> MessageBus : emit Allocated event(s) end @enduml
Following the lessons learned in [chapter_03_service_layer], we can operate in "high gear," and write our unit tests at the highest possible level of abstraction, in terms of events. Here’s what they might look like:
class TestChangeBatchQuantity:
@staticmethod
def test_changes_available_quantity():
uow = FakeUnitOfWork()
messagebus.handle([events.BatchCreated("batch1", "ADORABLE-SETTEE", 100, None)], uow)
[batch] = uow.products.get(sku="ADORABLE-SETTEE").batches
assert batch.available_quantity == 100 #(1)
messagebus.handle([events.BatchQuantityChanged("batch1", 50)], uow)
assert batch.available_quantity == 50 #(1)
@staticmethod
def test_reallocates_if_necessary():
uow = FakeUnitOfWork()
messagebus.handle([
events.BatchCreated("batch1", "INDIFFERENT-TABLE", 50, None),
events.BatchCreated("batch2", "INDIFFERENT-TABLE", 50, date.today()),
events.AllocationRequired("order1", "INDIFFERENT-TABLE", 20),
events.AllocationRequired("order2", "INDIFFERENT-TABLE", 20),
], uow)
[batch1, batch2] = uow.products.get(sku="INDIFFERENT-TABLE").batches
assert batch1.available_quantity == 10
messagebus.handle([events.BatchQuantityChanged("batch1", 25)], uow)
# order1 or order2 will be deallocated, so we"ll have 25 - 20 * 1
assert batch1.available_quantity == 5 #(2)
# and 20 will be reallocated to the next batch
assert batch2.available_quantity == 30 #(2)
-
The simple case would be trivially easy to implement, we just modify a quantity.
-
But if we try and change the quantity so that there’s less than has been allocated, we’ll need to deallocate at least one order, and we expect to reallocated it to a new batch
def change_batch_quantity(
event: events.BatchQuantityChanged, uow: unit_of_work.AbstractUnitOfWork
):
with uow:
product = uow.products.get_by_batchref(batchref=event.ref)
product.change_batch_quantity(ref=event.ref, qty=event.qty)
uow.commit()
We realise we’ll need a new query type on our repository:
class AbstractRepository(abc.ABC):
...
def get(self, sku):
...
def get_by_batchref(self, batchref):
p = self._get_by_batchref(batchref)
if p:
self.seen.add(p)
return p
@abc.abstractmethod
def _add(self, product):
raise NotImplementedError
@abc.abstractmethod
def _get(self, sku):
raise NotImplementedError
@abc.abstractmethod
def _get_by_batchref(self, batchref):
raise NotImplementedError
class SqlAlchemyRepository(AbstractRepository):
...
def _get(self, sku):
return self.session.query(model.Product).filter_by(sku=sku).first()
def _get_by_batchref(self, batchref):
return self.session.query(model.Product).join(model.Batch).filter(
orm.batches.c.reference == batchref,
).first()
And on our fakerepository too:
class FakeRepository(repository.AbstractRepository):
...
def _get(self, sku):
return next((p for p in self._products if p.sku == sku), None)
def _get_by_batchref(self, batchref):
return next((
p for p in self._products for b in p.batches
if b.reference == batchref
), None)
You may be starting to worry that maintaining these fakes is going to be a maintenance burden. There’s no doubt that it is work, but in our experience it’s not a lot of work. Once your project is up and running, the interface for your repository and UoW abstractions really don’t change much. And if you’re using ABC’s, they’ll help remind you when things get out of sync.
TODO: discuss finder methods on repository.
We add the new method to the model, which does the quantity change and deallocation(s) inline, and publishes a new event. We also modify the existing allocate function to publish an event.
class Product:
...
def change_batch_quantity(self, ref: str, qty: int):
batch = next(b for b in self.batches if b.reference == ref)
batch._purchased_quantity = qty
while batch.available_quantity < 0:
line = batch.deallocate_one()
self.events.append(
events.AllocationRequired(line.orderid, line.sku, line.qty)
)
...
class Batch:
...
def deallocate_one(self) -> OrderLine:
return self._allocations.pop()
We wire up our new handler:
HANDLERS = {
events.BatchCreated: [handlers.add_batch],
events.BatchQuantityChanged: [handlers.change_batch_quantity],
events.AllocationRequired: [handlers.allocate],
events.OutOfStock: [handlers.send_out_of_stock_notification],
} # type: Dict[Type[events.Event], List[Callable]]
And our system is now entirely event-driven!
It’s a good idea to keep the distinction between internal and external events clear. Some events may come from the outside, and some events may get upgraded and published externally, but not all of them. This is particularly important if you get into [event sourcing](https://io.made.com/eventsourcing-101/) (very much a topic for another book though).
-
events are simple dataclasses that define the data structures for inputs, and internal messages within our system. this is quite powerful from a DDD standpoint, since events often translate really well into business language; cf. "event storming" (TODO: link)
-
handlers are the way we react to events. They can call down to our model, or they can call out to external services. We can define multiple handlers for a single event if we want to. handlers can also raise other events. This allows us to be very granular about what a handler does, and really stick to the SRP.
TODO: talk about the fact that we’ve implemented quite a complicated use case (change quantity, deallocate, start new transaction, reallocate, publish external notification), but thanks to our architecture the complexity stays constant. we just have events, handlers, and a unit of work. it’s easy to reason about, and easy to explain. Possibly show a hacky version for comparison?
Pros | Cons |
---|---|
|
|