Skip to content

Commit

Permalink
FEATURE: Allow configuration of rabbit queue name
Browse files Browse the repository at this point in the history
  • Loading branch information
paxuclus committed Dec 27, 2019
1 parent 74503b0 commit e9c11b8
Showing 1 changed file with 11 additions and 5 deletions.
16 changes: 11 additions & 5 deletions Classes/Queue/RabbitQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ class RabbitQueue implements QueueInterface
*/
protected $channel;

/**
* @var string
*/
protected $queueName = '';

/**
* @var string
*/
Expand Down Expand Up @@ -95,6 +100,7 @@ public function __construct(string $name, array $options = [])
// declare queue
$queueOptions = $options['queueOptions'];

$this->queueName = $queueOptions['name'] ?? $this->name;
$passive = isset($queueOptions['passive']) ? (bool) $queueOptions['passive'] : false;
$durable = isset($queueOptions['durable']) ? (bool) $queueOptions['durable'] : false;
$exclusive = isset($queueOptions['exclusive']) ? (bool) $queueOptions['exclusive'] : false;
Expand All @@ -105,11 +111,11 @@ public function __construct(string $name, array $options = [])
$this->routingKey = $options['routingKey'] ?? '';

if (isset($queueOptions['declare']) ? (bool) $queueOptions['declare'] : true) {
$this->channel->queue_declare($this->name, $passive, $durable, $exclusive, $autoDelete, $nowait, $arguments);
$this->channel->queue_declare($this->queueName, $passive, $durable, $exclusive, $autoDelete, $nowait, $arguments);

// bind the queue to an exchange if there is a specific set
if ($this->exchangeName !== '') {
$this->channel->queue_bind($this->name, $this->exchangeName, $this->routingKey);
$this->channel->queue_bind($this->queueName, $this->exchangeName, $this->routingKey);
}
}

Expand Down Expand Up @@ -218,7 +224,7 @@ public function peek(int $limit = 1): array
*/
public function count()
{
return (int) $this->channel->queue_declare($this->name, true)[1];
return (int) $this->channel->queue_declare($this->queueName, true)[1];
}

public function setUp(): void
Expand Down Expand Up @@ -246,7 +252,7 @@ public function countReserved(): int
public function flush(): void
{
$this->connect();
$this->channel->queue_purge($this->name);
$this->channel->queue_purge($this->queueName);
}

public function shutdownObject(): void
Expand Down Expand Up @@ -285,7 +291,7 @@ protected function dequeue(bool $ack = true, ?int $timeout = null): ?Message
$this->connect();

$cache = null;
$consumerTag = $this->channel->basic_consume($this->name, '', false, false, false, false, function (AMQPMessage $message) use (&$cache, $ack): void {
$consumerTag = $this->channel->basic_consume($this->queueName, '', false, false, false, false, function (AMQPMessage $message) use (&$cache, $ack): void {
$deliveryTag = (string) $message->delivery_info['delivery_tag'];

/** @var AMQPTable $applicationHeader */
Expand Down

0 comments on commit e9c11b8

Please sign in to comment.