From 2aca422ccad2f91ecba123658cd500def4bddb65 Mon Sep 17 00:00:00 2001 From: Jordi Rejas Date: Wed, 16 Aug 2023 13:55:03 +0200 Subject: [PATCH] Add partition selector feature --- EnvelopeItem/PartitionStamp.php | 24 +++++++++++++++++++++++ QueueInteropTransport.php | 6 ++++++ Tests/EnvelopeItem/PartitionStampTest.php | 21 ++++++++++++++++++++ 3 files changed, 51 insertions(+) create mode 100644 EnvelopeItem/PartitionStamp.php create mode 100644 Tests/EnvelopeItem/PartitionStampTest.php diff --git a/EnvelopeItem/PartitionStamp.php b/EnvelopeItem/PartitionStamp.php new file mode 100644 index 0000000..55fa284 --- /dev/null +++ b/EnvelopeItem/PartitionStamp.php @@ -0,0 +1,24 @@ +partition; + } + + public function setPartition(int $partition): self + { + $this->partition = $partition; + + return $this; + } +} diff --git a/QueueInteropTransport.php b/QueueInteropTransport.php index ed8f9f9..adaba3a 100644 --- a/QueueInteropTransport.php +++ b/QueueInteropTransport.php @@ -15,6 +15,7 @@ use Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy; use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy; use Enqueue\MessengerAdapter\EnvelopeItem\InteropMessageStamp; +use Enqueue\MessengerAdapter\EnvelopeItem\PartitionStamp; use Enqueue\MessengerAdapter\EnvelopeItem\TransportConfiguration; use Enqueue\MessengerAdapter\Exception\MissingMessageMetadataSetterException; use Enqueue\MessengerAdapter\Exception\SendingMessageFailedException; @@ -168,6 +169,11 @@ public function send(Envelope $envelope): Envelope $producer->setTimeToLive($this->options['timeToLive']); } + $partitionStamp = $envelope->last(PartitionStamp::class); + if (method_exists($interopMessage, 'setPartition') && !is_null($partitionStamp)) { + $interopMessage->setPartition($partitionStamp->getPartition()); + } + try { $producer->send($topic, $interopMessage); } catch (InteropQueueException $e) { diff --git a/Tests/EnvelopeItem/PartitionStampTest.php b/Tests/EnvelopeItem/PartitionStampTest.php new file mode 100644 index 0000000..5748b8a --- /dev/null +++ b/Tests/EnvelopeItem/PartitionStampTest.php @@ -0,0 +1,21 @@ +setPartition(1); + $this->assertInstanceOf(PartitionStamp::class, $partitionStamp); + } + + public function testPartitionGetter() + { + $partitionStamp = (new PartitionStamp())->setPartition(1); + $this->assertEquals(1, $partitionStamp->getPartition()); + } +}