Skip to content

Commit

Permalink
up
Browse files Browse the repository at this point in the history
Signed-off-by: Rui Qiao <[email protected]>
  • Loading branch information
Ubuntu authored and ruisearch42 committed Oct 22, 2024
1 parent 409df5e commit dab4b89
Showing 1 changed file with 142 additions and 68 deletions.
210 changes: 142 additions & 68 deletions python/ray/dag/tests/experimental/test_torch_tensor_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,74 +103,6 @@ def forward(self, inp):
return torch.randn(10, 10)


class TestNcclGroup(GPUCommunicator):
"""
A custom NCCL group for testing. This is a simple wrapper around `_NcclGroup`.
"""

def __init__(self, world_size, comm_id, actor_handles):
self._world_size = world_size
self._comm_id = comm_id
self._actor_handles = actor_handles
self._inner = None

def initialize(self, rank: int) -> None:
self._inner = _NcclGroup(
self._world_size,
self._comm_id,
rank,
self._actor_handles,
torch.cuda.current_stream().cuda_stream,
)

def get_rank(self, actor: ray.actor.ActorHandle) -> int:
# Implement this without forwarding to `_inner` to allow the method
# to be called before initialization.
actor_ids = [a._ray_actor_id for a in self._actor_handles]
try:
rank = actor_ids.index(actor._ray_actor_id)
except ValueError:
raise ValueError("Actor is not in the NCCL group.")
return rank

def get_world_size(self) -> int:
# Implement this without forwarding to `_inner` to allow the method
# to be called before initialization.
return self._world_size

def get_self_rank(self) -> Optional[int]:
if self._inner is None:
return None
return self._inner.get_self_rank()

def get_actor_handles(self) -> List["ray.actor.ActorHandle"]:
return self._actor_handles

def send(self, value: "torch.Tensor", peer_rank: int) -> None:
return self._inner.send(value, peer_rank)

def recv(
self,
shape: Tuple[int],
dtype: "torch.dtype",
peer_rank: int,
allocator: Optional[TorchTensorAllocator] = None,
) -> "torch.Tensor":
return self._inner.recv(shape, dtype, peer_rank, allocator=allocator)

def allreduce(
self,
send_buf: "torch.Tensor",
recv_buf: "torch.Tensor",
op: ReduceOp = ReduceOp.SUM,
) -> None:
self._inner.allreduce(send_buf, recv_buf, op)
recv_buf += 1

def destroy(self) -> None:
return self._inner.destroy()


@pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True)
def test_torch_tensor_p2p(ray_start_regular):
if USE_GPU:
Expand Down Expand Up @@ -392,6 +324,77 @@ def test_torch_tensor_custom_comm(ray_start_regular):

from cupy.cuda import nccl

class TestNcclGroup(GPUCommunicator):
"""
A custom NCCL group for testing. This is a simple wrapper around `_NcclGroup`.
"""

def __init__(self, world_size, comm_id, actor_handles):
self._world_size = world_size
self._comm_id = comm_id
self._actor_handles = actor_handles
self._inner = None

def initialize(self, rank: int) -> None:
print(f"initializing rank {rank}")
try:
self._inner = _NcclGroup(
self._world_size,
self._comm_id,
rank,
self._actor_handles,
torch.cuda.current_stream().cuda_stream,
)
except Exception as e:
print(f"Got {e}")

def get_rank(self, actor: ray.actor.ActorHandle) -> int:
# Implement this without forwarding to `_inner` to allow the method
# to be called before initialization.
actor_ids = [a._ray_actor_id for a in self._actor_handles]
try:
rank = actor_ids.index(actor._ray_actor_id)
except ValueError:
raise ValueError("Actor is not in the NCCL group.")
return rank

def get_world_size(self) -> int:
# Implement this without forwarding to `_inner` to allow the method
# to be called before initialization.
return self._world_size

def get_self_rank(self) -> Optional[int]:
if self._inner is None:
return None
return self._inner.get_self_rank()

def get_actor_handles(self) -> List["ray.actor.ActorHandle"]:
return self._actor_handles

def send(self, value: "torch.Tensor", peer_rank: int) -> None:
return self._inner.send(value, peer_rank)

def recv(
self,
shape: Tuple[int],
dtype: "torch.dtype",
peer_rank: int,
allocator: Optional[TorchTensorAllocator] = None,
) -> "torch.Tensor":
return self._inner.recv(shape, dtype, peer_rank, allocator=allocator)

def allreduce(
self,
send_buf: "torch.Tensor",
recv_buf: "torch.Tensor",
op: ReduceOp = ReduceOp.SUM,
) -> None:
self._inner.allreduce(send_buf, recv_buf, op)
recv_buf += 1

def destroy(self) -> None:
return self._inner.destroy()

comm_id = nccl.get_unique_id()
nccl_group = TestNcclGroup(2, comm_id, [sender, receiver])
with InputNode() as inp:
Expand Down Expand Up @@ -1059,6 +1062,77 @@ def test_torch_tensor_nccl_all_reduce_custom_comm(ray_start_regular):
num_workers = 2
workers = [actor_cls.remote() for _ in range(num_workers)]

class TestNcclGroup(GPUCommunicator):
"""
A custom NCCL group for testing. This is a simple wrapper around `_NcclGroup`.
"""

def __init__(self, world_size, comm_id, actor_handles):
self._world_size = world_size
self._comm_id = comm_id
self._actor_handles = actor_handles
self._inner = None

def initialize(self, rank: int) -> None:
print(f"initializing rank {rank}")
try:
self._inner = _NcclGroup(
self._world_size,
self._comm_id,
rank,
self._actor_handles,
torch.cuda.current_stream().cuda_stream,
)
except Exception as e:
print(f"Got {e}")

def get_rank(self, actor: ray.actor.ActorHandle) -> int:
# Implement this without forwarding to `_inner` to allow the method
# to be called before initialization.
actor_ids = [a._ray_actor_id for a in self._actor_handles]
try:
rank = actor_ids.index(actor._ray_actor_id)
except ValueError:
raise ValueError("Actor is not in the NCCL group.")
return rank

def get_world_size(self) -> int:
# Implement this without forwarding to `_inner` to allow the method
# to be called before initialization.
return self._world_size

def get_self_rank(self) -> Optional[int]:
if self._inner is None:
return None
return self._inner.get_self_rank()

def get_actor_handles(self) -> List["ray.actor.ActorHandle"]:
return self._actor_handles

def send(self, value: "torch.Tensor", peer_rank: int) -> None:
return self._inner.send(value, peer_rank)

def recv(
self,
shape: Tuple[int],
dtype: "torch.dtype",
peer_rank: int,
allocator: Optional[TorchTensorAllocator] = None,
) -> "torch.Tensor":
return self._inner.recv(shape, dtype, peer_rank, allocator=allocator)

def allreduce(
self,
send_buf: "torch.Tensor",
recv_buf: "torch.Tensor",
op: ReduceOp = ReduceOp.SUM,
) -> None:
self._inner.allreduce(send_buf, recv_buf, op)
recv_buf += 1

def destroy(self) -> None:
return self._inner.destroy()

from cupy.cuda import nccl

comm_id = nccl.get_unique_id()
Expand Down

0 comments on commit dab4b89

Please sign in to comment.