diff --git a/python/ray/dag/tests/experimental/test_torch_tensor_dag.py b/python/ray/dag/tests/experimental/test_torch_tensor_dag.py index 6f6f98db628a4..70e9a14296b9e 100644 --- a/python/ray/dag/tests/experimental/test_torch_tensor_dag.py +++ b/python/ray/dag/tests/experimental/test_torch_tensor_dag.py @@ -103,74 +103,6 @@ def forward(self, inp): return torch.randn(10, 10) -class TestNcclGroup(GPUCommunicator): - """ - A custom NCCL group for testing. This is a simple wrapper around `_NcclGroup`. - """ - - def __init__(self, world_size, comm_id, actor_handles): - self._world_size = world_size - self._comm_id = comm_id - self._actor_handles = actor_handles - self._inner = None - - def initialize(self, rank: int) -> None: - self._inner = _NcclGroup( - self._world_size, - self._comm_id, - rank, - self._actor_handles, - torch.cuda.current_stream().cuda_stream, - ) - - def get_rank(self, actor: ray.actor.ActorHandle) -> int: - # Implement this without forwarding to `_inner` to allow the method - # to be called before initialization. - actor_ids = [a._ray_actor_id for a in self._actor_handles] - try: - rank = actor_ids.index(actor._ray_actor_id) - except ValueError: - raise ValueError("Actor is not in the NCCL group.") - return rank - - def get_world_size(self) -> int: - # Implement this without forwarding to `_inner` to allow the method - # to be called before initialization. - return self._world_size - - def get_self_rank(self) -> Optional[int]: - if self._inner is None: - return None - return self._inner.get_self_rank() - - def get_actor_handles(self) -> List["ray.actor.ActorHandle"]: - return self._actor_handles - - def send(self, value: "torch.Tensor", peer_rank: int) -> None: - return self._inner.send(value, peer_rank) - - def recv( - self, - shape: Tuple[int], - dtype: "torch.dtype", - peer_rank: int, - allocator: Optional[TorchTensorAllocator] = None, - ) -> "torch.Tensor": - return self._inner.recv(shape, dtype, peer_rank, allocator=allocator) - - def allreduce( - self, - send_buf: "torch.Tensor", - recv_buf: "torch.Tensor", - op: ReduceOp = ReduceOp.SUM, - ) -> None: - self._inner.allreduce(send_buf, recv_buf, op) - recv_buf += 1 - - def destroy(self) -> None: - return self._inner.destroy() - - @pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True) def test_torch_tensor_p2p(ray_start_regular): if USE_GPU: @@ -392,6 +324,77 @@ def test_torch_tensor_custom_comm(ray_start_regular): from cupy.cuda import nccl + class TestNcclGroup(GPUCommunicator): + """ + A custom NCCL group for testing. This is a simple wrapper around `_NcclGroup`. + """ + + def __init__(self, world_size, comm_id, actor_handles): + self._world_size = world_size + self._comm_id = comm_id + self._actor_handles = actor_handles + self._inner = None + + def initialize(self, rank: int) -> None: + print(f"initializing rank {rank}") + try: + self._inner = _NcclGroup( + self._world_size, + self._comm_id, + rank, + self._actor_handles, + torch.cuda.current_stream().cuda_stream, + ) + except Exception as e: + print(f"Got {e}") + + def get_rank(self, actor: ray.actor.ActorHandle) -> int: + # Implement this without forwarding to `_inner` to allow the method + # to be called before initialization. + actor_ids = [a._ray_actor_id for a in self._actor_handles] + try: + rank = actor_ids.index(actor._ray_actor_id) + except ValueError: + raise ValueError("Actor is not in the NCCL group.") + return rank + + def get_world_size(self) -> int: + # Implement this without forwarding to `_inner` to allow the method + # to be called before initialization. + return self._world_size + + def get_self_rank(self) -> Optional[int]: + if self._inner is None: + return None + return self._inner.get_self_rank() + + def get_actor_handles(self) -> List["ray.actor.ActorHandle"]: + return self._actor_handles + + def send(self, value: "torch.Tensor", peer_rank: int) -> None: + return self._inner.send(value, peer_rank) + + def recv( + self, + shape: Tuple[int], + dtype: "torch.dtype", + peer_rank: int, + allocator: Optional[TorchTensorAllocator] = None, + ) -> "torch.Tensor": + return self._inner.recv(shape, dtype, peer_rank, allocator=allocator) + + def allreduce( + self, + send_buf: "torch.Tensor", + recv_buf: "torch.Tensor", + op: ReduceOp = ReduceOp.SUM, + ) -> None: + self._inner.allreduce(send_buf, recv_buf, op) + recv_buf += 1 + + def destroy(self) -> None: + return self._inner.destroy() + comm_id = nccl.get_unique_id() nccl_group = TestNcclGroup(2, comm_id, [sender, receiver]) with InputNode() as inp: @@ -1059,6 +1062,77 @@ def test_torch_tensor_nccl_all_reduce_custom_comm(ray_start_regular): num_workers = 2 workers = [actor_cls.remote() for _ in range(num_workers)] + class TestNcclGroup(GPUCommunicator): + """ + A custom NCCL group for testing. This is a simple wrapper around `_NcclGroup`. + """ + + def __init__(self, world_size, comm_id, actor_handles): + self._world_size = world_size + self._comm_id = comm_id + self._actor_handles = actor_handles + self._inner = None + + def initialize(self, rank: int) -> None: + print(f"initializing rank {rank}") + try: + self._inner = _NcclGroup( + self._world_size, + self._comm_id, + rank, + self._actor_handles, + torch.cuda.current_stream().cuda_stream, + ) + except Exception as e: + print(f"Got {e}") + + def get_rank(self, actor: ray.actor.ActorHandle) -> int: + # Implement this without forwarding to `_inner` to allow the method + # to be called before initialization. + actor_ids = [a._ray_actor_id for a in self._actor_handles] + try: + rank = actor_ids.index(actor._ray_actor_id) + except ValueError: + raise ValueError("Actor is not in the NCCL group.") + return rank + + def get_world_size(self) -> int: + # Implement this without forwarding to `_inner` to allow the method + # to be called before initialization. + return self._world_size + + def get_self_rank(self) -> Optional[int]: + if self._inner is None: + return None + return self._inner.get_self_rank() + + def get_actor_handles(self) -> List["ray.actor.ActorHandle"]: + return self._actor_handles + + def send(self, value: "torch.Tensor", peer_rank: int) -> None: + return self._inner.send(value, peer_rank) + + def recv( + self, + shape: Tuple[int], + dtype: "torch.dtype", + peer_rank: int, + allocator: Optional[TorchTensorAllocator] = None, + ) -> "torch.Tensor": + return self._inner.recv(shape, dtype, peer_rank, allocator=allocator) + + def allreduce( + self, + send_buf: "torch.Tensor", + recv_buf: "torch.Tensor", + op: ReduceOp = ReduceOp.SUM, + ) -> None: + self._inner.allreduce(send_buf, recv_buf, op) + recv_buf += 1 + + def destroy(self) -> None: + return self._inner.destroy() + from cupy.cuda import nccl comm_id = nccl.get_unique_id()