Skip to content

Commit

Permalink
Merge branch 'main' into dse_qwen2_2b_mrl_v1
Browse files Browse the repository at this point in the history
  • Loading branch information
austin.veselka committed Nov 13, 2024
2 parents 1186516 + 0d4ea3f commit 8139112
Show file tree
Hide file tree
Showing 15 changed files with 57 additions and 38 deletions.
2 changes: 1 addition & 1 deletion Dockerfile.neuron
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ RUN --mount=type=bind,source=.git,target=.git \
if [ "$GIT_REPO_CHECK" != 0 ]; then bash tools/check_repo.sh ; fi

RUN python3 -m pip install -U \
'cmake>=3.26,<=3.30' ninja packaging 'setuptools-scm>=8' wheel jinja2 \
'cmake>=3.26' ninja packaging 'setuptools-scm>=8' wheel jinja2 \
-r requirements-neuron.txt

ENV VLLM_TARGET_DEVICE neuron
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.ppc64le
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ RUN --mount=type=bind,source=.git,target=.git \
# These packages will be in rocketce eventually
RUN --mount=type=cache,target=/root/.cache/pip \
pip install -v --prefer-binary --extra-index-url https://repo.fury.io/mgiessing \
'cmake>=3.26,<=3.30' ninja packaging 'setuptools-scm>=8' wheel jinja2 \
'cmake>=3.26' ninja packaging 'setuptools-scm>=8' wheel jinja2 \
torch==2.3.1 \
-r requirements-cpu.txt \
xformers uvloop==0.20.0
Expand Down
4 changes: 3 additions & 1 deletion docs/source/_static/custom.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ document.addEventListener("DOMContentLoaded", function () {
script.setAttribute("version", "stable");
script.setAttribute("runllm-keyboard-shortcut", "Mod+j"); // cmd-j or ctrl-j to open the widget.
script.setAttribute("runllm-name", "vLLM");
script.setAttribute("runllm-position", "TOP_RIGHT");
script.setAttribute("runllm-position", "BOTTOM_RIGHT");
script.setAttribute("runllm-position-y", "20%");
script.setAttribute("runllm-position-x", "3%");
script.setAttribute("runllm-assistant-id", "207");

script.async = true;
Expand Down
2 changes: 1 addition & 1 deletion docs/source/getting_started/cpu-installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ Build from source
.. code-block:: console
$ pip install --upgrade pip
$ pip install cmake>=3.26,<=3.30 wheel packaging ninja "setuptools-scm>=8" numpy
$ pip install cmake>=3.26 wheel packaging ninja "setuptools-scm>=8" numpy
$ pip install -v -r requirements-cpu.txt --extra-index-url https://download.pytorch.org/whl/cpu
- Finally, build and install vLLM CPU backend:
Expand Down
4 changes: 4 additions & 0 deletions docs/source/getting_started/debugging.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ Hangs loading a model from disk
If the model is large, it can take a long time to load it from disk. Pay attention to where you store the model. Some clusters have shared filesystems across nodes, e.g. a distributed filesystem or a network filesystem, which can be slow.
It'd be better to store the model in a local disk. Additionally, have a look at the CPU memory usage, when the model is too large it might take a lot of CPU memory, slowing down the operating system because it needs to frequently swap between disk and memory.

.. note::

To isolate the model downloading and loading issue, you can use the ``--load-format dummy`` argument to skip loading the model weights. This way, you can check if the model downloading and loading is the bottleneck.

Model is too large
----------------------------------------
If the model is too large to fit in a single GPU, you might want to `consider tensor parallelism <https://docs.vllm.ai/en/latest/serving/distributed_serving.html#distributed-inference-and-serving>`_ to split the model across multiple GPUs. In that case, every process will read the whole model and split it into chunks, which makes the disk reading time even longer (proportional to the size of tensor parallelism). You can convert the model checkpoint to a sharded checkpoint using `this example <https://docs.vllm.ai/en/latest/getting_started/examples/save_sharded_state.html>`_ . The conversion process might take some time, but later you can load the sharded checkpoint much faster. The model loading time should remain constant regardless of the size of tensor parallelism.
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[build-system]
# Should be mirrored in requirements-build.txt
requires = [
"cmake>=3.26,<=3.30",
"cmake>=3.26",
"ninja",
"packaging",
"setuptools>=61",
Expand Down
2 changes: 1 addition & 1 deletion requirements-build.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Should be mirrored in pyproject.toml
cmake>=3.26,<=3.30
cmake>=3.26
ninja
packaging
setuptools>=61
Expand Down
2 changes: 1 addition & 1 deletion requirements-tpu.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
-r requirements-common.txt

# Dependencies for TPU
cmake>=3.26,<=3.30
cmake>=3.26
ninja
packaging
setuptools-scm>=8
Expand Down
2 changes: 1 addition & 1 deletion requirements-xpu.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
-r requirements-common.txt

ray >= 2.9
cmake>=3.26,<=3.30
cmake>=3.26
ninja
packaging
setuptools-scm>=8
Expand Down
26 changes: 16 additions & 10 deletions tests/distributed/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,15 @@ def test_cuda_device_count_stateless():


def cpu_worker(rank, WORLD_SIZE, port1, port2):
pg1 = StatelessProcessGroup.create(init_method=f"tcp://127.0.0.1:{port1}",
pg1 = StatelessProcessGroup.create(host="127.0.0.1",
port=port1,
rank=rank,
world_size=WORLD_SIZE)
if rank <= 2:
pg2 = StatelessProcessGroup.create(
init_method=f"tcp://127.0.0.1:{port2}", rank=rank, world_size=3)
pg2 = StatelessProcessGroup.create(host="127.0.0.1",
port=port2,
rank=rank,
world_size=3)
data = torch.tensor([rank])
data = pg1.broadcast_obj(data, src=2)
assert data.item() == 2
Expand All @@ -62,14 +65,17 @@ def cpu_worker(rank, WORLD_SIZE, port1, port2):

def gpu_worker(rank, WORLD_SIZE, port1, port2):
torch.cuda.set_device(rank)
pg1 = StatelessProcessGroup.create(init_method=f"tcp://127.0.0.1:{port1}",
pg1 = StatelessProcessGroup.create(host="127.0.0.1",
port=port1,
rank=rank,
world_size=WORLD_SIZE)
pynccl1 = PyNcclCommunicator(pg1, device=rank)
pynccl1.disabled = False
if rank <= 2:
pg2 = StatelessProcessGroup.create(
init_method=f"tcp://127.0.0.1:{port2}", rank=rank, world_size=3)
pg2 = StatelessProcessGroup.create(host="127.0.0.1",
port=port2,
rank=rank,
world_size=3)
pynccl2 = PyNcclCommunicator(pg2, device=rank)
pynccl2.disabled = False
data = torch.tensor([rank]).cuda()
Expand All @@ -89,7 +95,8 @@ def gpu_worker(rank, WORLD_SIZE, port1, port2):


def broadcast_worker(rank, WORLD_SIZE, port1, port2):
pg1 = StatelessProcessGroup.create(init_method=f"tcp://127.0.0.1:{port1}",
pg1 = StatelessProcessGroup.create(host="127.0.0.1",
port=port1,
rank=rank,
world_size=WORLD_SIZE)
if rank == 2:
Expand All @@ -101,16 +108,15 @@ def broadcast_worker(rank, WORLD_SIZE, port1, port2):


def allgather_worker(rank, WORLD_SIZE, port1, port2):
pg1 = StatelessProcessGroup.create(init_method=f"tcp://127.0.0.1:{port1}",
pg1 = StatelessProcessGroup.create(host="127.0.0.1",
port=port1,
rank=rank,
world_size=WORLD_SIZE)
data = pg1.all_gather_obj(rank)
assert data == list(range(WORLD_SIZE))
pg1.barrier()


# TODO: investigate why this test is flaky. It hangs during initialization.
@pytest.mark.skip("Skip the test because it is flaky.")
@multi_gpu_test(num_gpus=4)
@pytest.mark.parametrize(
"worker", [cpu_worker, gpu_worker, broadcast_worker, allgather_worker])
Expand Down
3 changes: 3 additions & 0 deletions tests/v1/engine/test_engine_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ def make_request() -> EngineCoreRequest:
request_id=uuid.uuid4(),
prompt=PROMPT,
prompt_token_ids=PROMPT_TOKENS,
mm_data=None,
mm_placeholders=None,
mm_processor_kwargs=None,
sampling_params=SamplingParams(),
eos_token_id=None,
arrival_time=time.time(),
Expand Down
3 changes: 3 additions & 0 deletions tests/v1/engine/test_engine_core_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ def make_request(params: SamplingParams) -> EngineCoreRequest:
request_id=str(uuid.uuid4()),
prompt=PROMPT,
prompt_token_ids=PROMPT_TOKENS,
mm_data=None,
mm_placeholders=None,
mm_processor_kwargs=None,
sampling_params=params,
eos_token_id=None,
arrival_time=time.time(),
Expand Down
28 changes: 13 additions & 15 deletions vllm/distributed/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from typing import Any, Deque, Dict, Optional, Sequence, Tuple

import torch
from torch.distributed.rendezvous import rendezvous
from torch.distributed import TCPStore

import vllm.envs as envs
from vllm.logger import init_logger
Expand Down Expand Up @@ -97,7 +97,6 @@ class StatelessProcessGroup:
group. Only use it to communicate metadata between processes.
For data-plane communication, create NCCL-related objects.
"""
prefix: str
rank: int
world_size: int
store: torch._C._distributed_c10d.Store
Expand Down Expand Up @@ -127,7 +126,7 @@ def __post_init__(self):
def send_obj(self, obj: Any, dst: int):
"""Send an object to a destination rank."""
self.expire_data()
key = f"{self.prefix}/send_to/{dst}/{self.send_dst_counter[dst]}"
key = f"send_to/{dst}/{self.send_dst_counter[dst]}"
self.store.set(key, pickle.dumps(obj))
self.send_dst_counter[dst] += 1
self.entries.append((key, time.time()))
Expand All @@ -147,8 +146,7 @@ def recv_obj(self, src: int) -> Any:
"""Receive an object from a source rank."""
obj = pickle.loads(
self.store.get(
f"{self.prefix}/send_to/{self.rank}/{self.recv_src_counter[src]}"
))
f"send_to/{self.rank}/{self.recv_src_counter[src]}"))
self.recv_src_counter[src] += 1
return obj

Expand All @@ -159,14 +157,14 @@ def broadcast_obj(self, obj: Optional[Any], src: int) -> Any:
"""
if self.rank == src:
self.expire_data()
key = (f"{self.prefix}/broadcast_from/{src}/"
key = (f"broadcast_from/{src}/"
f"{self.broadcast_send_counter}")
self.store.set(key, pickle.dumps(obj))
self.broadcast_send_counter += 1
self.entries.append((key, time.time()))
return obj
else:
key = (f"{self.prefix}/broadcast_from/{src}/"
key = (f"broadcast_from/{src}/"
f"{self.broadcast_recv_src_counter[src]}")
recv_obj = pickle.loads(self.store.get(key))
self.broadcast_recv_src_counter[src] += 1
Expand Down Expand Up @@ -194,7 +192,8 @@ def barrier(self):

@staticmethod
def create(
init_method: str,
host: str,
port: int,
rank: int,
world_size: int,
data_expiration_seconds: int = 3600,
Expand All @@ -214,15 +213,14 @@ def create(
can call `StatelessProcessGroup.create` to form a group, and then process A, B,
C, and D can call `StatelessProcessGroup.create` to form another group.
""" # noqa
from torch._C._distributed_c10d import _DEFAULT_PG_TIMEOUT
timeout = _DEFAULT_PG_TIMEOUT

store, rank, world_size = next(
rendezvous(init_method, rank, world_size, timeout=timeout))
store.set_timeout(timeout)
store = TCPStore(
host_name=host,
port=port,
world_size=world_size,
is_master=(rank == 0),
)

return StatelessProcessGroup(
prefix=init_method,
rank=rank,
world_size=world_size,
store=store,
Expand Down
2 changes: 1 addition & 1 deletion vllm/v1/engine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ def process_input_socket(self, input_path: str):

# Msgpack serialization decoding.
decoder_add_req = PickleEncoder()
decoder_abort_req = msgpack.Decoder(list[str])
decoder_abort_req = PickleEncoder()

with self.make_socket(input_path, zmq.constants.PULL) as socket:
while True:
Expand Down
11 changes: 7 additions & 4 deletions vllm/v1/worker/gpu_model_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,14 +404,17 @@ def execute_model(

def load_model(self) -> None:
if self.use_cuda_graph:
# FIXME(woosuk): Currently, we do not use inductor to reduce the
# compilation time and any potential issues with the inductor.
os.environ["VLLM_CUSTOM_OPS"] = "all"
# NOTE(woosuk): Currently, we use inductor because the piecewise
# CUDA graphs do not work properly with the custom CUDA kernels.
# FIXME(woosuk): Disable inductor to reduce the compilation time
# and avoid any potential issues with the inductor.
os.environ["VLLM_CUSTOM_OPS"] = "none"
set_compilation_config(
CompilationConfig(
use_cudagraph=True,
non_cudagraph_ops=["vllm.unified_v1_flash_attention"],
use_inductor=False,
use_inductor=True,
enable_fusion=False,
))

logger.info("Starting to load model %s...", self.model_config.model)
Expand Down

0 comments on commit 8139112

Please sign in to comment.