Skip to content

Commit

Permalink
Rename variables
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait committed Sep 27, 2022
1 parent 07b5d13 commit 71df3cd
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 15 deletions.
16 changes: 8 additions & 8 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -673,8 +673,8 @@ async def test_clean(c, s, a, b):
@gen_cluster(client=True)
async def test_message_breakup(c, s, a, b):
n = 100_000
a.state.transfer_message_target_bytes = 10 * n
b.state.transfer_message_target_bytes = 10 * n
a.state.transfer_message_bytes_limit = 10 * n
b.state.transfer_message_bytes_limit = 10 * n
xs = [
c.submit(mul, b"%d" % i, n, key=f"x{i}", workers=[a.address]) for i in range(30)
]
Expand Down Expand Up @@ -809,10 +809,10 @@ async def test_multiple_transfers(c, s, w1, w2, w3):
@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3)
async def test_share_communication(c, s, w1, w2, w3):
x = c.submit(
mul, b"1", int(w3.transfer_message_target_bytes + 1), workers=w1.address
mul, b"1", int(w3.transfer_message_bytes_limit + 1), workers=w1.address
)
y = c.submit(
mul, b"2", int(w3.transfer_message_target_bytes + 1), workers=w2.address
mul, b"2", int(w3.transfer_message_bytes_limit + 1), workers=w2.address
)
await wait([x, y])
await c._replicate([x, y], workers=[w1.address, w2.address])
Expand All @@ -826,8 +826,8 @@ async def test_share_communication(c, s, w1, w2, w3):
@pytest.mark.xfail(reason="very high flakiness")
@gen_cluster(client=True)
async def test_dont_overlap_communications_to_same_worker(c, s, a, b):
x = c.submit(mul, b"1", int(b.transfer_message_target_bytes + 1), workers=a.address)
y = c.submit(mul, b"2", int(b.transfer_message_target_bytes + 1), workers=a.address)
x = c.submit(mul, b"1", int(b.transfer_message_bytes_limit + 1), workers=a.address)
y = c.submit(mul, b"2", int(b.transfer_message_bytes_limit + 1), workers=a.address)
await wait([x, y])
z = c.submit(add, x, y, workers=b.address)
await wait(z)
Expand Down Expand Up @@ -3001,9 +3001,9 @@ async def test_acquire_replicas_with_no_priority(c, s, a, b):
@gen_cluster(client=True, nthreads=[("", 1)])
async def test_acquire_replicas_large_data(c, s, a):
"""When acquire-replicas is used to acquire multiple sizeable tasks, it respects
transfer_message_target_bytes and acquires them over multiple iterations.
transfer_message_bytes_limit and acquires them over multiple iterations.
"""
size = a.state.transfer_message_target_bytes // 5 - 10_000
size = a.state.transfer_message_bytes_limit // 5 - 10_000

class C:
def __sizeof__(self):
Expand Down
4 changes: 2 additions & 2 deletions distributed/tests/test_worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1013,13 +1013,13 @@ async def test_deprecated_worker_attributes(s, a, b):
@pytest.mark.parametrize(
"nbytes,n_in_flight",
[
# Note: transfer_message_target_bytes = 50e6 bytes
(int(10e6), 3),
(int(20e6), 2),
(int(30e6), 1),
],
)
def test_aggregate_gather_deps(ws, nbytes, n_in_flight):
ws.transfer_message_bytes_limit = int(50e6)
ws2 = "127.0.0.1:2"
instructions = ws.handle_stimulus(
AcquireReplicasEvent(
Expand Down Expand Up @@ -1066,7 +1066,7 @@ def test_gather_priority(ws):
},
# Substantial nbytes prevents transfer_incoming_count_limit to be
# overridden by transfer_incoming_bytes_throttle_threshold,
# but it's less than transfer_message_target_bytes
# but it's less than transfer_message_bytes_limit
nbytes={f"x{i}": 4 * 2**20 for i in range(1, 9)},
stimulus_id="compute1",
),
Expand Down
2 changes: 1 addition & 1 deletion distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -892,7 +892,7 @@ def data(self) -> MutableMapping[str, Any]:
ready = DeprecatedWorkerStateAttribute()
tasks = DeprecatedWorkerStateAttribute()
target_message_size = DeprecatedWorkerStateAttribute(
target="transfer_message_target_bytes"
target="transfer_message_bytes_limit"
)
total_out_connections = DeprecatedWorkerStateAttribute(
target="transfer_incoming_count_limit"
Expand Down
2 changes: 1 addition & 1 deletion distributed/worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1637,7 +1637,7 @@ def _select_keys_for_gather(
"""Helper of _ensure_communicating.
Fetch all tasks that are replicated on the target worker within a single
message, up to transfer_message_target_bytes or until we reach the limit
message, up to transfer_message_bytes_limit or until we reach the limit
for the size of incoming data transfers.
"""
to_gather: list[TaskState] = []
Expand Down
7 changes: 4 additions & 3 deletions docs/source/worker-state.rst
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,10 @@ These :class:`TaskState` objects have their state set to ``fetch``, are put in t
network. For each dependency we select a worker at random that has that data and collect
the dependency from that worker. To improve bandwidth, we opportunistically gather other
dependencies of other tasks that are known to be on that worker, up to a maximum of 50MB
of data (:attr:`~WorkerState.transfer_message_target_bytes`) - too little data and
bandwidth suffers, too much data and responsiveness suffers. We use a fixed number of 50
connections (:attr:`~WorkerState.transfer_incoming_count_limit`, which is in turn
of data (:attr:`~WorkerState.transfer_message_bytes_limit`, which is acquired from the
configuration key ``distributed.worker.transfer.message-bytes-limit``) - too little data
and bandwidth suffers, too much data and responsiveness suffers. We use a fixed number of
50 connections (:attr:`~WorkerState.transfer_incoming_count_limit`, which is in turn
acquired from the configuration key ``distributed.worker.connections.outgoing``) so as
to avoid overly-fragmenting our network bandwidth.

Expand Down

0 comments on commit 71df3cd

Please sign in to comment.