Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dead lettering support #15

Merged
merged 42 commits into from
Jan 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
a8bf177
Add fanout, direct and topic exchanges type support
KinaneD Jan 6, 2017
3c2d9ee
add `nack` consumer support
KinaneD Jan 6, 2017
48cdcf3
minor improvements
KinaneD Jan 6, 2017
93c80f3
fix typo
KinaneD Jan 10, 2017
9388a6c
add typehinting and set default value to routingKeys
KinaneD Jan 10, 2017
6c75803
improve command description
KinaneD Jan 10, 2017
de86ad2
add consume command options shortcuts
KinaneD Jan 11, 2017
6504890
add dead lettering command options
KinaneD Jan 11, 2017
4d19383
if dead lettering options are set, configure
KinaneD Jan 11, 2017
6284d37
implement dead lettering
KinaneD Jan 11, 2017
2b24bb3
add missing `;`
KinaneD Jan 11, 2017
6345623
remove command options shortcuts
KinaneD Jan 11, 2017
531393d
remove producer's construct typehinting
KinaneD Jan 11, 2017
7057ec7
set command options one letter shortucts
KinaneD Jan 11, 2017
6a8573e
add dead lettering command options
KinaneD Jan 11, 2017
4b25e8f
implement dead lettering
KinaneD Jan 11, 2017
7bacd9f
set default producer exchange type to fanout
KinaneD Jan 11, 2017
58f48bc
assure backward compatibility by setting a default to exchangeName
KinaneD Jan 11, 2017
71faf4e
improve messageTtl description
KinaneD Jan 11, 2017
ad58b63
update readme
KinaneD Jan 11, 2017
438ec0e
change to use [] array instead [null]
KinaneD Jan 12, 2017
8ce886a
minor improvements
KinaneD Jan 12, 2017
273df2c
comment on why we declare queues in the producer
KinaneD Jan 12, 2017
8df3fc4
Move ack, nack, reject message in addition to new ones to trait
KinaneD Jan 12, 2017
77ef842
add compileParameters() to trait
KinaneD Jan 12, 2017
2a9de24
add DeclarationMismatchException
KinaneD Jan 12, 2017
a639236
wrap with try catch exchange and queue declarations
KinaneD Jan 12, 2017
2c4d03d
set exception properties to protected
KinaneD Jan 12, 2017
11a7227
fix exception
KinaneD Jan 12, 2017
85af874
unify variables name
KinaneD Jan 12, 2017
6e3f226
Move message actions back to consumer
KinaneD Jan 13, 2017
9730f32
Move queue and exchange actions to a new AdminTrait
KinaneD Jan 13, 2017
8e29578
add author
KinaneD Jan 13, 2017
d57758a
move rabbitmq configuration from connection to service provider
KinaneD Jan 13, 2017
d6e398d
move helper methods to HelperTrait
KinaneD Jan 13, 2017
d814eb5
update readme
KinaneD Jan 13, 2017
9eca608
fix typo
KinaneD Jan 13, 2017
bcd3f05
allow producer to only set one routing key on a message
KinaneD Jan 13, 2017
83413cd
add `criticalDelivery` note
KinaneD Jan 13, 2017
4084bf6
revoke producer's capability to declare and bind queues
KinaneD Jan 16, 2017
c9d6c2a
minor updates
KinaneD Jan 16, 2017
00dc109
fix typo
KinaneD Jan 16, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 72 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,29 +27,34 @@ In order to configure rabbitmq host, port, username and password, add the follow
],
```

And add `Vinelab\Bowler\BowlerServiceProvider::class` to the providers array in `config/app`.

### Producer

```php
// initialize a Bowler object with the rabbitmq server ip and port
$connection = new Bowler\Connection();
// initialize a Producer object with a connection, exchange name and type
$bowlerProducer = new Producer($connection, 'crud', 'fanout');
return $bowlerProducer->publish($data);
// publish a message
$bowlerProducer->publish($data);
```

### Consumer

- Modify config/app.php:
- add `Vinelab\Bowler\BowlerServiceProvider::class,` to the providers array.
- add `'Registrator' => Vinelab\Bowler\Facades\Registrator::class,` to the aliases array.
Add `'Registrator' => Vinelab\Bowler\Facades\Registrator::class,` to the aliases array in `config/app`.

Create a handler where you can handle the received messages and bind the handler to its corresponding queue.

You can do so either:

##### Manually
- Create your handlers classes to handle the messages received:

```php
//this is an example handler class

