In this chapter we’d like to revisit our domain model to talk about invariants and constraints, and see how our our domain objects can maintain their own internal consistency, both conceptually and in persistent storage. We’ll discuss the concept of a consistency boundary, and show how making it explicit can help us to build high-performance software without compromising maintainability.
What’s the point of a domain model anyway? What’s the fundamental problem we’re trying to addresss?
Couldn’t we just run everything in a spreadsheet? Many of our users would be delighted by that. Business users like spreadsheets because they’re simple, familiar, and yet enormously powerful.
In fact, an enormous number of business processes do operate by manually sending spreadsheets back and forward over e-mail. This "csv over smtp" architecture has low initial complexity but tends not to scale very well because it’s difficult to apply logic and maintain consistency.
Who is allowed to view this particular field? Who’s allowed to update it? What happens when we try to order -350 chairs, or 10,000,000 tables? Can an employee have a negative salary?
These are the constraints of a system. Much of the domain logic we write exists to enforce these constraints in order to maintain the invariants of the system. The invariants are the things that have to be true whenever we finish an operation.
If we were writing a hotel booking system, we might have the constraint that no two bookings can exist for the same hotel room on the same night. This supports the invariant that no room is double booked.
Of course, sometimes we might need to temporarily bend the rules. Perhaps we need to shuffle the rooms around due to a VIP booking. While we’re moving people around, we might be double booked, but our domain model should ensure that, when we’re finished, we end up in a final consistent state, where the invariants are met. Either that, or we raise an error and refuse to complete the operation.
Let’s look at a couple of concrete examples from our business requirements
An order line can only be allocated to one batch at a time.
This is a business rule that implements a constraint. The constraint is that an
order line is allocated to either zero or one batches, but never more than one.
We need to make sure that our code never accidentally calls Batch.allocate()
on two different batches for the same line, and currently, there’s nothing
there to explicitly stop us doing that. By nominating an aggregate to be
"in charge of" all batches, we’ll have a single place where we can enforce
this constraint.
Let’s look at another one of our business rules:
I can’t allocate to a batch if the available quantity is less than the quantity of the order line.
Here the constraint is that we can’t allocate more stock than is available to a batch. The invariant is that we never oversell stock by allocating two customers to the same physical cushion. Every time we update the state of the system, our code needs to ensure that we don’t break the invariants.
In a single threaded single user application it’s relatively easy for us to maintain this invariant. We can just allocate stock one line at a time, and raise an error if there’s no stock available.
This gets much harder when we introduce the idea of concurrency. Suddenly we might be allocating stock for multiple order lines simultaneously. We might even be allocating order lines at the same time as processing changes to the batches themselves.
We usually solve this problem by applying locks to our database tables. This prevents two operations happening simultaneously on the same row or same table.
As we start to think about scaling up our app, we realise that our model
of allocating lines against all available batches may not scale. If we’ve
got tens of thousands of orders per hour, and hundreds of thousands of
order lines, we can’t hold a lock over the whole batches
table for
every single one.
In the rest of this chapter, we’ll first discuss choosing an aggregate and demonstrate its usefulness for managing invariants at the conceptual level, as the single entrypoint in our code for modifying batches and allocations. In that role, it’s defending us from programmer error.
Then we’ll return to the topic of concurrency and discuss how the aggregate can enforce invariants at a lower level. In that role, it’ll be defending us against concurrency / data integrity bugs.
An AGGREGATE is a cluster of associated objects that we treat as a unit for the purpose of data changes.
DDD blue book
Even if it weren’t for the data integrity concerns, as a model gets more complex and grows more different Entity and Value Objects, all of which start pointing to each other, it can be hard to keep track of who can modify what. Especially when we have collections in the model like we do (our batches are a collection), it’s a good idea to nominate some entities to be the single entrypoint for modifying their related objects. It makes the system conceptually simpler and easy to reason about if you nominate some objects to be in charge of consistency for the others.
Tip
|
Just like we sometimes use _leading_underscores to mark methods or functions
as "private", you can think of aggregates as being the "public" classes of our
model, and the rest of the Entities and Value Objects are "private".
|
Some sort of Order
object might suggest itself, but that’s more about order lines,
and we’re more concerned about something that provides some sort of conceptual unity
for collections of batches.
When we allocate an order line, we’re actually only interested in batches
that have the same SKU as the order line. Some sort of concept of GlobalSkuStock
or perhaps just simply Product
— after all, that was the first concept we
came across in our exploration of the domain language back in [chapter_01_domain_model].
Let’s go with that for now, and see how it looks
class Product:
def __init__(self, sku: str, batches: List[Batch]):
self.sku = sku #(1)
self.batches = batches #(2)
def allocate(self, line: OrderLine) -> str: #(3)
try:
batch = next(
b for b in sorted(self.batches) if b.can_allocate(line)
)
batch.allocate(line)
return batch.reference
except StopIteration:
raise OutOfStock(f'Out of stock for sku {line.sku}')
-
Product’s main identifier is the `sku
-
It holds a reference to a collection of
batches
for that sku -
And finally, we can move the
allocate()
domain service to being a method onProduct
.
Note
|
This Product might not look like what you’d expect a Product
model to look like. No price, no description, no dimensions…
Our allocation service doesn’t care about any of those things.
This is the power of microservices and bounded contexts, the concept
of Product in one app can be very different from another.[1]
|
One of the most important contributions from Evans and the DDD community is the concept of Bounded Contexts.
In essence, this was a reaction against attempts to capture entire businesses into a single model. The word "customer" means different things to people in sales, customer services, logistics, support, and so on. Attributes needed in one context are irrelevant in another; more perniciously, concepts with the same name can have entirely different meanings in different contexts. Rather than trying to build a single model (or class, or database) to capture all the use cases, better to have several different models, draw boundaries around each context, and handle the translation between different contexts explicitly.
This concept translates very well to the world of microservices, where each microservice is free to have its own concept of "customer", and rules for translating that to and from other microservices it integrates with.
Whether or not you’ve got a microservices architecture, a key consideration in choosing your aggregates is also choosing the bounded context that they will operate in. By restricting the context, you can keep your number of aggregates low and their size manageable.
Once again we find ourselves forced to say that we can’t give this issue the treatment it deserves here, and we can only encourage you to read up on it elsewhere.
Once you define certain entities to be Aggregates, we need to apply the rule that they are the only entities that are publicly accessible to the outside world. In other words, the only repositories we are allowed should be repositories that return aggregates.
In our case, we’ll switch from BatchRepository
to ProductRepository
:
class _UnitOfWork:
def __init__(self, session):
self.session = session
self.products = repository.ProductRepository(session)
#...
class ProductRepository:
#...
def get(self, sku):
return self.session.query(model.Product).filter_by(sku=sku).first()
And our service layer evolves to use Product
as its main entrypoint:
def add_batch(
ref: str, sku: str, qty: int, eta: Optional[date],
uow: unit_of_work.AbstractUnitOfWork
):
with uow:
product = uow.products.get(sku=sku)
if product is None:
product = model.Product(sku, batches=[])
uow.products.add(product)
product.batches.append(model.Batch(ref, sku, qty, eta))
uow.commit()
def allocate(
orderid: str, sku: str, qty: int,
uow: unit_of_work.AbstractUnitOfWork
) -> str:
line = OrderLine(orderid, sku, qty)
with uow:
product = uow.products.get(sku=line.sku)
if product is None:
raise InvalidSku(f'Invalid sku {line.sku}')
batchref = product.allocate(line)
uow.commit()
return batchref
You’ve just seen the main top layers of the code, so this shouldn’t be too hard,
but we’d like you to implement the Product
aggregate starting from Batch
,
just like we did.
Of course you could cheat and copy/paste from the listings above, but even if you do that, you’ll still have to solve a few challenges on your own, like adding the model to the ORM and making sure all the moving parts can talk to each other, which we hope will be instructive.
We’ve put in a "cheating" implementation in that delegates to the existing
allocate()
function, so you should be able to evolve that towards the real
thing.
We’ve marked a couple of tests with @pytest.skip()
, come back to then
when you’re done and you’ve read the rest of this chapter, to have a go
at implementing version numbers. Bonus points if you can get SQLAlchemy to
do them for you by magic!
We’ve got our new aggregate and we’re using it in all the right places, the remaining
question is: how will we actually enforce our data integrity rules? We don’t want
to hold a lock over the entire batches table, but how will we implement holding a
lock over just the rows for a particular sku? The answer is to have a single
attribute on the Product model which acts as a marker for the whole state change
being complete, and we use it as the single resource that concurrent workers
can fight over: if two transactions both read the state of the world for batches
at the same time, and they both want to update the allocations
tables, we force
both of them to also try and update the version_number
in the products
table,
in such a way that only one of them can win and the world stays consistent.
There are essentially 3 options for implementing version numbers:
-
version_number
lives in domain, we add it to theProduct
constructor, andProduct.allocate()
is responsible for incrementing it. -
The services layer could do it! The version number isn’t strictly a domain concern, so instead our service layer could assume that the current version number is attached to
Product
by the repository, and the service layer will increment it before it does thecommit()
-
Or, since it’s arguably an infrastructure concern, the UoW and repository could do it by magic. The repository has access to version numbers for any products it retrieves, and when the UoW does a commit, it can increment the version number for any products it knows about, assuming them to have changed.
Option 3 isn’t ideal, because there’s no real way of doing it without having to assume that all products have changed, so we’ll be incrementing version numbers when we don’t have to[2].
Option 2 involves mixing the responsibility for mutating state between the service layer and the domain layer, so it’s a little messy as well.
So in the end, even though version numbers don’t have to be a domain concern, you might decide the cleanest tradeoff is to put them in the domain.
class Product:
def __init__(self, sku: str, batches: List[Batch], version_number: int = 0): #(1)
self.sku = sku
self.batches = batches
self.version_number = version_number #(1)
def allocate(self, line: OrderLine) -> str:
try:
batch = next(
b for b in sorted(self.batches) if b.can_allocate(line)
)
batch.allocate(line)
self.version_number += 1 #(1)
return batch.reference
except StopIteration:
raise OutOfStock(f'Out of stock for sku {line.sku}')
-
There it is!
Now to actually make sure we can get the behavior we want: if we have two
concurrent attempts to do allocation against the same Product
, one of them
should fail, because they can’t both update the version number:
def test_concurrent_updates_to_version_are_not_allowed(postgres_session_factory):
sku, batch = random_sku(), random_batchref()
session = postgres_session_factory()
insert_batch(session, batch, sku, 100, eta=None, product_version=1)
session.commit()
order1, order2 = random_orderid(1), random_orderid(2)
exceptions = [] # type: List[Exception]
try_to_allocate_order1 = lambda: try_to_allocate(order1, sku, exceptions)
try_to_allocate_order2 = lambda: try_to_allocate(order2, sku, exceptions)
thread1 = threading.Thread(target=try_to_allocate_order1) #(1)
thread2 = threading.Thread(target=try_to_allocate_order2) #(1)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
[[version]] = session.execute(
"SELECT version_number FROM products WHERE sku=:sku",
dict(sku=sku),
)
assert version == 2 #(2)
exception = [exceptions]
assert 'could not serialize access due to concurrent update' in str(exception) #(3)
orders = list(session.execute(
"SELECT orderid FROM allocations"
" JOIN batches ON allocations.batch_id = batches.id"
" JOIN order_lines ON allocations.orderline_id = order_lines.id"
" WHERE order_lines.sku=:sku",
dict(sku=sku),
))
assert len(orders) == 1 #(4)
with unit_of_work.SqlAlchemyUnitOfWork() as uow:
uow.session.execute('select 1')
-
We set up two threads that will reliably produce the concurrency behavior we want:
read1, read2, write1, write2
. (see below for the code being run in each thread). -
We assert that the version number has only been incremented once.
-
We can also check on the specific exception if we like
-
And we can make sure that only one allocation has gotten through.
def try_to_allocate(orderid, sku, exceptions):
line = model.OrderLine(orderid, sku, 10)
try:
with unit_of_work.SqlAlchemyUnitOfWork() as uow:
product = uow.products.get(sku=sku)
product.allocate(line)
time.sleep(0.2)
uow.commit()
except Exception as e:
print(traceback.format_exc())
exceptions.append(e)
To get the test to pass as it is, we can set the transaction isolation level on our session:
DEFAULT_SESSION_FACTORY = sessionmaker(bind=create_engine(
config.get_postgres_uri(),
isolation_level="SERIALIZABLE",
))
Transaction isolation levels are tricky stuff, it’s worth spending time understanding the documentation.
An alternative to using the SERIALIZABLE
isolation level is to use
SELECT FOR UPDATE,
which will produce different behavior: two concurrent transactions will not
be allowed to do a read on the same rows at the same time.
def get(self, sku):
return self.session.query(model.Product) \
.filter_by(sku=sku) \
.with_for_update() \
.first()
This will have the effect of changing the concurrency pattern from
read1, read2, write1, write2(fail)
to
read1, write1, read2, write2(succeed)
In our simple case, it’s not obvious which to prefer. In a more complex
scenario, SELECT FOR UPDATE
might lead to more deadlocks, while SERIALIZABLE
having more of an "optimistic locking" approach and might lead to more failures,
but the failures might be more recoverable. So, as usual, the right solution
will depend on circumstances.
- Choose the right aggregate
-
bla
- Something something transactions
-
bla bla.
Pros | Cons |
---|---|
|
|