Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[App] Enable running with spawn context #15923

Merged
merged 19 commits into from
Dec 7, 2022
2 changes: 2 additions & 0 deletions src/lightning_app/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).

- Added the property `ready` of the LightningFlow to inform when the `Open App` should be visible ([#15921](https://github.com/Lightning-AI/lightning/pull/15921))

- Added private work attributed `_start_method` to customize how to start the works ([#15923](https://github.com/Lightning-AI/lightning/pull/15923))


### Changed

Expand Down
3 changes: 2 additions & 1 deletion src/lightning_app/core/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ class MultiProcessQueue(BaseQueue):
def __init__(self, name: str, default_timeout: float):
self.name = name
self.default_timeout = default_timeout
self.queue = multiprocessing.Queue()
context = multiprocessing.get_context("spawn")
self.queue = context.Queue()

def put(self, item):
self.queue.put(item)
Expand Down
1 change: 1 addition & 0 deletions src/lightning_app/core/work.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class LightningWork:
)

_run_executor_cls: Type[WorkRunExecutor] = WorkRunExecutor
_start_method = "fork"

def __init__(
self,
Expand Down
5 changes: 4 additions & 1 deletion src/lightning_app/runners/backends/mp_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ def start(self):
flow_to_work_delta_queue=self.app.flow_to_work_delta_queues[self.work.name],
run_executor_cls=self.work._run_executor_cls,
)
self._process = multiprocessing.Process(target=self._work_runner)

start_method = self.work._start_method
context = multiprocessing.get_context(start_method)
self._process = context.Process(target=self._work_runner)
self._process.start()

def kill(self):
Expand Down