From 34350393d9d5168c98cda8a8b96fa09034f77e24 Mon Sep 17 00:00:00 2001 From: Nathan Robertson Date: Wed, 16 Oct 2024 16:48:28 +1100 Subject: [PATCH 1/3] Add events for before and after producer publishes a message --- Event/AMQPEvent.php | 29 ++++++++++++++- Event/AfterProducerPublishMessageEvent.php | 41 +++++++++++++++++++++ Event/BeforeProducerPublishMessageEvent.php | 41 +++++++++++++++++++++ RabbitMq/Producer.php | 13 +++++++ 4 files changed, 123 insertions(+), 1 deletion(-) create mode 100644 Event/AfterProducerPublishMessageEvent.php create mode 100644 Event/BeforeProducerPublishMessageEvent.php diff --git a/Event/AMQPEvent.php b/Event/AMQPEvent.php index 1ae07c36..8cd889dc 100644 --- a/Event/AMQPEvent.php +++ b/Event/AMQPEvent.php @@ -3,8 +3,8 @@ namespace OldSound\RabbitMqBundle\Event; use OldSound\RabbitMqBundle\RabbitMq\Consumer; +use OldSound\RabbitMqBundle\RabbitMq\Producer; use PhpAmqpLib\Message\AMQPMessage; -use Symfony\Component\EventDispatcher\Event; /** * Class AMQPEvent @@ -18,6 +18,8 @@ class AMQPEvent extends AbstractAMQPEvent public const ON_IDLE = 'on_idle'; public const BEFORE_PROCESSING_MESSAGE = 'before_processing'; public const AFTER_PROCESSING_MESSAGE = 'after_processing'; + public const BEFORE_PUBLISH_MESSAGE = 'before_publishing'; + public const AFTER_PUBLISH_MESSAGE = 'after_publishing'; /** * @var AMQPMessage @@ -29,6 +31,11 @@ class AMQPEvent extends AbstractAMQPEvent */ protected $consumer; + /** + * @var Producer + */ + protected $producer; + /** * @return AMQPMessage */ @@ -68,4 +75,24 @@ public function setConsumer(Consumer $consumer) return $this; } + + /** + * @return Producer + */ + public function getProducer() + { + return $this->producer; + } + + /** + * @param Producer $producer + * + * @return AMQPEvent + */ + public function setProducer(Producer $producer) + { + $this->producer = $producer; + + return $this; + } } diff --git a/Event/AfterProducerPublishMessageEvent.php b/Event/AfterProducerPublishMessageEvent.php new file mode 100644 index 00000000..3b04847a --- /dev/null +++ b/Event/AfterProducerPublishMessageEvent.php @@ -0,0 +1,41 @@ +setProducer($producer); + $this->setAMQPMessage($AMQPMessage); + $this->routingKey = $routingKey; + } + + /** + * @return AMQPMessage + */ + public function getRoutingKey() + { + return $this->routingKey; + } +} diff --git a/Event/BeforeProducerPublishMessageEvent.php b/Event/BeforeProducerPublishMessageEvent.php new file mode 100644 index 00000000..43d241af --- /dev/null +++ b/Event/BeforeProducerPublishMessageEvent.php @@ -0,0 +1,41 @@ +setProducer($producer); + $this->setAMQPMessage($AMQPMessage); + $this->routingKey = $routingKey; + } + + /** + * @return AMQPMessage + */ + public function getRoutingKey() + { + return $this->routingKey; + } +} diff --git a/RabbitMq/Producer.php b/RabbitMq/Producer.php index 8fcb7349..571ce3b7 100644 --- a/RabbitMq/Producer.php +++ b/RabbitMq/Producer.php @@ -2,6 +2,8 @@ namespace OldSound\RabbitMqBundle\RabbitMq; +use OldSound\RabbitMqBundle\Event\AfterProducerPublishMessageEvent; +use OldSound\RabbitMqBundle\Event\BeforeProducerPublishMessageEvent; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; @@ -63,6 +65,12 @@ public function publish($msgBody, $routingKey = null, $additionalProperties = [] } $real_routingKey = $routingKey !== null ? $routingKey : $this->defaultRoutingKey; + + $this->dispatchEvent( + BeforeProducerPublishMessageEvent::NAME, + new BeforeProducerPublishMessageEvent($this, $msg, $real_routingKey) + ); + $this->getChannel()->basic_publish($msg, $this->exchangeOptions['name'], (string)$real_routingKey); $this->logger->debug('AMQP message published', [ 'amqp' => [ @@ -72,5 +80,10 @@ public function publish($msgBody, $routingKey = null, $additionalProperties = [] 'headers' => $headers, ], ]); + + $this->dispatchEvent( + AfterProducerPublishMessageEvent::NAME, + new AfterProducerPublishMessageEvent($this, $msg, $real_routingKey) + ); } } From 397f3845307744e761399bf345fc2cf847d52585 Mon Sep 17 00:00:00 2001 From: Nathan Robertson Date: Wed, 16 Oct 2024 17:04:19 +1100 Subject: [PATCH 2/3] Add documentation for producer events --- README.md | 43 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/README.md b/README.md index ccc0b487..b2b0642d 100644 --- a/README.md +++ b/README.md @@ -316,6 +316,49 @@ If you need to use a custom class for a producer (which should inherit from `Old The next piece of the puzzle is to have a consumer that will take the message out of the queue and process it accordingly. +#### Producer Events #### + +There are currently two events emitted by the producer. + +##### BeforeProducerPublishMessageEvent ##### +This event occurs immediately before publishing the message. This is a good hook to do any final logging, validation, etc. before actually sending the message. A sample implementation of a listener: + +```php +namespace App\EventListener; + +use OldSound\RabbitMqBundle\Event\BeforeProducerPublishMessageEvent; +use Symfony\Component\EventDispatcher\Attribute\AsEventListener; + +#[AsEventListener(event: BeforeProducerPublishMessageEvent::NAME)] +final class AMQPBeforePublishEventListener +{ + public function __invoke(BeforeProducerPublishMessageEvent $event): void + { + // Your code goes here + } +} +``` + +##### AfterProducerPublishMessageEvent ##### +This event occurs immediately after publishing the message. This is a good hook to do any confirmation logging, commits, etc. after actually sending the message. A sample implementation of a listener: + +```php +namespace App\EventListener; + +use OldSound\RabbitMqBundle\Event\AfterProducerPublishMessageEvent; +use Symfony\Component\EventDispatcher\Attribute\AsEventListener; + +#[AsEventListener(event: AfterProducerPublishMessageEvent::NAME)] +final class AMQPBeforePublishEventListener +{ + public function __invoke(AfterProducerPublishMessageEvent $event): void + { + // Your code goes here + } +} +``` + + ### Consumers ### A consumer will connect to the server and start a __loop__ waiting for incoming messages to process. Depending on the specified __callback__ for such consumer will be the behavior it will have. Let's review the consumer configuration from above: From 6044540688f47d197f186aef9e8ec11b892e723b Mon Sep 17 00:00:00 2001 From: Nathan Robertson Date: Thu, 17 Oct 2024 10:53:28 +1100 Subject: [PATCH 3/3] Fix documented return type --- Event/AfterProducerPublishMessageEvent.php | 2 +- Event/BeforeProducerPublishMessageEvent.php | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Event/AfterProducerPublishMessageEvent.php b/Event/AfterProducerPublishMessageEvent.php index 3b04847a..32be6fbf 100644 --- a/Event/AfterProducerPublishMessageEvent.php +++ b/Event/AfterProducerPublishMessageEvent.php @@ -32,7 +32,7 @@ public function __construct(Producer $producer, AMQPMessage $AMQPMessage, string } /** - * @return AMQPMessage + * @return string */ public function getRoutingKey() { diff --git a/Event/BeforeProducerPublishMessageEvent.php b/Event/BeforeProducerPublishMessageEvent.php index 43d241af..49b9a450 100644 --- a/Event/BeforeProducerPublishMessageEvent.php +++ b/Event/BeforeProducerPublishMessageEvent.php @@ -32,7 +32,7 @@ public function __construct(Producer $producer, AMQPMessage $AMQPMessage, string } /** - * @return AMQPMessage + * @return string */ public function getRoutingKey() {