Skip to content

Commit

Permalink
feat(amqp): make amqp v2 compatible
Browse files Browse the repository at this point in the history
  • Loading branch information
thibaudgirard committed Mar 1, 2024
1 parent ffe058e commit 9dd9d48
Show file tree
Hide file tree
Showing 13 changed files with 163 additions and 260 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ jobs:
matrix:
php-version: ['7.4', '8.0', '8.1' ]
symfony-version: ['^4.4', '^5.0']
amqp-version: ['1.11.0', '2.1.2']
fail-fast: false
steps:
- uses: actions/checkout@master
- uses: shivammathur/setup-php@v2
with:
php-version: ${{ matrix.php-version }}
coverage: xdebug2
extensions: amqp
extensions: amqp-${{ matrix.amqp-version }}
- name: Install symfony version from matrix
env:
SYMFONY_VERSION: ${{ matrix.symfony-version }}
Expand Down
15 changes: 7 additions & 8 deletions src/AmqpBundle/Amqp/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ public function __construct(\AMQPQueue $queue, array $queueOptions)
public function getMessage(int $flags = AMQP_AUTOACK): ?\AMQPEnvelope
{
$envelope = $this->call($this->queue, 'get', [$flags]);
$envelope = $envelope === false ? null : $envelope;

if ($this->eventDispatcher) {
$preRetrieveEvent = new PreRetrieveEvent($envelope);
Expand All @@ -49,41 +48,41 @@ public function getMessage(int $flags = AMQP_AUTOACK): ?\AMQPEnvelope
/**
* Acknowledge the receipt of a message.
*
* @param string $deliveryTag delivery tag of last message to ack
* @param int $deliveryTag delivery tag of last message to ack
* @param int $flags AMQP_MULTIPLE or AMQP_NOPARAM
*
* @throws \AMQPChannelException if the channel is not open
* @throws \AMQPConnectionException if the connection to the broker was lost
*/
public function ackMessage(string $deliveryTag, int $flags = AMQP_NOPARAM): bool
public function ackMessage(int $deliveryTag, int $flags = AMQP_NOPARAM): void
{
if ($this->eventDispatcher) {
$ackEvent = new AckEvent($deliveryTag, $flags);

$this->eventDispatcher->dispatch($ackEvent,AckEvent::NAME);
}

return $this->call($this->queue, 'ack', [$deliveryTag, $flags]);
$this->call($this->queue, 'ack', [$deliveryTag, $flags]);
}

/**
* Mark a message as explicitly not acknowledged.
*
* @param string $deliveryTag delivery tag of last message to nack
* @param int $deliveryTag delivery tag of last message to nack
* @param int $flags AMQP_NOPARAM or AMQP_REQUEUE to requeue the message(s)
*
* @throws \AMQPConnectionException if the connection to the broker was lost
* @throws \AMQPChannelException if the channel is not open
*/
public function nackMessage(string $deliveryTag, int $flags = AMQP_NOPARAM): bool
public function nackMessage(int $deliveryTag, int $flags = AMQP_NOPARAM): void
{
if ($this->eventDispatcher) {
$nackEvent = new NackEvent($deliveryTag, $flags);

$this->eventDispatcher->dispatch($nackEvent, NackEvent::NAME);
}

return $this->call($this->queue, 'nack', [$deliveryTag, $flags]);
$this->call($this->queue, 'nack', [$deliveryTag, $flags]);
}

/**
Expand All @@ -92,7 +91,7 @@ public function nackMessage(string $deliveryTag, int $flags = AMQP_NOPARAM): boo
* @throws \AMQPChannelException if the channel is not open
* @throws \AMQPConnectionException if the connection to the broker was lost
*/
public function purge(): bool
public function purge(): int
{
if ($this->eventDispatcher) {
$purgeEvent = new PurgeEvent($this->queue);
Expand Down
11 changes: 6 additions & 5 deletions src/AmqpBundle/Amqp/Producer.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public function __construct(\AMQPExchange $exchange, array $exchangeOptions)
* timestamp, expiration, type or reply_to
* @param array $routingKeys If set, overrides the Producer 'routing_keys' for this message
*
* @return bool tRUE on success or FALSE on failure
* @return bool TRUE on success or throws on failure
*
* @throws \AMQPExchangeException on failure
* @throws \AMQPChannelException if the channel is not open
Expand Down Expand Up @@ -63,16 +63,17 @@ public function publishMessage(string $message, int $flags = AMQP_NOPARAM, array
}

if (!$routingKeys) {
return $this->call($this->exchange, 'publish', [$message, null, $flags, $attributes]);
$this->call($this->exchange, 'publish', [$message, null, $flags, $attributes]);

return true;
}

// Publish the message for each routing keys
$success = true;
foreach ($routingKeys as $routingKey) {
$success &= $this->call($this->exchange, 'publish', [$message, $routingKey, $flags, $attributes]);
$this->call($this->exchange, 'publish', [$message, $routingKey, $flags, $attributes]);
}

return (bool) $success;
return true;
}

public function getExchange(): \AMQPExchange
Expand Down
6 changes: 3 additions & 3 deletions src/AmqpBundle/Event/AckEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@ class AckEvent extends Event
{
const NAME = 'amqp.ack';

private string $deliveryTag;
private int $deliveryTag;

private int $flags;

public function __construct(string $deliveryTag, int $flags)
public function __construct(int $deliveryTag, int $flags)
{
$this->deliveryTag = $deliveryTag;
$this->flags = $flags;
}

public function getDeliveryTag(): string
public function getDeliveryTag(): int
{
return $this->deliveryTag;
}
Expand Down
6 changes: 3 additions & 3 deletions src/AmqpBundle/Event/NackEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ class NackEvent extends Event
{
const NAME = 'amqp.nack';

private string $deliveryTag;
private int $deliveryTag;
private int $flags;

public function __construct(string $deliveryTag, int $flags)
public function __construct(int $deliveryTag, int $flags)
{
$this->deliveryTag = $deliveryTag;
$this->flags = $flags;
}

public function getDeliveryTag(): string
public function getDeliveryTag(): int
{
return $this->deliveryTag;
}
Expand Down
18 changes: 6 additions & 12 deletions src/AmqpBundle/Sandbox/NullConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,48 +10,42 @@ class NullConnection extends \AMQPConnection
/**
* {@inheritdoc}
*/
public function connect()
public function connect(): void
{
return true;
}

/**
* {@inheritdoc}
*/
public function pconnect()
public function pconnect(): void
{
return true;
}

/**
* {@inheritdoc}
*/
public function pdisconnect()
public function pdisconnect(): void
{
return true;
}

/**
* {@inheritdoc}
*/
public function disconnect()
public function disconnect(): void
{
return true;
}

/**
* {@inheritdoc}
*/
public function reconnect()
public function reconnect(): void
{
return true;
}

/**
* {@inheritdoc}
*/
public function preconnect()
public function preconnect(): void
{
return true;
}
}
Loading

0 comments on commit 9dd9d48

Please sign in to comment.