Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move accelerator-specific parsing functions with their accelerators #14753

Merged
merged 20 commits into from
Sep 18, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions src/lightning_lite/accelerators/cpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import torch

from lightning_lite.accelerators.accelerator import Accelerator
from lightning_lite.utilities import device_parser


class CPUAccelerator(Accelerator):
Expand All @@ -37,13 +36,13 @@ def teardown(self) -> None:
@staticmethod
def parse_devices(devices: Union[int, str, List[int]]) -> int:
"""Accelerator device parsing logic."""
devices = device_parser.parse_cpu_cores(devices)
devices = parse_cpu_cores(devices)
return devices

@staticmethod
def get_parallel_devices(devices: Union[int, str, List[int]]) -> List[torch.device]:
"""Gets parallel devices for the Accelerator."""
devices = device_parser.parse_cpu_cores(devices)
devices = parse_cpu_cores(devices)
return [torch.device("cpu")] * devices

@staticmethod
Expand All @@ -63,3 +62,26 @@ def register_accelerators(cls, accelerator_registry: Dict) -> None:
cls,
description=cls.__class__.__name__,
)


def parse_cpu_cores(cpu_cores: Union[int, str, List[int]]) -> int:
"""Parses the cpu_cores given in the format as accepted by the ``devices`` argument in the
:class:`~pytorch_lightning.trainer.Trainer`.

Args:
cpu_cores: An int > 0.

Returns:
An int representing the number of processes

Raises:
MisconfigurationException:
If cpu_cores is not an int > 0
"""
if isinstance(cpu_cores, str) and cpu_cores.strip().isdigit():
cpu_cores = int(cpu_cores)

if not isinstance(cpu_cores, int) or cpu_cores <= 0:
raise TypeError("`devices` selected with `CPUAccelerator` should be an int > 0.")

return cpu_cores
43 changes: 39 additions & 4 deletions src/lightning_lite/accelerators/cuda.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import multiprocessing
from typing import Dict, List, Optional, Union

import torch

from lightning_lite.accelerators.accelerator import Accelerator
from lightning_lite.utilities import device_parser
from lightning_lite.strategies.launchers.multiprocessing import _is_forking_disabled


class CUDAAccelerator(Accelerator):
Expand All @@ -39,7 +40,9 @@ def teardown(self) -> None:
@staticmethod
def parse_devices(devices: Union[int, str, List[int]]) -> Optional[List[int]]:
"""Accelerator device parsing logic."""
return device_parser.parse_gpu_ids(devices, include_cuda=True)
from lightning_lite.utilities.device_parser import parse_gpu_ids

return parse_gpu_ids(devices, include_cuda=True)

@staticmethod
def get_parallel_devices(devices: List[int]) -> List[torch.device]:
Expand All @@ -49,11 +52,11 @@ def get_parallel_devices(devices: List[int]) -> List[torch.device]:
@staticmethod
def auto_device_count() -> int:
"""Get the devices when set to auto."""
return device_parser.num_cuda_devices()
return num_cuda_devices()

@staticmethod
def is_available() -> bool:
return device_parser.num_cuda_devices() > 0
return num_cuda_devices() > 0

@classmethod
def register_accelerators(cls, accelerator_registry: Dict) -> None:
Expand All @@ -62,3 +65,35 @@ def register_accelerators(cls, accelerator_registry: Dict) -> None:
cls,
description=cls.__class__.__name__,
)


def _get_all_available_cuda_gpus() -> List[int]:
"""
Returns:
A list of all available CUDA GPUs
"""
return list(range(num_cuda_devices()))


def num_cuda_devices() -> int:
"""Returns the number of GPUs available.

Unlike :func:`torch.cuda.device_count`, this function does its best not to create a CUDA context for fork support,
if the platform allows it.
"""
if "fork" not in torch.multiprocessing.get_all_start_methods() or _is_forking_disabled():
return torch.cuda.device_count()
with multiprocessing.get_context("fork").Pool(1) as pool:
return pool.apply(torch.cuda.device_count)


def is_cuda_available() -> bool:
"""Returns a bool indicating if CUDA is currently available.

Unlike :func:`torch.cuda.is_available`, this function does its best not to create a CUDA context for fork support,
if the platform allows it.
"""
if "fork" not in torch.multiprocessing.get_all_start_methods() or _is_forking_disabled():
return torch.cuda.is_available()
with multiprocessing.get_context("fork").Pool(1) as pool:
return pool.apply(torch.cuda.is_available)
14 changes: 11 additions & 3 deletions src/lightning_lite/accelerators/mps.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import torch

from lightning_lite.accelerators.accelerator import Accelerator
from lightning_lite.utilities import device_parser
from lightning_lite.utilities.imports import _TORCH_GREATER_EQUAL_1_12


Expand All @@ -40,15 +39,16 @@ def teardown(self) -> None:
@staticmethod
def parse_devices(devices: Union[int, str, List[int]]) -> Optional[List[int]]:
"""Accelerator device parsing logic."""
parsed_devices = device_parser.parse_gpu_ids(devices, include_mps=True)
from lightning_lite.utilities.device_parser import parse_gpu_ids
carmocca marked this conversation as resolved.
Show resolved Hide resolved

