Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dead lettering support #15

Merged
merged 42 commits into from
Jan 16, 2017
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
a8bf177
Add fanout, direct and topic exchanges type support
KinaneD Jan 6, 2017
3c2d9ee
add `nack` consumer support
KinaneD Jan 6, 2017
48cdcf3
minor improvements
KinaneD Jan 6, 2017
93c80f3
fix typo
KinaneD Jan 10, 2017
9388a6c
add typehinting and set default value to routingKeys
KinaneD Jan 10, 2017
6c75803
improve command description
KinaneD Jan 10, 2017
de86ad2
add consume command options shortcuts
KinaneD Jan 11, 2017
6504890
add dead lettering command options
KinaneD Jan 11, 2017
4d19383
if dead lettering options are set, configure
KinaneD Jan 11, 2017
6284d37
implement dead lettering
KinaneD Jan 11, 2017
2b24bb3
add missing `;`
KinaneD Jan 11, 2017
6345623
remove command options shortcuts
KinaneD Jan 11, 2017
531393d
remove producer's construct typehinting
KinaneD Jan 11, 2017
7057ec7
set command options one letter shortucts
KinaneD Jan 11, 2017
6a8573e
add dead lettering command options
KinaneD Jan 11, 2017
4b25e8f
implement dead lettering
KinaneD Jan 11, 2017
7bacd9f
set default producer exchange type to fanout
KinaneD Jan 11, 2017
58f48bc
assure backward compatibility by setting a default to exchangeName
KinaneD Jan 11, 2017
71faf4e
improve messageTtl description
KinaneD Jan 11, 2017
ad58b63
update readme
KinaneD Jan 11, 2017
438ec0e
change to use [] array instead [null]
KinaneD Jan 12, 2017
8ce886a
minor improvements
KinaneD Jan 12, 2017
273df2c
comment on why we declare queues in the producer
KinaneD Jan 12, 2017
8df3fc4
Move ack, nack, reject message in addition to new ones to trait
KinaneD Jan 12, 2017
77ef842
add compileParameters() to trait
KinaneD Jan 12, 2017
2a9de24
add DeclarationMismatchException
KinaneD Jan 12, 2017
a639236
wrap with try catch exchange and queue declarations
KinaneD Jan 12, 2017
2c4d03d
set exception properties to protected
KinaneD Jan 12, 2017
11a7227
fix exception
KinaneD Jan 12, 2017
85af874
unify variables name
KinaneD Jan 12, 2017
6e3f226
Move message actions back to consumer
KinaneD Jan 13, 2017
9730f32
Move queue and exchange actions to a new AdminTrait
KinaneD Jan 13, 2017
8e29578
add author
KinaneD Jan 13, 2017
d57758a
move rabbitmq configuration from connection to service provider
KinaneD Jan 13, 2017
d6e398d
move helper methods to HelperTrait
KinaneD Jan 13, 2017
d814eb5
update readme
KinaneD Jan 13, 2017
9eca608
fix typo
KinaneD Jan 13, 2017
bcd3f05
allow producer to only set one routing key on a message
KinaneD Jan 13, 2017
83413cd
add `criticalDelivery` note
KinaneD Jan 13, 2017
4084bf6
revoke producer's capability to declare and bind queues
KinaneD Jan 16, 2017
c9d6c2a
minor updates
KinaneD Jan 16, 2017
00dc109
fix typo
KinaneD Jan 16, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 52 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ class AuthorHandler {
if($e instanceof InvalidInputException) {
$this->consumer->rejectMessage($msg);
} elseif($e instanceof WhatEverException) {
$this->consumer->ackMessage();
$this->consumer->ackMessage($msg);
} elseif($e instanceof WhatElseException) {
$this->consumer->nackMessage($msg);
}
}

