Skip to content

Commit

Permalink
FEATURE: Add support for DLX
Browse files Browse the repository at this point in the history
  • Loading branch information
johannessteu authored Jul 19, 2019
2 parents 8f2b3f3 + fec03a9 commit 74503b0
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 9 deletions.
38 changes: 30 additions & 8 deletions Classes/Queue/RabbitQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Flowpack\JobQueue\Common\Exception as JobQueueException;
use Flowpack\JobQueue\Common\Queue\Message;
use Flowpack\JobQueue\Common\Queue\QueueInterface;
use Neos\Utility\ObjectAccess;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
Expand Down Expand Up @@ -44,6 +45,11 @@ class RabbitQueue implements QueueInterface
*/
protected $routingKey = '';

/**
* @var bool
*/
protected $useDLX = false;

/**
* @param mixed[] $options
*/
Expand Down Expand Up @@ -106,6 +112,8 @@ public function __construct(string $name, array $options = [])
$this->channel->queue_bind($this->name, $this->exchangeName, $this->routingKey);
}
}

$this->useDLX = $options['useDLX'] ?? false;
}

protected function connect(): void
Expand Down Expand Up @@ -161,19 +169,29 @@ public function release(string $messageId, array $options = []): void
*/
public function reQueueMessage(Message $message, array $releaseOptions): void
{
// Ack the current message
$this->channel->basic_ack($message->getIdentifier());

// requeue the message
$this->queue($message->getPayload(), $releaseOptions, $message->getNumberOfReleases() + 1);
if ($this->useDLX) {
// Use nack to move message to DLX
$this->channel->basic_nack($message->getIdentifier());
} else {
// Ack the current message
$this->channel->basic_ack($message->getIdentifier());

// requeue the message
$this->queue($message->getPayload(), $releaseOptions, $message->getNumberOfReleases() + 1);
}
}

/**
* @inheritdoc
*/
public function abort(string $messageId): void
{
$this->channel->basic_nack($messageId);
if ($this->useDLX) {
// basic_nack would move message to DLX, not actually removing it
$this->channel->basic_ack($messageId);
} else {
$this->channel->basic_nack($messageId);
}
}

/**
Expand Down Expand Up @@ -258,7 +276,7 @@ protected function queue(string $payload, array $options = [], int $numberOfRele

$headers = new AMQPTable($headerOptions);
$message->set('application_headers', $headers);
$this->channel->basic_publish($message, $this->exchangeName, $this->routingKey !== '' ? $this->routingKey : $this->name);
$this->channel->basic_publish($message, $this->exchangeName, $this->routingKey);
return $correlationIdentifier;
}

Expand All @@ -272,7 +290,11 @@ protected function dequeue(bool $ack = true, ?int $timeout = null): ?Message

/** @var AMQPTable $applicationHeader */
$applicationHeader = $message->get('application_headers')->getNativeData();
$numberOfReleases = $applicationHeader['x-numberOfReleases'] ?? 0;
if ($this->useDLX) {
$numberOfReleases = ObjectAccess::getPropertyPath($applicationHeader, 'x-death.0.count') ?? 0;
} else {
$numberOfReleases = $applicationHeader['x-numberOfReleases'] ?? 0;
}

if ($ack) {
$this->channel->basic_ack($deliveryTag);
Expand Down
3 changes: 2 additions & 1 deletion Configuration/Settings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ Neos:
# delay: 5
#
# options:
# routingKey: '' # for advanced exchange configuration
# routingKey: '' # for advanced exchange configuration
# useDLX: false # support for dead letter exchange
#
# # Options that are used to configure a que
# queueOptions:
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ Flowpack:
options:
routingKey: ''
useDLX: true # enable support for dead letter exchange, requires configuration in RabbitMQ
queueOptions:
# don't declare a queue by default
Expand Down

0 comments on commit 74503b0

Please sign in to comment.