From a8bf1779961c095ab13e1e13070309a75462eec7 Mon Sep 17 00:00:00 2001 From: Kinane Date: Fri, 6 Jan 2017 17:38:37 +0200 Subject: [PATCH 01/41] Add fanout, direct and topic exchanges type support resolves #9 resolves #6 resolves #8 --- src/Console/Commands/ConsumeCommand.php | 25 ++++++++++++++-- src/Consumer.php | 29 ++++++++++++++---- src/Producer.php | 39 ++++++++++++++++++------- 3 files changed, 75 insertions(+), 18 deletions(-) diff --git a/src/Console/Commands/ConsumeCommand.php b/src/Console/Commands/ConsumeCommand.php index 67e9ca1..5089265 100644 --- a/src/Console/Commands/ConsumeCommand.php +++ b/src/Console/Commands/ConsumeCommand.php @@ -30,7 +30,15 @@ public function __construct(RegisterQueues $registerQueues) * * @var string */ - protected $signature = 'bowler:consume {queue}'; + protected $signature = 'bowler:consume + {queueName : The queue NAME} + {--exchangeName= : The exchange NAME. If not specified the queue name will be used} + {--exchangeType=fanout : The exchange TYPE. Supported exchanges: fanout, direct, topic} + {--bindingKeys=* : The consumer\'s BINDINGKEYS (array)} + {--passive=0 : } + {--durable=1 : Mark exchange and queue as DURABLE} + {--autoDelete=0 : Set exchange and queue to AUTODELETE when all queues and consumers, respectively have finished using it} + {--deliveryMode=2 : The message DELIVERYMODE. Non-persistent 1 or persistent 2}'; /** * The console command description. @@ -46,14 +54,25 @@ public function __construct(RegisterQueues $registerQueues) */ public function handle() { - $queueName = $this->argument('queue'); + $queueName = $this->argument('queueName'); + + // If the exchange name has not been set, use the queue name + $exchangeName = ($name = $this->option('exchangeName')) ? $name : $queueName; + $exchangeType = $this->option('exchangeType'); + + // If no bidingKeys are specified push a value of null so that we can still perform the loop + $bindingKeys = ($keys = $this->option('bindingKeys')) ? (array) $keys : [null]; + $passive = (bool) $this->option('passive'); + $durable = (bool) $this->option('durable'); + $autoDelete = (bool) $this->option('autoDelete'); + $deliveryMode = (int) $this->option('deliveryMode'); require(app_path().'/Messaging/queues.php'); $handlers = Registrator::getHandlers(); foreach ($handlers as $handler) { if ($handler->queueName == $queueName) { - $bowlerConsumer = new Consumer(app(Connection::class), $handler->queueName); + $bowlerConsumer = new Consumer(app(Connection::class), $handler->queueName, $exchangeName, $exchangeType, $bindingKeys, $passive, $durable, $autoDelete, $deliveryMode); $bowlerConsumer->listenToQueue($handler->className, app(ExceptionHandler::class)); } } diff --git a/src/Consumer.php b/src/Consumer.php index 6e6442e..e391937 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -22,11 +22,23 @@ class Consumer /** * the name of the exchange where the producer sends its messages to. + * The name of the queue bound to the exchange where the producer sends its messages. + * + * @var string + */ + private $queueName; * * @var string */ private $exchangeName; + /** + * The binding keys used by the exchange to route messages to bounded queues. + * + * @var string + */ + private $bindingKeys; + /** * type of exchange: * fanout: routes messages to all of the queues that are bound to it and the routing key is ignored. @@ -73,18 +85,22 @@ class Consumer /** * @param Vinelab\Bowler\Connection $connection + * @param string $queueName * @param string $exchangeName * @param string $exchangeType + * @param array $bindingKeys * @param bool $passive * @param bool $durable * @param bool $autoDelete * @param int $deliveryMode */ - public function __construct(Connection $connection, $exchangeName, $exchangeType = 'fanout', $passive = false, $durable = true, $autoDelete = false, $deliveryMode = 2) + public function __construct(Connection $connection, $queueName, $exchangeName, $exchangeType, $bindingKeys, $passive = false, $durable = true, $autoDelete = false, $deliveryMode = 2) { $this->connection = $connection; + $this->queueName = $queueName; $this->exchangeName = $exchangeName; $this->exchangeType = $exchangeType; + $this->bindingKeys = $bindingKeys; $this->passive = $passive; $this->durable = $durable; $this->autoDelete = $autoDelete; @@ -98,11 +114,14 @@ public function __construct(Connection $connection, $exchangeName, $exchangeType */ public function listenToQueue($handlerClass, ExceptionHandler $exceptionHandler) { - $this->connection->getChannel()->exchange_declare($this->exchangeName, $this->exchangeType, $this->passive, $this->durable, $this->autoDelete); - list($queue_name) = $this->connection->getChannel()->queue_declare($this->exchangeName, $this->passive, $this->durable, false, $this->autoDelete); - $this->connection->getChannel()->queue_bind($queue_name, $this->exchangeName); echo ' [*] Listening to Queue: ' . $this->exchangeName . ' To exit press CTRL+C', "\n"; + $channel->exchange_declare($this->exchangeName, $this->exchangeType, $this->passive, $this->durable, $this->autoDelete); + $channel->queue_declare($this->queueName, $this->passive, $this->durable, false, $this->autoDelete); + + foreach ($this->bindingKeys as $bindingKey) { + $channel->queue_bind($this->queueName, $this->exchangeName, $bindingKey); + } $handler = new $handlerClass; @@ -125,7 +144,7 @@ public function listenToQueue($handlerClass, ExceptionHandler $exceptionHandler) }; $this->connection->getChannel()->basic_qos(null, 1, null); - $this->connection->getChannel()->basic_consume($queue_name, '', false, false, false, false, $callback); + $channel->basic_consume($this->queueName, '', false, false, false, false, $callback); while (count($this->connection->getChannel()->callbacks)) { $this->connection->getChannel()->wait(); diff --git a/src/Producer.php b/src/Producer.php index 0e56a70..f1c7057 100644 --- a/src/Producer.php +++ b/src/Producer.php @@ -21,10 +21,22 @@ class Producer /** * the name of the exchange where the producer sends its messages to + * The name of the queue bound to the exchange where the producer sends its messages. + * + * @var string + */ + private $queueName; * @var string */ private $exchangeName; + /** + * The routing keys used by the exchange to route messages to bounded queues. + * + * @var string + */ + private $routingKeys; + /** * type of exchange: * fanout: routes messages to all of the queues that are bound to it and the routing key is ignored @@ -66,18 +78,22 @@ class Producer /** * * @param Vinelab\Bowler\Connection $connection - * @param string $exchangeName - * @param string $exchangeType - * @param boolean $passive - * @param boolean $durable - * @param boolean $autoDelete - * @param integer $deliveryMode + * @param string $queueName + * @param string $exchangeName + * @param string $exchangeType + * @param aray $routingKeys + * @param boolean $passive + * @param boolean $durable + * @param boolean $autoDelete + * @param integer $deliveryMode */ - public function __construct(Connection $connection, $exchangeName, $exchangeType, $passive = false, $durable = true, $autoDelete = false, $deliveryMode = 2) + public function __construct(Connection $connection, $queueName, $exchangeName, $exchangeType, $routingKeys, $passive = false, $durable = true, $autoDelete = false, $deliveryMode = 2) { $this->connection = $connection; + $this->queueName = $queueName; $this->exchangeName = $exchangeName; $this->exchangeType = $exchangeType; + $this->routingKeys = $routingKeys; $this->passive = $passive; $this->durable = $durable; $this->autoDelete = $autoDelete; @@ -94,11 +110,14 @@ public function publish($data) { $this->connection->getChannel()->exchange_declare($this->exchangeName, $this->exchangeType, $this->passive, $this->durable, $this->autoDelete); - list($queue_name) = $this->connection->getChannel()->queue_declare($this->exchangeName, $this->passive, $this->durable, false, $this->autoDelete); - $this->connection->getChannel()->queue_bind($queue_name, $this->exchangeName); + $channel->exchange_declare($this->exchangeName, $this->exchangeType, $this->passive, $this->durable, $this->autoDelete); + + $channel->queue_declare($this->queueName, $this->passive, $this->durable, false, $this->autoDelete); $msg = new AMQPMessage($data, ['delivery_mode' => $this->deliveryMode]); - $this->connection->getChannel()->basic_publish($msg, '', $this->exchangeName); echo " [x] Data Package Sent to CRUD Exchange!'\n"; + foreach ($this->routingKeys as $routingKey) { + $channel->queue_bind($this->queueName, $this->exchangeName, $routingKey); + $channel->basic_publish($msg, $this->exchangeName, $routingKeys); } } From 3c2d9ee7ec79c919e3d5976aab295944564eac6e Mon Sep 17 00:00:00 2001 From: Kinane Date: Fri, 6 Jan 2017 17:39:17 +0200 Subject: [PATCH 02/41] add `nack` consumer support --- src/Consumer.php | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/Consumer.php b/src/Consumer.php index e391937..224bafd 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -159,6 +159,16 @@ public function listenToQueue($handlerClass, ExceptionHandler $exceptionHandler) public function ackMessage($msg) { $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); + /** + * Negatively acknowledge a messasge. + * + * @param PhpAmqpLib\Message\AMQPMessage $msg + * @param bool $multiple + * @param bool $requeue + */ + public function nackMessage($msg, $multiple = false, $requeue = false) + { + $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], $multiple, $requeue); } /** From 48cdcf30b690221287b63a33ba2859c65027d8df Mon Sep 17 00:00:00 2001 From: Kinane Date: Fri, 6 Jan 2017 17:39:33 +0200 Subject: [PATCH 03/41] minor improvements --- src/Console/Commands/ConsumeCommand.php | 2 +- src/Consumer.php | 34 +++++++++++++++---------- src/Producer.php | 33 ++++++++++++++++-------- 3 files changed, 43 insertions(+), 26 deletions(-) diff --git a/src/Console/Commands/ConsumeCommand.php b/src/Console/Commands/ConsumeCommand.php index 5089265..2329d8b 100644 --- a/src/Console/Commands/ConsumeCommand.php +++ b/src/Console/Commands/ConsumeCommand.php @@ -45,7 +45,7 @@ public function __construct(RegisterQueues $registerQueues) * * @var string */ - protected $description = 'register all consumers to their queues'; + protected $description = 'Register a consumer to its queue'; /** * Run the command. diff --git a/src/Consumer.php b/src/Consumer.php index 224bafd..7ec99d9 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -14,19 +14,21 @@ class Consumer { /** - * the main class of the package where we define the channel and the connection. + * The main class of the package where we define the channel and the connection. * * @var Vinelab\Bowler\Connection */ private $connection; /** - * the name of the exchange where the producer sends its messages to. * The name of the queue bound to the exchange where the producer sends its messages. * * @var string */ private $queueName; + + /** + * The name of the exchange where the producer sends its messages to. * * @var string */ @@ -81,8 +83,6 @@ class Consumer */ private $deliveryMode; - private $msgProcessor; - /** * @param Vinelab\Bowler\Connection $connection * @param string $queueName @@ -110,12 +110,13 @@ public function __construct(Connection $connection, $queueName, $exchangeName, $ /** * consume a message from a specified exchange. * - * @param string $data + * @param string $handlerClass + * @param Vinelab\Bowler\Contracts\BowlerExceptionHandler $exceptionHandler */ public function listenToQueue($handlerClass, ExceptionHandler $exceptionHandler) { + $channel = $this->connection->getChannel(); - echo ' [*] Listening to Queue: ' . $this->exchangeName . ' To exit press CTRL+C', "\n"; $channel->exchange_declare($this->exchangeName, $this->exchangeType, $this->passive, $this->durable, $this->autoDelete); $channel->queue_declare($this->queueName, $this->passive, $this->durable, false, $this->autoDelete); @@ -123,6 +124,8 @@ public function listenToQueue($handlerClass, ExceptionHandler $exceptionHandler) $channel->queue_bind($this->queueName, $this->exchangeName, $bindingKey); } + echo " [*] Listening to Queue: ", $this->queueName, " To exit press CTRL+C", "\n"; + $handler = new $handlerClass; if(method_exists($handler, 'setConsumer')) { @@ -143,22 +146,24 @@ public function listenToQueue($handlerClass, ExceptionHandler $exceptionHandler) } }; - $this->connection->getChannel()->basic_qos(null, 1, null); + $channel->basic_qos(null, 1, null); $channel->basic_consume($this->queueName, '', false, false, false, false, $callback); - while (count($this->connection->getChannel()->callbacks)) { - $this->connection->getChannel()->wait(); + while (count($channel->callbacks)) { + $channel->wait(); } } /** - * acknowledge a messasge. + * Acknowledge a messasge. * * @param PhpAmqpLib\Message\AMQPMessage $msg */ public function ackMessage($msg) { - $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); + $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag'], 0); + } + /** * Negatively acknowledge a messasge. * @@ -172,12 +177,13 @@ public function nackMessage($msg, $multiple = false, $requeue = false) } /** - * reject a messasge. + * Reject a messasge. * * @param PhpAmqpLib\Message\AMQPMessage $msg + * @param bool $requeue */ - public function rejectMessage($msg) + public function rejectMessage($msg, $requeue = false) { - $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], false); + $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], $requeue); } } diff --git a/src/Producer.php b/src/Producer.php index f1c7057..60d7f90 100644 --- a/src/Producer.php +++ b/src/Producer.php @@ -13,19 +13,22 @@ class Producer { /** - * the main class of the package where we define the channel and the connection + * The main class of the package where we define the channel and the connection * * @var Vinelab\Bowler\Connection */ private $connection; /** - * the name of the exchange where the producer sends its messages to * The name of the queue bound to the exchange where the producer sends its messages. * * @var string */ private $queueName; + + /** + * The name of the exchange where the producer sends its messages to + * * @var string */ private $exchangeName; @@ -53,27 +56,31 @@ class Producer /** * If set, the server will reply with Declare-Ok if the exchange already exists with the same name, and raise an error if not. The client can use this to check whether an exchange exists without modifying the server state. + * * @var boolean */ private $passive; /** * If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient exchanges) are purged if/when a server restarts. + * * @var boolean */ private $durable; /** * If set, the exchange is deleted when all queues have finished using it. + * * @var boolean */ private $autoDelete; - /** - * Non-persistent (1) or persistent (2). - * @var [type] - */ - private $deliveryMode; + /** + * Non-persistent (1) or persistent (2). + * + * @var [type] + */ + private $deliveryMode; /** * @@ -101,23 +108,27 @@ public function __construct(Connection $connection, $queueName, $exchangeName, $ } /** - * publish a message to a specified exchange + * Publish a message to a specified exchange + * * @param string $data - * @param string $route: the route where the message should be published to + * * @return void */ public function publish($data) { - $this->connection->getChannel()->exchange_declare($this->exchangeName, $this->exchangeType, $this->passive, $this->durable, $this->autoDelete); + $channel = $this->connection->getChannel(); $channel->exchange_declare($this->exchangeName, $this->exchangeType, $this->passive, $this->durable, $this->autoDelete); $channel->queue_declare($this->queueName, $this->passive, $this->durable, false, $this->autoDelete); $msg = new AMQPMessage($data, ['delivery_mode' => $this->deliveryMode]); - echo " [x] Data Package Sent to CRUD Exchange!'\n"; + foreach ($this->routingKeys as $routingKey) { $channel->queue_bind($this->queueName, $this->exchangeName, $routingKey); $channel->basic_publish($msg, $this->exchangeName, $routingKeys); + } + + echo " [x] Data Package Sent to ", $this->exchangeName, " Exchange!", "\n"; } } From 93c80f3e129c45530eef97e99c3ef7a5d0f4ced2 Mon Sep 17 00:00:00 2001 From: Kinane Date: Tue, 10 Jan 2017 14:06:04 +0200 Subject: [PATCH 04/41] fix typo --- src/Producer.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Producer.php b/src/Producer.php index 60d7f90..6837547 100644 --- a/src/Producer.php +++ b/src/Producer.php @@ -126,7 +126,7 @@ public function publish($data) foreach ($this->routingKeys as $routingKey) { $channel->queue_bind($this->queueName, $this->exchangeName, $routingKey); - $channel->basic_publish($msg, $this->exchangeName, $routingKeys); + $channel->basic_publish($msg, $this->exchangeName, $routingKey); } echo " [x] Data Package Sent to ", $this->exchangeName, " Exchange!", "\n"; From 9388a6ced41606aa234b1e6f9a121e332e512f0e Mon Sep 17 00:00:00 2001 From: Kinane Date: Tue, 10 Jan 2017 14:08:55 +0200 Subject: [PATCH 05/41] add typehinting and set default value to routingKeys --- src/Producer.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Producer.php b/src/Producer.php index 6837547..6b85f36 100644 --- a/src/Producer.php +++ b/src/Producer.php @@ -94,7 +94,7 @@ class Producer * @param boolean $autoDelete * @param integer $deliveryMode */ - public function __construct(Connection $connection, $queueName, $exchangeName, $exchangeType, $routingKeys, $passive = false, $durable = true, $autoDelete = false, $deliveryMode = 2) + public function __construct(Connection $connection, $queueName, $exchangeName, $exchangeType, array $routingKeys = [null], bool $passive = false, bool $durable = true, bool $autoDelete = false, int $deliveryMode = 2) { $this->connection = $connection; $this->queueName = $queueName; From 6c75803937976fe9276ba7734eebe767871f419b Mon Sep 17 00:00:00 2001 From: Kinane Date: Tue, 10 Jan 2017 16:43:43 +0200 Subject: [PATCH 06/41] improve command description --- src/Console/Commands/ConsumeCommand.php | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/Console/Commands/ConsumeCommand.php b/src/Console/Commands/ConsumeCommand.php index 2329d8b..349afcf 100644 --- a/src/Console/Commands/ConsumeCommand.php +++ b/src/Console/Commands/ConsumeCommand.php @@ -33,12 +33,12 @@ public function __construct(RegisterQueues $registerQueues) protected $signature = 'bowler:consume {queueName : The queue NAME} {--exchangeName= : The exchange NAME. If not specified the queue name will be used} - {--exchangeType=fanout : The exchange TYPE. Supported exchanges: fanout, direct, topic} + {--exchangeType=fanout : The exchange TYPE. Supported exchanges: fanout, direct, topic. Defaults to fanout} {--bindingKeys=* : The consumer\'s BINDINGKEYS (array)} - {--passive=0 : } - {--durable=1 : Mark exchange and queue as DURABLE} - {--autoDelete=0 : Set exchange and queue to AUTODELETE when all queues and consumers, respectively have finished using it} - {--deliveryMode=2 : The message DELIVERYMODE. Non-persistent 1 or persistent 2}'; + {--passive=0 : If set, the server will reply with Declare-Ok if the exchange and queue already exists with the same name, and raise an error if not. Defaults to 0} + {--durable=1 : Mark exchange and queue as DURABLE. Defaults to 1} + {--autoDelete=0 : Set exchange and queue to AUTODELETE when all queues and consumers, respectively have finished using it. Defaults to 0} + {--deliveryMode=2 : The message DELIVERYMODE. Non-persistent 1 or persistent 2. Defaults to 2}'; /** * The console command description. From de86ad206dbbfc07b61849dfc36aba8fc52aadfe Mon Sep 17 00:00:00 2001 From: Kinane Date: Wed, 11 Jan 2017 12:45:58 +0200 Subject: [PATCH 07/41] add consume command options shortcuts --- src/Console/Commands/ConsumeCommand.php | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/Console/Commands/ConsumeCommand.php b/src/Console/Commands/ConsumeCommand.php index 349afcf..a044501 100644 --- a/src/Console/Commands/ConsumeCommand.php +++ b/src/Console/Commands/ConsumeCommand.php @@ -32,13 +32,13 @@ public function __construct(RegisterQueues $registerQueues) */ protected $signature = 'bowler:consume {queueName : The queue NAME} - {--exchangeName= : The exchange NAME. If not specified the queue name will be used} - {--exchangeType=fanout : The exchange TYPE. Supported exchanges: fanout, direct, topic. Defaults to fanout} - {--bindingKeys=* : The consumer\'s BINDINGKEYS (array)} - {--passive=0 : If set, the server will reply with Declare-Ok if the exchange and queue already exists with the same name, and raise an error if not. Defaults to 0} - {--durable=1 : Mark exchange and queue as DURABLE. Defaults to 1} - {--autoDelete=0 : Set exchange and queue to AUTODELETE when all queues and consumers, respectively have finished using it. Defaults to 0} - {--deliveryMode=2 : The message DELIVERYMODE. Non-persistent 1 or persistent 2. Defaults to 2}'; + {--xN|exchangeName= : The exchange NAME. Defaults to queueName} + {--xT|exchangeType=fanout : The exchange TYPE. Supported exchanges: fanout, direct, topic. Defaults to fanout} + {--bK|bindingKeys=* : The consumer\'s BINDING KEYS (array)} + {--P|passive=0 : If set, the server will reply with Declare-Ok if the exchange and queue already exists with the same name, and raise an error if not. Defaults to 0} + {--D|durable=1 : Mark exchange and queue as DURABLE. Defaults to 1} + {--aD|autoDelete=0 : Set exchange and queue to AUTO DELETE when all queues and consumers, respectively have finished using it. Defaults to 0} + {--dM|deliveryMode=2 : The message DELIVERY MODE. Non-persistent 1 or persistent 2. Defaults to 2} /** * The console command description. From 650489050de802ca7c5644b08e6d9be174aa5c06 Mon Sep 17 00:00:00 2001 From: Kinane Date: Wed, 11 Jan 2017 13:47:15 +0200 Subject: [PATCH 08/41] add dead lettering command options --- src/Console/Commands/ConsumeCommand.php | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/src/Console/Commands/ConsumeCommand.php b/src/Console/Commands/ConsumeCommand.php index a044501..2155d8c 100644 --- a/src/Console/Commands/ConsumeCommand.php +++ b/src/Console/Commands/ConsumeCommand.php @@ -39,6 +39,11 @@ public function __construct(RegisterQueues $registerQueues) {--D|durable=1 : Mark exchange and queue as DURABLE. Defaults to 1} {--aD|autoDelete=0 : Set exchange and queue to AUTO DELETE when all queues and consumers, respectively have finished using it. Defaults to 0} {--dM|deliveryMode=2 : The message DELIVERY MODE. Non-persistent 1 or persistent 2. Defaults to 2} + {--dlqN|deadLetterQueueName= : The dead letter queue NAME. Defaults to deadLetterExchangeName} + {--dlxN|deadLetterExchangeName= : The dead letter exchange NAME. Defaults to deadLetterQueueName} + {--dlxT|deadLetterExchangeType=fanout : The dead letter exchange TYPE. Supported exchanges: fanout, direct, topic. Defaults to fanout} + {--dlrK|deadLetterRoutingKey= : The dead letter ROUTING KEY} + {--dlmT|messageTtl= : The dead letter MESSAGE TTL}'; /** * The console command description. @@ -56,17 +61,21 @@ public function handle() { $queueName = $this->argument('queueName'); - // If the exchange name has not been set, use the queue name - $exchangeName = ($name = $this->option('exchangeName')) ? $name : $queueName; + $exchangeName = ($name = $this->option('exchangeName')) ? $name : $queueName; // If the exchange name has not been set, use the queue name $exchangeType = $this->option('exchangeType'); - - // If no bidingKeys are specified push a value of null so that we can still perform the loop - $bindingKeys = ($keys = $this->option('bindingKeys')) ? (array) $keys : [null]; + $bindingKeys = ($keys = $this->option('bindingKeys')) ? (array) $keys : [null]; // If no bidingKeys are specified push a value of null so that we can still perform the loop $passive = (bool) $this->option('passive'); $durable = (bool) $this->option('durable'); $autoDelete = (bool) $this->option('autoDelete'); $deliveryMode = (int) $this->option('deliveryMode'); + // Dead Lettering + $deadLetterQueueName = ($qName = $this->option('deadLetterQueueName')) ? $qName : (($xName = $this->option('deadLetterExchangeName')) ? $xName : null) + $deadLetterExchangeName = ($xName = $this->option('deadLetterExchangeName')) ? $xName : (($qName = $this->option('deadLetterQueueName')) ? $qName : null) + $deadLetterExchangeType = $this->option('deadLetterExchangeType'); + $deadLetterRoutingKey = $this->option('deadLetterRoutingKey'); + $messageTtl = (int) $this->option('messageTtl'); + require(app_path().'/Messaging/queues.php'); $handlers = Registrator::getHandlers(); @@ -78,5 +87,4 @@ public function handle() } } - } From 4d19383c8c2a0f2248aec388514987428a946c89 Mon Sep 17 00:00:00 2001 From: Kinane Date: Wed, 11 Jan 2017 13:48:07 +0200 Subject: [PATCH 09/41] if dead lettering options are set, configure --- src/Console/Commands/ConsumeCommand.php | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/Console/Commands/ConsumeCommand.php b/src/Console/Commands/ConsumeCommand.php index 2155d8c..f08170c 100644 --- a/src/Console/Commands/ConsumeCommand.php +++ b/src/Console/Commands/ConsumeCommand.php @@ -82,6 +82,9 @@ public function handle() foreach ($handlers as $handler) { if ($handler->queueName == $queueName) { $bowlerConsumer = new Consumer(app(Connection::class), $handler->queueName, $exchangeName, $exchangeType, $bindingKeys, $passive, $durable, $autoDelete, $deliveryMode); + if($deadLetterQueueName) { + $bowlerConsumer->configureDeadLettering($deadLetterQueueName, $deadLetterExchangeName, $deadLetterExchangeType, $deadLetterRoutingKey, $messageTtl); + } $bowlerConsumer->listenToQueue($handler->className, app(ExceptionHandler::class)); } } From 6284d37184e46c9a81294632f09960979dc08f17 Mon Sep 17 00:00:00 2001 From: Kinane Date: Wed, 11 Jan 2017 13:48:43 +0200 Subject: [PATCH 10/41] implement dead lettering --- src/Consumer.php | 13 +++++++- src/Producer.php | 11 ++++++- src/Traits/DeadLetteringTrait.php | 52 +++++++++++++++++++++++++++++++ 3 files changed, 74 insertions(+), 2 deletions(-) create mode 100644 src/Traits/DeadLetteringTrait.php diff --git a/src/Consumer.php b/src/Consumer.php index 7ec99d9..5a5bdb2 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -3,6 +3,7 @@ namespace Vinelab\Bowler; use PhpAmqpLib\Message\AMQPMessage; +use Vinelab\Bowler\Traits\DeadLetteringTrait; use Vinelab\Bowler\Contracts\BowlerExceptionHandler as ExceptionHandler; /** @@ -13,6 +14,8 @@ */ class Consumer { + use DeadLetteringTrait; + /** * The main class of the package where we define the channel and the connection. * @@ -83,6 +86,13 @@ class Consumer */ private $deliveryMode; + /** + * The arguments that should be added to the `queue_declare` statement for dead lettering + * + * @var array + */ + private $arguments = []; + /** * @param Vinelab\Bowler\Connection $connection * @param string $queueName @@ -118,7 +128,8 @@ public function listenToQueue($handlerClass, ExceptionHandler $exceptionHandler) $channel = $this->connection->getChannel(); $channel->exchange_declare($this->exchangeName, $this->exchangeType, $this->passive, $this->durable, $this->autoDelete); - $channel->queue_declare($this->queueName, $this->passive, $this->durable, false, $this->autoDelete); + $channel->queue_declare($this->queueName, $this->passive, $this->durable, false, $this->autoDelete, false, $this->arguments); + foreach ($this->bindingKeys as $bindingKey) { $channel->queue_bind($this->queueName, $this->exchangeName, $bindingKey); diff --git a/src/Producer.php b/src/Producer.php index 6b85f36..33d1fec 100644 --- a/src/Producer.php +++ b/src/Producer.php @@ -3,6 +3,7 @@ namespace Vinelab\Bowler; use PhpAmqpLib\Message\AMQPMessage; +use Vinelab\Bowler\Traits\DeadLetteringTrait; /** * Bowler Producer @@ -11,6 +12,7 @@ */ class Producer { + use DeadLetteringTrait; /** * The main class of the package where we define the channel and the connection @@ -82,6 +84,13 @@ class Producer */ private $deliveryMode; + /** + * The arguments that should be added to the `queue_declare` statement for dead lettering + * + * @var array + */ + private $arguments = []; + /** * * @param Vinelab\Bowler\Connection $connection @@ -120,7 +129,7 @@ public function publish($data) $channel->exchange_declare($this->exchangeName, $this->exchangeType, $this->passive, $this->durable, $this->autoDelete); - $channel->queue_declare($this->queueName, $this->passive, $this->durable, false, $this->autoDelete); + $channel->queue_declare($this->queueName, $this->passive, $this->durable, false, $this->autoDelete, false, $this->arguments); $msg = new AMQPMessage($data, ['delivery_mode' => $this->deliveryMode]); diff --git a/src/Traits/DeadLetteringTrait.php b/src/Traits/DeadLetteringTrait.php new file mode 100644 index 0000000..8bb9c87 --- /dev/null +++ b/src/Traits/DeadLetteringTrait.php @@ -0,0 +1,52 @@ +connection->getChannel(); + + $channel->exchange_declare($deadLetterExchangeName, $deadLetterExchangeType, $this->passive, $this->durable, false, $this->autoDelete); + + $channel->queue_declare($deadLetterQueueName, $this->passive, $this->durable, false, $this->autoDelete); + + $channel->queue_bind($deadLetterQueueName, $deadLetterExchangeName); + + $this->compileArguments($deadLetterExchangeName, $deadLetterRoutingKey, $messageTtl); + } + + /** + * Compiles the arguments array to be passed to the messaging queue. + * + * @param string $deadLetterExchangeName + * @param string $deadLetterRoutingKey + * @param int $messageTtl + * + * @return void + */ + private function compileArguments($deadLetterExchangeName, $deadLetterRoutingKey, $messageTtl) + { + $this->arguments['x-dead-letter-exchange'] = ['S', $deadLetterExchangeName]; + + if($deadLetterRoutingKey) { + $this->Arguments['x-dead-letter-routing-key'] = ['S', $deadLetterRoutingKey]; + } + + if($messageTtl) { + $this->arguments['x-message-ttl'] = ['I', $messageTtl]; + } + } +} From 2b24bb3c28659ccd2dd991889e02af478d373244 Mon Sep 17 00:00:00 2001 From: Kinane Date: Wed, 11 Jan 2017 14:00:02 +0200 Subject: [PATCH 11/41] add missing `;` --- src/Console/Commands/ConsumeCommand.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Console/Commands/ConsumeCommand.php b/src/Console/Commands/ConsumeCommand.php index f08170c..97b9999 100644 --- a/src/Console/Commands/ConsumeCommand.php +++ b/src/Console/Commands/ConsumeCommand.php @@ -70,8 +70,8 @@ public function handle() $deliveryMode = (int) $this->option('deliveryMode'); // Dead Lettering - $deadLetterQueueName = ($qName = $this->option('deadLetterQueueName')) ? $qName : (($xName = $this->option('deadLetterExchangeName')) ? $xName : null) - $deadLetterExchangeName = ($xName = $this->option('deadLetterExchangeName')) ? $xName : (($qName = $this->option('deadLetterQueueName')) ? $qName : null) + $deadLetterQueueName = ($qName = $this->option('deadLetterQueueName')) ? $qName : (($xName = $this->option('deadLetterExchangeName')) ? $xName : null); + $deadLetterExchangeName = ($xName = $this->option('deadLetterExchangeName')) ? $xName : (($qName = $this->option('deadLetterQueueName')) ? $qName : null); $deadLetterExchangeType = $this->option('deadLetterExchangeType'); $deadLetterRoutingKey = $this->option('deadLetterRoutingKey'); $messageTtl = (int) $this->option('messageTtl'); From 63456235981c855b71225bb3d50fbe97eb23cfbf Mon Sep 17 00:00:00 2001 From: Kinane Date: Wed, 11 Jan 2017 15:19:06 +0200 Subject: [PATCH 12/41] remove command options shortcuts --- src/Console/Commands/ConsumeCommand.php | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/src/Console/Commands/ConsumeCommand.php b/src/Console/Commands/ConsumeCommand.php index a044501..7c9ea83 100644 --- a/src/Console/Commands/ConsumeCommand.php +++ b/src/Console/Commands/ConsumeCommand.php @@ -32,13 +32,13 @@ public function __construct(RegisterQueues $registerQueues) */ protected $signature = 'bowler:consume {queueName : The queue NAME} - {--xN|exchangeName= : The exchange NAME. Defaults to queueName} - {--xT|exchangeType=fanout : The exchange TYPE. Supported exchanges: fanout, direct, topic. Defaults to fanout} - {--bK|bindingKeys=* : The consumer\'s BINDING KEYS (array)} - {--P|passive=0 : If set, the server will reply with Declare-Ok if the exchange and queue already exists with the same name, and raise an error if not. Defaults to 0} - {--D|durable=1 : Mark exchange and queue as DURABLE. Defaults to 1} - {--aD|autoDelete=0 : Set exchange and queue to AUTO DELETE when all queues and consumers, respectively have finished using it. Defaults to 0} - {--dM|deliveryMode=2 : The message DELIVERY MODE. Non-persistent 1 or persistent 2. Defaults to 2} + {--exchangeName= : The exchange NAME. Defaults to queueName} + {--exchangeType=fanout : The exchange TYPE. Supported exchanges: fanout, direct, topic. Defaults to fanout} + {--bindingKeys=* : The consumer\'s BINDING KEYS (array)} + {--passive=0 : If set, the server will reply with Declare-Ok if the exchange and queue already exists with the same name, and raise an error if not. Defaults to 0} + {--durable=1 : Mark exchange and queue as DURABLE. Defaults to 1} + {--autoDelete=0 : Set exchange and queue to AUTO DELETE when all queues and consumers, respectively have finished using it. Defaults to 0} + {--deliveryMode=2 : The message DELIVERY MODE. Non-persistent 1 or persistent 2. Defaults to 2}'; /** * The console command description. @@ -56,12 +56,9 @@ public function handle() { $queueName = $this->argument('queueName'); - // If the exchange name has not been set, use the queue name - $exchangeName = ($name = $this->option('exchangeName')) ? $name : $queueName; + $exchangeName = ($name = $this->option('exchangeName')) ? $name : $queueName; // If the exchange name has not been set, use the queue name $exchangeType = $this->option('exchangeType'); - - // If no bidingKeys are specified push a value of null so that we can still perform the loop - $bindingKeys = ($keys = $this->option('bindingKeys')) ? (array) $keys : [null]; + $bindingKeys = ($keys = $this->option('bindingKeys')) ? (array) $keys : [null]; // If no bidingKeys are specified push a value of null so that we can still perform the loop $passive = (bool) $this->option('passive'); $durable = (bool) $this->option('durable'); $autoDelete = (bool) $this->option('autoDelete'); @@ -78,5 +75,4 @@ public function handle() } } - } From 531393de2ef19f555756ae6b7ad10fd842958dd4 Mon Sep 17 00:00:00 2001 From: Kinane Date: Wed, 11 Jan 2017 15:19:18 +0200 Subject: [PATCH 13/41] remove producer's construct typehinting --- src/Producer.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Producer.php b/src/Producer.php index 6b85f36..7e3d1a3 100644 --- a/src/Producer.php +++ b/src/Producer.php @@ -94,7 +94,7 @@ class Producer * @param boolean $autoDelete * @param integer $deliveryMode */ - public function __construct(Connection $connection, $queueName, $exchangeName, $exchangeType, array $routingKeys = [null], bool $passive = false, bool $durable = true, bool $autoDelete = false, int $deliveryMode = 2) + public function __construct(Connection $connection, $queueName, $exchangeName, $exchangeType, array $routingKeys = [null], $passive = false, $durable = true, $autoDelete = false, $deliveryMode = 2) { $this->connection = $connection; $this->queueName = $queueName; From 7057ec72704483231e46153ac864dce931c3570b Mon Sep 17 00:00:00 2001 From: Kinane Date: Wed, 11 Jan 2017 15:28:59 +0200 Subject: [PATCH 14/41] set command options one letter shortucts --- src/Console/Commands/ConsumeCommand.php | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/Console/Commands/ConsumeCommand.php b/src/Console/Commands/ConsumeCommand.php index 7c9ea83..3c3f5f2 100644 --- a/src/Console/Commands/ConsumeCommand.php +++ b/src/Console/Commands/ConsumeCommand.php @@ -32,13 +32,13 @@ public function __construct(RegisterQueues $registerQueues) */ protected $signature = 'bowler:consume {queueName : The queue NAME} - {--exchangeName= : The exchange NAME. Defaults to queueName} - {--exchangeType=fanout : The exchange TYPE. Supported exchanges: fanout, direct, topic. Defaults to fanout} - {--bindingKeys=* : The consumer\'s BINDING KEYS (array)} - {--passive=0 : If set, the server will reply with Declare-Ok if the exchange and queue already exists with the same name, and raise an error if not. Defaults to 0} - {--durable=1 : Mark exchange and queue as DURABLE. Defaults to 1} - {--autoDelete=0 : Set exchange and queue to AUTO DELETE when all queues and consumers, respectively have finished using it. Defaults to 0} - {--deliveryMode=2 : The message DELIVERY MODE. Non-persistent 1 or persistent 2. Defaults to 2}'; + {--N|exchangeName= : The exchange NAME. Defaults to queueName} + {--T|exchangeType=fanout : The exchange TYPE. Supported exchanges: fanout, direct, topic. Defaults to fanout} + {--K|bindingKeys=* : The consumer\'s BINDING KEYS (array)} + {--p|passive=0 : If set, the server will reply with Declare-Ok if the exchange and queue already exists with the same name, and raise an error if not. Defaults to 0} + {--d|durable=1 : Mark exchange and queue as DURABLE. Defaults to 1} + {--D|autoDelete=0 : Set exchange and queue to AUTO DELETE when all queues and consumers, respectively have finished using it. Defaults to 0} + {--M|deliveryMode=2 : The message DELIVERY MODE. Non-persistent 1 or persistent 2. Defaults to 2}'; /** * The console command description. From 4b25e8f2c75495c67b00b00f36d272bb5574c080 Mon Sep 17 00:00:00 2001 From: Kinane Date: Wed, 11 Jan 2017 15:41:32 +0200 Subject: [PATCH 15/41] implement dead lettering --- src/Consumer.php | 15 +++++++-- src/Producer.php | 13 ++++++-- src/Traits/DeadLetteringTrait.php | 52 +++++++++++++++++++++++++++++++ 3 files changed, 76 insertions(+), 4 deletions(-) create mode 100644 src/Traits/DeadLetteringTrait.php diff --git a/src/Consumer.php b/src/Consumer.php index 7ec99d9..e672503 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -3,6 +3,7 @@ namespace Vinelab\Bowler; use PhpAmqpLib\Message\AMQPMessage; +use Vinelab\Bowler\Traits\DeadLetteringTrait; use Vinelab\Bowler\Contracts\BowlerExceptionHandler as ExceptionHandler; /** @@ -13,6 +14,8 @@ */ class Consumer { + use DeadLetteringTrait; + /** * The main class of the package where we define the channel and the connection. * @@ -83,6 +86,13 @@ class Consumer */ private $deliveryMode; + /** + * The arguments that should be added to the `queue_declare` statement for dead lettering + * + * @var array + */ + private $arguments = []; + /** * @param Vinelab\Bowler\Connection $connection * @param string $queueName @@ -94,7 +104,7 @@ class Consumer * @param bool $autoDelete * @param int $deliveryMode */ - public function __construct(Connection $connection, $queueName, $exchangeName, $exchangeType, $bindingKeys, $passive = false, $durable = true, $autoDelete = false, $deliveryMode = 2) + public function __construct(Connection $connection, $queueName, $exchangeName, $exchangeType = 'fanout', $bindingKeys = [null], $passive = false, $durable = true, $autoDelete = false, $deliveryMode = 2) { $this->connection = $connection; $this->queueName = $queueName; @@ -118,7 +128,8 @@ public function listenToQueue($handlerClass, ExceptionHandler $exceptionHandler) $channel = $this->connection->getChannel(); $channel->exchange_declare($this->exchangeName, $this->exchangeType, $this->passive, $this->durable, $this->autoDelete); - $channel->queue_declare($this->queueName, $this->passive, $this->durable, false, $this->autoDelete); + $channel->queue_declare($this->queueName, $this->passive, $this->durable, false, $this->autoDelete, false, $this->arguments); + foreach ($this->bindingKeys as $bindingKey) { $channel->queue_bind($this->queueName, $this->exchangeName, $bindingKey); diff --git a/src/Producer.php b/src/Producer.php index 7e3d1a3..123fafd 100644 --- a/src/Producer.php +++ b/src/Producer.php @@ -3,6 +3,7 @@ namespace Vinelab\Bowler; use PhpAmqpLib\Message\AMQPMessage; +use Vinelab\Bowler\Traits\DeadLetteringTrait; /** * Bowler Producer @@ -11,6 +12,7 @@ */ class Producer { + use DeadLetteringTrait; /** * The main class of the package where we define the channel and the connection @@ -82,6 +84,13 @@ class Producer */ private $deliveryMode; + /** + * The arguments that should be added to the `queue_declare` statement for dead lettering + * + * @var array + */ + private $arguments = []; + /** * * @param Vinelab\Bowler\Connection $connection @@ -94,7 +103,7 @@ class Producer * @param boolean $autoDelete * @param integer $deliveryMode */ - public function __construct(Connection $connection, $queueName, $exchangeName, $exchangeType, array $routingKeys = [null], $passive = false, $durable = true, $autoDelete = false, $deliveryMode = 2) + public function __construct(Connection $connection, $queueName, $exchangeName, $exchangeType = 'fanout', array $routingKeys = [null], $passive = false, $durable = true, $autoDelete = false, $deliveryMode = 2) { $this->connection = $connection; $this->queueName = $queueName; @@ -120,7 +129,7 @@ public function publish($data) $channel->exchange_declare($this->exchangeName, $this->exchangeType, $this->passive, $this->durable, $this->autoDelete); - $channel->queue_declare($this->queueName, $this->passive, $this->durable, false, $this->autoDelete); + $channel->queue_declare($this->queueName, $this->passive, $this->durable, false, $this->autoDelete, false, $this->arguments); $msg = new AMQPMessage($data, ['delivery_mode' => $this->deliveryMode]); diff --git a/src/Traits/DeadLetteringTrait.php b/src/Traits/DeadLetteringTrait.php new file mode 100644 index 0000000..0bd34cd --- /dev/null +++ b/src/Traits/DeadLetteringTrait.php @@ -0,0 +1,52 @@ +connection->getChannel(); + + $channel->exchange_declare($deadLetterExchangeName, $deadLetterExchangeType, $this->passive, $this->durable, false, $this->autoDelete); + + $channel->queue_declare($deadLetterQueueName, $this->passive, $this->durable, false, $this->autoDelete); + + $channel->queue_bind($deadLetterQueueName, $deadLetterExchangeName); + + $this->compileArguments($deadLetterExchangeName, $deadLetterRoutingKey, $messageTtl); + } + + /** + * Compiles the arguments array to be passed to the messaging queue. + * + * @param string $deadLetterExchangeName + * @param string $deadLetterRoutingKey + * @param int $messageTtl + * + * @return void + */ + private function compileArguments($deadLetterExchangeName, $deadLetterRoutingKey, $messageTtl) + { + $this->arguments['x-dead-letter-exchange'] = ['S', $deadLetterExchangeName]; + + if($deadLetterRoutingKey) { + $this->Arguments['x-dead-letter-routing-key'] = ['S', $deadLetterRoutingKey]; + } + + if($messageTtl) { + $this->arguments['x-message-ttl'] = ['I', $messageTtl]; + } + } +} From 7bacd9f734f75d83ab9f2544f72371afdbeb3e30 Mon Sep 17 00:00:00 2001 From: Kinane Date: Wed, 11 Jan 2017 17:09:47 +0200 Subject: [PATCH 16/41] set default producer exchange type to fanout --- src/Producer.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Producer.php b/src/Producer.php index 123fafd..2b4841c 100644 --- a/src/Producer.php +++ b/src/Producer.php @@ -103,7 +103,7 @@ class Producer * @param boolean $autoDelete * @param integer $deliveryMode */ - public function __construct(Connection $connection, $queueName, $exchangeName, $exchangeType = 'fanout', array $routingKeys = [null], $passive = false, $durable = true, $autoDelete = false, $deliveryMode = 2) + public function __construct(Connection $connection, $queueName, $exchangeType = 'fanout', $exchangeName = null,array $routingKeys = [null], $passive = false, $durable = true, $autoDelete = false, $deliveryMode = 2) { $this->connection = $connection; $this->queueName = $queueName; From 58f48bc38024b619d12ae013449b81e63c52e320 Mon Sep 17 00:00:00 2001 From: Kinane Date: Wed, 11 Jan 2017 17:10:18 +0200 Subject: [PATCH 17/41] assure backward compatibility by setting a default to exchangeName --- src/Producer.php | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/Producer.php b/src/Producer.php index 2b4841c..f180692 100644 --- a/src/Producer.php +++ b/src/Producer.php @@ -107,8 +107,12 @@ public function __construct(Connection $connection, $queueName, $exchangeType = { $this->connection = $connection; $this->queueName = $queueName; - $this->exchangeName = $exchangeName; - $this->exchangeType = $exchangeType; + $this->exchangeType = $exchangeType; + if(!$exchangeName) { + $this->exchangeName = $queueName; + } else { + $this->exchangeName = $exchangeName; + } $this->routingKeys = $routingKeys; $this->passive = $passive; $this->durable = $durable; From 71faf4eb8dd2d2e9570bf3b045488bdf17aacf21 Mon Sep 17 00:00:00 2001 From: Kinane Date: Wed, 11 Jan 2017 17:41:51 +0200 Subject: [PATCH 18/41] improve messageTtl description --- src/Console/Commands/ConsumeCommand.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Console/Commands/ConsumeCommand.php b/src/Console/Commands/ConsumeCommand.php index 9b62961..1936bf3 100644 --- a/src/Console/Commands/ConsumeCommand.php +++ b/src/Console/Commands/ConsumeCommand.php @@ -42,7 +42,7 @@ public function __construct(RegisterQueues $registerQueues) {--deadLetterExchangeName= : The dead letter exchange NAME. Defaults to deadLetterQueueName} {--deadLetterExchangeType=fanout : The dead letter exchange TYPE. Supported exchanges: fanout, direct, topic. Defaults to fanout} {--deadLetterRoutingKey= : The dead letter ROUTING KEY} - {--messageTtl= : The dead letter MESSAGE TTL}'; + {--messageTtl= : If set, specifies how long, in milliseconds, before a message is declared dead letter}'; /** * The console command description. From ad58b63a3df09bd28345dca41d53500aaeeb4370 Mon Sep 17 00:00:00 2001 From: Kinane Date: Wed, 11 Jan 2017 17:42:06 +0200 Subject: [PATCH 19/41] update readme --- README.md | 54 ++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 52 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index a854df8..ff98fe5 100644 --- a/README.md +++ b/README.md @@ -65,7 +65,9 @@ class AuthorHandler { if($e instanceof InvalidInputException) { $this->consumer->rejectMessage($msg); } elseif($e instanceof WhatEverException) { - $this->consumer->ackMessage(); + $this->consumer->ackMessage($msg); + } elseif($e instanceof WhatElseException) { + $this->consumer->nackMessage($msg); } } @@ -76,6 +78,18 @@ class AuthorHandler { } ``` +If you wish to handle a message based on the routing key it was published with, you can use a switch case in the handler's `handle` method, like so: + +```php +public function handle($msg) +{ + switch ($msg->delivery_info['routing_key']) { + case 'key 1': //do something + case 'key 2': //do something else + } +} +``` + - Add all your handlers inside the queues.php file (think about the queues file as the routes file from Laravel), note that the `queues.php` file should be under App\Messaging folder: ```php @@ -96,7 +110,43 @@ The previous command: 2. Create the `App\Messaging\Handlers\AnalyticsDataHandler.php` in `App\Messaging\Handler` directory. - Now in order to listen to any queue, run the following command from your console: -`php artisan bowler:consume`, you wil be asked to specify queue name (the queue name is the first parameter passed to `Registrator::queue`) +`php artisan bowler:consume`, you will be asked to specify queue name (the queue name is the first parameter passed to `Registrator::queue`) + +`bowler:consume` complete arguments list description: + +```php +bowler:consume +queueName : The queue NAME +--N|exchangeName= : The exchange NAME. Defaults to queueName +--T|exchangeType=fanout : The exchange TYPE. Supported exchanges: fanout, direct, topic. Defaults to fanout +--K|bindingKeys=* : The consumer\'s BINDING KEYS (array) +--p|passive=0 : If set, the server will reply with Declare-Ok if the exchange and queue already exists with the same name, and raise an error if not. Defaults to 0 +--d|durable=1 : Mark exchange and queue as DURABLE. Defaults to 1 +--D|autoDelete=0 : Set exchange and queue to AUTO DELETE when all queues and consumers, respectively have finished using it. Defaults to 0 +--M|deliveryMode=2 : The message DELIVERY MODE. Non-persistent 1 or persistent 2. Defaults to 2 +--deadLetterQueueName= : The dead letter queue NAME. Defaults to deadLetterExchangeName +--deadLetterExchangeName= : The dead letter exchange NAME. Defaults to deadLetterQueueName +--deadLetterExchangeType=fanout : The dead letter exchange TYPE. Supported exchanges: fanout, direct, topic. Defaults to fanout +--deadLetterRoutingKey= : The dead letter ROUTING KEY +--messageTtl= : If set, specifies how long, in milliseconds, before a message is declared dead letter +``` + +### Dead Lettering +#### Producer +Once a Producer is instantiated, if you wish to enable dead lettering you should: +```php +$producer = new Produer(...); + +$producer->configureDeadLettering($deadLetterQueueName, $deadLEtterExchangeName, $deadLetterExchangeType, $deadLetterRoutingKey, $messageTtl); + +$producer->publish($body); +``` + +#### Consumer +Enabeling dead lettering on the consumer is done through the command line using the same command that run the consumer with the dedicated optional arguments, at least one of `--deadLetterQueueName` or `--deadLetterExchangeName` should be specified. +```php +php artisan bowler:consume my_app_queue --deadLetterQueueName=my_app_dlx --deadLetterExchangeName=dlx --deadLetterExchangeType=direct --deadLetterRoutingKey=invalid --messageTtl=10000 +``` ### Exception Handling Error Handling in Bowler is split into the application and queue domains. From 438ec0e7f7cdee152b618e91d9d2648a345b9e97 Mon Sep 17 00:00:00 2001 From: Kinane Date: Thu, 12 Jan 2017 13:05:34 +0200 Subject: [PATCH 20/41] change to use [] array instead [null] --- src/Console/Commands/ConsumeCommand.php | 2 +- src/Consumer.php | 11 +++++++---- src/Producer.php | 13 +++++++++---- 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/src/Console/Commands/ConsumeCommand.php b/src/Console/Commands/ConsumeCommand.php index 1936bf3..3fa95b0 100644 --- a/src/Console/Commands/ConsumeCommand.php +++ b/src/Console/Commands/ConsumeCommand.php @@ -62,7 +62,7 @@ public function handle() $exchangeName = ($name = $this->option('exchangeName')) ? $name : $queueName; // If the exchange name has not been set, use the queue name $exchangeType = $this->option('exchangeType'); - $bindingKeys = ($keys = $this->option('bindingKeys')) ? (array) $keys : [null]; // If no bidingKeys are specified push a value of null so that we can still perform the loop + $bindingKeys = (array) $this->option('bindingKeys'); $passive = (bool) $this->option('passive'); $durable = (bool) $this->option('durable'); $autoDelete = (bool) $this->option('autoDelete'); diff --git a/src/Consumer.php b/src/Consumer.php index e672503..ae0760b 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -104,7 +104,7 @@ class Consumer * @param bool $autoDelete * @param int $deliveryMode */ - public function __construct(Connection $connection, $queueName, $exchangeName, $exchangeType = 'fanout', $bindingKeys = [null], $passive = false, $durable = true, $autoDelete = false, $deliveryMode = 2) + public function __construct(Connection $connection, $queueName, $exchangeName, $exchangeType = 'fanout', $bindingKeys = [], $passive = false, $durable = true, $autoDelete = false, $deliveryMode = 2) { $this->connection = $connection; $this->queueName = $queueName; @@ -130,9 +130,12 @@ public function listenToQueue($handlerClass, ExceptionHandler $exceptionHandler) $channel->exchange_declare($this->exchangeName, $this->exchangeType, $this->passive, $this->durable, $this->autoDelete); $channel->queue_declare($this->queueName, $this->passive, $this->durable, false, $this->autoDelete, false, $this->arguments); - - foreach ($this->bindingKeys as $bindingKey) { - $channel->queue_bind($this->queueName, $this->exchangeName, $bindingKey); + if(!empty($this->bindingKeys)) { + foreach ($this->bindingKeys as $bindingKey) { + $channel->queue_bind($this->queueName, $this->exchangeName, $bindingKey); + } + } else { + $channel->queue_bind($this->queueName, $this->exchangeName); } echo " [*] Listening to Queue: ", $this->queueName, " To exit press CTRL+C", "\n"; diff --git a/src/Producer.php b/src/Producer.php index f180692..371cb89 100644 --- a/src/Producer.php +++ b/src/Producer.php @@ -103,7 +103,7 @@ class Producer * @param boolean $autoDelete * @param integer $deliveryMode */ - public function __construct(Connection $connection, $queueName, $exchangeType = 'fanout', $exchangeName = null,array $routingKeys = [null], $passive = false, $durable = true, $autoDelete = false, $deliveryMode = 2) + public function __construct(Connection $connection, $queueName, $exchangeType = 'fanout', $exchangeName = null,array $routingKeys = [], $passive = false, $durable = true, $autoDelete = false, $deliveryMode = 2) { $this->connection = $connection; $this->queueName = $queueName; @@ -137,9 +137,14 @@ public function publish($data) $msg = new AMQPMessage($data, ['delivery_mode' => $this->deliveryMode]); - foreach ($this->routingKeys as $routingKey) { - $channel->queue_bind($this->queueName, $this->exchangeName, $routingKey); - $channel->basic_publish($msg, $this->exchangeName, $routingKey); + if (!empty($this->routingKeys)) { + foreach ($this->routingKeys as $routingKey) { + $channel->queue_bind($this->queueName, $this->exchangeName, $routingKey); + $channel->basic_publish($msg, $this->exchangeName, $routingKey); + } + } else { + $channel->queue_bind($this->queueName, $this->exchangeName); + $channel->basic_publish($msg, $this->exchangeName); } echo " [x] Data Package Sent to ", $this->exchangeName, " Exchange!", "\n"; From 8ce886a31d4d9152c40d3fc83a6a0c4ceb99b420 Mon Sep 17 00:00:00 2001 From: Kinane Date: Thu, 12 Jan 2017 14:19:26 +0200 Subject: [PATCH 21/41] minor improvements --- src/Console/Commands/ConsumeCommand.php | 10 +++++----- src/Traits/DeadLetteringTrait.php | 16 +++++++++------- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/src/Console/Commands/ConsumeCommand.php b/src/Console/Commands/ConsumeCommand.php index 3fa95b0..ced8315 100644 --- a/src/Console/Commands/ConsumeCommand.php +++ b/src/Console/Commands/ConsumeCommand.php @@ -42,7 +42,7 @@ public function __construct(RegisterQueues $registerQueues) {--deadLetterExchangeName= : The dead letter exchange NAME. Defaults to deadLetterQueueName} {--deadLetterExchangeType=fanout : The dead letter exchange TYPE. Supported exchanges: fanout, direct, topic. Defaults to fanout} {--deadLetterRoutingKey= : The dead letter ROUTING KEY} - {--messageTtl= : If set, specifies how long, in milliseconds, before a message is declared dead letter}'; + {--messageTTL= : If set, specifies how long, in milliseconds, before a message is declared dead letter}'; /** * The console command description. @@ -69,11 +69,11 @@ public function handle() $deliveryMode = (int) $this->option('deliveryMode'); // Dead Lettering - $deadLetterQueueName = ($qName = $this->option('deadLetterQueueName')) ? $qName : (($xName = $this->option('deadLetterExchangeName')) ? $xName : null); - $deadLetterExchangeName = ($xName = $this->option('deadLetterExchangeName')) ? $xName : (($qName = $this->option('deadLetterQueueName')) ? $qName : null); + $deadLetterQueueName = ($dlQueueName = $this->option('deadLetterQueueName')) ? $dlQueueName : (($dlExchangeName = $this->option('deadLetterExchangeName')) ? $dlExchangeName : null); + $deadLetterExchangeName = ($dlExchangeName = $this->option('deadLetterExchangeName')) ? $dlExchangeName : (($dlQueueName = $this->option('deadLetterQueueName')) ? $dlQueueName : null); $deadLetterExchangeType = $this->option('deadLetterExchangeType'); $deadLetterRoutingKey = $this->option('deadLetterRoutingKey'); - $messageTtl = (int) $this->option('messageTtl'); + $messageTTL = (int) $this->option('messageTTL'); require(app_path().'/Messaging/queues.php'); $handlers = Registrator::getHandlers(); @@ -82,7 +82,7 @@ public function handle() if ($handler->queueName == $queueName) { $bowlerConsumer = new Consumer(app(Connection::class), $handler->queueName, $exchangeName, $exchangeType, $bindingKeys, $passive, $durable, $autoDelete, $deliveryMode); if($deadLetterQueueName) { - $bowlerConsumer->configureDeadLettering($deadLetterQueueName, $deadLetterExchangeName, $deadLetterExchangeType, $deadLetterRoutingKey, $messageTtl); + $bowlerConsumer->configureDeadLettering($deadLetterQueueName, $deadLetterExchangeName, $deadLetterExchangeType, $deadLetterRoutingKey, $messageTTL); } $bowlerConsumer->listenToQueue($handler->className, app(ExceptionHandler::class)); } diff --git a/src/Traits/DeadLetteringTrait.php b/src/Traits/DeadLetteringTrait.php index 0bd34cd..40ea4a1 100644 --- a/src/Traits/DeadLetteringTrait.php +++ b/src/Traits/DeadLetteringTrait.php @@ -11,11 +11,11 @@ trait DeadLetteringTrait * @param string $deadLetterExchangeName * @param string $deadLetterExchangeType * @param string $deadLetterRoutingKey - * @param int $messageTtl + * @param int $messageTTL * * @return void */ - public function configureDeadLettering($deadLetterQueueName, $deadLetterExchangeName, $deadLetterExchangeType = 'fanout', $deadLetterRoutingKey = null, $messageTtl = null) + public function configureDeadLettering($deadLetterQueueName, $deadLetterExchangeName, $deadLetterExchangeType = 'fanout', $deadLetterRoutingKey = null, $messageTTL = null) { $channel = $this->connection->getChannel(); @@ -25,7 +25,7 @@ public function configureDeadLettering($deadLetterQueueName, $deadLetterExchange $channel->queue_bind($deadLetterQueueName, $deadLetterExchangeName); - $this->compileArguments($deadLetterExchangeName, $deadLetterRoutingKey, $messageTtl); + $this->compileArguments($deadLetterExchangeName, $deadLetterRoutingKey, $messageTTL); } /** @@ -33,20 +33,22 @@ public function configureDeadLettering($deadLetterQueueName, $deadLetterExchange * * @param string $deadLetterExchangeName * @param string $deadLetterRoutingKey - * @param int $messageTtl + * @param int $messageTTL * * @return void */ - private function compileArguments($deadLetterExchangeName, $deadLetterRoutingKey, $messageTtl) + private function compileArguments($deadLetterExchangeName, $deadLetterRoutingKey, $messageTTL) { + // 'S', Rabbitmq data type for long string $this->arguments['x-dead-letter-exchange'] = ['S', $deadLetterExchangeName]; if($deadLetterRoutingKey) { $this->Arguments['x-dead-letter-routing-key'] = ['S', $deadLetterRoutingKey]; } - if($messageTtl) { - $this->arguments['x-message-ttl'] = ['I', $messageTtl]; + if($messageTTL) { + // 'I', Rabbitmq data type for long int + $this->arguments['x-message-ttl'] = ['I', $messageTTL]; } } } From 273df2c0f15e8e650c7a85263033a9544105fc37 Mon Sep 17 00:00:00 2001 From: Kinane Date: Thu, 12 Jan 2017 15:57:17 +0200 Subject: [PATCH 22/41] comment on why we declare queues in the producer --- src/Producer.php | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Producer.php b/src/Producer.php index 371cb89..5271e30 100644 --- a/src/Producer.php +++ b/src/Producer.php @@ -133,6 +133,8 @@ public function publish($data) $channel->exchange_declare($this->exchangeName, $this->exchangeType, $this->passive, $this->durable, $this->autoDelete); + // The exchange corresponding queue is declared here because we cannot afford loosing any messages if there were no consumers already running. By doing this we make sure that there always be a queue bound to this exchange. This results in an anti-pattern where the producer knows about the consumer identity. + // If loosing some messasges while the consumer is up and runing is no issue, we could disregard the queue descralation and binding to this exchange, and leave as the consumer's responsability. $channel->queue_declare($this->queueName, $this->passive, $this->durable, false, $this->autoDelete, false, $this->arguments); $msg = new AMQPMessage($data, ['delivery_mode' => $this->deliveryMode]); From 8df3fc408c46d3c7a964a47e4cb52451a4adba08 Mon Sep 17 00:00:00 2001 From: Kinane Date: Thu, 12 Jan 2017 18:47:50 +0200 Subject: [PATCH 23/41] Move ack, nack, reject message in addition to new ones to trait So that both the producer and consumer can take advantage of. --- src/Traits/DeadLetteringTrait.php | 66 +++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/src/Traits/DeadLetteringTrait.php b/src/Traits/DeadLetteringTrait.php index 40ea4a1..93d52c6 100644 --- a/src/Traits/DeadLetteringTrait.php +++ b/src/Traits/DeadLetteringTrait.php @@ -4,6 +4,72 @@ trait DeadLetteringTrait { + /** + * Acknowledge a messasge. + * + * @param PhpAmqpLib\Message\AMQPMessage $msg + */ + public function ackMessage($msg) + { + $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag'], 0); + } + + /** + * Negatively acknowledge a messasge. + * + * @param PhpAmqpLib\Message\AMQPMessage $msg + * @param bool $multiple + * @param bool $requeue + */ + public function nackMessage($msg, $multiple = false, $requeue = false) + { + $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], $multiple, $requeue); + } + + /** + * Reject a messasge. + * + * @param PhpAmqpLib\Message\AMQPMessage $msg + * @param bool $requeue + */ + public function rejectMessage($msg, $requeue = false) + { + $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], $requeue); + } + + /** + * Delete a exchange. + * + * @param string $exchangeName + * @param bool $unused + */ + public function deleteExchange($exchangeName, $unused) + { + $this->connection->getChannel()->exchange_delete($queueName, $unused, $empty); + } + + /** + * Delete a queue. + * + * @param string $queueName + * @param bool $unused + * @param bool $empty + */ + public function deleteQueue($queueName, $unused, $empty) + { + $this->connection->getChannel()->queue_delete($queueName, $unused, $empty); + } + + /** + * Purge a queue. + * + * @param string $queueName + */ + public function purgeQueue($queueName) + { + $this->connection->getChannel()->queue_purge($queueName); + } + /** * Configure Dead Lettering by creating a queue and exchange, and prepares the arguments array to be passed to the messaging queue. * From 77ef842d490e7ab698a175dd358fdcd4084da5af Mon Sep 17 00:00:00 2001 From: Kinane Date: Thu, 12 Jan 2017 18:49:00 +0200 Subject: [PATCH 24/41] add compileParameters() to trait So we can get the producer/consumer exchange/queue declaration and other params --- src/Traits/DeadLetteringTrait.php | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/src/Traits/DeadLetteringTrait.php b/src/Traits/DeadLetteringTrait.php index 93d52c6..8c7fbd8 100644 --- a/src/Traits/DeadLetteringTrait.php +++ b/src/Traits/DeadLetteringTrait.php @@ -117,4 +117,26 @@ private function compileArguments($deadLetterExchangeName, $deadLetterRoutingKey $this->arguments['x-message-ttl'] = ['I', $messageTTL]; } } + + /** + * Compiles the parameters passed to the constructor. + * + * @return array + */ + private function compileParameters() + { + $params = [ + 'queue_name' => $this->queueName, + 'exchange_name' => $this->exchangeName, + 'exchange_type' => $this->exchangeType, + 'passive' => $this->passive, + 'durable' => $this->durable, + 'auto_delete' => $this->autoDelete, + 'delivery_mode' => $this->deliveryMode + ]; + + property_exists($this, 'routingKeys') ? ($params['routing_keys'] = $this->routingKeys) : ($params['binding_keys'] = $this->bindingKeys); + + return $params; + } } From 2a9de240b33c5b243734a3da81494534e0d7026f Mon Sep 17 00:00:00 2001 From: Kinane Date: Thu, 12 Jan 2017 18:49:26 +0200 Subject: [PATCH 25/41] add DeclarationMismatchException --- .../DeclarationMismatchException.php | 81 +++++++++++++++++++ 1 file changed, 81 insertions(+) create mode 100644 src/Exceptions/DeclarationMismatchException.php diff --git a/src/Exceptions/DeclarationMismatchException.php b/src/Exceptions/DeclarationMismatchException.php new file mode 100644 index 0000000..e2ac2a9 --- /dev/null +++ b/src/Exceptions/DeclarationMismatchException.php @@ -0,0 +1,81 @@ + + */ +class DeclarationMismatchException extends Exception +{ + private $message; + private $code; + private $file; + private $line; + private $trace; + private $previous; + private $traceAsString; + private $parameters; + + // Dead lettering arguments + private $arguments; + + public function __construct($message, $code, $file, $line, $trace, $previous, $traceAsString, $parameters, $arguments = []) + { + $this->message = $message; + $this->code= $code; + $this->file = $file; + $this->line = $line; + $this->trace = $trace; + $this->previous = $previous; + $this->traceAsString = $traceAsString; + $this->parameters = $parameters; + $this->arguments = $arguments; + } + + public function getMessage() + { + return $this->message; + } + + public function getCode() + { + return $this->code; + } + + public function getFile() + { + return $this->file; + } + + public function getLine() + { + return $this->line; + } + + public function getTrace() + { + return $this->trace; + } + + public function getPrevious() + { + return $this->previous; + } + + public function getTraceAsString() + { + return $this->traceAsString; + } + + public function getParameters() + { + return $this->parameters; + } + + public function getArguments() + { + return $this->arguments; + } +} From a639236881d61381bb311d84cc83e432c7008339 Mon Sep 17 00:00:00 2001 From: Kinane Date: Thu, 12 Jan 2017 18:50:15 +0200 Subject: [PATCH 26/41] wrap with try catch exchange and queue declarations --- src/Consumer.php | 42 ++++++------------------------- src/Producer.php | 14 ++++++++--- src/Traits/DeadLetteringTrait.php | 20 ++++++++++++--- 3 files changed, 34 insertions(+), 42 deletions(-) diff --git a/src/Consumer.php b/src/Consumer.php index ae0760b..c4bc6dd 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -4,6 +4,7 @@ use PhpAmqpLib\Message\AMQPMessage; use Vinelab\Bowler\Traits\DeadLetteringTrait; +use Vinelab\Bowler\Exceptions\DeclarationMismatchException; use Vinelab\Bowler\Contracts\BowlerExceptionHandler as ExceptionHandler; /** @@ -127,8 +128,12 @@ public function listenToQueue($handlerClass, ExceptionHandler $exceptionHandler) { $channel = $this->connection->getChannel(); - $channel->exchange_declare($this->exchangeName, $this->exchangeType, $this->passive, $this->durable, $this->autoDelete); - $channel->queue_declare($this->queueName, $this->passive, $this->durable, false, $this->autoDelete, false, $this->arguments); + try { + $channel->exchange_declare($this->exchangeName, $this->exchangeType, $this->passive, $this->durable, $this->autoDelete); + $channel->queue_declare($this->queueName, $this->passive, $this->durable, false, $this->autoDelete, false, $this->arguments); + } catch (\Exception $e) { + throw new DeclarationMismatchException($e->getMessage(), $e->getCode(), $e->getFile(), $e->getLine(), $e->getTrace(), $e->getPrevious(), $e->getTraceAsString(), $this->compileParameters(), $this->arguments); + } if(!empty($this->bindingKeys)) { foreach ($this->bindingKeys as $bindingKey) { @@ -167,37 +172,4 @@ public function listenToQueue($handlerClass, ExceptionHandler $exceptionHandler) $channel->wait(); } } - - /** - * Acknowledge a messasge. - * - * @param PhpAmqpLib\Message\AMQPMessage $msg - */ - public function ackMessage($msg) - { - $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag'], 0); - } - - /** - * Negatively acknowledge a messasge. - * - * @param PhpAmqpLib\Message\AMQPMessage $msg - * @param bool $multiple - * @param bool $requeue - */ - public function nackMessage($msg, $multiple = false, $requeue = false) - { - $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], $multiple, $requeue); - } - - /** - * Reject a messasge. - * - * @param PhpAmqpLib\Message\AMQPMessage $msg - * @param bool $requeue - */ - public function rejectMessage($msg, $requeue = false) - { - $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], $requeue); - } } diff --git a/src/Producer.php b/src/Producer.php index 5271e30..caaff58 100644 --- a/src/Producer.php +++ b/src/Producer.php @@ -4,6 +4,7 @@ use PhpAmqpLib\Message\AMQPMessage; use Vinelab\Bowler\Traits\DeadLetteringTrait; +use Vinelab\Bowler\Exceptions\DeclarationMismatchException; /** * Bowler Producer @@ -131,11 +132,16 @@ public function publish($data) { $channel = $this->connection->getChannel(); - $channel->exchange_declare($this->exchangeName, $this->exchangeType, $this->passive, $this->durable, $this->autoDelete); + try { + $channel->exchange_declare($this->exchangeName, $this->exchangeType, $this->passive, $this->durable, $this->autoDelete); - // The exchange corresponding queue is declared here because we cannot afford loosing any messages if there were no consumers already running. By doing this we make sure that there always be a queue bound to this exchange. This results in an anti-pattern where the producer knows about the consumer identity. - // If loosing some messasges while the consumer is up and runing is no issue, we could disregard the queue descralation and binding to this exchange, and leave as the consumer's responsability. - $channel->queue_declare($this->queueName, $this->passive, $this->durable, false, $this->autoDelete, false, $this->arguments); + // The exchange corresponding queue is declared here because we cannot afford loosing any messages if there were no consumers already running. By doing this we make sure that there always be a queue bound to this exchange. This results in an anti-pattern where the producer knows about the consumer identity. + // If loosing some messasges while the consumer is up and runing is no issue, we could disregard the queue descralation and binding to this exchange, and leave as the consumer's responsability. + $channel->queue_declare($this->queueName, $this->passive, $this->durable, false, $this->autoDelete, false, $this->arguments); + } catch (\Exception $e) { + throw new DeclarationMismatchException($e->getMessage(), $e->getCode(), $e->getFile(), $e->getLine(), $e->getTrace(), $e->getPrevious(), $e->getTraceAsString(), $this->compileParameters(), $this->arguments + ); + } $msg = new AMQPMessage($data, ['delivery_mode' => $this->deliveryMode]); diff --git a/src/Traits/DeadLetteringTrait.php b/src/Traits/DeadLetteringTrait.php index 8c7fbd8..f618fde 100644 --- a/src/Traits/DeadLetteringTrait.php +++ b/src/Traits/DeadLetteringTrait.php @@ -2,6 +2,8 @@ namespace Vinelab\Bowler\Traits; +use Vinelab\Bowler\Exceptions\DeclarationMismatchException; + trait DeadLetteringTrait { /** @@ -85,9 +87,21 @@ public function configureDeadLettering($deadLetterQueueName, $deadLetterExchange { $channel = $this->connection->getChannel(); - $channel->exchange_declare($deadLetterExchangeName, $deadLetterExchangeType, $this->passive, $this->durable, false, $this->autoDelete); - - $channel->queue_declare($deadLetterQueueName, $this->passive, $this->durable, false, $this->autoDelete); + try { + $channel->exchange_declare($deadLetterExchangeName, $deadLetterExchangeType, $this->passive, $this->durable, false, $this->autoDelete); + + $channel->queue_declare($deadLetterQueueName, $this->passive, $this->durable, false, $this->autoDelete); + } catch (\Exception $e) { + throw new DeclarationMismatchException($e->getMessage(), $e->getCode(), $e->getFile(), $e->getLine(), $e->getTrace(), $e->getPrevious(), $e->getTraceAsString(), + [ + 'dead_letter_queue_name' => $deadLetterQueueName, + 'dead_letter_exchange_name' => $deadLetterExchangeName, + 'dead_letter_exchange_type' => $deadLetterExchangeType, + 'dead_letter_routing_key' => $deadLetterRoutingKey, + 'message_ttl' => $messageTTL + ], + $this->arguments); + } $channel->queue_bind($deadLetterQueueName, $deadLetterExchangeName); From 2c4d03d4e49616931e385e5b4180643bfd1bbe6c Mon Sep 17 00:00:00 2001 From: Kinane Date: Thu, 12 Jan 2017 19:10:38 +0200 Subject: [PATCH 27/41] set exception properties to protected --- .../DeclarationMismatchException.php | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/Exceptions/DeclarationMismatchException.php b/src/Exceptions/DeclarationMismatchException.php index e2ac2a9..68c3ef5 100644 --- a/src/Exceptions/DeclarationMismatchException.php +++ b/src/Exceptions/DeclarationMismatchException.php @@ -9,17 +9,17 @@ */ class DeclarationMismatchException extends Exception { - private $message; - private $code; - private $file; - private $line; - private $trace; - private $previous; - private $traceAsString; - private $parameters; + protected $message; + protected $code; + protected $file; + protected $line; + protected $trace; + protected $previous; + protected $traceAsString; + protected $parameters; // Dead lettering arguments - private $arguments; + protected $arguments; public function __construct($message, $code, $file, $line, $trace, $previous, $traceAsString, $parameters, $arguments = []) { From 11a722781283ff714a8aa7b123806b49549732a7 Mon Sep 17 00:00:00 2001 From: Kinane Date: Thu, 12 Jan 2017 19:53:01 +0200 Subject: [PATCH 28/41] fix exception --- .../DeclarationMismatchException.php | 37 +------------------ 1 file changed, 1 insertion(+), 36 deletions(-) diff --git a/src/Exceptions/DeclarationMismatchException.php b/src/Exceptions/DeclarationMismatchException.php index 68c3ef5..89c0e25 100644 --- a/src/Exceptions/DeclarationMismatchException.php +++ b/src/Exceptions/DeclarationMismatchException.php @@ -21,7 +21,7 @@ class DeclarationMismatchException extends Exception // Dead lettering arguments protected $arguments; - public function __construct($message, $code, $file, $line, $trace, $previous, $traceAsString, $parameters, $arguments = []) + public function __construct($message, $code, $file, $line, $trace, $previous, $traceAsString, $parameters = [], $arguments = []) { $this->message = $message; $this->code= $code; @@ -34,41 +34,6 @@ public function __construct($message, $code, $file, $line, $trace, $previous, $t $this->arguments = $arguments; } - public function getMessage() - { - return $this->message; - } - - public function getCode() - { - return $this->code; - } - - public function getFile() - { - return $this->file; - } - - public function getLine() - { - return $this->line; - } - - public function getTrace() - { - return $this->trace; - } - - public function getPrevious() - { - return $this->previous; - } - - public function getTraceAsString() - { - return $this->traceAsString; - } - public function getParameters() { return $this->parameters; From 85af874f077f4e4077a335fb94db02b8849567a9 Mon Sep 17 00:00:00 2001 From: Kinane Date: Thu, 12 Jan 2017 19:53:35 +0200 Subject: [PATCH 29/41] unify variables name --- src/Traits/DeadLetteringTrait.php | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/Traits/DeadLetteringTrait.php b/src/Traits/DeadLetteringTrait.php index f618fde..c858367 100644 --- a/src/Traits/DeadLetteringTrait.php +++ b/src/Traits/DeadLetteringTrait.php @@ -94,11 +94,11 @@ public function configureDeadLettering($deadLetterQueueName, $deadLetterExchange } catch (\Exception $e) { throw new DeclarationMismatchException($e->getMessage(), $e->getCode(), $e->getFile(), $e->getLine(), $e->getTrace(), $e->getPrevious(), $e->getTraceAsString(), [ - 'dead_letter_queue_name' => $deadLetterQueueName, - 'dead_letter_exchange_name' => $deadLetterExchangeName, - 'dead_letter_exchange_type' => $deadLetterExchangeType, - 'dead_letter_routing_key' => $deadLetterRoutingKey, - 'message_ttl' => $messageTTL + 'deadLetterQueueName' => $deadLetterQueueName, + 'deadLetterExchangeName' => $deadLetterExchangeName, + 'deadLetterExchangeEype' => $deadLetterExchangeType, + 'deadLetterRoutingKey' => $deadLetterRoutingKey, + 'messageTTL' => $messageTTL ], $this->arguments); } @@ -140,16 +140,16 @@ private function compileArguments($deadLetterExchangeName, $deadLetterRoutingKey private function compileParameters() { $params = [ - 'queue_name' => $this->queueName, - 'exchange_name' => $this->exchangeName, - 'exchange_type' => $this->exchangeType, + 'queueName' => $this->queueName, + 'exchangeName' => $this->exchangeName, + 'exchangeType' => $this->exchangeType, 'passive' => $this->passive, 'durable' => $this->durable, - 'auto_delete' => $this->autoDelete, - 'delivery_mode' => $this->deliveryMode + 'autoDelete' => $this->autoDelete, + 'deliveryMode' => $this->deliveryMode ]; - property_exists($this, 'routingKeys') ? ($params['routing_keys'] = $this->routingKeys) : ($params['binding_keys'] = $this->bindingKeys); + property_exists($this, 'routingKeys') ? ($params['routingKeys'] = $this->routingKeys) : ($params['bindingKeys'] = $this->bindingKeys); return $params; } From 6e3f226d3df327e29ea007d63bb9d40e535319c3 Mon Sep 17 00:00:00 2001 From: Kinane Date: Fri, 13 Jan 2017 11:56:58 +0200 Subject: [PATCH 30/41] Move message actions back to consumer since these are specific to the consumer. --- src/Consumer.php | 33 +++++++++++++++++++++++++++++++ src/Traits/DeadLetteringTrait.php | 31 ----------------------------- 2 files changed, 33 insertions(+), 31 deletions(-) diff --git a/src/Consumer.php b/src/Consumer.php index c4bc6dd..9b9dcb9 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -172,4 +172,37 @@ public function listenToQueue($handlerClass, ExceptionHandler $exceptionHandler) $channel->wait(); } } + + /** + * Acknowledge a messasge. + * + * @param PhpAmqpLib\Message\AMQPMessage $msg + */ + public function ackMessage($msg) + { + $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag'], 0); + } + + /** + * Negatively acknowledge a messasge. + * + * @param PhpAmqpLib\Message\AMQPMessage $msg + * @param bool $multiple + * @param bool $requeue + */ + public function nackMessage($msg, $multiple = false, $requeue = false) + { + $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], $multiple, $requeue); + } + + /** + * Reject a messasge. + * + * @param PhpAmqpLib\Message\AMQPMessage $msg + * @param bool $requeue + */ + public function rejectMessage($msg, $requeue = false) + { + $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], $requeue); + } } diff --git a/src/Traits/DeadLetteringTrait.php b/src/Traits/DeadLetteringTrait.php index c858367..bafd747 100644 --- a/src/Traits/DeadLetteringTrait.php +++ b/src/Traits/DeadLetteringTrait.php @@ -7,37 +7,6 @@ trait DeadLetteringTrait { /** - * Acknowledge a messasge. - * - * @param PhpAmqpLib\Message\AMQPMessage $msg - */ - public function ackMessage($msg) - { - $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag'], 0); - } - - /** - * Negatively acknowledge a messasge. - * - * @param PhpAmqpLib\Message\AMQPMessage $msg - * @param bool $multiple - * @param bool $requeue - */ - public function nackMessage($msg, $multiple = false, $requeue = false) - { - $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], $multiple, $requeue); - } - - /** - * Reject a messasge. - * - * @param PhpAmqpLib\Message\AMQPMessage $msg - * @param bool $requeue - */ - public function rejectMessage($msg, $requeue = false) - { - $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], $requeue); - } /** * Delete a exchange. From 9730f3252bb5171ebdd1351a3d418b7aa7b789c8 Mon Sep 17 00:00:00 2001 From: Kinane Date: Fri, 13 Jan 2017 11:58:00 +0200 Subject: [PATCH 31/41] Move queue and exchange actions to a new AdminTrait Figure out a way to give the producer access to these functionalities --- src/Consumer.php | 2 ++ src/Traits/AdminTrait.php | 42 +++++++++++++++++++++++++++++++ src/Traits/DeadLetteringTrait.php | 35 -------------------------- 3 files changed, 44 insertions(+), 35 deletions(-) create mode 100644 src/Traits/AdminTrait.php diff --git a/src/Consumer.php b/src/Consumer.php index 9b9dcb9..ad2cf1b 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -3,6 +3,7 @@ namespace Vinelab\Bowler; use PhpAmqpLib\Message\AMQPMessage; +use Vinelab\Bowler\Traits\AdminTrait; use Vinelab\Bowler\Traits\DeadLetteringTrait; use Vinelab\Bowler\Exceptions\DeclarationMismatchException; use Vinelab\Bowler\Contracts\BowlerExceptionHandler as ExceptionHandler; @@ -15,6 +16,7 @@ */ class Consumer { + use AdminTrait; use DeadLetteringTrait; /** diff --git a/src/Traits/AdminTrait.php b/src/Traits/AdminTrait.php new file mode 100644 index 0000000..06fe9b9 --- /dev/null +++ b/src/Traits/AdminTrait.php @@ -0,0 +1,42 @@ + + */ +trait AdminTrait +{ + /** + * Delete a exchange. + * + * @param string $exchangeName + * @param bool $unused + */ + public function deleteExchange($exchangeName, $unused = true) + { + $this->connection->getChannel()->exchange_delete($queueName, $unused, $empty); + } + + /** + * Delete a queue. + * + * @param string $queueName + * @param bool $unused + * @param bool $empty + */ + public function deleteQueue($queueName, $unused = true, $empty = true) + { + $this->connection->getChannel()->queue_delete($queueName, $unused, $empty); + } + + /** + * Purge a queue. + * + * @param string $queueName + */ + public function purgeQueue($queueName) + { + $this->connection->getChannel()->queue_purge($queueName); + } +} diff --git a/src/Traits/DeadLetteringTrait.php b/src/Traits/DeadLetteringTrait.php index bafd747..a53fce1 100644 --- a/src/Traits/DeadLetteringTrait.php +++ b/src/Traits/DeadLetteringTrait.php @@ -6,41 +6,6 @@ trait DeadLetteringTrait { - /** - - /** - * Delete a exchange. - * - * @param string $exchangeName - * @param bool $unused - */ - public function deleteExchange($exchangeName, $unused) - { - $this->connection->getChannel()->exchange_delete($queueName, $unused, $empty); - } - - /** - * Delete a queue. - * - * @param string $queueName - * @param bool $unused - * @param bool $empty - */ - public function deleteQueue($queueName, $unused, $empty) - { - $this->connection->getChannel()->queue_delete($queueName, $unused, $empty); - } - - /** - * Purge a queue. - * - * @param string $queueName - */ - public function purgeQueue($queueName) - { - $this->connection->getChannel()->queue_purge($queueName); - } - /** * Configure Dead Lettering by creating a queue and exchange, and prepares the arguments array to be passed to the messaging queue. * From 8e29578ddd073ab3e83da120b6e3001f5591d106 Mon Sep 17 00:00:00 2001 From: Kinane Date: Fri, 13 Jan 2017 11:58:14 +0200 Subject: [PATCH 32/41] add author --- src/Traits/DeadLetteringTrait.php | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/Traits/DeadLetteringTrait.php b/src/Traits/DeadLetteringTrait.php index a53fce1..ac5a951 100644 --- a/src/Traits/DeadLetteringTrait.php +++ b/src/Traits/DeadLetteringTrait.php @@ -4,6 +4,9 @@ use Vinelab\Bowler\Exceptions\DeclarationMismatchException; +/** + * @author Kinane Domloje + */ trait DeadLetteringTrait { /** From d57758a39d4cf2384dd7d023316463888550d691 Mon Sep 17 00:00:00 2001 From: Kinane Date: Fri, 13 Jan 2017 13:17:30 +0200 Subject: [PATCH 33/41] move rabbitmq configuration from connection to service provider --- src/BowlerServiceProvider.php | 9 +++++++-- src/Connection.php | 5 +++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/src/BowlerServiceProvider.php b/src/BowlerServiceProvider.php index f4364fd..968f834 100644 --- a/src/BowlerServiceProvider.php +++ b/src/BowlerServiceProvider.php @@ -21,8 +21,13 @@ public function register() return new RegisterQueues($app->make('Vinelab\Bowler\Connection')); }); - $this->app->bind(Connection::class, function () { - return new Connection(); + // Bind connection to env configuration + $rbmqHost = config('queue.connections.rabbitmq.host'); + $rbmqPort = config('queue.connections.rabbitmq.port'); + $rbmqUsername = config('queue.connections.rabbitmq.username'); + $rbmqPassword = config('queue.connections.rabbitmq.password'); + $this->app->bind(Connection::class, function () use($rbmqHost, $rbmqPort, $rbmtUsername, $rbmqPassword){ + return new Connection($rbmqHost, $rbmqPort, $rbmqUsername, $rbmqPassword); }); $this->app->bind( diff --git a/src/Connection.php b/src/Connection.php index 5cdebfb..1ee6051 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -13,6 +13,7 @@ * Connection * * @author Ali Issa + * @author Kinane Domloje */ class Connection { @@ -31,14 +32,14 @@ class Connection /** * * @param string $host the ip of the rabbitmq server, default: localhost - * @param integer $port. default: 5672 + * @param int $port. default: 5672 * @param string $username, default: guest * @param string $password, default: guest */ public function __construct($host = 'localhost', $port = 5672, $username = 'guest', $password = 'guest') { + $this->connection = new AMQPStreamConnection($host, $port, $username, $password); - $this->connection = new AMQPStreamConnection(Config::get('queue.connections.rabbitmq.host', $host), Config::get('queue.connections.rabbitmq.port', $port), Config::get('queue.connections.rabbitmq.username', $username), Config::get('queue.connections.rabbitmq.password', $password)); $this->channel = $this->connection->channel(); } From d6e398d74c53fe4873e395c57926a6a625cd8682 Mon Sep 17 00:00:00 2001 From: Kinane Date: Fri, 13 Jan 2017 13:17:59 +0200 Subject: [PATCH 34/41] move helper methods to HelperTrait --- src/Consumer.php | 2 ++ src/Producer.php | 2 ++ src/Traits/DeadLetteringTrait.php | 22 ---------------------- src/Traits/HelperTrait.php | 31 +++++++++++++++++++++++++++++++ 4 files changed, 35 insertions(+), 22 deletions(-) create mode 100644 src/Traits/HelperTrait.php diff --git a/src/Consumer.php b/src/Consumer.php index ad2cf1b..f44c233 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -4,6 +4,7 @@ use PhpAmqpLib\Message\AMQPMessage; use Vinelab\Bowler\Traits\AdminTrait; +use Vinelab\Bowler\Traits\HelperTrait; use Vinelab\Bowler\Traits\DeadLetteringTrait; use Vinelab\Bowler\Exceptions\DeclarationMismatchException; use Vinelab\Bowler\Contracts\BowlerExceptionHandler as ExceptionHandler; @@ -17,6 +18,7 @@ class Consumer { use AdminTrait; + use HelperTrait; use DeadLetteringTrait; /** diff --git a/src/Producer.php b/src/Producer.php index caaff58..640bb4e 100644 --- a/src/Producer.php +++ b/src/Producer.php @@ -3,6 +3,7 @@ namespace Vinelab\Bowler; use PhpAmqpLib\Message\AMQPMessage; +use Vinelab\Bowler\Traits\HelperTrait; use Vinelab\Bowler\Traits\DeadLetteringTrait; use Vinelab\Bowler\Exceptions\DeclarationMismatchException; @@ -13,6 +14,7 @@ */ class Producer { + use HelperTrait; use DeadLetteringTrait; /** diff --git a/src/Traits/DeadLetteringTrait.php b/src/Traits/DeadLetteringTrait.php index ac5a951..62d9b22 100644 --- a/src/Traits/DeadLetteringTrait.php +++ b/src/Traits/DeadLetteringTrait.php @@ -68,26 +68,4 @@ private function compileArguments($deadLetterExchangeName, $deadLetterRoutingKey $this->arguments['x-message-ttl'] = ['I', $messageTTL]; } } - - /** - * Compiles the parameters passed to the constructor. - * - * @return array - */ - private function compileParameters() - { - $params = [ - 'queueName' => $this->queueName, - 'exchangeName' => $this->exchangeName, - 'exchangeType' => $this->exchangeType, - 'passive' => $this->passive, - 'durable' => $this->durable, - 'autoDelete' => $this->autoDelete, - 'deliveryMode' => $this->deliveryMode - ]; - - property_exists($this, 'routingKeys') ? ($params['routingKeys'] = $this->routingKeys) : ($params['bindingKeys'] = $this->bindingKeys); - - return $params; - } } diff --git a/src/Traits/HelperTrait.php b/src/Traits/HelperTrait.php new file mode 100644 index 0000000..c97ab4e --- /dev/null +++ b/src/Traits/HelperTrait.php @@ -0,0 +1,31 @@ + + */ +trait HelperTrait +{ + /** + * Compiles the parameters passed to the constructor. + * + * @return array + */ + private function compileParameters() + { + $params = [ + 'queueName' => $this->queueName, + 'exchangeName' => $this->exchangeName, + 'exchangeType' => $this->exchangeType, + 'passive' => $this->passive, + 'durable' => $this->durable, + 'autoDelete' => $this->autoDelete, + 'deliveryMode' => $this->deliveryMode + ]; + + property_exists($this, 'routingKeys') ? ($params['routingKeys'] = $this->routingKeys) : ($params['bindingKeys'] = $this->bindingKeys); + + return $params; + } +} From d814eb5d317369a36c629b540a1413a4a286e4f3 Mon Sep 17 00:00:00 2001 From: Kinane Date: Fri, 13 Jan 2017 13:31:57 +0200 Subject: [PATCH 35/41] update readme --- README.md | 44 ++++++++++++++++++++++++++++++++------------ 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index ff98fe5..89c53a4 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,8 @@ In order to configure rabbitmq host, port, username and password, add the follow ], ``` +And add `Vinelab\Bowler\BowlerServiceProvider::class` to the providers array in `config/app`. + ### Producer ```php @@ -34,14 +36,16 @@ In order to configure rabbitmq host, port, username and password, add the follow $connection = new Bowler\Connection(); // initialize a Producer object with a connection, exchange name and type $bowlerProducer = new Producer($connection, 'crud', 'fanout'); -return $bowlerProducer->publish($data); +$bowlerProducer->publish($data); ``` ### Consumer -- Modify config/app.php: - - add `Vinelab\Bowler\BowlerServiceProvider::class,` to the providers array. - - add `'Registrator' => Vinelab\Bowler\Facades\Registrator::class,` to the aliases array. +Add `'Registrator' => Vinelab\Bowler\Facades\Registrator::class,` to the aliases array in `config/app`. + +Create a handler where you can handle the received messages and bind the handler to its corresponding queue. + +You can do so either: ##### Manually - Create your handlers classes to handle the messages received: @@ -49,7 +53,7 @@ return $bowlerProducer->publish($data); ```php //this is an example handler class -namespace App\Messaging; +namespace App\Messaging\Handlers; class AuthorHandler { @@ -78,6 +82,8 @@ class AuthorHandler { } ``` +> Similarly to the above, additional functionality is also provided to the consumer's handler like `deleteExchange`, `purgeQueue` and `deleteQueue`. You these wisely and take advantage of the `unused` and `empty` parameters. + If you wish to handle a message based on the routing key it was published with, you can use a switch case in the handler's `handle` method, like so: ```php @@ -90,27 +96,29 @@ public function handle($msg) } ``` -- Add all your handlers inside the queues.php file (think about the queues file as the routes file from Laravel), note that the `queues.php` file should be under App\Messaging folder: +- Add all your handlers inside the queues.php file (think about the queues file as the routes file from Laravel), note that the `queues.php` file should be under App\Messaging directory: ```php -Registrator::queue('books', 'App\Messaging\BookHandler'); +Registrator::queue('books', 'App\Messaging\Handlers\BookHandler'); -Registrator::queue('crud', 'App\Messaging\AuthorHandler'); +Registrator::queue('crud', 'App\Messaging\Handlers\AuthorHandler'); ``` ##### Console -- Register a handler for a specific queue with `php artisan bowler:handler analytics_queue AnalyticsData`. +- Register a handler for a specific queue with `php artisan bowler:handler analytics_queue analytics_data_exchange`. The previous command: 1. Adds `Registrator::queue('analytics_queue', 'App\Messaging\Handlers\AnalyticsDataHandler');` to `App\Messaging\queues.php`. +> If no exchange name is provided the queue name will be used for both. + 2. Create the `App\Messaging\Handlers\AnalyticsDataHandler.php` in `App\Messaging\Handler` directory. - Now in order to listen to any queue, run the following command from your console: -`php artisan bowler:consume`, you will be asked to specify queue name (the queue name is the first parameter passed to `Registrator::queue`) +`php artisan bowler:consume`, you will be asked to specify queue name (the queue name is the first parameter passed to `Registrator::queue`). `bowler:consume` complete arguments list description: @@ -137,7 +145,7 @@ Once a Producer is instantiated, if you wish to enable dead lettering you should ```php $producer = new Produer(...); -$producer->configureDeadLettering($deadLetterQueueName, $deadLEtterExchangeName, $deadLetterExchangeType, $deadLetterRoutingKey, $messageTtl); +$producer->configureDeadLettering($deadLetterQueueName, $deadLEtterExchangeName, $deadLetterExchangeType, $deadLetterRoutingKey, $messageTTL); $producer->publish($body); ``` @@ -145,7 +153,7 @@ $producer->publish($body); #### Consumer Enabeling dead lettering on the consumer is done through the command line using the same command that run the consumer with the dedicated optional arguments, at least one of `--deadLetterQueueName` or `--deadLetterExchangeName` should be specified. ```php -php artisan bowler:consume my_app_queue --deadLetterQueueName=my_app_dlx --deadLetterExchangeName=dlx --deadLetterExchangeType=direct --deadLetterRoutingKey=invalid --messageTtl=10000 +php artisan bowler:consume my_app_queue --deadLetterQueueName=my_app_dlx --deadLetterExchangeName=dlx --deadLetterExchangeType=direct --deadLetterRoutingKey=invalid --messageTTL=10000 ``` ### Exception Handling @@ -160,3 +168,15 @@ Bowler supports application level error reporting. To do so the default laravel exception handler normaly located in `app\Exceptions\Handler`, should implement `Vinelab\Bowler\Contracts\BowlerExceptionHandler`. And obviously, implement its methods. + +### Important Notes +1- It is of most importance that the users of this package, take onto their responsability the mapping between exchanges and queues. And to make sure that exchanges and queues declaration matching both on the producer and consumer side, otherwise a `Vinelab\Bowler\DeclarationMismatchException` is thrown. + +2- The reason behind queue declaration is happening on both the producer's and cosumer's side, is that for our use case, we could not afford to loose any published messages. And since publishing messages to an exchange with no bound queues will disregard them, we though it pertinent to declare queues on the producer side as well. We are aware that this might cause an anti-pattern, now that the producer is aware of the consumer. This may or may not remain as is. + +3- The use of nameless exchanges and queues is not supported in this package. Can be reconsidered later. + +## TODO +* Expressive queue declaration. +* Provide default pub/sub and dlx implementations. +* Provide a way to programatically handle configuration exceptions. From 9eca6083458231afadd019cea40c16d81f22d46a Mon Sep 17 00:00:00 2001 From: Kinane Date: Fri, 13 Jan 2017 14:07:40 +0200 Subject: [PATCH 36/41] fix typo --- src/BowlerServiceProvider.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/BowlerServiceProvider.php b/src/BowlerServiceProvider.php index 968f834..2dd76cb 100644 --- a/src/BowlerServiceProvider.php +++ b/src/BowlerServiceProvider.php @@ -26,7 +26,7 @@ public function register() $rbmqPort = config('queue.connections.rabbitmq.port'); $rbmqUsername = config('queue.connections.rabbitmq.username'); $rbmqPassword = config('queue.connections.rabbitmq.password'); - $this->app->bind(Connection::class, function () use($rbmqHost, $rbmqPort, $rbmtUsername, $rbmqPassword){ + $this->app->bind(Connection::class, function () use($rbmqHost, $rbmqPort, $rbmqUsername, $rbmqPassword){ return new Connection($rbmqHost, $rbmqPort, $rbmqUsername, $rbmqPassword); }); From bcd3f05e93ae44479d191cb4d03446da2c31f132 Mon Sep 17 00:00:00 2001 From: Kinane Date: Fri, 13 Jan 2017 17:53:53 +0200 Subject: [PATCH 37/41] allow producer to only set one routing key on a message --- src/Producer.php | 23 ++++++++--------------- 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/src/Producer.php b/src/Producer.php index 640bb4e..87e74c7 100644 --- a/src/Producer.php +++ b/src/Producer.php @@ -39,11 +39,11 @@ class Producer private $exchangeName; /** - * The routing keys used by the exchange to route messages to bounded queues. + * The routing key used by the exchange to route messages to bounded queues. * * @var string */ - private $routingKeys; + private $routingKey; /** * type of exchange: @@ -100,13 +100,13 @@ class Producer * @param string $queueName * @param string $exchangeName * @param string $exchangeType - * @param aray $routingKeys + * @param string $routingKey * @param boolean $passive * @param boolean $durable * @param boolean $autoDelete * @param integer $deliveryMode */ - public function __construct(Connection $connection, $queueName, $exchangeType = 'fanout', $exchangeName = null,array $routingKeys = [], $passive = false, $durable = true, $autoDelete = false, $deliveryMode = 2) + public function __construct(Connection $connection, $queueName, $exchangeType = 'fanout', $exchangeName = null, $routingKey = null, $passive = false, $durable = true, $autoDelete = false, $deliveryMode = 2) { $this->connection = $connection; $this->queueName = $queueName; @@ -116,7 +116,7 @@ public function __construct(Connection $connection, $queueName, $exchangeType = } else { $this->exchangeName = $exchangeName; } - $this->routingKeys = $routingKeys; + $this->routingKey = $routingKey; $this->passive = $passive; $this->durable = $durable; $this->autoDelete = $autoDelete; @@ -138,7 +138,7 @@ public function publish($data) $channel->exchange_declare($this->exchangeName, $this->exchangeType, $this->passive, $this->durable, $this->autoDelete); // The exchange corresponding queue is declared here because we cannot afford loosing any messages if there were no consumers already running. By doing this we make sure that there always be a queue bound to this exchange. This results in an anti-pattern where the producer knows about the consumer identity. - // If loosing some messasges while the consumer is up and runing is no issue, we could disregard the queue descralation and binding to this exchange, and leave as the consumer's responsability. + // If loosing some messasges while the consumer is up and runing is no issue, we could disregard the queue decralation and binding to this exchange, and leave as the consumer's responsability. $channel->queue_declare($this->queueName, $this->passive, $this->durable, false, $this->autoDelete, false, $this->arguments); } catch (\Exception $e) { throw new DeclarationMismatchException($e->getMessage(), $e->getCode(), $e->getFile(), $e->getLine(), $e->getTrace(), $e->getPrevious(), $e->getTraceAsString(), $this->compileParameters(), $this->arguments @@ -147,15 +147,8 @@ public function publish($data) $msg = new AMQPMessage($data, ['delivery_mode' => $this->deliveryMode]); - if (!empty($this->routingKeys)) { - foreach ($this->routingKeys as $routingKey) { - $channel->queue_bind($this->queueName, $this->exchangeName, $routingKey); - $channel->basic_publish($msg, $this->exchangeName, $routingKey); - } - } else { - $channel->queue_bind($this->queueName, $this->exchangeName); - $channel->basic_publish($msg, $this->exchangeName); - } + $channel->queue_bind($this->queueName, $this->exchangeName, $this->routingKey); + $channel->basic_publish($msg, $this->exchangeName, $this->routingKey); echo " [x] Data Package Sent to ", $this->exchangeName, " Exchange!", "\n"; } From 83413cd585c777e9a8493e348bdae6b2dc09fdd4 Mon Sep 17 00:00:00 2001 From: Kinane Date: Fri, 13 Jan 2017 17:57:15 +0200 Subject: [PATCH 38/41] add `criticalDelivery` note --- src/Producer.php | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Producer.php b/src/Producer.php index 87e74c7..9a6ac9a 100644 --- a/src/Producer.php +++ b/src/Producer.php @@ -105,6 +105,8 @@ class Producer * @param boolean $durable * @param boolean $autoDelete * @param integer $deliveryMode + * + * @todo add a `criticalDelivery` param that allow the user to decide whether the producer should declare and bind the queue, or not. */ public function __construct(Connection $connection, $queueName, $exchangeType = 'fanout', $exchangeName = null, $routingKey = null, $passive = false, $durable = true, $autoDelete = false, $deliveryMode = 2) { From 4084bf6c388386ce19e1c36f27f48cf68f0e1b1b Mon Sep 17 00:00:00 2001 From: Kinane Date: Mon, 16 Jan 2017 14:19:02 +0200 Subject: [PATCH 39/41] revoke producer's capability to declare and bind queues And thus, setting up dead lettering as well --- src/Producer.php | 27 ++------------------------- 1 file changed, 2 insertions(+), 25 deletions(-) diff --git a/src/Producer.php b/src/Producer.php index 9a6ac9a..69378b2 100644 --- a/src/Producer.php +++ b/src/Producer.php @@ -4,7 +4,6 @@ use PhpAmqpLib\Message\AMQPMessage; use Vinelab\Bowler\Traits\HelperTrait; -use Vinelab\Bowler\Traits\DeadLetteringTrait; use Vinelab\Bowler\Exceptions\DeclarationMismatchException; /** @@ -15,7 +14,6 @@ class Producer { use HelperTrait; - use DeadLetteringTrait; /** * The main class of the package where we define the channel and the connection @@ -24,13 +22,6 @@ class Producer */ private $connection; - /** - * The name of the queue bound to the exchange where the producer sends its messages. - * - * @var string - */ - private $queueName; - /** * The name of the exchange where the producer sends its messages to * @@ -95,9 +86,7 @@ class Producer private $arguments = []; /** - * * @param Vinelab\Bowler\Connection $connection - * @param string $queueName * @param string $exchangeName * @param string $exchangeType * @param string $routingKey @@ -105,19 +94,12 @@ class Producer * @param boolean $durable * @param boolean $autoDelete * @param integer $deliveryMode - * - * @todo add a `criticalDelivery` param that allow the user to decide whether the producer should declare and bind the queue, or not. */ - public function __construct(Connection $connection, $queueName, $exchangeType = 'fanout', $exchangeName = null, $routingKey = null, $passive = false, $durable = true, $autoDelete = false, $deliveryMode = 2) + public function __construct(Connection $connection, $exchangeName, $exchangeType = 'fanout', $routingKey = null, $passive = false, $durable = true, $autoDelete = false, $deliveryMode = 2) { $this->connection = $connection; - $this->queueName = $queueName; + $this->exchangeName = $exchangeName; $this->exchangeType = $exchangeType; - if(!$exchangeName) { - $this->exchangeName = $queueName; - } else { - $this->exchangeName = $exchangeName; - } $this->routingKey = $routingKey; $this->passive = $passive; $this->durable = $durable; @@ -138,10 +120,6 @@ public function publish($data) try { $channel->exchange_declare($this->exchangeName, $this->exchangeType, $this->passive, $this->durable, $this->autoDelete); - - // The exchange corresponding queue is declared here because we cannot afford loosing any messages if there were no consumers already running. By doing this we make sure that there always be a queue bound to this exchange. This results in an anti-pattern where the producer knows about the consumer identity. - // If loosing some messasges while the consumer is up and runing is no issue, we could disregard the queue decralation and binding to this exchange, and leave as the consumer's responsability. - $channel->queue_declare($this->queueName, $this->passive, $this->durable, false, $this->autoDelete, false, $this->arguments); } catch (\Exception $e) { throw new DeclarationMismatchException($e->getMessage(), $e->getCode(), $e->getFile(), $e->getLine(), $e->getTrace(), $e->getPrevious(), $e->getTraceAsString(), $this->compileParameters(), $this->arguments ); @@ -149,7 +127,6 @@ public function publish($data) $msg = new AMQPMessage($data, ['delivery_mode' => $this->deliveryMode]); - $channel->queue_bind($this->queueName, $this->exchangeName, $this->routingKey); $channel->basic_publish($msg, $this->exchangeName, $this->routingKey); echo " [x] Data Package Sent to ", $this->exchangeName, " Exchange!", "\n"; From c9d6c2aa29034018471464a5238dec896ee71f42 Mon Sep 17 00:00:00 2001 From: Kinane Date: Mon, 16 Jan 2017 14:19:14 +0200 Subject: [PATCH 40/41] minor updates --- README.md | 21 ++++++--------------- src/Traits/HelperTrait.php | 2 +- src/handlerexample.php | 2 +- 3 files changed, 8 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 89c53a4..f96eac0 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,7 @@ And add `Vinelab\Bowler\BowlerServiceProvider::class` to the providers array in $connection = new Bowler\Connection(); // initialize a Producer object with a connection, exchange name and type $bowlerProducer = new Producer($connection, 'crud', 'fanout'); +// publish a message $bowlerProducer->publish($data); ``` @@ -140,22 +141,14 @@ queueName : The queue NAME ``` ### Dead Lettering -#### Producer -Once a Producer is instantiated, if you wish to enable dead lettering you should: -```php -$producer = new Produer(...); - -$producer->configureDeadLettering($deadLetterQueueName, $deadLEtterExchangeName, $deadLetterExchangeType, $deadLetterRoutingKey, $messageTTL); - -$producer->publish($body); -``` - -#### Consumer +Since dead lettering is solely the responsability of the consumer and part of it's queue configuration, the natural place to define one. Enabeling dead lettering on the consumer is done through the command line using the same command that run the consumer with the dedicated optional arguments, at least one of `--deadLetterQueueName` or `--deadLetterExchangeName` should be specified. ```php php artisan bowler:consume my_app_queue --deadLetterQueueName=my_app_dlx --deadLetterExchangeName=dlx --deadLetterExchangeType=direct --deadLetterRoutingKey=invalid --messageTTL=10000 ``` +> If only one of the mentioned optional arguments are set, the second will default to the exact value as to the one you've just set. Leading to the same dlx and dlq name. + ### Exception Handling Error Handling in Bowler is split into the application and queue domains. * `ExceptionHandler::renderQueue($e, $msg)` allows you to render error as you wish. While providing the exception and the que message itsef for maximum flexibility. @@ -170,11 +163,9 @@ To do so the default laravel exception handler normaly located in `app\Exception And obviously, implement its methods. ### Important Notes -1- It is of most importance that the users of this package, take onto their responsability the mapping between exchanges and queues. And to make sure that exchanges and queues declaration matching both on the producer and consumer side, otherwise a `Vinelab\Bowler\DeclarationMismatchException` is thrown. - -2- The reason behind queue declaration is happening on both the producer's and cosumer's side, is that for our use case, we could not afford to loose any published messages. And since publishing messages to an exchange with no bound queues will disregard them, we though it pertinent to declare queues on the producer side as well. We are aware that this might cause an anti-pattern, now that the producer is aware of the consumer. This may or may not remain as is. +1- It is of most importance that the users of this package, take onto their responsability the mapping between exchanges and queues. And to make sure that exchanges declaration are matching both on the producer and consumer side, otherwise a `Vinelab\Bowler\DeclarationMismatchException` is thrown. -3- The use of nameless exchanges and queues is not supported in this package. Can be reconsidered later. +2- The use of nameless exchanges and queues is not supported in this package. Can be reconsidered later. ## TODO * Expressive queue declaration. diff --git a/src/Traits/HelperTrait.php b/src/Traits/HelperTrait.php index c97ab4e..3874459 100644 --- a/src/Traits/HelperTrait.php +++ b/src/Traits/HelperTrait.php @@ -15,7 +15,7 @@ trait HelperTrait private function compileParameters() { $params = [ - 'queueName' => $this->queueName, + 'queueName' => property_exists($this, 'queueName') ? $this->queueName : null, 'exchangeName' => $this->exchangeName, 'exchangeType' => $this->exchangeType, 'passive' => $this->passive, diff --git a/src/handlerexample.php b/src/handlerexample.php index 3406697..6a87698 100644 --- a/src/handlerexample.php +++ b/src/handlerexample.php @@ -2,4 +2,4 @@ Registrator::queue('books', 'BooksHandler', ['type' => 'fanout']); -Registrator::queue('books', 'App\Messaging\BooksHandler'); \ No newline at end of file +Registrator::queue('books', 'App\Messaging\Handlers\BooksHandler'); From 00dc1092848f9bd2ea0ae1fee8a2e70e49e3f3ce Mon Sep 17 00:00:00 2001 From: Kinane Date: Mon, 16 Jan 2017 14:30:46 +0200 Subject: [PATCH 41/41] fix typo --- src/Traits/HelperTrait.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Traits/HelperTrait.php b/src/Traits/HelperTrait.php index 3874459..16f2cf9 100644 --- a/src/Traits/HelperTrait.php +++ b/src/Traits/HelperTrait.php @@ -24,7 +24,7 @@ private function compileParameters() 'deliveryMode' => $this->deliveryMode ]; - property_exists($this, 'routingKeys') ? ($params['routingKeys'] = $this->routingKeys) : ($params['bindingKeys'] = $this->bindingKeys); + property_exists($this, 'routingKey') ? ($params['routingKey'] = $this->routingKey) : ($params['bindingKeys'] = $this->bindingKeys); return $params; }