From a499d43fbaf42f7214db1f63dba79a3e6f85cea7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Turos?= Date: Fri, 20 Oct 2023 08:34:51 +0200 Subject: [PATCH] AMQP: Fix connection retrieval #3023 (#3025) * Fix for #3023 Offer new cached connection when previous one has been closed and released. moved local variable into method (enable Scala 3 tail recursion). updated validation in conn provider test. --- .../alpakka/amqp/AmqpConnectionProvider.scala | 3 ++- .../AmqpConnectionProvidersSpec.scala | 19 +++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpConnectionProvider.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpConnectionProvider.scala index b35363de54..a0d8de1a67 100644 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpConnectionProvider.scala +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpConnectionProvider.scala @@ -367,13 +367,14 @@ final class AmqpCachedConnectionProvider private (val provider: AmqpConnectionPr def withAutomaticRelease(automaticRelease: Boolean): AmqpCachedConnectionProvider = copy(automaticRelease = automaticRelease) - private lazy val connection = provider.get + private def getConnection = provider.get @tailrec override def get: Connection = state.get match { case Empty => if (state.compareAndSet(Empty, Connecting)) { try { + val connection = getConnection if (!state.compareAndSet(Connecting, Connected(connection, 1))) throw new ConcurrentModificationException( "Unexpected concurrent modification while creating the connection." diff --git a/amqp/src/test/scala/akka/stream/alpakka/amqp/scaladsl/AmqpConnectionProvidersSpec.scala b/amqp/src/test/scala/akka/stream/alpakka/amqp/scaladsl/AmqpConnectionProvidersSpec.scala index 00a042de60..fd50b843c5 100644 --- a/amqp/src/test/scala/akka/stream/alpakka/amqp/scaladsl/AmqpConnectionProvidersSpec.scala +++ b/amqp/src/test/scala/akka/stream/alpakka/amqp/scaladsl/AmqpConnectionProvidersSpec.scala @@ -196,4 +196,23 @@ class AmqpConnectionProvidersSpec extends AmqpSpec { catch { case e: Throwable => e shouldBe an[ConnectException] } } } + + "The AMQP Reusable Connection Provider" should { + "open new connection when previous one is forced to close and released" in { + val connectionFactory = new ConnectionFactory() + val connectionProvider = AmqpConnectionFactoryConnectionProvider(connectionFactory) + .withHostAndPort("localhost", 5672) + val reusableConnectionProvider = AmqpCachedConnectionProvider(connectionProvider) + val originalConnection = reusableConnectionProvider.get + + originalConnection.isOpen shouldBe true + originalConnection.abort(1, "Forced close") + reusableConnectionProvider.release(originalConnection) + + val newConnection = reusableConnectionProvider.get + newConnection should not be (originalConnection) + newConnection.isOpen shouldBe true + originalConnection.isOpen should not be true + } + } }