diff --git a/distributed/comm/tcp.py b/distributed/comm/tcp.py index 0088ed6500e..3b5967fbc7e 100644 --- a/distributed/comm/tcp.py +++ b/distributed/comm/tcp.py @@ -145,6 +145,8 @@ class TCP(Comm): An established communication based on an underlying Tornado IOStream. """ + max_shard_size = dask.utils.parse_bytes(dask.config.get("distributed.comm.shard")) + def __init__(self, stream, local_addr, peer_addr, deserialize=True): self._closed = False Comm.__init__(self) @@ -248,6 +250,7 @@ async def write(self, msg, serializers=None, on_error="message"): "recipient": self.remote_info, **self.handshake_options, }, + frame_split_size=self.max_shard_size, ) frames_nbytes = [nbytes(f) for f in frames] frames_nbytes_total = sum(frames_nbytes) @@ -335,6 +338,8 @@ class TLS(TCP): A TLS-specific version of TCP. """ + max_shard_size = min(C_INT_MAX, TCP.max_shard_size) + def _read_extra(self): TCP._read_extra(self) sock = self.stream.socket