Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature(wgt): enable DI using torch-rpc to support GPU-p2p and RDMA-rpc #562

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[run]
concurrency = multiprocessing,thread
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why add this

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add concurrency = multiprocessing, so that codecov can count the coverage of subprocesses, and the default concurrency is set to threading. However,there are some things need to pay attention in using, refer to: https://pytest-cov.readthedocs.io/en/latest/subprocess-support.html

omit =
ding/utils/slurm_helper.py
ding/utils/file_helper.py
Expand Down
47 changes: 47 additions & 0 deletions .github/workflows/unit_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ jobs:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
env:
AGENT_TOOLSDIRECTORY: /opt/hostedtoolcache
with:
python-version: ${{ matrix.python-version }}
- name: do_benchmark
Expand All @@ -55,3 +57,48 @@ jobs:
python -m pip install ".[test,k8s]"
./ding/scripts/install-k8s-tools.sh
make benchmark

test_multiprocess:
runs-on: self-hosted
if: "!contains(github.event.head_commit.message, 'ci skip')"
strategy:
matrix:
python-version: ["3.9"]
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: do_multiprocesstest
timeout-minutes: 40
run: |
python -m pip install box2d-py
python -m pip install .
python -m pip install ".[test,k8s]"
./ding/scripts/install-k8s-tools.sh
make multiprocesstest

test_cuda:
runs-on: self-hosted
if: "!contains(github.event.head_commit.message, 'ci skip')"
strategy:
matrix:
python-version: ["3.9"]
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
env:
AGENT_TOOLSDIRECTORY: /opt/hostedtoolcache
with:
python-version: ${{ matrix.python-version }}
- name: do_unittest
timeout-minutes: 40
run: |
python -m pip install torch==1.12.1+cu113 --extra-index-url https://download.pytorch.org/whl/cu113
python -m pip install box2d-py
python -m pip install .
python -m pip install ".[test,k8s]"
./ding/scripts/install-k8s-tools.sh
make cudatest
16 changes: 13 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ FORMAT_DIR ?= $(if ${RANGE_DIR},${RANGE_DIR},${DING_DIR})
PLATFORM_TEST_DIR ?= $(if ${RANGE_DIR},${RANGE_DIR},${DING_DIR}/entry/tests/test_serial_entry.py ${DING_DIR}/entry/tests/test_serial_entry_onpolicy.py)

# Workers command
WORKERS ?= 2
WORKERS ?= 1
WORKERS_COMMAND := $(if ${WORKERS},-n ${WORKERS} --dist=loadscope,)

# Duration command
DURATIONS ?= 10
DURATIONS_COMMAND := $(if ${DURATIONS},--durations=${DURATIONS},)


docs:
$(MAKE) -C ${DING_DIR}/docs html

Expand Down Expand Up @@ -57,15 +58,24 @@ benchmark:
--durations=0 \
-sv -m benchmark

multiprocesstest:
pytest ${TEST_DIR} \
--cov-report=xml \
--cov-report term-missing \
--cov=${COV_DIR} \
${DURATIONS_COMMAND} \
${WORKERS_COMMAND} \
-sv -m multiprocesstest

test: unittest # just for compatibility, can be changed later

cpu_test: unittest algotest benchmark

all_test: unittest algotest cudatest benchmark
all_test: unittest algotest cudatest benchmark multiprocesstest

format:
yapf --in-place --recursive -p --verbose --style .style.yapf ${FORMAT_DIR}
format_test:
bash format.sh ${FORMAT_DIR} --test
flake_check:
flake8 ${FORMAT_DIR}
flake8 ${FORMAT_DIR}
4 changes: 4 additions & 0 deletions ding/compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@ def torch_ge_131():

def torch_ge_180():
return int("".join(list(filter(str.isdigit, torch.__version__)))) >= 180


def torch_ge_1121():
return int("".join(list(filter(str.isdigit, torch.__version__)))) >= 1121
132 changes: 128 additions & 4 deletions ding/data/shm_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
import ctypes
import numpy as np
import torch
import torch.multiprocessing as mp
from functools import reduce
from ditk import logging
from abc import abstractmethod

_NTYPE_TO_CTYPE = {
np.bool_: ctypes.c_bool,
Expand All @@ -18,8 +22,37 @@
np.float64: ctypes.c_double,
}

# uint16, uint32, uint32
_NTYPE_TO_TTYPE = {
np.bool_: torch.bool,
np.uint8: torch.uint8,
# np.uint16: torch.int16,
# np.uint32: torch.int32,
# np.uint64: torch.int64,
np.int8: torch.uint8,
np.int16: torch.int16,
np.int32: torch.int32,
np.int64: torch.int64,
np.float32: torch.float32,
np.float64: torch.float64,
}

_NOT_SUPPORT_NTYPE = {np.uint16: torch.int16, np.uint32: torch.int32, np.uint64: torch.int64}
_CONVERSION_TYPE = {np.uint16: np.int16, np.uint32: np.int32, np.uint64: np.int64}


class ShmBufferBase:

@abstractmethod
def fill(self, src_arr: Union[np.ndarray, torch.Tensor]) -> None:
raise NotImplementedError