Expand All @@ -76,6 +78,18 @@ class AuthorHandler {
}
```

If you wish to handle a message based on the routing key it was published with, you can use a switch case in the handler's `handle` method, like so:

```php
public function handle($msg)
{
switch ($msg->delivery_info['routing_key']) {
case 'key 1': //do something
case 'key 2': //do something else
}
}
```

- Add all your handlers inside the queues.php file (think about the queues file as the routes file from Laravel), note that the `queues.php` file should be under App\Messaging folder:

```php
Expand All @@ -96,7 +110,43 @@ The previous command:
2. Create the `App\Messaging\Handlers\AnalyticsDataHandler.php` in `App\Messaging\Handler` directory.

- Now in order to listen to any queue, run the following command from your console:
`php artisan bowler:consume`, you wil be asked to specify queue name (the queue name is the first parameter passed to `Registrator::queue`)
`php artisan bowler:consume`, you will be asked to specify queue name (the queue name is the first parameter passed to `Registrator::queue`)

`bowler:consume` complete arguments list description:

```php
bowler:consume
queueName : The queue NAME
--N|exchangeName= : The exchange NAME. Defaults to queueName
--T|exchangeType=fanout : The exchange TYPE. Supported exchanges: fanout, direct, topic. Defaults to fanout
--K|bindingKeys=* : The consumer\'s BINDING KEYS (array)
--p|passive=0 : If set, the server will reply with Declare-Ok if the exchange and queue already exists with the same name, and raise an error if not. Defaults to 0
--d|durable=1 : Mark exchange and queue as DURABLE. Defaults to 1
--D|autoDelete=0 : Set exchange and queue to AUTO DELETE when all queues and consumers, respectively have finished using it. Defaults to 0
--M|deliveryMode=2 : The message DELIVERY MODE. Non-persistent 1 or persistent 2. Defaults to 2
--deadLetterQueueName= : The dead letter queue NAME. Defaults to deadLetterExchangeName
--deadLetterExchangeName= : The dead letter exchange NAME. Defaults to deadLetterQueueName
--deadLetterExchangeType=fanout : The dead letter exchange TYPE. Supported exchanges: fanout, direct, topic. Defaults to fanout
--deadLetterRoutingKey= : The dead letter ROUTING KEY
--messageTtl= : If set, specifies how long, in milliseconds, before a message is declared dead letter
```

### Dead Lettering
#### Producer
Once a Producer is instantiated, if you wish to enable dead lettering you should:
```php
$producer = new Produer(...);

$producer->configureDeadLettering($deadLetterQueueName, $deadLEtterExchangeName, $deadLetterExchangeType, $deadLetterRoutingKey, $messageTtl);

$producer->publish($body);
```

#### Consumer
Enabeling dead lettering on the consumer is done through the command line using the same command that run the consumer with the dedicated optional arguments, at least one of `--deadLetterQueueName` or `--deadLetterExchangeName` should be specified.
```php
php artisan bowler:consume my_app_queue --deadLetterQueueName=my_app_dlx --deadLetterExchangeName=dlx --deadLetterExchangeType=direct --deadLetterRoutingKey=invalid --messageTtl=10000
```

### Exception Handling
Error Handling in Bowler is split into the application and queue domains.
Expand Down
41 changes: 35 additions & 6 deletions src/Console/Commands/ConsumeCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,32 @@ public function __construct(RegisterQueues $registerQueues)
$this->registerQueues = $registerQueues;
}


