From e9c11b87f2a13c38fdb583c723584fead74eb94a Mon Sep 17 00:00:00 2001 From: Lars Lauger Date: Mon, 23 Dec 2019 15:36:49 +0100 Subject: [PATCH] FEATURE: Allow configuration of rabbit queue name --- Classes/Queue/RabbitQueue.php | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/Classes/Queue/RabbitQueue.php b/Classes/Queue/RabbitQueue.php index 99d2869..2a989d7 100644 --- a/Classes/Queue/RabbitQueue.php +++ b/Classes/Queue/RabbitQueue.php @@ -35,6 +35,11 @@ class RabbitQueue implements QueueInterface */ protected $channel; + /** + * @var string + */ + protected $queueName = ''; + /** * @var string */ @@ -95,6 +100,7 @@ public function __construct(string $name, array $options = []) // declare queue $queueOptions = $options['queueOptions']; + $this->queueName = $queueOptions['name'] ?? $this->name; $passive = isset($queueOptions['passive']) ? (bool) $queueOptions['passive'] : false; $durable = isset($queueOptions['durable']) ? (bool) $queueOptions['durable'] : false; $exclusive = isset($queueOptions['exclusive']) ? (bool) $queueOptions['exclusive'] : false; @@ -105,11 +111,11 @@ public function __construct(string $name, array $options = []) $this->routingKey = $options['routingKey'] ?? ''; if (isset($queueOptions['declare']) ? (bool) $queueOptions['declare'] : true) { - $this->channel->queue_declare($this->name, $passive, $durable, $exclusive, $autoDelete, $nowait, $arguments); + $this->channel->queue_declare($this->queueName, $passive, $durable, $exclusive, $autoDelete, $nowait, $arguments); // bind the queue to an exchange if there is a specific set if ($this->exchangeName !== '') { - $this->channel->queue_bind($this->name, $this->exchangeName, $this->routingKey); + $this->channel->queue_bind($this->queueName, $this->exchangeName, $this->routingKey); } } @@ -218,7 +224,7 @@ public function peek(int $limit = 1): array */ public function count() { - return (int) $this->channel->queue_declare($this->name, true)[1]; + return (int) $this->channel->queue_declare($this->queueName, true)[1]; } public function setUp(): void @@ -246,7 +252,7 @@ public function countReserved(): int public function flush(): void { $this->connect(); - $this->channel->queue_purge($this->name); + $this->channel->queue_purge($this->queueName); } public function shutdownObject(): void @@ -285,7 +291,7 @@ protected function dequeue(bool $ack = true, ?int $timeout = null): ?Message $this->connect(); $cache = null; - $consumerTag = $this->channel->basic_consume($this->name, '', false, false, false, false, function (AMQPMessage $message) use (&$cache, $ack): void { + $consumerTag = $this->channel->basic_consume($this->queueName, '', false, false, false, false, function (AMQPMessage $message) use (&$cache, $ack): void { $deliveryTag = (string) $message->delivery_info['delivery_tag']; /** @var AMQPTable $applicationHeader */