From aebe3e7d2ec7c9393e3d8bb1c81d86598d16dbb5 Mon Sep 17 00:00:00 2001 From: Lars Lauger <lars.lauger@netlogix.de> Date: Thu, 18 Jul 2019 18:29:06 +0200 Subject: [PATCH 1/2] FEATURE: Add support for DLX --- Classes/Queue/RabbitQueue.php | 38 +++++++++++++++++++++++++++-------- 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/Classes/Queue/RabbitQueue.php b/Classes/Queue/RabbitQueue.php index a4f1e94..99d2869 100644 --- a/Classes/Queue/RabbitQueue.php +++ b/Classes/Queue/RabbitQueue.php @@ -7,6 +7,7 @@ use Flowpack\JobQueue\Common\Exception as JobQueueException; use Flowpack\JobQueue\Common\Queue\Message; use Flowpack\JobQueue\Common\Queue\QueueInterface; +use Neos\Utility\ObjectAccess; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; @@ -44,6 +45,11 @@ class RabbitQueue implements QueueInterface */ protected $routingKey = ''; + /** + * @var bool + */ + protected $useDLX = false; + /** * @param mixed[] $options */ @@ -106,6 +112,8 @@ public function __construct(string $name, array $options = []) $this->channel->queue_bind($this->name, $this->exchangeName, $this->routingKey); } } + + $this->useDLX = $options['useDLX'] ?? false; } protected function connect(): void @@ -161,11 +169,16 @@ public function release(string $messageId, array $options = []): void */ public function reQueueMessage(Message $message, array $releaseOptions): void { - // Ack the current message - $this->channel->basic_ack($message->getIdentifier()); - - // requeue the message - $this->queue($message->getPayload(), $releaseOptions, $message->getNumberOfReleases() + 1); + if ($this->useDLX) { + // Use nack to move message to DLX + $this->channel->basic_nack($message->getIdentifier()); + } else { + // Ack the current message + $this->channel->basic_ack($message->getIdentifier()); + + // requeue the message + $this->queue($message->getPayload(), $releaseOptions, $message->getNumberOfReleases() + 1); + } } /** @@ -173,7 +186,12 @@ public function reQueueMessage(Message $message, array $releaseOptions): void */ public function abort(string $messageId): void { - $this->channel->basic_nack($messageId); + if ($this->useDLX) { + // basic_nack would move message to DLX, not actually removing it + $this->channel->basic_ack($messageId); + } else { + $this->channel->basic_nack($messageId); + } } /** @@ -258,7 +276,7 @@ protected function queue(string $payload, array $options = [], int $numberOfRele $headers = new AMQPTable($headerOptions); $message->set('application_headers', $headers); - $this->channel->basic_publish($message, $this->exchangeName, $this->routingKey !== '' ? $this->routingKey : $this->name); + $this->channel->basic_publish($message, $this->exchangeName, $this->routingKey); return $correlationIdentifier; } @@ -272,7 +290,11 @@ protected function dequeue(bool $ack = true, ?int $timeout = null): ?Message /** @var AMQPTable $applicationHeader */ $applicationHeader = $message->get('application_headers')->getNativeData(); - $numberOfReleases = $applicationHeader['x-numberOfReleases'] ?? 0; + if ($this->useDLX) { + $numberOfReleases = ObjectAccess::getPropertyPath($applicationHeader, 'x-death.0.count') ?? 0; + } else { + $numberOfReleases = $applicationHeader['x-numberOfReleases'] ?? 0; + } if ($ack) { $this->channel->basic_ack($deliveryTag); From fec03a927a1ccc27aeec88f317102b116ff0be59 Mon Sep 17 00:00:00 2001 From: Lars Lauger <lars.lauger@netlogix.de> Date: Fri, 19 Jul 2019 12:04:23 +0200 Subject: [PATCH 2/2] FEATURE: Add "useDLX" configuration option to documentation --- Configuration/Settings.yaml | 3 ++- README.md | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/Configuration/Settings.yaml b/Configuration/Settings.yaml index 502888b..9d110e6 100644 --- a/Configuration/Settings.yaml +++ b/Configuration/Settings.yaml @@ -22,7 +22,8 @@ Neos: # delay: 5 # # options: -# routingKey: '' # for advanced exchange configuration +# routingKey: '' # for advanced exchange configuration +# useDLX: false # support for dead letter exchange # # # Options that are used to configure a que # queueOptions: diff --git a/README.md b/README.md index 0eb8c5d..09e85fb 100644 --- a/README.md +++ b/README.md @@ -86,6 +86,7 @@ Flowpack: options: routingKey: '' + useDLX: true # enable support for dead letter exchange, requires configuration in RabbitMQ queueOptions: # don't declare a queue by default