Skip to content

Commit

Permalink
Merge branch 'main' into v2.28.0
Browse files Browse the repository at this point in the history
  • Loading branch information
chris-janidlo authored Oct 2, 2024
2 parents 8b652eb + 480538d commit c0f5725
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
Changed
^^^^^^^

- The Executor now implements a bursty rate-limit in the background submission
thread. The Executor is designed to coalesce up to ``.batch_size`` of tasks
and submit them in a single API call. But if tasks are supplied to the
Executor at just the right frequency, it will send much smaller batches more
frequently which is "not nice" to the API. This change allows "bursts" of up
to 4 API calls in a 16s window, and then will back off to submit every 4
seconds. Notes:

- ``.batch_size`` currently defaults to 128 but is user-settable

- If the Executor is able to completely fill the batch of tasks sent to the
API, that call is not counted toward the burst limit
Original file line number Diff line number Diff line change
Expand Up @@ -1018,7 +1018,7 @@ def cmd_start_endpoint(

log.debug("Convey credentials; redirect stdout, stderr (to '%s')", ep_log)
log_fd_flags = os.O_CREAT | os.O_WRONLY | os.O_APPEND | os.O_SYNC
log_fd = os.open(ep_log, log_fd_flags, mode=0o200)
log_fd = os.open(ep_log, log_fd_flags, mode=0o600)
with os.fdopen(log_fd, "w") as log_f:
if os.dup2(log_f.fileno(), 1) != 1:
raise OSError(f"Unable to redirect stdout to {ep_log}")
Expand Down
1 change: 1 addition & 0 deletions compute_endpoint/tests/unit/test_endpointmanager_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -1972,6 +1972,7 @@ def test_redirect_stdstreams_to_user_log(

a, k = next((a, k) for a, k in mock_os.open.call_args_list if a[0] == ep_log)
assert a[1] == exp_flags, "Expect replacement stdout/stderr: append, wronly, sync"
assert k["mode"] == 0o600, "Expect default to writable *and* readable"


@pytest.mark.parametrize("debug", (True, False))
Expand Down
49 changes: 47 additions & 2 deletions compute_sdk/globus_compute_sdk/sdk/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ class InvalidStateError(Exception):
from pika.frame import Method
from pika.spec import Basic, BasicProperties


_REGISTERED_EXECUTORS: dict[int, Executor] = {}
_RESULT_WATCHERS: dict[uuid.UUID, _ResultWatcher] = {}

Expand Down Expand Up @@ -136,6 +135,8 @@ def __init__(
label: str = "",
batch_size: int = 128,
amqp_port: int | None = None,
api_burst_limit: int = 4,
api_burst_window_s: int = 16,
**kwargs,
):
"""
Expand All @@ -156,6 +157,10 @@ def __init__(
sending upstream [min: 1, default: 128]
:param amqp_port: Port to use when connecting to results queue. Note that the
Compute web services only support 5671, 5672, and 443.
:param api_burst_limit: Number of "free" API calls to allow before engaging
client-side (i.e., this executor) rate-limiting. See ``api_burst_window_s``
:param api_burst_window_s: Window of time (in seconds) in which to count API
calls for rate-limiting.
"""
deprecated_kwargs = {""}
for key in kwargs:
Expand Down Expand Up @@ -192,6 +197,9 @@ def __init__(
self.label = label
self.batch_size = max(1, batch_size)

self.api_burst_limit = min(8, max(1, api_burst_limit))
self.api_burst_window_s = min(32, max(1, api_burst_window_s))

self.task_count_submitted = 0
self._task_counter: int = 0
self._tasks_to_send: queue.Queue[
Expand Down Expand Up @@ -868,6 +876,8 @@ def __hash__(self):
t.List[_TaskSubmissionInfo],
]

api_burst_ts: list[float] = []
api_burst_fill: list[float] = []
try:
fut: ComputeFuture | None = ComputeFuture() # just start the loop; please
while fut is not None:
Expand Down Expand Up @@ -899,18 +909,53 @@ def __hash__(self):
if not tasks:
continue # fut and task are None; "single point of exit"

api_rate_steady = self.api_burst_window_s / self.api_burst_limit
for submit_group, task_list in tasks.items():
fut_list = futs[submit_group]
num_tasks = len(task_list)

tg_uuid, ep_uuid, res_spec, uep_config = submit_group
log.info(
f"Submitting tasks for Task Group {tg_uuid} to"
f" Endpoint {ep_uuid}: {len(task_list):,}"
f" Endpoint {ep_uuid}: {num_tasks:,}"
)

if api_burst_ts:
now = time.monotonic()
then = now - self.api_burst_window_s
api_burst_ts = [i for i in api_burst_ts if i > then]
api_burst_fill = api_burst_fill[-len(api_burst_ts) :]
if len(api_burst_ts) >= self.api_burst_limit:
delay = api_rate_steady - (now - api_burst_ts[-1])
delay = max(delay, 0) + random.random()
_burst_rel = [f"{now - s:.2f}" for s in api_burst_ts]
_burst_fill = [f"{p:.1f}%" for p in api_burst_fill]

log.warning(
"%r (tid:%s): API rate-limit delay of %.2fs"
"\n Consider submitting more tasks at once."
"\n batch_size = %d"
"\n api_burst_limit = %s"
"\n api_burst_window_s = %s (seconds)"
"\n recent sends: %s"
"\n recent batch fill percent: %s",
self,
_tid,
delay,
self.batch_size,
self.api_burst_limit,
self.api_burst_window_s,
", ".join(_burst_rel),
", ".join(_burst_fill),
)
time.sleep(delay)
self._submit_tasks(
tg_uuid, ep_uuid, res_spec, uep_config, fut_list, task_list
)
if num_tasks < self.api_burst_limit:
api_burst_ts.append(time.monotonic())
fill_percent = 100 * num_tasks / self.batch_size
api_burst_fill.append(fill_percent)

to_watch = [f for f in fut_list if f.task_id and not f.done()]
if not to_watch:
Expand Down
44 changes: 41 additions & 3 deletions compute_sdk/tests/unit/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -938,10 +938,12 @@ def test_task_submitter_respects_batch_size(gc_executor, batch_size: int):

gce.endpoint_id = uuid.uuid4()
gce.batch_size = batch_size
for _ in range(num_batches * batch_size):
gce.submit(noop)
with mock.patch(f"{_MOCK_BASE}time.sleep"):
for _ in range(num_batches * batch_size):
gce.submit(noop)

try_assert(lambda: gcc.batch_run.call_count >= num_batches)

try_assert(lambda: gcc.batch_run.call_count >= num_batches)
for args, _kwargs in gcc.batch_run.call_args_list:
*_, batch = args
assert 0 < batch.add.call_count <= batch_size
Expand Down Expand Up @@ -1016,6 +1018,42 @@ def _mock_max(*a, **k):
assert found_tg_uuid == expected


@pytest.mark.parametrize("burst_limit", (2, 3, 4))
@pytest.mark.parametrize("burst_window", (2, 3, 4))
def test_task_submitter_api_rate_limit(
gc_executor, mock_log, burst_limit, burst_window
):
gcc, gce = gc_executor
gce.endpoint_id = uuid.uuid4()
gce._submit_tasks = mock.Mock()

gce._function_registry[gce._fn_cache_key(noop)] = str(uuid.uuid4())
gce.api_burst_limit = burst_limit
gce.api_burst_window_s = burst_window
gce.batch_size = random.randint(2, 10)

exp_rate_limit = random.randint(1, 10)
exp_api_submits = burst_limit + exp_rate_limit
uep_confs = [{"something": i} for i in range(exp_api_submits)]
with mock.patch(f"{_MOCK_BASE}time.sleep"):
for uep_conf in uep_confs:
gce.user_endpoint_config = uep_conf
gce.submit(noop)

try_assert(lambda: gce._submit_tasks.call_count == exp_api_submits)

exp_perc = [100 / gce.batch_size for _ in range(1, exp_api_submits)]
exp_perc_text = ", ".join(f"{p:.1f}%" for p in exp_perc)
cal = [(a, k) for a, k in mock_log.warning.call_args_list if "api_burst" in a[0]]
assert len(cal) == exp_rate_limit, "Expect log when rate limiting"

a, k = cal[-1]
assert "batch_size" in a[0], "Expect current value reported"
assert "API rate-limit" in a[0], "Expect basic explanation of why delayed"
assert "recent batch fill percent: %s" in a[0]
assert exp_perc_text == a[-1], "Expect to share batch utilization %"


def test_task_submit_handles_multiple_user_endpoint_configs(
mocker: MockerFixture, gc_executor
):
Expand Down

0 comments on commit c0f5725

Please sign in to comment.