Skip to content

Commit

Permalink
Update send_call to send_call_to_supervisor
Browse files Browse the repository at this point in the history
  • Loading branch information
zanieb committed Feb 25, 2023
1 parent 4f4dc6b commit ae97f1c
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 8 deletions.
4 changes: 2 additions & 2 deletions src/prefect/_internal/concurrency/from_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def call_soon_in_runtime_thread(
):
submit_fn = runtime.submit_to_loop
else:
submit_fn = current_supervisor.send_call
submit_fn = current_supervisor.send_call_to_supervisor

supervisor = AsyncSupervisor(submit_fn=submit_fn)
supervisor.submit(__fn, *args, **kwargs)
Expand Down Expand Up @@ -66,5 +66,5 @@ def call_soon_in_supervising_thread(
if current_future is None:
raise RuntimeError("No supervisor found.")

future = current_future.send_call(__fn, *args, **kwargs)
future = current_future.send_call_to_supervisor(__fn, *args, **kwargs)
return asyncio.wrap_future(future)
4 changes: 2 additions & 2 deletions src/prefect/_internal/concurrency/from_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def call_soon_in_runtime_thread(
):
submit_fn = runtime.submit_to_loop
else:
submit_fn = current_supervisor.send_call
submit_fn = current_supervisor.send_call_to_supervisor

supervisor = SyncSupervisor(submit_fn=submit_fn)
supervisor.submit(__fn, *args, **kwargs)
Expand Down Expand Up @@ -63,5 +63,5 @@ def call_soon_in_supervising_thread(
if current_supervisor is None:
raise RuntimeError("No supervisor found.")

future = current_supervisor.send_call(__fn, *args, **kwargs)
future = current_supervisor.send_call_to_supervisor(__fn, *args, **kwargs)
return future
6 changes: 4 additions & 2 deletions src/prefect/_internal/concurrency/supervisors.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,11 @@ def submit(self, __fn, *args, **kwargs) -> concurrent.futures.Future:

return future

def send_call(self, __fn, *args, **kwargs) -> concurrent.futures.Future:
def send_call_to_supervisor(
self, __fn, *args, **kwargs
) -> concurrent.futures.Future:
"""
Send a call to the supervisor from a worker.
Send a call to the supervisor thread from a worker thread.
"""
work_item = WorkItem.from_call(__fn, *args, **kwargs)
self._put_work_item(work_item)
Expand Down
4 changes: 2 additions & 2 deletions tests/_internal/concurrency/test_supervisors.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def test_sync_supervisor_timeout_in_main_thread():

def on_worker_thread():
# Send sleep to the main thread
future = supervisor.send_call(time.sleep, 2)
future = supervisor.send_call_to_supervisor(time.sleep, 2)
return future

supervisor.submit(on_worker_thread)
Expand Down Expand Up @@ -108,7 +108,7 @@ async def test_async_supervisor_timeout_in_main_thread():

def on_worker_thread():
# Send sleep to the main thread
future = supervisor.send_call(asyncio.sleep, 1)
future = supervisor.send_call_to_supervisor(asyncio.sleep, 1)
return future

supervisor.submit(on_worker_thread)
Expand Down

0 comments on commit ae97f1c

Please sign in to comment.