namespace App\Messaging;
namespace App\Messaging\Handlers;

class AuthorHandler {

Expand All @@ -65,7 +70,9 @@ class AuthorHandler {
if($e instanceof InvalidInputException) {
$this->consumer->rejectMessage($msg);
} elseif($e instanceof WhatEverException) {
$this->consumer->ackMessage();
$this->consumer->ackMessage($msg);
} elseif($e instanceof WhatElseException) {
$this->consumer->nackMessage($msg);
}
}

Expand All @@ -76,27 +83,71 @@ class AuthorHandler {
}
```

- Add all your handlers inside the queues.php file (think about the queues file as the routes file from Laravel), note that the `queues.php` file should be under App\Messaging folder:
> Similarly to the above, additional functionality is also provided to the consumer's handler like `deleteExchange`, `purgeQueue` and `deleteQueue`. You these wisely and take advantage of the `unused` and `empty` parameters.

If you wish to handle a message based on the routing key it was published with, you can use a switch case in the handler's `handle` method, like so:

```php
public function handle($msg)
{
switch ($msg->delivery_info['routing_key']) {
case 'key 1': //do something
case 'key 2': //do something else
}
}
```

- Add all your handlers inside the queues.php file (think about the queues file as the routes file from Laravel), note that the `queues.php` file should be under App\Messaging directory:

```php

Registrator::queue('books', 'App\Messaging\BookHandler');
Registrator::queue('books', 'App\Messaging\Handlers\BookHandler');

Registrator::queue('crud', 'App\Messaging\AuthorHandler');
Registrator::queue('crud', 'App\Messaging\Handlers\AuthorHandler');

```

##### Console
- Register a handler for a specific queue with `php artisan bowler:handler analytics_queue AnalyticsData`.
- Register a handler for a specific queue with `php artisan bowler:handler analytics_queue analytics_data_exchange`.

The previous command:

1. Adds `Registrator::queue('analytics_queue', 'App\Messaging\Handlers\AnalyticsDataHandler');` to `App\Messaging\queues.php`.

> If no exchange name is provided the queue name will be used for both.

2. Create the `App\Messaging\Handlers\AnalyticsDataHandler.php` in `App\Messaging\Handler` directory.

- Now in order to listen to any queue, run the following command from your console:
`php artisan bowler:consume`, you wil be asked to specify queue name (the queue name is the first parameter passed to `Registrator::queue`)
`php artisan bowler:consume`, you will be asked to specify queue name (the queue name is the first parameter passed to `Registrator::queue`).

`bowler:consume` complete arguments list description:

```php
bowler:consume
queueName : The queue NAME
--N|exchangeName= : The exchange NAME. Defaults to queueName
--T|exchangeType=fanout : The exchange TYPE. Supported exchanges: fanout, direct, topic. Defaults to fanout
--K|bindingKeys=* : The consumer\'s BINDING KEYS (array)
--p|passive=0 : If set, the server will reply with Declare-Ok if the exchange and queue already exists with the same name, and raise an error if not. Defaults to 0
--d|durable=1 : Mark exchange and queue as DURABLE. Defaults to 1
--D|autoDelete=0 : Set exchange and queue to AUTO DELETE when all queues and consumers, respectively have finished using it. Defaults to 0
--M|deliveryMode=2 : The message DELIVERY MODE. Non-persistent 1 or persistent 2. Defaults to 2
--deadLetterQueueName= : The dead letter queue NAME. Defaults to deadLetterExchangeName
--deadLetterExchangeName= : The dead letter exchange NAME. Defaults to deadLetterQueueName
--deadLetterExchangeType=fanout : The dead letter exchange TYPE. Supported exchanges: fanout, direct, topic. Defaults to fanout
--deadLetterRoutingKey= : The dead letter ROUTING KEY
--messageTtl= : If set, specifies how long, in milliseconds, before a message is declared dead letter
```

### Dead Lettering
Since dead lettering is solely the responsability of the consumer and part of it's queue configuration, the natural place to define one.
Enabeling dead lettering on the consumer is done through the command line using the same command that run the consumer with the dedicated optional arguments, at least one of `--deadLetterQueueName` or `--deadLetterExchangeName` should be specified.
```php
php artisan bowler:consume my_app_queue --deadLetterQueueName=my_app_dlx --deadLetterExchangeName=dlx --deadLetterExchangeType=direct --deadLetterRoutingKey=invalid --messageTTL=10000
```

> If only one of the mentioned optional arguments are set, the second will default to the exact value as to the one you've just set. Leading to the same dlx and dlq name.

### Exception Handling
Error Handling in Bowler is split into the application and queue domains.
Expand All @@ -110,3 +161,13 @@ Bowler supports application level error reporting.

To do so the default laravel exception handler normaly located in `app\Exceptions\Handler`, should implement `Vinelab\Bowler\Contracts\BowlerExceptionHandler`.
And obviously, implement its methods.

### Important Notes
1- It is of most importance that the users of this package, take onto their responsability the mapping between exchanges and queues. And to make sure that exchanges declaration are matching both on the producer and consumer side, otherwise a `Vinelab\Bowler\DeclarationMismatchException` is thrown.

2- The use of nameless exchanges and queues is not supported in this package. Can be reconsidered later.

## TODO
* Expressive queue declaration.
* Provide default pub/sub and dlx implementations.
* Provide a way to programatically handle configuration exceptions.
9 changes: 7 additions & 2 deletions src/BowlerServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@ public function register()
return new RegisterQueues($app->make('Vinelab\Bowler\Connection'));
});

$this->app->bind(Connection::class, function () {
return new Connection();
// Bind connection to env configuration
$rbmqHost = config('queue.connections.rabbitmq.host');
$rbmqPort = config('queue.connections.rabbitmq.port');
$rbmqUsername = config('queue.connections.rabbitmq.username');
$rbmqPassword = config('queue.connections.rabbitmq.password');
$this->app->bind(Connection::class, function () use($rbmqHost, $rbmqPort, $rbmqUsername, $rbmqPassword){
return new Connection($rbmqHost, $rbmqPort, $rbmqUsername, $rbmqPassword);
});

$this->app->bind(
Expand Down
5 changes: 3 additions & 2 deletions src/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* Connection
*
* @author Ali Issa <[email protected]>
* @author Kinane Domloje <[email protected]>
*/
class Connection
{
Expand All @@ -31,14 +32,14 @@ class Connection
/**
*
* @param string $host the ip of the rabbitmq server, default: localhost
* @param integer $port. default: 5672
* @param int $port. default: 5672
* @param string $username, default: guest
* @param string $password, default: guest
*/
public function __construct($host = 'localhost', $port = 5672, $username = 'guest', $password = 'guest')
{
$this->connection = new AMQPStreamConnection($host, $port, $username, $password);

$this->connection = new AMQPStreamConnection(Config::get('queue.connections.rabbitmq.host', $host), Config::get('queue.connections.rabbitmq.port', $port), Config::get('queue.connections.rabbitmq.username', $username), Config::get('queue.connections.rabbitmq.password', $password));
$this->channel = $this->connection->channel();
}

Expand Down
41 changes: 35 additions & 6 deletions src/Console/Commands/ConsumeCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,32 @@ public function __construct(RegisterQueues $registerQueues)
$this->registerQueues = $registerQueues;
}


/**
* The console command name.
*
* @var string
*/
protected $signature = 'bowler:consume {queue}';
protected $signature = 'bowler:consume
{queueName : The queue NAME}
{--N|exchangeName= : The exchange NAME. Defaults to queueName}
{--T|exchangeType=fanout : The exchange TYPE. Supported exchanges: fanout, direct, topic. Defaults to fanout}
{--K|bindingKeys=* : The consumer\'s BINDING KEYS (array)}
{--p|passive=0 : If set, the server will reply with Declare-Ok if the exchange and queue already exists with the same name, and raise an error if not. Defaults to 0}
{--d|durable=1 : Mark exchange and queue as DURABLE. Defaults to 1}
{--D|autoDelete=0 : Set exchange and queue to AUTO DELETE when all queues and consumers, respectively have finished using it. Defaults to 0}
{--M|deliveryMode=2 : The message DELIVERY MODE. Non-persistent 1 or persistent 2. Defaults to 2}
{--deadLetterQueueName= : The dead letter queue NAME. Defaults to deadLetterExchangeName}
{--deadLetterExchangeName= : The dead letter exchange NAME. Defaults to deadLetterQueueName}
{--deadLetterExchangeType=fanout : The dead letter exchange TYPE. Supported exchanges: fanout, direct, topic. Defaults to fanout}
{--deadLetterRoutingKey= : The dead letter ROUTING KEY}
{--messageTTL= : If set, specifies how long, in milliseconds, before a message is declared dead letter}';

/**
* The console command description.
*
* @var string
*/
protected $description = 'register all consumers to their queues';
protected $description = 'Register a consumer to its queue';

/**
* Run the command.
Expand All @@ -46,18 +58,35 @@ public function __construct(RegisterQueues $registerQueues)
*/
public function handle()
{
$queueName = $this->argument('queue');
$queueName = $this->argument('queueName');

$exchangeName = ($name = $this->option('exchangeName')) ? $name : $queueName; // If the exchange name has not been set, use the queue name
$exchangeType = $this->option('exchangeType');
$bindingKeys = (array) $this->option('bindingKeys');
$passive = (bool) $this->option('passive');
$durable = (bool) $this->option('durable');
$autoDelete = (bool) $this->option('autoDelete');
$deliveryMode = (int) $this->option('deliveryMode');

// Dead Lettering
$deadLetterQueueName = ($dlQueueName = $this->option('deadLetterQueueName')) ? $dlQueueName : (($dlExchangeName = $this->option('deadLetterExchangeName')) ? $dlExchangeName : null);
$deadLetterExchangeName = ($dlExchangeName = $this->option('deadLetterExchangeName')) ? $dlExchangeName : (($dlQueueName = $this->option('deadLetterQueueName')) ? $dlQueueName : null);
$deadLetterExchangeType = $this->option('deadLetterExchangeType');
$deadLetterRoutingKey = $this->option('deadLetterRoutingKey');
$messageTTL = (int) $this->option('messageTTL');

require(app_path().'/Messaging/queues.php');
$handlers = Registrator::getHandlers();

foreach ($handlers as $handler) {
if ($handler->queueName == $queueName) {
$bowlerConsumer = new Consumer(app(Connection::class), $handler->queueName);
$bowlerConsumer = new Consumer(app(Connection::class), $handler->queueName, $exchangeName, $exchangeType, $bindingKeys, $passive, $durable, $autoDelete, $deliveryMode);
if($deadLetterQueueName) {
$bowlerConsumer->configureDeadLettering($deadLetterQueueName, $deadLetterExchangeName, $deadLetterExchangeType, $deadLetterRoutingKey, $messageTTL);
}
$bowlerConsumer->listenToQueue($handler->className, app(ExceptionHandler::class));
}
}

}

}
Loading