Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added possibility to send message attributes for amazon (snsqs, sqs) transports #116

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions EnvelopeItem/MessageAttributesStamp.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?php

namespace Enqueue\MessengerAdapter\EnvelopeItem;

use Enqueue\MessengerAdapter\Model\MessageAttributes;
use Interop\Queue\Message;
use Symfony\Component\Messenger\Stamp\NonSendableStampInterface;

final class MessageAttributesStamp implements NonSendableStampInterface
{
/** @var MessageAttributes */
private $attributes;

public function __construct(MessageAttributes $attributes)
{
$this->attributes = $attributes;
}

public function getAttributes(): MessageAttributes
{
return $this->attributes;
}
}
60 changes: 60 additions & 0 deletions Model/MessageAttributes.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
<?php

namespace Enqueue\MessengerAdapter\Model;

final class MessageAttributes
{
/** @var array<string, string> */
private $attributes;

/** @param array<string, string> $attributes */
public function __construct(array $attributes)
{
if (empty($attributes)) {
throw new \InvalidArgumentException('MessageAttributes should have at least one attribute');
}
\array_walk($attributes, function (string $value, string $name) {
return $this->add($name, $value);
});
}

/** @return array<string, array<string, string>> */
public function toArray(): array
{
$result = array();
foreach ($this->attributes as $name => $value) {
$result[$name] = array(
'DataType' => 'String',
'StringValue' => $value,
);
}

return $result;
}

public static function merge(self ...$messageAttributes): self
{
return new self(
\array_merge(
...\array_map(static function (MessageAttributes $attributes) {
return $attributes->attributes;
}, $messageAttributes)
)
);
}

private function add(string $name, string $value): void
{
if (0 === \preg_match('/^[A-Za-z0-9-_.]{1,256}$/', $name)) {
throw new \InvalidArgumentException(
\sprintf(
'"%s" is invalid massage attribute name. See details here: %s',
$name,
'https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html'
)
);
}

$this->attributes[$name] = $value;
}
}
22 changes: 22 additions & 0 deletions QueueInteropTransport.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
use Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy;
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;
use Enqueue\MessengerAdapter\EnvelopeItem\InteropMessageStamp;
use Enqueue\MessengerAdapter\EnvelopeItem\MessageAttributesStamp;
use Enqueue\MessengerAdapter\EnvelopeItem\TransportConfiguration;
use Enqueue\MessengerAdapter\Exception\MissingMessageMetadataSetterException;
use Enqueue\MessengerAdapter\Exception\SendingMessageFailedException;
use Enqueue\MessengerAdapter\Model\MessageAttributes;
use Enqueue\SnsQs\SnsQsProducer;
use Interop\Queue\Consumer;
use Interop\Queue\Exception as InteropQueueException;
Expand Down Expand Up @@ -168,6 +170,8 @@ public function send(Envelope $envelope): Envelope
$producer->setTimeToLive($this->options['timeToLive']);
}

$this->addMessageAttributes($interopMessage, $envelope);

try {
$producer->send($topic, $interopMessage);
} catch (InteropQueueException $e) {
Expand Down Expand Up @@ -281,4 +285,22 @@ private function getConsumer(): Consumer

return $context->createConsumer($queue);
}

private function addMessageAttributes(Message $interopMessage, Envelope $envelope): void
{
if (!\method_exists($interopMessage, 'setMessageAttributes')) {
return;
}
if (empty($messageAttributesStamps = $envelope->all(MessageAttributesStamp::class))) {
return;
}

$messageAttributes = \array_map(
static function (MessageAttributesStamp $stamp) {
return $stamp->getAttributes();
},
$messageAttributesStamps
);
$interopMessage->setMessageAttributes(MessageAttributes::merge(...$messageAttributes)->toArray());
}
}
40 changes: 40 additions & 0 deletions Tests/Model/MessageAttributesTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<?php

namespace Enqueue\MessengerAdapter\Tests\Model;

use Enqueue\MessengerAdapter\Model\MessageAttributes;
use PHPUnit\Framework\TestCase;

