-
We’ll discuss the difference between events and commands.
-
We’ll extend our message bus to handle command messages.
-
We’ll finish rebuilding our application as a message-processor.
// DIAGRAM GOES HERE
In the previous chapter we talked about using events as a way of representing the inputs to our system. This starts to turn our application into a message processing machine.
TODO: DIAGRAM: Message processor
To achieve that, we converted all our use-case functions to event-handlers.
When the API receives a POST to create a new batch, it builds a new BatchCreated
event and handles it as though it came from an external system.
This might have felt counter-intuitive. After all, the batch hasn’t been
created yet, that’s why we called the API. We’re going to fix that conceptual
wart by introducing Commands.
Like events, commands are a type of message - instructions sent by one part of a system to another. Like events, we usually represent commands with dumb data structures and we can handle them in much the same way.
The differences between them, though, are important.
Commands are sent by one actor to another specific actor. When I post a form to an API handler, I am sending a command. We name commands with imperative tense verb phrases like "allocate stock," or "delay shipment."
Commands capture intent. They express our wish for the system to do something. As a result, when they fail, the sender needs to receive error information.
Events are broadcast by an actor to all interested listeners. When we publish the
batch_quantity_changed
we don’t know who’s going to pick it up. We name events
with past-tense verb phrases like "order allocated to stock," or "shipment delayed."
We often use events to spread the knowledge about successful commands.
Events capture facts about things that happened in the past. Since we don’t know who’s handling an event, senders should not care whether the receivers succeeded or failed.
Event |
Command |
|
Named |
Past-Tense |
Imperative Tense |
Error Handling |
Fail independently |
Fail noisily |
Sent to |
All listeners |
One recipient |
What kinds of commands do we have in our system right now?
class Command:
pass
@dataclass
class Allocate(Command): #(1)
orderid: str
sku: str
qty: int
@dataclass
class CreateBatch(Command): #(2)
ref: str
sku: str
qty: int
eta: Optional[date] = None
@dataclass
class ChangeBatchQuantity(Command): #(3)
ref: str
qty: int
-
commands.Allocate
will replaceevents.AllocationRequired
-
commands.CreateBatch
will replaceevents.BatchCreated
-
commands.ChangeBatchQuantity
will replaceevents.BatchQuantityChanged`
Each of the use-cases that we discussed earlier in the book is really a command, an instruction for the system to try and do a thing. To unify the two halves of the domain, we’re going to make a simple change: instead of directly invoking our use case functions, like we did before, we’re going to take these commands, and we’re going to put them on the message bus. As a result, our message bus changes somewhat.
Message = Union[commands.Command, events.Event]
def handle(message_queue: List[Message], uow: unit_of_work.AbstractUnitOfWork): #(1)
while message_queue:
m = message_queue.pop(0)
if isinstance(m, events.Event):
handle_event(m, uow)
elif isinstance(m, commands.Command):
handle_command(m, uow)
else:
raise Exception(f'{m} was not an Event or Command')
def handle_event(event: events.Event, uow: unit_of_work.AbstractUnitOfWork): #(2)
for handler in EVENT_HANDLERS[type(event)]:
try:
print('handling event', event, 'with handler', handler, flush=True)
handler(event, uow=uow)
except: #(2)
print(f'Exception handling event {event}\n:{traceback.format_exc()}')
continue
def handle_command(command, uow: unit_of_work.AbstractUnitOfWork): #(3)
print('handling command', command, flush=True)
try:
handler = COMMAND_HANDLERS[type(command)]
return handler(command, uow=uow)
except Exception as e:
print(f'Exception handling command {command}: {e}')
raise e #(3)
EVENT_HANDLERS = {
events.OutOfStock: [handlers.send_out_of_stock_notification],
} # type: Dict[Type[events.Event], List[Callable]] #(2)
COMMAND_HANDLERS = {
commands.Allocate: handlers.allocate,
commands.CreateBatch: handlers.add_batch,
commands.ChangeBatchQuantity: handlers.change_batch_quantity,
} # type: Dict[Type[commands.Command], Callable] #(3)
-
It still has a main
handle()
entrypoint, that takes a list of messages, that may be commands or events. -
We dispatch to a function for handling events. It can delegate to multiple handlers per event, and it catches and logs any errors, but does not let them interrupt message processing.
-
The command handler expects just one handler per command. If any errors are raised, they fail hard and will bubble up.
Why does handle_command
have a return
, but handle_events
doesn’t, we hear
you ask? It’s so that we can return the batchref from the API.
@app.route("/allocate", methods=['POST'])
def allocate_endpoint():
try:
cmd = commands.Allocate(
request.json['orderid'], request.json['sku'], request.json['qty'],
)
uow = unit_of_work.SqlAlchemyUnitOfWork()
batchref = messagebus.handle_command(cmd, uow)
except exceptions.InvalidSku as e:
return jsonify({'message': str(e)}), 400
return jsonify({'batchref': batchref}), 201
It’s the same wart we’ve drawn attention to before. In [chapter_11_cqrs] we’ll look at a way of separating out command handling from read requests.
TODO: discussion, can events raise commands?