From dbba5774d6a3742515fc1b6b701a4b4e63c17e89 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Fri, 23 Jul 2021 19:36:20 -0700 Subject: [PATCH] Read smaller frames to workaround OpenSSL bug As older versions of OpenSSL (in particular 1.0.2) have limitations on the size of buffers they can work with, take small views into our larger buffer and read those in instead. This should keep the buffer sizes more manageable for OpenSSL. --- distributed/comm/tcp.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/distributed/comm/tcp.py b/distributed/comm/tcp.py index de6cee016d8..0ca918c1774 100644 --- a/distributed/comm/tcp.py +++ b/distributed/comm/tcp.py @@ -1,3 +1,4 @@ +import ctypes import errno import functools import logging @@ -19,6 +20,8 @@ from tornado.tcpclient import TCPClient from tornado.tcpserver import TCPServer +from tlz import sliding_window + import dask from dask.utils import parse_timedelta @@ -35,6 +38,7 @@ MAX_BUFFER_SIZE = MEMORY_LIMIT / 2 +C_MAX_INT = 256 ** ctypes.sizeof(ctypes.c_int) // 2 - 1 def set_tcp_timeout(comm): @@ -193,9 +197,13 @@ async def read(self, deserializers=None): frames_nbytes = await stream.read_bytes(fmt_size) (frames_nbytes,) = struct.unpack(fmt, frames_nbytes) - frames = bytearray(frames_nbytes) - n = await stream.read_into(frames) - assert n == frames_nbytes, (n, frames_nbytes) + frames = memoryview(bytearray(frames_nbytes)) + # Workaround for OpenSSL 1.0.2 (can drop with OpenSSL 1.1.1) + for i, j in sliding_window(range(0, frame_nbytes + C_MAX_INT, C_MAX_INT)): + chunk = frames[i:j] + chunk_nbytes = len(chunk) + n = await stream.read_into(chunk) + assert n == chunk_nbytes, (n, chunk_nbytes) except StreamClosedError as e: self.stream = None self._closed = True