parsed_devices = parse_gpu_ids(devices, include_mps=True)
return parsed_devices

@staticmethod
def get_parallel_devices(devices: Union[int, str, List[int]]) -> List[torch.device]:
"""Gets parallel devices for the Accelerator."""
parsed_devices = MPSAccelerator.parse_devices(devices)
assert parsed_devices is not None

return [torch.device("mps", i) for i in range(len(parsed_devices))]

@staticmethod
Expand All @@ -72,3 +72,11 @@ def register_accelerators(cls, accelerator_registry: Dict) -> None:
cls,
description=cls.__class__.__name__,
)


def _get_all_available_mps_gpus() -> List[int]:
"""
Returns:
A list of all available MPS GPUs
"""
return [0] if MPSAccelerator.is_available() else []
57 changes: 54 additions & 3 deletions src/lightning_lite/accelerators/tpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict, List, Optional, Union
from typing import Any, Dict, List, Optional, Union

import torch

from lightning_lite.accelerators.accelerator import Accelerator
from lightning_lite.utilities import device_parser
from lightning_lite.utilities.device_parser import _check_data_type
from lightning_lite.utilities.imports import _TPU_AVAILABLE


Expand All @@ -32,7 +32,7 @@ def teardown(self) -> None:
@staticmethod
def parse_devices(devices: Union[int, str, List[int]]) -> Optional[Union[int, List[int]]]:
"""Accelerator device parsing logic."""
return device_parser.parse_tpu_cores(devices)
return parse_tpu_cores(devices)

@staticmethod
def get_parallel_devices(devices: Union[int, List[int]]) -> List[int]:
Expand All @@ -57,3 +57,54 @@ def register_accelerators(cls, accelerator_registry: Dict) -> None:
cls,
description=cls.__class__.__name__,
)


def parse_tpu_cores(tpu_cores: Optional[Union[int, str, List[int]]]) -> Optional[Union[int, List[int]]]:
"""
Parses the tpu_cores given in the format as accepted by the
:class:`~pytorch_lightning.trainer.Trainer`.

Args:
tpu_cores: An int of 1 or string '1' indicates that 1 core with multi-processing should be used
An int 8 or string '8' indicates that all 8 cores with multi-processing should be used
A list of ints or a strings containing a list of comma separated integers
indicates the specific TPU core to use.

Returns:
A list of tpu_cores to be used or ``None`` if no TPU cores were requested

Raises:
MisconfigurationException:
If TPU cores aren't 1, 8 or [<1-8>]
"""
_check_data_type(tpu_cores)

if isinstance(tpu_cores, str):
tpu_cores = _parse_tpu_cores_str(tpu_cores.strip())

if not _tpu_cores_valid(tpu_cores):
raise TypeError("`tpu_cores` can only be 1, 8 or [<1-8>]")

return tpu_cores


def _tpu_cores_valid(tpu_cores: Any) -> bool:
# allow 1 or 8 cores
if tpu_cores in (1, 8, None):
return True

# allow picking 1 of 8 indexes
if isinstance(tpu_cores, (list, tuple, set)):
has_1_tpu_idx = len(tpu_cores) == 1
is_valid_tpu_idx = 1 <= list(tpu_cores)[0] <= 8

is_valid_tpu_core_choice = has_1_tpu_idx and is_valid_tpu_idx
return is_valid_tpu_core_choice

return False


def _parse_tpu_cores_str(tpu_cores: str) -> Union[int, List[int]]:
if tpu_cores in ("1", "8"):
return int(tpu_cores)
return [int(x.strip()) for x in tpu_cores.split(",") if len(x) > 0]
5 changes: 3 additions & 2 deletions src/lightning_lite/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@
XLAStrategy,
)
from lightning_lite.strategies.ddp_spawn import _DDP_FORK_ALIASES
from lightning_lite.utilities import _StrategyType, device_parser, rank_zero_deprecation, rank_zero_info, rank_zero_warn
from lightning_lite.utilities import _StrategyType, rank_zero_deprecation, rank_zero_info, rank_zero_warn
from lightning_lite.utilities.device_parser import determine_root_gpu_device
from lightning_lite.utilities.imports import _HPU_AVAILABLE, _IPU_AVAILABLE, _IS_INTERACTIVE, _TPU_AVAILABLE

_PLUGIN = Union[Strategy, Precision, ClusterEnvironment, CheckpointIO]
Expand Down Expand Up @@ -438,7 +439,7 @@ def _choose_strategy(self) -> Union[Strategy, str]:
if isinstance(self._accelerator_flag, (CUDAAccelerator, MPSAccelerator)) or (
isinstance(self._accelerator_flag, str) and self._accelerator_flag in ("cuda", "gpu", "mps")
):
device = device_parser.determine_root_gpu_device(self._parallel_devices)
device = determine_root_gpu_device(self._parallel_devices)
else:
device = "cpu"
# TODO: lazy initialized device, then here could be self._strategy_flag = "single_device"
Expand Down
Loading