From cd8b6f9ba047a19c616b7a78b6cb5b138b2417d5 Mon Sep 17 00:00:00 2001 From: valtzu Date: Sun, 25 Feb 2024 13:46:46 +0200 Subject: [PATCH] [Messenger] Extend SQS visibility timeout for messages that are still being processed --- CHANGELOG.md | 5 ++++ Tests/Transport/AmazonSqsIntegrationTest.php | 17 +++++++++++- Tests/Transport/AmazonSqsReceiverTest.php | 13 +++++++++ Tests/Transport/AmazonSqsTransportTest.php | 26 ++++++++++++++++++ Tests/Transport/ConnectionTest.php | 28 ++++++++++++++++++++ Transport/AmazonSqsReceiver.php | 13 +++++++-- Transport/AmazonSqsTransport.php | 11 +++++++- Transport/Connection.php | 17 ++++++++++++ composer.json | 2 +- 9 files changed, 127 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a8b1d1e..6e29f4f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,11 @@ CHANGELOG ========= +7.2 +--- + + * Implement the `KeepaliveReceiverInterface` to enable asynchronously notifying SQS that the job is still being processed, in order to avoid timeouts + 6.4 --- diff --git a/Tests/Transport/AmazonSqsIntegrationTest.php b/Tests/Transport/AmazonSqsIntegrationTest.php index 189a4e5..357cbdd 100644 --- a/Tests/Transport/AmazonSqsIntegrationTest.php +++ b/Tests/Transport/AmazonSqsIntegrationTest.php @@ -41,11 +41,12 @@ public function testConnectionSendAndGet() private function execute(string $dsn): void { - $connection = Connection::fromDsn($dsn, []); + $connection = Connection::fromDsn($dsn, ['visibility_timeout' => 1]); $connection->setup(); $this->clearSqs($dsn); $connection->send('{"message": "Hi"}', ['type' => DummyMessage::class, DummyMessage::class => 'special']); + $messageSentAt = microtime(true); $this->assertSame(1, $connection->getMessageCount()); $wait = 0; @@ -55,6 +56,20 @@ private function execute(string $dsn): void $this->assertEquals('{"message": "Hi"}', $encoded['body']); $this->assertEquals(['type' => DummyMessage::class, DummyMessage::class => 'special'], $encoded['headers']); + + $this->waitUntilElapsed(seconds: 1.0, since: $messageSentAt); + $connection->keepalive($encoded['id']); + $this->waitUntilElapsed(seconds: 2.0, since: $messageSentAt); + $this->assertSame(0, $connection->getMessageCount(), 'The queue should be empty since visibility timeout was extended'); + $connection->delete($encoded['id']); + } + + private function waitUntilElapsed(float $seconds, float $since): void + { + $waitTime = $seconds - (microtime(true) - $since); + if ($waitTime > 0) { + usleep((int) ($waitTime * 1e6)); + } } private function clearSqs(string $dsn): void diff --git a/Tests/Transport/AmazonSqsReceiverTest.php b/Tests/Transport/AmazonSqsReceiverTest.php index 2f60f81..164ec7a 100644 --- a/Tests/Transport/AmazonSqsReceiverTest.php +++ b/Tests/Transport/AmazonSqsReceiverTest.php @@ -13,8 +13,10 @@ use PHPUnit\Framework\TestCase; use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsReceivedStamp; use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsReceiver; use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection; +use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; use Symfony\Component\Messenger\Transport\Serialization\Serializer; @@ -54,6 +56,17 @@ public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException() iterator_to_array($receiver->get()); } + public function testKeepalive() + { + $serializer = $this->createSerializer(); + + $connection = $this->createMock(Connection::class); + $connection->expects($this->once())->method('keepalive')->with('123', 10); + + $receiver = new AmazonSqsReceiver($connection, $serializer); + $receiver->keepalive(new Envelope(new DummyMessage('foo'), [new AmazonSqsReceivedStamp('123')]), 10); + } + private function createSqsEnvelope() { return [ diff --git a/Tests/Transport/AmazonSqsTransportTest.php b/Tests/Transport/AmazonSqsTransportTest.php index fbe49e2..1bcda50 100644 --- a/Tests/Transport/AmazonSqsTransportTest.php +++ b/Tests/Transport/AmazonSqsTransportTest.php @@ -16,6 +16,7 @@ use PHPUnit\Framework\MockObject\MockObject; use PHPUnit\Framework\TestCase; use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsReceivedStamp; use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsReceiver; use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsTransport; use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection; @@ -151,6 +152,31 @@ public function testItConvertsHttpExceptionDuringResetIntoTransportException() $this->transport->reset(); } + public function testKeepalive() + { + $transport = $this->getTransport( + null, + $connection = $this->createMock(Connection::class), + ); + + $connection->expects($this->once())->method('keepalive')->with('123', 10); + $transport->keepalive(new Envelope(new DummyMessage('foo'), [new AmazonSqsReceivedStamp('123')]), 10); + } + + public function testKeepaliveWhenASqsExceptionOccurs() + { + $transport = $this->getTransport( + null, + $connection = $this->createMock(Connection::class), + ); + + $exception = $this->createHttpException(); + $connection->expects($this->once())->method('keepalive')->with('123')->willThrowException($exception); + + $this->expectExceptionObject(new TransportException($exception->getMessage(), 0, $exception)); + $transport->keepalive(new Envelope(new DummyMessage('foo'), [new AmazonSqsReceivedStamp('123')])); + } + private function getTransport(?SerializerInterface $serializer = null, ?Connection $connection = null) { $serializer ??= $this->createMock(SerializerInterface::class); diff --git a/Tests/Transport/ConnectionTest.php b/Tests/Transport/ConnectionTest.php index 691b3fd..3352bfd 100644 --- a/Tests/Transport/ConnectionTest.php +++ b/Tests/Transport/ConnectionTest.php @@ -23,6 +23,7 @@ use Symfony\Component\HttpClient\MockHttpClient; use Symfony\Component\HttpClient\Response\MockResponse; use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection; +use Symfony\Component\Messenger\Exception\TransportException; use Symfony\Contracts\HttpClient\HttpClientInterface; class ConnectionTest extends TestCase @@ -357,6 +358,33 @@ public function testLoggerWithDebugOption() $connection->get(); } + public function testKeepalive() + { + $expectedParams = [ + 'QueueUrl' => $queueUrl = 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue', + 'ReceiptHandle' => $id = 'abc', + 'VisibilityTimeout' => $visibilityTimeout = 30, + ]; + + $client = $this->createMock(SqsClient::class); + $client->expects($this->once())->method('changeMessageVisibility')->with($expectedParams); + + $connection = new Connection(['visibility_timeout' => $visibilityTimeout], $client, $queueUrl); + $connection->keepalive($id); + } + + public function testKeepaliveWithTooSmallTtl() + { + $client = $this->createMock(SqsClient::class); + $client->expects($this->never())->method($this->anything()); + + $connection = new Connection(['visibility_timeout' => 1], $client, 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue'); + + $this->expectException(TransportException::class); + $this->expectExceptionMessage('SQS visibility_timeout (1s) cannot be smaller than the keepalive interval (2s).'); + $connection->keepalive('123', 2); + } + private function getMockedQueueUrlResponse(): MockResponse { if ($this->isAsyncAwsSqsVersion2Installed()) { diff --git a/Transport/AmazonSqsReceiver.php b/Transport/AmazonSqsReceiver.php index 5179186..8a86615 100644 --- a/Transport/AmazonSqsReceiver.php +++ b/Transport/AmazonSqsReceiver.php @@ -16,15 +16,15 @@ use Symfony\Component\Messenger\Exception\LogicException; use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; use Symfony\Component\Messenger\Exception\TransportException; +use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface; -use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; /** * @author Jérémy Derussé */ -class AmazonSqsReceiver implements ReceiverInterface, MessageCountAwareInterface +class AmazonSqsReceiver implements KeepaliveReceiverInterface, MessageCountAwareInterface { private SerializerInterface $serializer; @@ -78,6 +78,15 @@ public function reject(Envelope $envelope): void } } + public function keepalive(Envelope $envelope, ?int $seconds = null): void + { + try { + $this->connection->keepalive($this->findSqsReceivedStamp($envelope)->getId(), $seconds); + } catch (HttpException $e) { + throw new TransportException($e->getMessage(), 0, $e); + } + } + public function getMessageCount(): int { try { diff --git a/Transport/AmazonSqsTransport.php b/Transport/AmazonSqsTransport.php index ebe64ab..d98efef 100644 --- a/Transport/AmazonSqsTransport.php +++ b/Transport/AmazonSqsTransport.php @@ -14,6 +14,7 @@ use AsyncAws\Core\Exception\Http\HttpException; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\TransportException; +use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\Messenger\Transport\Sender\SenderInterface; @@ -26,7 +27,7 @@ /** * @author Jérémy Derussé */ -class AmazonSqsTransport implements TransportInterface, SetupableTransportInterface, MessageCountAwareInterface, ResetInterface +class AmazonSqsTransport implements TransportInterface, KeepaliveReceiverInterface, SetupableTransportInterface, MessageCountAwareInterface, ResetInterface { private SerializerInterface $serializer; @@ -54,6 +55,14 @@ public function reject(Envelope $envelope): void $this->getReceiver()->reject($envelope); } + public function keepalive(Envelope $envelope, ?int $seconds = null): void + { + $receiver = $this->getReceiver(); + if ($receiver instanceof KeepaliveReceiverInterface) { + $receiver->keepalive($envelope, $seconds); + } + } + public function getMessageCount(): int { return $this->getReceiver()->getMessageCount(); diff --git a/Transport/Connection.php b/Transport/Connection.php index 16dff20..40a6e06 100644 --- a/Transport/Connection.php +++ b/Transport/Connection.php @@ -304,6 +304,23 @@ public function delete(string $id): void ]); } + /** + * @param int|null $seconds the minimum duration the message should be kept alive + */ + public function keepalive(string $id, ?int $seconds = null): void + { + $visibilityTimeout = $this->configuration['visibility_timeout']; + if (null !== $visibilityTimeout && null !== $seconds && $visibilityTimeout < $seconds) { + throw new TransportException(\sprintf('SQS visibility_timeout (%ds) cannot be smaller than the keepalive interval (%ds).', $visibilityTimeout, $seconds)); + } + + $this->client->changeMessageVisibility([ + 'QueueUrl' => $this->getQueueUrl(), + 'ReceiptHandle' => $id, + 'VisibilityTimeout' => $this->configuration['visibility_timeout'], + ]); + } + public function getMessageCount(): int { $response = $this->client->getQueueAttributes([ diff --git a/composer.json b/composer.json index 18b24c6..341026c 100644 --- a/composer.json +++ b/composer.json @@ -19,7 +19,7 @@ "php": ">=8.2", "async-aws/core": "^1.7", "async-aws/sqs": "^1.0|^2.0", - "symfony/messenger": "^6.4|^7.0", + "symfony/messenger": "^7.2", "symfony/service-contracts": "^2.5|^3", "psr/log": "^1|^2|^3" },