/**
* The console command name.
*
* @var string
*/
protected $signature = 'bowler:consume {queue}';
protected $signature = 'bowler:consume
{queueName : The queue NAME}
{--N|exchangeName= : The exchange NAME. Defaults to queueName}
{--T|exchangeType=fanout : The exchange TYPE. Supported exchanges: fanout, direct, topic. Defaults to fanout}
{--K|bindingKeys=* : The consumer\'s BINDING KEYS (array)}
{--p|passive=0 : If set, the server will reply with Declare-Ok if the exchange and queue already exists with the same name, and raise an error if not. Defaults to 0}
{--d|durable=1 : Mark exchange and queue as DURABLE. Defaults to 1}
{--D|autoDelete=0 : Set exchange and queue to AUTO DELETE when all queues and consumers, respectively have finished using it. Defaults to 0}
{--M|deliveryMode=2 : The message DELIVERY MODE. Non-persistent 1 or persistent 2. Defaults to 2}
{--deadLetterQueueName= : The dead letter queue NAME. Defaults to deadLetterExchangeName}
{--deadLetterExchangeName= : The dead letter exchange NAME. Defaults to deadLetterQueueName}
{--deadLetterExchangeType=fanout : The dead letter exchange TYPE. Supported exchanges: fanout, direct, topic. Defaults to fanout}
{--deadLetterRoutingKey= : The dead letter ROUTING KEY}
{--messageTtl= : If set, specifies how long, in milliseconds, before a message is declared dead letter}';

/**
* The console command description.
*
* @var string
*/
protected $description = 'register all consumers to their queues';
protected $description = 'Register a consumer to its queue';

/**
* Run the command.
Expand All @@ -46,18 +58,35 @@ public function __construct(RegisterQueues $registerQueues)
*/
public function handle()
{
$queueName = $this->argument('queue');
$queueName = $this->argument('queueName');

$exchangeName = ($name = $this->option('exchangeName')) ? $name : $queueName; // If the exchange name has not been set, use the queue name
$exchangeType = $this->option('exchangeType');
$bindingKeys = ($keys = $this->option('bindingKeys')) ? (array) $keys : [null]; // If no bidingKeys are specified push a value of null so that we can still perform the loop
$passive = (bool) $this->option('passive');
$durable = (bool) $this->option('durable');
$autoDelete = (bool) $this->option('autoDelete');
$deliveryMode = (int) $this->option('deliveryMode');

// Dead Lettering
$deadLetterQueueName = ($qName = $this->option('deadLetterQueueName')) ? $qName : (($xName = $this->option('deadLetterExchangeName')) ? $xName : null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename the var $xName

$deadLetterExchangeName = ($xName = $this->option('deadLetterExchangeName')) ? $xName : (($qName = $this->option('deadLetterQueueName')) ? $qName : null);
$deadLetterExchangeType = $this->option('deadLetterExchangeType');
$deadLetterRoutingKey = $this->option('deadLetterRoutingKey');
$messageTtl = (int) $this->option('messageTtl');

require(app_path().'/Messaging/queues.php');
$handlers = Registrator::getHandlers();

foreach ($handlers as $handler) {
if ($handler->queueName == $queueName) {
$bowlerConsumer = new Consumer(app(Connection::class), $handler->queueName);
$bowlerConsumer = new Consumer(app(Connection::class), $handler->queueName, $exchangeName, $exchangeType, $bindingKeys, $passive, $durable, $autoDelete, $deliveryMode);
if($deadLetterQueueName) {
$bowlerConsumer->configureDeadLettering($deadLetterQueueName, $deadLetterExchangeName, $deadLetterExchangeType, $deadLetterRoutingKey, $messageTtl);
}
$bowlerConsumer->listenToQueue($handler->className, app(ExceptionHandler::class));
}
}

}

}
82 changes: 64 additions & 18 deletions src/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Vinelab\Bowler;

use PhpAmqpLib\Message\AMQPMessage;
use Vinelab\Bowler\Traits\DeadLetteringTrait;
use Vinelab\Bowler\Contracts\BowlerExceptionHandler as ExceptionHandler;

/**
Expand All @@ -13,20 +14,36 @@
*/
class Consumer
{
use DeadLetteringTrait;

/**
* the main class of the package where we define the channel and the connection.
* The main class of the package where we define the channel and the connection.
*
* @var Vinelab\Bowler\Connection
*/
private $connection;

/**
* the name of the exchange where the producer sends its messages to.
* The name of the queue bound to the exchange where the producer sends its messages.
*
* @var string
*/
private $queueName;

/**
* The name of the exchange where the producer sends its messages to.
*
* @var string
*/
private $exchangeName;

/**
* The binding keys used by the exchange to route messages to bounded queues.
*
* @var string
*/
private $bindingKeys;

/**
* type of exchange:
* fanout: routes messages to all of the queues that are bound to it and the routing key is ignored.
Expand Down Expand Up @@ -69,22 +86,31 @@ class Consumer
*/
private $deliveryMode;

private $msgProcessor;
/**
* The arguments that should be added to the `queue_declare` statement for dead lettering
*
* @var array
*/
private $arguments = [];

/**
* @param Vinelab\Bowler\Connection $connection
* @param string $queueName
* @param string $exchangeName
* @param string $exchangeType
* @param array $bindingKeys
* @param bool $passive
* @param bool $durable
* @param bool $autoDelete
* @param int $deliveryMode
*/
public function __construct(Connection $connection, $exchangeName, $exchangeType = 'fanout', $passive = false, $durable = true, $autoDelete = false, $deliveryMode = 2)
public function __construct(Connection $connection, $queueName, $exchangeName, $exchangeType = 'fanout', $bindingKeys = [null], $passive = false, $durable = true, $autoDelete = false, $deliveryMode = 2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

$bindingKeys = [null] better be empty array, and do the check later

{
$this->connection = $connection;
$this->queueName = $queueName;
$this->exchangeName = $exchangeName;
$this->exchangeType = $exchangeType;
$this->bindingKeys = $bindingKeys;
$this->passive = $passive;
$this->durable = $durable;
$this->autoDelete = $autoDelete;
Expand All @@ -94,15 +120,22 @@ public function __construct(Connection $connection, $exchangeName, $exchangeType
/**
* consume a message from a specified exchange.
*
* @param string $data
* @param string $handlerClass
* @param Vinelab\Bowler\Contracts\BowlerExceptionHandler $exceptionHandler
*/
public function listenToQueue($handlerClass, ExceptionHandler $exceptionHandler)
{
$this->connection->getChannel()->exchange_declare($this->exchangeName, $this->exchangeType, $this->passive, $this->durable, $this->autoDelete);
list($queue_name) = $this->connection->getChannel()->queue_declare($this->exchangeName, $this->passive, $this->durable, false, $this->autoDelete);
$this->connection->getChannel()->queue_bind($queue_name, $this->exchangeName);
$channel = $this->connection->getChannel();

$channel->exchange_declare($this->exchangeName, $this->exchangeType, $this->passive, $this->durable, $this->autoDelete);
$channel->queue_declare($this->queueName, $this->passive, $this->durable, false, $this->autoDelete, false, $this->arguments);


echo ' [*] Listening to Queue: ' . $this->exchangeName . ' To exit press CTRL+C', "\n";
foreach ($this->bindingKeys as $bindingKey) {
$channel->queue_bind($this->queueName, $this->exchangeName, $bindingKey);
}

echo " [*] Listening to Queue: ", $this->queueName, " To exit press CTRL+C", "\n";

$handler = new $handlerClass;

Expand All @@ -124,31 +157,44 @@ public function listenToQueue($handlerClass, ExceptionHandler $exceptionHandler)
}
};

$this->connection->getChannel()->basic_qos(null, 1, null);
$this->connection->getChannel()->basic_consume($queue_name, '', false, false, false, false, $callback);
$channel->basic_qos(null, 1, null);
$channel->basic_consume($this->queueName, '', false, false, false, false, $callback);

while (count($this->connection->getChannel()->callbacks)) {
$this->connection->getChannel()->wait();
while (count($channel->callbacks)) {
$channel->wait();
}
}

/**
* acknowledge a messasge.
* Acknowledge a messasge.
*
* @param PhpAmqpLib\Message\AMQPMessage $msg
*/
public function ackMessage($msg)
{
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag'], 0);
}

/**
* Negatively acknowledge a messasge.
*
* @param PhpAmqpLib\Message\AMQPMessage $msg
* @param bool $multiple
* @param bool $requeue
*/
public function nackMessage($msg, $multiple = false, $requeue = false)
{
$msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], $multiple, $requeue);
}

/**
* reject a messasge.
* Reject a messasge.
*
* @param PhpAmqpLib\Message\AMQPMessage $msg
* @param bool $requeue
*/
public function rejectMessage($msg)
public function rejectMessage($msg, $requeue = false)
{
$msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], false);
$msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], $requeue);
}
}
Loading