From 28d8b90d8cd9fc909e4e88c94bba7ad89e635f85 Mon Sep 17 00:00:00 2001 From: Alexander Onatskiy Date: Mon, 13 Jun 2022 21:14:59 +0300 Subject: [PATCH] added ability to send message attributes for amazon (snsqs, sqs) transports --- EnvelopeItem/MessageAttributesStamp.php | 23 ++++++++++ Model/MessageAttributes.php | 60 +++++++++++++++++++++++++ QueueInteropTransport.php | 22 +++++++++ Tests/Model/MessageAttributesTest.php | 40 +++++++++++++++++ Tests/QueueInteropTransportTest.php | 58 ++++++++++++++++++++++++ 5 files changed, 203 insertions(+) create mode 100644 EnvelopeItem/MessageAttributesStamp.php create mode 100644 Model/MessageAttributes.php create mode 100644 Tests/Model/MessageAttributesTest.php diff --git a/EnvelopeItem/MessageAttributesStamp.php b/EnvelopeItem/MessageAttributesStamp.php new file mode 100644 index 0000000..12fef6d --- /dev/null +++ b/EnvelopeItem/MessageAttributesStamp.php @@ -0,0 +1,23 @@ +attributes = $attributes; + } + + public function getAttributes(): MessageAttributes + { + return $this->attributes; + } +} diff --git a/Model/MessageAttributes.php b/Model/MessageAttributes.php new file mode 100644 index 0000000..6d5059f --- /dev/null +++ b/Model/MessageAttributes.php @@ -0,0 +1,60 @@ + */ + private $attributes; + + /** @param array $attributes */ + public function __construct(array $attributes) + { + if (empty($attributes)) { + throw new \InvalidArgumentException('MessageAttributes should have at least one attribute'); + } + \array_walk($attributes, function (string $value, string $name) { + return $this->add($name, $value); + }); + } + + /** @return array> */ + public function toArray(): array + { + $result = array(); + foreach ($this->attributes as $name => $value) { + $result[$name] = array( + 'DataType' => 'String', + 'StringValue' => $value, + ); + } + + return $result; + } + + public static function merge(self ...$messageAttributes): self + { + return new self( + \array_merge( + ...\array_map(static function (MessageAttributes $attributes) { + return $attributes->attributes; + }, $messageAttributes) + ) + ); + } + + private function add(string $name, string $value): void + { + if (0 === \preg_match('/^[A-Za-z0-9-_.]{1,256}$/', $name)) { + throw new \InvalidArgumentException( + \sprintf( + '"%s" is invalid massage attribute name. See details here: %s', + $name, + 'https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html' + ) + ); + } + + $this->attributes[$name] = $value; + } +} diff --git a/QueueInteropTransport.php b/QueueInteropTransport.php index ed8f9f9..e4685ad 100644 --- a/QueueInteropTransport.php +++ b/QueueInteropTransport.php @@ -15,9 +15,11 @@ use Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy; use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy; use Enqueue\MessengerAdapter\EnvelopeItem\InteropMessageStamp; +use Enqueue\MessengerAdapter\EnvelopeItem\MessageAttributesStamp; use Enqueue\MessengerAdapter\EnvelopeItem\TransportConfiguration; use Enqueue\MessengerAdapter\Exception\MissingMessageMetadataSetterException; use Enqueue\MessengerAdapter\Exception\SendingMessageFailedException; +use Enqueue\MessengerAdapter\Model\MessageAttributes; use Enqueue\SnsQs\SnsQsProducer; use Interop\Queue\Consumer; use Interop\Queue\Exception as InteropQueueException; @@ -168,6 +170,8 @@ public function send(Envelope $envelope): Envelope $producer->setTimeToLive($this->options['timeToLive']); } + $this->addMessageAttributes($interopMessage, $envelope); + try { $producer->send($topic, $interopMessage); } catch (InteropQueueException $e) { @@ -281,4 +285,22 @@ private function getConsumer(): Consumer return $context->createConsumer($queue); } + + private function addMessageAttributes(Message $interopMessage, Envelope $envelope): void + { + if (!\method_exists($interopMessage, 'setMessageAttributes')) { + return; + } + if (empty($messageAttributesStamps = $envelope->all(MessageAttributesStamp::class))) { + return; + } + + $messageAttributes = \array_map( + static function (MessageAttributesStamp $stamp) { + return $stamp->getAttributes(); + }, + $messageAttributesStamps + ); + $interopMessage->setMessageAttributes(MessageAttributes::merge(...$messageAttributes)->toArray()); + } } diff --git a/Tests/Model/MessageAttributesTest.php b/Tests/Model/MessageAttributesTest.php new file mode 100644 index 0000000..ede97f4 --- /dev/null +++ b/Tests/Model/MessageAttributesTest.php @@ -0,0 +1,40 @@ + 'value1')); + $this->assertSame( + array( + 'attr1' => array( + 'DataType' => 'String', + 'StringValue' => 'value1', + ), + ), + $messageAttributes->toArray() + ); + } + + public function testMerge() + { + $this->assertEquals( + new MessageAttributes(array('attr1' => 'value1', 'attr2' => 'value2')), + MessageAttributes::merge( + new MessageAttributes(array('attr1' => 'value1')), + new MessageAttributes(array('attr2' => 'value2')) + ) + ); + } + + public function testThrowExceptionForInvalidAttribute() + { + $this->expectException(\InvalidArgumentException::class); + new MessageAttributes(array('invalidKey****' => 'value1')); + } +} diff --git a/Tests/QueueInteropTransportTest.php b/Tests/QueueInteropTransportTest.php index 6500c7f..f8ac0bf 100644 --- a/Tests/QueueInteropTransportTest.php +++ b/Tests/QueueInteropTransportTest.php @@ -14,10 +14,13 @@ use Enqueue\AmqpTools\DelayStrategyAware; use Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy; use Enqueue\MessengerAdapter\ContextManager; +use Enqueue\MessengerAdapter\EnvelopeItem\MessageAttributesStamp; use Enqueue\MessengerAdapter\EnvelopeItem\TransportConfiguration; use Enqueue\MessengerAdapter\Exception\MissingMessageMetadataSetterException; +use Enqueue\MessengerAdapter\Model\MessageAttributes; use Enqueue\MessengerAdapter\QueueInteropTransport; use Enqueue\MessengerAdapter\Tests\Fixtures\DecoratedPsrMessage; +use Enqueue\SnsQs\SnsQsMessage; use Enqueue\SnsQs\SnsQsProducer; use Interop\Queue\Consumer; use Interop\Queue\Context; @@ -139,6 +142,61 @@ public function testSendWithoutTransportName() $transport->send($envelope); } + public function testSendSnsQsMessageWithMessageAttributes() + { + $topicName = 'topic'; + $queueName = 'queue'; + $message = new \stdClass(); + $message->foo = 'bar'; + $envelope = (new Envelope($message)) + ->with(new MessageAttributesStamp( + new MessageAttributes(array('foo' => 'bar', 'baz' => 'quux')) + )); + + $psrMessageProphecy = $this->prophesize(SnsQsMessage::class); + $psrMessageProphecy->setMessageAttributes( + array( + 'foo' => array( + 'DataType' => 'String', + 'StringValue' => 'bar', + ), + 'baz' => array( + 'DataType' => 'String', + 'StringValue' => 'quux', + ), + ) + )->shouldBeCalled(); + $psrMessage = $psrMessageProphecy->reveal(); + $topicProphecy = $this->prophesize(Topic::class); + $topic = $topicProphecy->reveal(); + + $producerProphecy = $this->prophesize(Producer::class); + $producerProphecy->send($topic, $psrMessage)->shouldBeCalled(); + + $contextProphecy = $this->prophesize(Context::class); + $contextProphecy->createTopic($topicName)->shouldBeCalled()->willReturn($topic); + $contextProphecy->createProducer()->shouldBeCalled()->willReturn($producerProphecy->reveal()); + $contextProphecy->createMessage('foo', array(), array())->shouldBeCalled()->willReturn($psrMessage); + + $contextManagerProphecy = $this->prophesize(ContextManager::class); + $contextManagerProphecy->context()->shouldBeCalled()->willReturn($contextProphecy->reveal()); + + $encoderProphecy = $this->prophesize(SerializerInterface::class); + $encoderProphecy->encode($envelope)->shouldBeCalled()->willReturn(array('body' => 'foo')); + + $transport = $this->getTransport( + $encoderProphecy->reveal(), + $contextManagerProphecy->reveal(), + array( + 'topic' => array('name' => $topicName), + 'queue' => array('name' => $queueName), + ), + false + ); + + $transport->send($envelope); + } + public function testSendWithoutDebugWillNotVerifyTheInfrastructureForPerformanceReasons() { $transportName = 'transport';