Skip to content

Commit

Permalink
Limit TCP writes with Tornado to 2GB (#6557)
Browse files Browse the repository at this point in the history
Fix OverflowError resulting from trying to send a ridiculously large numpy array to a dask cluster.
  • Loading branch information
hhuuggoo authored Jun 24, 2022
1 parent becb366 commit f129485
Showing 1 changed file with 20 additions and 11 deletions.
31 changes: 20 additions & 11 deletions distributed/comm/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ async def read(self, deserializers=None):
range(0, frames_nbytes + OPENSSL_MAX_CHUNKSIZE, OPENSSL_MAX_CHUNKSIZE),
):
chunk = frames[i:j]
chunk_nbytes = len(chunk)
chunk_nbytes = chunk.nbytes
n = await stream.read_into(chunk)
assert n == chunk_nbytes, (n, chunk_nbytes)
except StreamClosedError as e:
Expand Down Expand Up @@ -303,16 +303,25 @@ async def write(self, msg, serializers=None, on_error="message"):
# trick to enque all frames for writing beforehand
for each_frame_nbytes, each_frame in zip(frames_nbytes, frames):
if each_frame_nbytes:
if stream._write_buffer is None:
raise StreamClosedError()

if isinstance(each_frame, memoryview):
# Make sure that `len(data) == data.nbytes`
# See <https://github.com/tornadoweb/tornado/pull/2996>
each_frame = ensure_memoryview(each_frame)

stream._write_buffer.append(each_frame)
stream._total_write_index += each_frame_nbytes
# Make sure that `len(data) == data.nbytes`
# See <https://github.com/tornadoweb/tornado/pull/2996>
each_frame = ensure_memoryview(each_frame)
for i, j in sliding_window(
2,
range(
0,
each_frame_nbytes + OPENSSL_MAX_CHUNKSIZE,
OPENSSL_MAX_CHUNKSIZE,
),
):
chunk = each_frame[i:j]
chunk_nbytes = chunk.nbytes

if stream._write_buffer is None:
raise StreamClosedError()

stream._write_buffer.append(chunk)
stream._total_write_index += chunk_nbytes

# start writing frames
stream.write(b"")
Expand Down

0 comments on commit f129485

Please sign in to comment.