diff --git a/src/prefect/runner/runner.py b/src/prefect/runner/runner.py index 26a0ee01aafc..fa6c6e459996 100644 --- a/src/prefect/runner/runner.py +++ b/src/prefect/runner/runner.py @@ -798,7 +798,20 @@ def _acquire_limit_slot(self, flow_run_id: str) -> bool: try: if self._limiter: self._limiter.acquire_on_behalf_of_nowait(flow_run_id) + self._logger.debug("Limit slot acquired for flow run '%s'", flow_run_id) return True + except RuntimeError as exc: + if ( + "this borrower is already holding one of this CapacityLimiter's tokens" + in str(exc) + ): + self._logger.warning( + f"Duplicate submission of flow run '{flow_run_id}' detected. Runner" + " will not re-submit flow run." + ) + return False + else: + raise except anyio.WouldBlock: self._logger.info( f"Flow run limit reached; {self._limiter.borrowed_tokens} flow runs" @@ -813,6 +826,7 @@ def _release_limit_slot(self, flow_run_id: str) -> None: """ if self._limiter: self._limiter.release_on_behalf_of(flow_run_id) + self._logger.debug("Limit slot released for flow run '%s'", flow_run_id) async def _submit_scheduled_flow_runs( self, flow_run_response: List["FlowRun"] diff --git a/tests/runner/test_runner.py b/tests/runner/test_runner.py index 0df76ff64abe..b77c08444d44 100644 --- a/tests/runner/test_runner.py +++ b/tests/runner/test_runner.py @@ -690,6 +690,37 @@ async def test_runner_caches_adhoc_pulls(self, prefect_client): # Should be 3 because the ad hoc pull should have been cached assert runner._storage_objs[0]._pull_code_spy.call_count == 3 + @pytest.mark.usefixtures("use_hosted_api_server") + async def test_runner_does_not_raise_on_duplicate_submission(self, prefect_client): + """ + Regression test for https://github.com/PrefectHQ/prefect/issues/11093 + + The runner has a race condition where it can try to borrow a limit slot + that it already has. This test ensures that the runner does not raise + an exception in this case. + """ + async with Runner(pause_on_shutdown=False) as runner: + deployment = RunnerDeployment.from_flow( + flow=tired_flow, + name=__file__, + ) + + deployment_id = await runner.add_deployment(deployment) + + flow_run = await prefect_client.create_flow_run_from_deployment( + deployment_id=deployment_id + ) + # acquire the limit slot and then try to borrow it again + # during submission to simulate race condition + runner._acquire_limit_slot(flow_run.id) + await runner._get_and_submit_flow_runs() + + # shut down cleanly + runner.started = False + runner.stopping = True + runner._cancelling_flow_run_ids.add(flow_run.id) + await runner._cancel_run(flow_run) + class TestRunnerDeployment: @pytest.fixture