diff --git a/distributed/comm/tcp.py b/distributed/comm/tcp.py index 2c09ec71035..554901f9838 100644 --- a/distributed/comm/tcp.py +++ b/distributed/comm/tcp.py @@ -267,25 +267,24 @@ async def write(self, msg, serializers=None, on_error="message"): try: # trick to enque all frames for writing beforehand for each_frame_nbytes, each_frame in zip(frames_nbytes, frames): - if each_frame_nbytes: - each_frame = memoryview(each_frame) + each_frame = memoryview(each_frame) - # Make sure that `len(data) == data.nbytes` - # See - each_frame = each_frame.cast("B") + # Make sure that `len(data) == data.nbytes` + # See + each_frame = each_frame.cast("B") - # Workaround for OpenSSL 1.0.2 (can drop with OpenSSL 1.1.1) - for i, j in sliding_window( - 2, range(0, each_frame_nbytes + C_INT_MAX, C_INT_MAX) - ): - chunk = each_frame[i:j] - chunk_nbytes = chunk.nbytes + # Workaround for OpenSSL 1.0.2 (can drop with OpenSSL 1.1.1) + for i, j in sliding_window( + 2, range(0, each_frame_nbytes + C_INT_MAX, C_INT_MAX) + ): + chunk = each_frame[i:j] + chunk_nbytes = chunk.nbytes - if stream._write_buffer is None: - raise StreamClosedError() + if stream._write_buffer is None: + raise StreamClosedError() - stream._write_buffer.append(chunk) - stream._total_write_index += chunk_nbytes + stream._write_buffer.append(chunk) + stream._total_write_index += chunk_nbytes # start writing frames stream.write(b"")