From 9106d5ad831537196eb95383c116ceaf299df836 Mon Sep 17 00:00:00 2001 From: Matt Vollrath Date: Tue, 12 May 2020 16:00:35 -0400 Subject: [PATCH 1/4] Don't block Subscription.unregister() Fixes #425 --- .../src/rosbridge_library/capabilities/subscribe.py | 2 +- .../rosbridge_library/internal/subscription_modifiers.py | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/rosbridge_library/src/rosbridge_library/capabilities/subscribe.py b/rosbridge_library/src/rosbridge_library/capabilities/subscribe.py index 6d5cc8c92..eefdb889a 100644 --- a/rosbridge_library/src/rosbridge_library/capabilities/subscribe.py +++ b/rosbridge_library/src/rosbridge_library/capabilities/subscribe.py @@ -87,7 +87,7 @@ def unregister(self): """ Unsubscribes this subscription and cleans up resources """ manager.unsubscribe(self.client_id, self.topic) with self.handler_lock: - self.handler.finish() + self.handler.finish(block=False) self.clients.clear() def subscribe(self, sid=None, msg_type=None, throttle_rate=0, diff --git a/rosbridge_library/src/rosbridge_library/internal/subscription_modifiers.py b/rosbridge_library/src/rosbridge_library/internal/subscription_modifiers.py index 00172c864..666d7f76f 100644 --- a/rosbridge_library/src/rosbridge_library/internal/subscription_modifiers.py +++ b/rosbridge_library/src/rosbridge_library/internal/subscription_modifiers.py @@ -80,7 +80,7 @@ def transition(self): else: return QueueMessageHandler(self) - def finish(self): + def finish(self, block=True): pass @@ -98,7 +98,7 @@ def transition(self): else: return QueueMessageHandler(self) - def finish(self): + def finish(self, block=True): pass @@ -136,14 +136,15 @@ def transition(self): self.c.notify() return self - def finish(self): + def finish(self, block=True): """ If throttle was set to 0, this pushes all buffered messages """ # Notify the thread to finish with self.c: self.alive = False self.c.notify() - self.join() + if block: + self.join() def run(self): while self.alive: From 96186aea7f3706207903c11ac656d9104759c7db Mon Sep 17 00:00:00 2001 From: Matt Vollrath Date: Tue, 12 May 2020 16:01:02 -0400 Subject: [PATCH 2/4] Don't add messages to finished queue handler --- .../src/rosbridge_library/internal/subscription_modifiers.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/rosbridge_library/src/rosbridge_library/internal/subscription_modifiers.py b/rosbridge_library/src/rosbridge_library/internal/subscription_modifiers.py index 666d7f76f..77052ceb5 100644 --- a/rosbridge_library/src/rosbridge_library/internal/subscription_modifiers.py +++ b/rosbridge_library/src/rosbridge_library/internal/subscription_modifiers.py @@ -115,6 +115,9 @@ def __init__(self, previous_handler): def handle_message(self, msg): with self.c: + if not self.alive: + return + should_notify = len(self.queue) == 0 self.queue.append(msg) if should_notify: From 51715323667dac894db5535763e2e9456541bc05 Mon Sep 17 00:00:00 2001 From: Matt Vollrath Date: Tue, 12 May 2020 16:34:09 -0400 Subject: [PATCH 3/4] Decouple incoming WS handling from server thread Fixes #425 --- .../rosbridge_server/autobahn_websocket.py | 50 ++++++++++++++++++- 1 file changed, 48 insertions(+), 2 deletions(-) diff --git a/rosbridge_server/src/rosbridge_server/autobahn_websocket.py b/rosbridge_server/src/rosbridge_server/autobahn_websocket.py index d31242ab5..96e89373c 100755 --- a/rosbridge_server/src/rosbridge_server/autobahn_websocket.py +++ b/rosbridge_server/src/rosbridge_server/autobahn_websocket.py @@ -39,6 +39,7 @@ import threading import traceback from functools import wraps +from collections import deque from autobahn.twisted.websocket import WebSocketServerProtocol from twisted.internet import interfaces, reactor @@ -66,6 +67,48 @@ def wrapper(*args, **kwargs): return wrapper +class IncomingQueue(threading.Thread): + """Decouples incoming messages from the Autobahn thread. + + This mitigates cases where outgoing messages are blocked by incoming, + and vice versa. + """ + def __init__(self, protocol): + threading.Thread.__init__(self) + self.daemon = True + self.queue = deque() + self.protocol = protocol + + self.cond = threading.Condition() + self._finished = False + + def finish(self): + """Clear the queue and do not accept further messages.""" + with self.cond: + self._finished = True + while len(self.queue) > 0: + self.queue.popleft() + self.cond.notify() + + def push(self, msg): + with self.cond: + self.queue.append(msg) + self.cond.notify() + + def run(self): + while True: + with self.cond: + if len(self.queue) == 0: + self.cond.wait() + + if self._finished: + break + + msg = self.queue.popleft() + + self.protocol.incoming(msg) + + @implementer(interfaces.IPushProducer) class OutgoingValve: """Allows the Autobahn transport to pause outgoing messages from rosbridge. @@ -131,6 +174,8 @@ def onOpen(self): } try: self.protocol = RosbridgeProtocol(cls.client_id_seed, parameters=parameters) + self.incoming_queue = IncomingQueue(self.protocol) + self.incoming_queue.start() producer = OutgoingValve(self) self.transport.registerProducer(producer, True) producer.resumeProducing() @@ -177,10 +222,10 @@ def onMessage(self, message, binary): self.sendClose() except: # proper error will be handled in the protocol class - self.protocol.incoming(message) + self.incoming_queue.push(message) else: # no authentication required - self.protocol.incoming(message) + self.incoming_queue.push(message) def outgoing(self, message): if type(message) == bson.BSON: @@ -201,6 +246,7 @@ def onClose(self, was_clean, code, reason): cls = self.__class__ cls.clients_connected -= 1 self.protocol.finish() + self.incoming_queue.finish() if cls.client_manager: cls.client_manager.remove_client(self.client_id, self.peer) rospy.loginfo("Client disconnected. %d clients total.", cls.clients_connected) From 36da1887bdad00e42dc4bfade3ea02590f35fe4a Mon Sep 17 00:00:00 2001 From: Matt Vollrath Date: Wed, 13 May 2020 00:01:28 -0400 Subject: [PATCH 4/4] IncomingQueue: don't wait() if _finished --- rosbridge_server/src/rosbridge_server/autobahn_websocket.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rosbridge_server/src/rosbridge_server/autobahn_websocket.py b/rosbridge_server/src/rosbridge_server/autobahn_websocket.py index 96e89373c..9af0f29ae 100755 --- a/rosbridge_server/src/rosbridge_server/autobahn_websocket.py +++ b/rosbridge_server/src/rosbridge_server/autobahn_websocket.py @@ -98,7 +98,7 @@ def push(self, msg): def run(self): while True: with self.cond: - if len(self.queue) == 0: + if len(self.queue) == 0 and not self._finished: self.cond.wait() if self._finished: