From 8bf63b3aebad7b6d4061cd61d7e5cd12ca2a12c7 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Thu, 29 Jul 2021 13:08:54 -0500 Subject: [PATCH 1/2] Cap maximum shard size at the size of an integer Supercedes https://github.com/dask/distributed/pull/5134 Copying over the summary of that PR Works around the OpenSSL 1.0.2 bug demonstrated in issue ( #4538 ), except unlike PR ( #5115 ) which did this for reading, this does the same thing for writing. The error may be less likely to show up in the write path (as frames may simply be smaller than this limit). Still it seems like a good idea to protect against OverflowErrors from OpenSSL --- distributed/comm/tcp.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/distributed/comm/tcp.py b/distributed/comm/tcp.py index 0088ed6500e..3b5967fbc7e 100644 --- a/distributed/comm/tcp.py +++ b/distributed/comm/tcp.py @@ -145,6 +145,8 @@ class TCP(Comm): An established communication based on an underlying Tornado IOStream. """ + max_shard_size = dask.utils.parse_bytes(dask.config.get("distributed.comm.shard")) + def __init__(self, stream, local_addr, peer_addr, deserialize=True): self._closed = False Comm.__init__(self) @@ -248,6 +250,7 @@ async def write(self, msg, serializers=None, on_error="message"): "recipient": self.remote_info, **self.handshake_options, }, + frame_split_size=self.max_shard_size, ) frames_nbytes = [nbytes(f) for f in frames] frames_nbytes_total = sum(frames_nbytes) @@ -335,6 +338,8 @@ class TLS(TCP): A TLS-specific version of TCP. """ + max_shard_size = min(C_INT_MAX, TCP.max_shard_size) + def _read_extra(self): TCP._read_extra(self) sock = self.stream.socket From f39406001e960b14ad43f3e26af101b0452f15dc Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Thu, 29 Jul 2021 14:22:52 -0500 Subject: [PATCH 2/2] Update distributed/comm/tcp.py Co-authored-by: jakirkham --- distributed/comm/tcp.py | 1 + 1 file changed, 1 insertion(+) diff --git a/distributed/comm/tcp.py b/distributed/comm/tcp.py index 3b5967fbc7e..22d7c72461a 100644 --- a/distributed/comm/tcp.py +++ b/distributed/comm/tcp.py @@ -338,6 +338,7 @@ class TLS(TCP): A TLS-specific version of TCP. """ + # Workaround for OpenSSL 1.0.2 (can drop with OpenSSL 1.1.1) max_shard_size = min(C_INT_MAX, TCP.max_shard_size) def _read_extra(self):