Skip to content

Commit

Permalink
Fix serve script crashes due to process limiter (#11264)
Browse files Browse the repository at this point in the history
  • Loading branch information
desertaxle authored Nov 29, 2023
1 parent cf84f6c commit 6e43187
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 0 deletions.
14 changes: 14 additions & 0 deletions src/prefect/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,20 @@ def _acquire_limit_slot(self, flow_run_id: str) -> bool:
try:
if self._limiter:
self._limiter.acquire_on_behalf_of_nowait(flow_run_id)
self._logger.debug("Limit slot acquired for flow run '%s'", flow_run_id)
return True
except RuntimeError as exc:
if (
"this borrower is already holding one of this CapacityLimiter's tokens"
in str(exc)
):
self._logger.warning(
f"Duplicate submission of flow run '{flow_run_id}' detected. Runner"
" will not re-submit flow run."
)
return False
else:
raise
except anyio.WouldBlock:
self._logger.info(
f"Flow run limit reached; {self._limiter.borrowed_tokens} flow runs"
Expand All @@ -813,6 +826,7 @@ def _release_limit_slot(self, flow_run_id: str) -> None:
"""
if self._limiter:
self._limiter.release_on_behalf_of(flow_run_id)
self._logger.debug("Limit slot released for flow run '%s'", flow_run_id)

async def _submit_scheduled_flow_runs(
self, flow_run_response: List["FlowRun"]
Expand Down
31 changes: 31 additions & 0 deletions tests/runner/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,37 @@ async def test_runner_caches_adhoc_pulls(self, prefect_client):
# Should be 3 because the ad hoc pull should have been cached
assert runner._storage_objs[0]._pull_code_spy.call_count == 3

@pytest.mark.usefixtures("use_hosted_api_server")
async def test_runner_does_not_raise_on_duplicate_submission(self, prefect_client):
"""
Regression test for https://github.com/PrefectHQ/prefect/issues/11093
The runner has a race condition where it can try to borrow a limit slot
that it already has. This test ensures that the runner does not raise
an exception in this case.
"""
async with Runner(pause_on_shutdown=False) as runner:
deployment = RunnerDeployment.from_flow(
flow=tired_flow,
name=__file__,
)

deployment_id = await runner.add_deployment(deployment)

flow_run = await prefect_client.create_flow_run_from_deployment(
deployment_id=deployment_id
)
# acquire the limit slot and then try to borrow it again
# during submission to simulate race condition
runner._acquire_limit_slot(flow_run.id)
await runner._get_and_submit_flow_runs()

# shut down cleanly
runner.started = False
runner.stopping = True
runner._cancelling_flow_run_ids.add(flow_run.id)
await runner._cancel_run(flow_run)


class TestRunnerDeployment:
@pytest.fixture
Expand Down

0 comments on commit 6e43187

Please sign in to comment.