Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improving Hydra+DDP support #11617

Merged
merged 40 commits into from
Sep 22, 2022
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
64e66a6
adds hydra test
jgbos Jan 25, 2022
ff7d739
update tests
jgbos Jan 25, 2022
77a0876
create method for hydra
jgbos Jan 27, 2022
9d02c4c
minor test updates
jgbos Jan 27, 2022
6361369
adds teardown code to support hydra multirun
jgbos Jan 28, 2022
dea8ea5
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 28, 2022
20499b0
fixes hydra available import checks
jgbos Jan 28, 2022
bb93d27
moved hydra specific code to utilities
jgbos Jan 31, 2022
fee8d61
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 31, 2022
2c1e839
fix mypy and flake8 errors
jgbos Jan 31, 2022
4ec0a45
fixes error with old version of hydra
jgbos Jan 31, 2022
0eb0627
Update pytorch_lightning/utilities/hydra.py
jgbos Feb 9, 2022
6530c6a
addresses comments and simplifies tests
jgbos Feb 9, 2022
7f340ed
rebase with launcher
rohitgr7 Feb 22, 2022
12c1122
add hydra launcher
rohitgr7 Feb 22, 2022
bdce069
move launcher
rohitgr7 Feb 28, 2022
bf0f0e8
Merge branch 'master' into fix-hydra-multirun
Borda Jun 21, 2022
011db09
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jun 21, 2022
d763922
prepare for merge
jgbos Aug 12, 2022
c6d4266
Merge remote-tracking branch 'origin' into fix-hydra-multirun
jgbos Aug 12, 2022
5bcfad1
working post merge
jgbos Aug 12, 2022
387a7cf
fixes test import
jgbos Aug 15, 2022
e8d625b
add support for hydra `rerun`
jgbos Aug 16, 2022
0f12e9a
refactor subprocess cmd and add test
jgbos Aug 27, 2022
f67856f
Update tests/tests_pytorch/strategies/test_ddp_hydra_support.py
jgbos Aug 27, 2022
e2bdb76
Update tests/tests_pytorch/strategies/test_ddp_hydra_support.py
jgbos Aug 27, 2022
ec19a90
resolve comments
jgbos Aug 27, 2022
6358621
uses hydra test_utils and checks standalone
jgbos Sep 6, 2022
5bb0b61
Merge branch 'master' into fix-hydra-multirun
carmocca Sep 12, 2022
3f2ab47
fix tests
rohitgr7 Sep 19, 2022
a2a8481
Merge branch 'master' into fix-hydra-multirun
rohitgr7 Sep 19, 2022
3120612
Revert "fix tests"
carmocca Sep 22, 2022
17e82cd
Move file
carmocca Sep 22, 2022
e6029d3
Merge branch 'master' into fix-hydra-multirun
carmocca Sep 22, 2022
ed78629
Minor test signature changes
carmocca Sep 22, 2022
5165f36
Tests require hydra>=1.0.7
carmocca Sep 22, 2022
e3ce4a7
oopsie
carmocca Sep 22, 2022
a827b2b
Update tests/tests_pytorch/strategies/launchers/test_subprocess_scrip…
carmocca Sep 22, 2022
4b6e46d
fixes failing test
jgbos Sep 22, 2022
ba32666
Merge branch 'fix-hydra-multirun' of github.com:jgbos/pytorch-lightni…
jgbos Sep 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 53 additions & 37 deletions src/pytorch_lightning/strategies/launchers/subprocess_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import subprocess
import sys
from time import sleep
from typing import Any, Callable, Optional
from typing import Any, Callable, Optional, Sequence

import __main__
import numpy as np
Expand All @@ -25,7 +25,7 @@
from pytorch_lightning.strategies.launchers.base import _Launcher
from pytorch_lightning.utilities.imports import _RequirementAvailable

_HYDRA_AVAILABLE = _RequirementAvailable("hydra")
_HYDRA_AVAILABLE = _RequirementAvailable("hydra-core")


class _SubprocessScriptLauncher(_Launcher):
Expand Down Expand Up @@ -101,32 +101,6 @@ def _call_children_scripts(self) -> None:
# allow the user to pass the node rank
os.environ["NODE_RANK"] = str(self.cluster_environment.node_rank())
os.environ["LOCAL_RANK"] = str(self.cluster_environment.local_rank())

# Check if the current calling command looked like `python a/b/c.py` or `python -m a.b.c`
# See https://docs.python.org/3/reference/import.html#main-spec
if __main__.__spec__ is None: # pragma: no-cover
# Script called as `python a/b/c.py`
if _HYDRA_AVAILABLE:
# when user is using hydra find the absolute path
from hydra.utils import to_absolute_path

to_abs_path = to_absolute_path
else:
to_abs_path = os.path.abspath

