From dfe0e0ca5a0cf00fb4c42261db0f5004769b1713 Mon Sep 17 00:00:00 2001 From: Chris Burr Date: Mon, 28 Oct 2024 11:21:04 +0100 Subject: [PATCH] sweep: #7854 fix (StompMQConnector): add a timeout for the StompConnector --- src/DIRAC/Resources/MessageQueue/StompMQConnector.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/DIRAC/Resources/MessageQueue/StompMQConnector.py b/src/DIRAC/Resources/MessageQueue/StompMQConnector.py index 316a2952f16..4f00f7bc18f 100644 --- a/src/DIRAC/Resources/MessageQueue/StompMQConnector.py +++ b/src/DIRAC/Resources/MessageQueue/StompMQConnector.py @@ -36,6 +36,10 @@ class StompMQConnector(MQConnector): RECONNECT_SLEEP_JITTER = 0.1 # Random factor to add. 0.1 means a random number from 0 to 10% of the current time. RECONNECT_ATTEMPTS_MAX = 1e4 # Maximum attempts to reconnect. + OUTGOING_HEARTBEAT_MS = 15_000 + INCOMING_HEARTBEAT_MS = 15_000 + STOMP_TIMEOUT = 60 + PORT = 61613 def __init__(self, parameters=None): @@ -69,6 +73,11 @@ def setupConnection(self, parameters=None): reconnectSleepJitter = self.parameters.get("ReconnectSleepJitter", StompMQConnector.RECONNECT_SLEEP_JITTER) reconnectAttemptsMax = self.parameters.get("ReconnectAttemptsMax", StompMQConnector.RECONNECT_ATTEMPTS_MAX) + outgoingHeartbeatMs = self.parameters.get("OutgoingHeartbeatMs", StompMQConnector.OUTGOING_HEARTBEAT_MS) + incomingHeartbeatMs = self.parameters.get("IncomingHeartbeatMs", StompMQConnector.INCOMING_HEARTBEAT_MS) + + stompTimeout = self.parameters.get("Timeout", StompMQConnector.STOMP_TIMEOUT) + host = self.parameters.get("Host") port = self.parameters.get("Port", StompMQConnector.PORT) vhost = self.parameters.get("VHost") @@ -80,6 +89,8 @@ def setupConnection(self, parameters=None): connectionArgs = { "vhost": vhost, "keepalive": True, + "timeout": stompTimeout, + "heartbeats": (outgoingHeartbeatMs, incomingHeartbeatMs), "reconnect_sleep_initial": reconnectSleepInitial, "reconnect_sleep_increase": reconnectSleepIncrease, "reconnect_sleep_max": reconnectSleepMax,