Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout to DeepSpeedStrategy #20474

Merged
merged 10 commits into from
Dec 10, 2024
3 changes: 1 addition & 2 deletions examples/fabric/tensor_parallel/train.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import lightning as L
import torch
import torch.nn.functional as F
from data import RandomTokenDataset
from lightning.fabric.strategies import ModelParallelStrategy
from model import ModelArgs, Transformer
from parallelism import parallelize
from torch.distributed.tensor.parallel import loss_parallel
from torch.utils.data import DataLoader

from data import RandomTokenDataset


def train():
strategy = ModelParallelStrategy(
Expand Down
3 changes: 1 addition & 2 deletions examples/pytorch/tensor_parallel/train.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import lightning as L
import torch
import torch.nn.functional as F
from data import RandomTokenDataset
from lightning.pytorch.strategies import ModelParallelStrategy
from model import ModelArgs, Transformer
from parallelism import parallelize
from torch.distributed.tensor.parallel import loss_parallel
from torch.utils.data import DataLoader

from data import RandomTokenDataset


class Llama3(L.LightningModule):
def __init__(self):
Expand Down
8 changes: 7 additions & 1 deletion src/lightning/fabric/strategies/deepspeed.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import platform
from collections.abc import Mapping
from contextlib import AbstractContextManager, ExitStack
from datetime import timedelta
from itertools import chain
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Optional, Union
Expand All @@ -29,6 +30,7 @@
from typing_extensions import override

from lightning.fabric.accelerators import Accelerator, CUDAAccelerator
from lightning.fabric.plugins.collectives.torch_collective import default_pg_timeout
from lightning.fabric.plugins.environments.cluster_environment import ClusterEnvironment
from lightning.fabric.plugins.precision import Precision
from lightning.fabric.strategies.ddp import DDPStrategy
Expand Down Expand Up @@ -97,6 +99,7 @@ def __init__(
load_full_weights: bool = False,
precision: Optional[Precision] = None,
process_group_backend: Optional[str] = None,
timeout: Optional[timedelta] = default_pg_timeout,
) -> None:
"""Provides capabilities to run training using the DeepSpeed library, with training optimizations for large
billion parameter models. `For more information: https://pytorch-
Expand Down Expand Up @@ -241,6 +244,7 @@ def __init__(
process_group_backend=process_group_backend,
)
self._backward_sync_control = None # DeepSpeed handles gradient accumulation internally
self._timeout: Optional[timedelta] = timeout

self.config = self._load_config(config)
if self.config is None:
Expand Down Expand Up @@ -648,7 +652,9 @@ def _init_deepspeed_distributed(self) -> None:
f"MEMBER: {self.global_rank + 1}/{self.world_size}"
)
self._process_group_backend = self._get_process_group_backend()
deepspeed.init_distributed(self._process_group_backend, distributed_port=self.cluster_environment.main_port)
deepspeed.init_distributed(
self._process_group_backend, distributed_port=self.cluster_environment.main_port, timeout=self._timeout
)

def _set_node_environment_variables(self) -> None:
assert self.cluster_environment is not None
Expand Down
8 changes: 7 additions & 1 deletion src/lightning/pytorch/strategies/deepspeed.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from collections import OrderedDict
from collections.abc import Generator, Mapping
from contextlib import contextmanager
from datetime import timedelta
from pathlib import Path
from typing import TYPE_CHECKING, Any, Optional, Union

Expand All @@ -30,6 +31,7 @@

import lightning.pytorch as pl
from lightning.fabric.plugins import ClusterEnvironment
from lightning.fabric.plugins.collectives.torch_collective import default_pg_timeout
from lightning.fabric.strategies import _StrategyRegistry
from lightning.fabric.strategies.deepspeed import (
_DEEPSPEED_AVAILABLE,
Expand Down Expand Up @@ -119,6 +121,7 @@ def __init__(
load_full_weights: bool = False,
precision_plugin: Optional[Precision] = None,
process_group_backend: Optional[str] = None,
timeout: Optional[timedelta] = default_pg_timeout,
) -> None:
"""Provides capabilities to run training using the DeepSpeed library, with training optimizations for large
billion parameter models. `For more information: https://pytorch-
Expand Down Expand Up @@ -264,6 +267,7 @@ def __init__(
precision_plugin=precision_plugin,
process_group_backend=process_group_backend,
)
self._timeout: Optional[timedelta] = timeout

self.config = self._load_config(config)
if self.config is None:
Expand Down Expand Up @@ -364,7 +368,9 @@ def _init_deepspeed_distributed(self) -> None:
f"MEMBER: {self.global_rank + 1}/{self.world_size}"
)
self._process_group_backend = self._get_process_group_backend()
deepspeed.init_distributed(self._process_group_backend, distributed_port=self.cluster_environment.main_port)
deepspeed.init_distributed(
self._process_group_backend, distributed_port=self.cluster_environment.main_port, timeout=self._timeout
)

def _set_node_environment_variables(self) -> None:
assert self.cluster_environment is not None
Expand Down
Loading