Skip to content

Commit

Permalink
Drain JMS acks queue when session is closed (#2695)
Browse files Browse the repository at this point in the history
  • Loading branch information
jaceksokol authored Jun 28, 2021
1 parent 296733b commit 9a27819
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,12 @@ private[jms] final class JmsAckSession(override val connection: jms.Connection,
override def abortSession(): Unit = stopMessageListenerAndCloseSession()

private def stopMessageListenerAndCloseSession(): Unit = {
ackQueue.put(Left(SessionClosed))
session.close()
try {
drainAcks()
} finally {
ackQueue.put(Left(SessionClosed))
session.close()
}
}

def ackBackpressure() = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ class JmsAckConnectorsSpec extends JmsSpec {
killSwitch2.shutdown()

resultList.toSet should contain theSameElementsAs numsIn.map(_.toString)

jmsSource.takeWithin(1.second).runWith(Sink.seq).futureValue shouldBe empty
}

"ensure no message loss when aborting a stream" in withServer() { server =>
Expand Down Expand Up @@ -367,6 +369,8 @@ class JmsAckConnectorsSpec extends JmsSpec {
s.toInt
}
resultList.toSet should contain theSameElementsAs numsIn.map(_.toString)

jmsSource.takeWithin(1.second).runWith(Sink.seq).futureValue shouldBe empty
}

"shutdown when waiting to acknowledge messages" in withServer() { server =>
Expand Down

0 comments on commit 9a27819

Please sign in to comment.