class MessageAttributesTest extends TestCase
{
public function testToArray()
{
$messageAttributes = new MessageAttributes(array('attr1' => 'value1'));
$this->assertSame(
array(
'attr1' => array(
'DataType' => 'String',
'StringValue' => 'value1',
),
),
$messageAttributes->toArray()
);
}

public function testMerge()
{
$this->assertEquals(
new MessageAttributes(array('attr1' => 'value1', 'attr2' => 'value2')),
MessageAttributes::merge(
new MessageAttributes(array('attr1' => 'value1')),
new MessageAttributes(array('attr2' => 'value2'))
)
);
}

public function testThrowExceptionForInvalidAttribute()
{
$this->expectException(\InvalidArgumentException::class);
new MessageAttributes(array('invalidKey****' => 'value1'));
}
}
58 changes: 58 additions & 0 deletions Tests/QueueInteropTransportTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
use Enqueue\AmqpTools\DelayStrategyAware;
use Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy;
use Enqueue\MessengerAdapter\ContextManager;
use Enqueue\MessengerAdapter\EnvelopeItem\MessageAttributesStamp;
use Enqueue\MessengerAdapter\EnvelopeItem\TransportConfiguration;
use Enqueue\MessengerAdapter\Exception\MissingMessageMetadataSetterException;
use Enqueue\MessengerAdapter\Model\MessageAttributes;
use Enqueue\MessengerAdapter\QueueInteropTransport;
use Enqueue\MessengerAdapter\Tests\Fixtures\DecoratedPsrMessage;
use Enqueue\SnsQs\SnsQsMessage;
use Enqueue\SnsQs\SnsQsProducer;
use Interop\Queue\Consumer;
use Interop\Queue\Context;
Expand Down Expand Up @@ -139,6 +142,61 @@ public function testSendWithoutTransportName()
$transport->send($envelope);
}

public function testSendSnsQsMessageWithMessageAttributes()
{
$topicName = 'topic';
$queueName = 'queue';
$message = new \stdClass();
$message->foo = 'bar';
$envelope = (new Envelope($message))
->with(new MessageAttributesStamp(
new MessageAttributes(array('foo' => 'bar', 'baz' => 'quux'))
));

$psrMessageProphecy = $this->prophesize(SnsQsMessage::class);
$psrMessageProphecy->setMessageAttributes(
array(
'foo' => array(
'DataType' => 'String',
'StringValue' => 'bar',
),
'baz' => array(
'DataType' => 'String',
'StringValue' => 'quux',
),
)
)->shouldBeCalled();
$psrMessage = $psrMessageProphecy->reveal();
$topicProphecy = $this->prophesize(Topic::class);
$topic = $topicProphecy->reveal();

$producerProphecy = $this->prophesize(Producer::class);
$producerProphecy->send($topic, $psrMessage)->shouldBeCalled();

$contextProphecy = $this->prophesize(Context::class);
$contextProphecy->createTopic($topicName)->shouldBeCalled()->willReturn($topic);
$contextProphecy->createProducer()->shouldBeCalled()->willReturn($producerProphecy->reveal());
$contextProphecy->createMessage('foo', array(), array())->shouldBeCalled()->willReturn($psrMessage);

$contextManagerProphecy = $this->prophesize(ContextManager::class);
$contextManagerProphecy->context()->shouldBeCalled()->willReturn($contextProphecy->reveal());

$encoderProphecy = $this->prophesize(SerializerInterface::class);
$encoderProphecy->encode($envelope)->shouldBeCalled()->willReturn(array('body' => 'foo'));

$transport = $this->getTransport(
$encoderProphecy->reveal(),
$contextManagerProphecy->reveal(),
array(
'topic' => array('name' => $topicName),
'queue' => array('name' => $queueName),
),
false
);

$transport->send($envelope);
}

public function testSendWithoutDebugWillNotVerifyTheInfrastructureForPerformanceReasons()
{
$transportName = 'transport';
Expand Down