class ShmBuffer():
@abstractmethod
def get(self) -> Union[np.ndarray, torch.Tensor]:
raise NotImplementedError


class ShmBuffer(ShmBufferBase):
"""
Overview:
Shared memory buffer to store numpy array.
Expand Down Expand Up @@ -78,6 +111,92 @@ def get(self) -> np.ndarray:
return data


class ShmBufferCuda(ShmBufferBase):

def __init__(
self,
dtype: Union[torch.dtype, np.dtype],
shape: Tuple[int],
ctype: Optional[type] = None,
copy_on_get: bool = True,
device: Optional[torch.device] = torch.device('cuda:0')
) -> None:
"""
Overview:
Use torch.multiprocessing for shared tensor or ndaray between processes.
Arguments:
- dtype (Union[torch.dtype, np.dtype]): dtype of torch.tensor or numpy.ndarray.
- shape (Tuple[int]): Shape of torch.tensor or numpy.ndarray.
- ctype (type): Origin class type, e.g. np.ndarray, torch.Tensor.
- copy_on_get (bool, optional): Can be set to False only if the shared object
is a tenor, otherwise True.
- device (Optional[torch.device], optional): The GPU device where cuda-shared-tensor
is located, the default is cuda:0.

Raises:
RuntimeError: Unsupported share type by ShmBufferCuda.
"""
if isinstance(dtype, np.dtype): # it is type of gym.spaces.dtype
self.ctype = np.ndarray
dtype = dtype.type
if dtype in _NOT_SUPPORT_NTYPE.keys():
logging.warning(
"Torch tensor unsupport numpy type {}, attempt to do a type conversion, which may lose precision.".
format(dtype)
)
ttype = _NOT_SUPPORT_NTYPE[dtype]
self.dtype = _CONVERSION_TYPE[dtype]
else:
ttype = _NTYPE_TO_TTYPE[dtype]
self.dtype = dtype
elif isinstance(dtype, torch.dtype):
self.ctype = torch.Tensor
ttype = dtype
else:
raise RuntimeError("The dtype parameter only supports torch.dtype and np.dtype")

self.copy_on_get = copy_on_get
self.shape = shape
self.device = device
self.buffer = torch.zeros(reduce(lambda x, y: x * y, shape), dtype=ttype, device=self.device)

def fill(self, src_arr: Union[np.ndarray, torch.Tensor]) -> None:
if self.ctype is np.ndarray:
if src_arr.dtype.type != self.dtype:
logging.warning(
"Torch tensor unsupport numpy type {}, attempt to do a type conversion, which may lose precision.".
format(self.dtype)
)
src_arr = src_arr.astype(self.dtype)
tensor = torch.from_numpy(src_arr)
elif self.ctype is torch.Tensor:
tensor = src_arr
else:
raise RuntimeError("Unsopport CUDA-shared-tensor input type:\"{}\"".format(type(src_arr)))

# If the GPU-a and GPU-b are connected using nvlink, the copy is very fast.
with torch.no_grad():
self.buffer.copy_(tensor.view(tensor.numel()))

def get(self) -> Union[np.ndarray, torch.Tensor]:
with torch.no_grad():
if self.ctype is np.ndarray:
# Because ShmBufferCuda use CUDA memory exchanging data between processes.
# So copy_on_get is necessary for numpy arrays.
re = self.buffer.cpu()
re = re.detach().view(self.shape).numpy()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why only detach rather than clone().detach here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because self.buffer is located in the GPU, we have already copied it to CPU and there is no need to clone it again.

else:
if self.copy_on_get:
re = self.buffer.clone().detach().view(self.shape)
else:
re = self.buffer.view(self.shape)

return re

def __del__(self):
del self.buffer


class ShmBufferContainer(object):
"""
Overview:
Expand All @@ -88,7 +207,8 @@ def __init__(
self,
dtype: Union[Dict[Any, type], type, np.dtype],
shape: Union[Dict[Any, tuple], tuple],
copy_on_get: bool = True
copy_on_get: bool = True,
is_cuda_buffer: bool = False
) -> None:
"""
Overview:
Expand All @@ -98,11 +218,15 @@ def __init__(
- shape (:obj:`Union[Dict[Any, tuple], tuple]`): If `Dict[Any, tuple]`, use a dict to manage \
multiple buffers; If `tuple`, use single buffer.
- copy_on_get (:obj:`bool`): Whether to copy data when calling get method.
- is_cuda_buffer (:obj:`bool`): Whether to use pytorch CUDA shared tensor as the implementation of shm.
"""
if isinstance(shape, dict):
self._data = {k: ShmBufferContainer(dtype[k], v, copy_on_get) for k, v in shape.items()}
self._data = {k: ShmBufferContainer(dtype[k], v, copy_on_get, is_cuda_buffer) for k, v in shape.items()}
elif isinstance(shape, (tuple, list)):
self._data = ShmBuffer(dtype, shape, copy_on_get)
if not is_cuda_buffer:
self._data = ShmBuffer(dtype, shape, copy_on_get)
else:
self._data = ShmBufferCuda(dtype, shape, copy_on_get)
else:
raise RuntimeError("not support shape: {}".format(shape))
self._shape = shape
Expand Down
Loading