From 667e68e947d4e6b6ef253cfc06891fd703e0c592 Mon Sep 17 00:00:00 2001 From: Sebastien Jourdain Date: Fri, 13 Aug 2021 14:04:42 -0600 Subject: [PATCH] fix(python): prevent raise condition when sending attachements aiohttp does not support concurrent ws.send_bytes() so guarding that code with a lock. https://github.com/aio-libs/aiohttp/issues/2934 --- .../src/wslink/backends/aiohttp/__init__.py | 27 ++++++++++++------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/python/src/wslink/backends/aiohttp/__init__.py b/python/src/wslink/backends/aiohttp/__init__.py index 09130f6..69a85c2 100644 --- a/python/src/wslink/backends/aiohttp/__init__.py +++ b/python/src/wslink/backends/aiohttp/__init__.py @@ -160,6 +160,7 @@ def __init__(self, protocol=None, web_app=None): self.attachmentsReceived = {} self.attachmentsRecvQueue = [] self.connections = {} + self.attachment_atomic = asyncio.Lock() # Build the rpc method dictionary, assuming we were given a serverprotocol if self.getServerProtocol(): @@ -426,20 +427,28 @@ async def sendWrappedMessage(self, rpcid, content, method="", client_id=None): found_keys.append(key) # increment for key pub.publishManager.registerAttachment(key) - # send header - header = { - "wslink": "1.0", - "method": "wslink.binary.attachment", - "args": [key], - } - json_header = json.dumps(header, ensure_ascii=False) + + for key in found_keys: + # send header + header = { + "wslink": "1.0", + "method": "wslink.binary.attachment", + "args": [key], + } + json_header = json.dumps(header, ensure_ascii=False) + + # aiohttp can not handle pending ws.send_bytes() + # tried with semaphore but got exception with >1 + # https://github.com/aio-libs/aiohttp/issues/2934 + async with self.attachment_atomic: for ws in websockets: + # Send binary header await ws.send_str(json_header) # Send binary message await ws.send_bytes(attachments[key]) - # decrement for key - pub.publishManager.unregisterAttachment(key) + # decrement for key + pub.publishManager.unregisterAttachment(key) pub.publishManager.freeAttachments(keys=found_keys)