Tools for Symfony Messenger.
- Installation
- Symfony Bundle Configuration
- Transports
- Serializing messages
- Middlewares
- Producing messages to external system
- Consuming messages from external system
composer req sokil/message-bus-bundle
If you are using Symfony Framework, you may register library as Symfony Bundle,
and it automatically configures services for you. Add bundle to your config/bundles.php
:
<?php
return [
// ...
Sokil\MessageBusBundle\MessageBusBundle::class => ['all' => true],
];
This transport accepts envelopes and skip them on send, and throw error when try to consume something.
Use dummy transport with prefix dummy://
Symfony Messenger
contains PhpSerializer
which use standard serialize/unserialize
php functions, and Serializer
which use Symfony serializer, but also generates serialized data with fully qualified PHP class names.
This may be fine for cases when producer and consumer both running on same codebase, but there may be fails of unserialization when consuming occurred after release where class was renamed.
So for safe and cross-platform communication we need serializer completely independent of PHP serialization process and runtime.
To separate serialised message from PHP runtime and serialization functions, we need to replace message and stamp class name with message and stamp string types.
This is done in service \Sokil\MessageBusBundle\Service\TypeLocator
:
$typeLocator = new TypeLocator(
[
DelayStamp::class => 'delay',
BusNameStamp::class => 'busName',
SentStamp::class => 'sent',
TransportMessageIdStamp::class => 'transportMessageId',
],
[
UserCreated::class => 'user.created',
],
)
If you use Symfony framework, this already done in service sokil.message_bus.type_locator
.
To define mapping from class names to types, add it to framework configuration in config/packages/message_bus.yaml
:
message_bus:
stamps:
Symfony\Component\Messenger\Stamp\DelayStamp:
type: Delay
Symfony\Component\Messenger\Stamp\BusNameStamp:
type: BusName
Symfony\Component\Messenger\Stamp\SentStamp:
type: Sent
Symfony\Component\Messenger\Stamp\TransportMessageIdStamp:
type: TransportMessageId
Symfony\Component\Messenger\Stamp\ErrorDetailsStamp:
type: ErrorDetails
Symfony\Component\Messenger\Stamp\RedeliveryStamp:
type: Redelivery
messages:
App\User\Event\UserCreated:
type: user.created
By default stamps already mapped to next values:
DelayStamp::class => 'Delay',
BusNameStamp::class => 'BusName',
SentStamp::class => 'Sent',
TransportMessageIdStamp::class => 'TransportMessageId',
ErrorDetailsStamp::class => 'ErrorDetails',
RedeliveryStamp::class => 'Redelivery',
SentToFailureTransportStamp::class => 'SendToFailureTransportStamp',
HandledStamp::class => 'Handled',
ReceivedStamp::class => 'Received',
Also you may add attribute to your event instead of configuring in packages config:
<?php
declare(strict_types=1);
namespace Sokil\MessageBusBundle\Stubs\Event;
use Sokil\MessageBusBundle\Attribute\Message;
#[Message(type: 'user.updated')]
class UserUpdated
{
}
Portable serializer produces data completely independent of PHP serialization process and runtime.
For example:
[
'headers' => [
'X-Message-Type' => 'user.created',
'Content-Type' => 'application/json',
'X-Message-Stamp-BusName' => '[{"busName":"event.bus"}]',
'X-Message-Stamp-TransportMessageId' => '[{"id":42}]',
],
'body' => '{"userId":"abcdef","email":"[email protected]","createdAt":"2022-01-26T10:16:00+00:00"}',
]
If Symfony Framework used, place configuration to your config/packages/message_bus.yaml
:
message_bus:
serializers:
some_serializer:
class: Sokil\MessageBusBundle\Serializer\PortableSerializer
format: json
normalizers:
- Sokil\MessageBusBundle\Stubs\Normalizer\EmailNormalizer
This creates service sokil.message_bus.serializer.some_serializer.json
.
This serializer then may be configured for using with messenger transport in config/packages/messenger.yaml
:
framework:
messenger:
transports:
# https://symfony.com/doc/current/messenger.html#transport-configuration
amqp:
dsn: '%env(AMQP_MESSENGER_TRANSPORT_DSN)%'
serializer: sokil.message_bus.serializer.some_serializer.json
options:
exchange:
name: user.events
type: topic
queues: []
Above serializer serializes only fields annotated with messenger
group.
class UserCreated
{
#[Groups(['messenger'])]
private Uuid $userId;
}
Field name serialized as is. To prevent side effects of PHP class property rename, you may define serialized field name explicitly:
class UserCreated
{
#[Groups(['messenger'])]
#[SerializedName('userId')]
private Uuid $userId;
}
Envelope without explicitly defined AmqpStamp
will be produced with null routing key. This means it will not be sent
to any queue.
With \Sokil\MessageBusBundle\Middleware\AmqpMessageRoutingKeyByTypeMiddleware
we may generate routing key
automatically by its message type defined in message_bus.messages
parameter of Symfony Configuration.
Routing key generated by pattern based on message type. This pattern may be
configured in config/packages/message_bus.yaml
:
message_bus:
middlewares:
amqp_message_routing_key_by_type:
pattern: "some-namespace.{messageType}"
When used in Symfony framework, service sokil.message_bus.middleware.amqp_message_routing_key_by_type
defined automatically.
To use middleware, configure it to message bus in config/packages/messenger.yaml
:
framework:
messenger:
default_bus: command.bus
buses:
event.bus:
default_middleware: allow_no_handlers
middleware:
- 'sokil.message_bus.middleware.amqp_message_routing_key_by_type'
By default, Symfony messenger expects that every message has at lease one handler.
When message dispatched to external system, there are no handlers in out system, so we need to define bus
which allows no handlers in config/packages/messenger.yaml
:
framework:
messenger:
buses:
event.bus:
default_middleware: allow_no_handlers
If we use AMQP transport, it by default require bind exchange to queue explicitly, and it automatically creates exchange and queue if they not configured.
So we need to configure exchange without queues bound to in config/packages/messenger.yaml
:
framework:
messenger:
transports:
# https://symfony.com/doc/current/messenger.html#transport-configuration
async:
dsn: '%env(ASYNC_MESSENGER_TRANSPORT_DSN)%'
serializer: sokil.message_bus.serializer.some_serializer.json
options:
exchange:
name: user.events
type: topic
queues: []
Also, you need to configure portable serializer in framework.messenger.transports.$.serializer
key because by default
Messenger uses serializer that just serializes PHP objects by standard serialize
function, so consumers in other
systems written in other languages will not able to unserialize this message.
Every producing service needs to create own exchange, no other service may use this exchange to produce own messages.
To consume external messages from AMQP we need to configure transport in config/packages/messenger.yaml
::
framework:
messenger:
transports:
external_messages:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
serializer: sokil.message_bus.serializer.some_serializer.json
options:
auto_setup: false
queues:
messages_from_external: ~
Now you may run worker to consume messages:
Note that message types must be configured for all messages we expect to consume.
Queue must be bound by hand because auto_setup: false
prevents it from automatic creation.
$ php bin/console messenger:consume -vv external_messages