# pull out the commands used to run the script and resolve the absolute file path
command = sys.argv
try:
full_path = to_abs_path(command[0])
except Exception:
full_path = os.path.abspath(command[0])

command[0] = full_path
# use the same python interpreter and actually running
command = [sys.executable] + command
else: # Script called as `python -m a.b.c`
command = [sys.executable, "-m", __main__.__spec__.name] + sys.argv[1:]

os.environ["WORLD_SIZE"] = f"{self.num_processes * self.num_nodes}"

for local_rank in range(1, self.num_processes):
Expand All @@ -137,18 +111,18 @@ def _call_children_scripts(self) -> None:
if os.environ.get("PL_GLOBAL_SEED") is None and "PL_GLOBAL_SEED" in env_copy:
del env_copy["PL_GLOBAL_SEED"]

# start process
# if hydra is available and initialized, make sure to set the cwd correctly
cwd: Optional[str] = None
hydra_in_use = False
if _HYDRA_AVAILABLE:
from hydra.core.hydra_config import HydraConfig
from hydra.utils import get_original_cwd

if HydraConfig.initialized():
cwd = get_original_cwd()
os_cwd = f'"{os.getcwd()}"'
command += [f"hydra.run.dir={os_cwd}", f"hydra.job.name=train_ddp_process_{local_rank}"]
subprocess.Popen(command, env=env_copy, cwd=cwd)
hydra_in_use = HydraConfig.initialized()

if hydra_in_use:
command = _hydra_subprocess_cmd(local_rank)
else:
command = _basic_subprocess_cmd(local_rank)

subprocess.Popen(command, env=env_copy)

# starting all processes at once can cause issues
# with dataloaders delay between 1-10 seconds
Expand All @@ -162,3 +136,45 @@ def _check_can_spawn_children(self) -> None:
" Possible reasons: 1) LOCAL_RANK environment variable was incorrectly modified by the user,"
" 2) `ClusterEnvironment.creates_processes_externally` incorrectly implemented."
)


def _basic_subprocess_cmd(local_rank: int) -> Sequence[str]:
if __main__.__spec__ is None: # pragma: no-cover
return [sys.executable, os.path.abspath(sys.argv[0])] + sys.argv[1:]
else:
return [sys.executable, "-m", __main__.__spec__.name] + sys.argv[1:]


def _hydra_subprocess_cmd(local_rank: int) -> Sequence[str]:
from hydra.core.hydra_config import HydraConfig
from hydra.utils import to_absolute_path

# when user is using hydra find the absolute path
if __main__.__spec__ is None: # pragma: no-cover
command = [sys.executable, to_absolute_path(sys.argv[0])]
else:
command = [sys.executable, "-m", __main__.__spec__.name]

# extract the hydra configu
hydra_cfg = HydraConfig.get()

# the location of the hydra configuration files saved for the current job
hydra_output = hydra_cfg.runtime.output_dir
if hydra_cfg.output_subdir is not None:
hydra_output = os.path.join(hydra_output, hydra_cfg.output_subdir)

# check if experimental re-run capability exists
# otherwise use existing config.yaml which may have issues
pickled_config = os.path.join(hydra_output, "config.pickle")
if os.path.exists(pickled_config):
command += ["--experimental-rerun", pickled_config]

else:
command += ["-cp", hydra_output, "-cn", "config.yaml"]
command += [
f"hydra.output_subdir=.pl_ddp_hydra_{local_rank}",
f"hydra.run.dir={hydra_cfg.runtime.output_dir}",
]

print("*********************", command)
jgbos marked this conversation as resolved.
Show resolved Hide resolved
return command
192 changes: 192 additions & 0 deletions tests/tests_pytorch/strategies/test_ddp_hydra_support.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
import logging
carmocca marked this conversation as resolved.
Show resolved Hide resolved
import os
import subprocess
import sys
from pathlib import Path

import pytest

from pytorch_lightning.strategies.launchers.subprocess_script import _HYDRA_AVAILABLE
from tests_pytorch.helpers.runif import RunIf

if _HYDRA_AVAILABLE:
from omegaconf import OmegaConf


# fixture to run hydra jobs in a clean temporary directory
# Hydra creates its own output directories and logs
@pytest.fixture
def cleandir(tmp_path):
"""Run function in a temporary directory."""
old_dir = os.getcwd() # get current working directory (cwd)
os.chdir(tmp_path) # change cwd to the temp-directory
yield tmp_path # yields control to the test to be run
os.chdir(old_dir)
logging.shutdown()
carmocca marked this conversation as resolved.
Show resolved Hide resolved


# function to run a command line argument
def run_process(cmd):
try:
process = subprocess.Popen(
args=cmd,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = process.communicate()
if process.returncode != 0:
sys.stderr.write(f"Subprocess error:\n{stderr}\n")
sys.stderr.write(f"Subprocess stdout:\n{stdout}\n")
raise subprocess.CalledProcessError(returncode=process.returncode, cmd=cmd)
return stdout, stderr
except Exception as e:
cmd = " ".join(cmd)
sys.stderr.write(f"Error executing:\n{cmd}\n")
raise e


# Script to run from command line
script = """
import hydra
import os
import torch

from pytorch_lightning import Trainer

from tests.tests_pytorch.helpers import BoringModel

class BoringModelGPU(BoringModel):
def on_train_start(self) -> None:
# make sure that the model is on GPU when training
assert self.device == torch.device(f"cuda:{self.trainer.strategy.local_rank}")
self.start_cuda_memory = torch.cuda.memory_allocated()

@hydra.main(config_path=None, version_base="1.1")
def task_fn(cfg):
trainer = Trainer(accelerator="auto", devices=cfg.devices, strategy=cfg.strategy, fast_dev_run=True)
model = BoringModelGPU()
trainer.fit(model)
trainer.test(model)

if torch.distributed.is_initialized():
torch.distributed.destroy_process_group()

os.environ.pop("LOCAL_RANK", None)
carmocca marked this conversation as resolved.
Show resolved Hide resolved

if __name__ == "__main__":
task_fn()
"""


@RunIf(min_cuda_gpus=2)
@pytest.mark.skipif(not _HYDRA_AVAILABLE, reason="Hydra not Available")
@pytest.mark.usefixtures("cleandir")
@pytest.mark.parametrize("subdir", [None, "dksa", ".hello"])
def test_ddp_with_hydra_runjob(subdir):
# Save script locally
with open("temp.py", "w") as fn:
fn.write(script)

# Run CLI
devices = 2
cmd = [sys.executable, "temp.py", f"+devices={devices}", '+strategy="ddp"']
if subdir is not None:
cmd += [f"hydra.output_subdir={subdir}"]
run_process(cmd)

# Make sure config.yaml was created for additional
# processes.
logs = list(Path.cwd().glob("**/config.yaml"))
assert len(logs) == devices

# Make sure the parameter was set and used
cfg = OmegaConf.load(logs[0])
assert cfg.devices == devices

# Make sure PL spawned a job that is logged by Hydra
logs = list(Path.cwd().glob("**/*.log"))
assert len(logs) == 1


@RunIf(min_cuda_gpus=2)
@pytest.mark.skipif(not _HYDRA_AVAILABLE, reason="Hydra not Available")
@pytest.mark.usefixtures("cleandir")
@pytest.mark.parametrize("num_jobs", [1, 2])
def test_ddp_with_hydra_multirunjob(num_jobs):
# Save script locally
with open("temp.py", "w") as fn:
fn.write(script)

# create fake multirun params based on `num_jobs`
fake_param = "+foo="
jgbos marked this conversation as resolved.
Show resolved Hide resolved
devices = 2
for i in range(num_jobs):
fake_param += f"{i}"
if i < num_jobs - 1:
fake_param += ","

# Run CLI
run_process([sys.executable, "temp.py", f"+devices={devices}", '+strategy="ddp"', fake_param, "--multirun"])

# Make sure config.yaml was created for each job
configs = sorted(Path.cwd().glob("**/.pl_ddp_hydra_*/config.yaml"))
assert len(configs) == num_jobs * (devices - 1)

# Make sure the parameter was set and used for each job
for i, config in enumerate(configs):
cfg = OmegaConf.load(config)
local_rank = int(config.parent.parent.parts[-1])
assert cfg.devices == devices
assert cfg.foo == local_rank

logs = list(Path.cwd().glob("**/*.log"))
assert len(logs) == num_jobs


yaml_file = """
hydra:
callbacks:
save_job_info:
_target_: hydra.experimental.callbacks.PickleJobInfoCallback
"""


@RunIf(min_cuda_gpus=2)
@pytest.mark.skipif(not _HYDRA_AVAILABLE, reason="Hydra not Available")
@pytest.mark.usefixtures("cleandir")
@pytest.mark.parametrize("num_jobs", [1, 2])
def test_ddp_with_hydra_multirunjob_rerun(num_jobs):
# Save script locally
with open("temp.py", "w") as fn:
fn.write(script)

with open("config.yaml", "w") as fn:
fn.write(yaml_file)

# create fake multirun params based on `num_jobs`
fake_param = "+foo="
devices = 2
for i in range(num_jobs):
fake_param += f"{i}"
if i < num_jobs - 1:
fake_param += ","
jgbos marked this conversation as resolved.
Show resolved Hide resolved

# Run CLI
run_process(
[
sys.executable,
"temp.py",
"-cp",
".",
"-cn",
"config.yaml",
f"+devices={devices}",
jgbos marked this conversation as resolved.
Show resolved Hide resolved
'+strategy="ddp"',
fake_param,
"--multirun",
]
)

pickles = sorted(Path.cwd().glob("**/.hydra/config.pickle"))
assert len(pickles) == num_jobs