From f7a635600f7198b4ca64bd5830a0716c68a5278b Mon Sep 17 00:00:00 2001 From: youkaichao Date: Thu, 11 Apr 2024 20:35:41 -0700 Subject: [PATCH 01/38] replace narrow-usage set_cuda_visible_devices to general update_environment_variables --- tests/distributed/test_pynccl.py | 4 ++-- vllm/engine/ray_utils.py | 8 ++++---- vllm/executor/ray_gpu_executor.py | 13 ++++++++++--- vllm/utils.py | 5 +++-- 4 files changed, 19 insertions(+), 11 deletions(-) diff --git a/tests/distributed/test_pynccl.py b/tests/distributed/test_pynccl.py index b50eed1c8c722..562d0228cb804 100644 --- a/tests/distributed/test_pynccl.py +++ b/tests/distributed/test_pynccl.py @@ -6,6 +6,7 @@ from vllm.distributed.device_communicators.pynccl import (NCCLCommunicator, ncclGetUniqueId) +from vllm.utils import update_environment_variables def distributed_run(fn, world_size): @@ -32,8 +33,7 @@ def update_env(fn): # so we need to pass the environment variables as arguments # and update the environment variables in the function def wrapper(env): - import os - os.environ.update(env) + update_environment_variables(env) fn() return wrapper diff --git a/vllm/engine/ray_utils.py b/vllm/engine/ray_utils.py index 70d5c9b1fae05..0d82418b5603e 100644 --- a/vllm/engine/ray_utils.py +++ b/vllm/engine/ray_utils.py @@ -1,9 +1,9 @@ import pickle -from typing import List, Optional, Tuple +from typing import Dict, List, Optional, Tuple from vllm.config import ParallelConfig from vllm.logger import init_logger -from vllm.utils import get_ip, is_hip, set_cuda_visible_devices +from vllm.utils import get_ip, is_hip, update_environment_variables logger = init_logger(__name__) @@ -52,8 +52,8 @@ def get_node_and_gpu_ids(self) -> Tuple[str, List[int]]: gpu_ids = ray.get_gpu_ids() return node_id, gpu_ids - def set_cuda_visible_devices(self, device_ids) -> None: - set_cuda_visible_devices(device_ids) + def update_environment_variables(self, envs: Dict[str, str]) -> None: + update_environment_variables(envs) def execute_model_compiled_dag_remote(self, ignored): """Used only when compiled DAG is enabled.""" diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index 6c0ccd7e64c90..267f163b9f2a2 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -14,7 +14,7 @@ from vllm.lora.request import LoRARequest from vllm.sequence import SamplerOutput, SequenceGroupMetadata from vllm.utils import (get_distributed_init_method, get_ip, get_open_port, - make_async, set_cuda_visible_devices) + make_async, update_environment_variables) if ray is not None: from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy @@ -134,9 +134,16 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", node_gpus[node_id] = sorted(gpu_ids) # Set CUDA_VISIBLE_DEVICES for the driver and workers. - set_cuda_visible_devices(node_gpus[driver_node_id]) + # ",".join(map(str, device_ids)) + update_environment_variables({ + "CUDA_VISIBLE_DEVICES": + ",".join(map(str, node_gpus[driver_node_id])) + }) for worker, (node_id, _) in zip(self.workers, worker_node_and_gpu_ids): - worker.set_cuda_visible_devices.remote(node_gpus[node_id]) + worker.update_environment_variables.remote({ + "CUDA_VISIBLE_DEVICES": + ",".join(map(str, node_gpus[node_id])) + }) distributed_init_method = get_distributed_init_method( driver_ip, get_open_port()) diff --git a/vllm/utils.py b/vllm/utils.py index 8ab8927512cc9..c7ef7c95ab9fa 100644 --- a/vllm/utils.py +++ b/vllm/utils.py @@ -232,8 +232,9 @@ def get_open_port() -> int: return s.getsockname()[1] -def set_cuda_visible_devices(device_ids: List[int]) -> None: - os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, device_ids)) +def update_environment_variables(envs: Dict[str, str]): + import os + os.environ.update(envs) def chunk_list(lst, chunk_size): From bbdfc692e663f98c80b23427bb406e40f6285a92 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Thu, 11 Apr 2024 20:45:52 -0700 Subject: [PATCH 02/38] add warning when env is overwritten --- vllm/utils.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/vllm/utils.py b/vllm/utils.py index c7ef7c95ab9fa..48e028570d1da 100644 --- a/vllm/utils.py +++ b/vllm/utils.py @@ -233,8 +233,13 @@ def get_open_port() -> int: def update_environment_variables(envs: Dict[str, str]): - import os - os.environ.update(envs) + for k, v in envs.items(): + if k in os.environ: + warnings.warn( + f"Overwriting environment variable {k} " + f"from {os.environ[k]} to {v}", + stacklevel=2) + os.environ[k] = v def chunk_list(lst, chunk_size): From 1e62614bd4373cba1ea78f76b9ee47f7ae1d937a Mon Sep 17 00:00:00 2001 From: youkaichao Date: Thu, 11 Apr 2024 21:49:47 -0700 Subject: [PATCH 03/38] use logger.warning --- vllm/utils.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/vllm/utils.py b/vllm/utils.py index 48e028570d1da..c9e89fda7e004 100644 --- a/vllm/utils.py +++ b/vllm/utils.py @@ -235,10 +235,8 @@ def get_open_port() -> int: def update_environment_variables(envs: Dict[str, str]): for k, v in envs.items(): if k in os.environ: - warnings.warn( - f"Overwriting environment variable {k} " - f"from {os.environ[k]} to {v}", - stacklevel=2) + logger.warning(f"Overwriting environment variable {k} " + f"from {os.environ[k]} to {v}") os.environ[k] = v From 37eb3448bb7dacc746e68bca0837e3e8c4d605e8 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Thu, 11 Apr 2024 22:19:08 -0700 Subject: [PATCH 04/38] fix env copy --- tests/distributed/test_pynccl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/distributed/test_pynccl.py b/tests/distributed/test_pynccl.py index 562d0228cb804..f902a031be7bf 100644 --- a/tests/distributed/test_pynccl.py +++ b/tests/distributed/test_pynccl.py @@ -13,7 +13,7 @@ def distributed_run(fn, world_size): number_of_processes = world_size processes = [] for i in range(number_of_processes): - env = os.environ.copy() + env = {} env['RANK'] = str(i) env['LOCAL_RANK'] = str(i) env['WORLD_SIZE'] = str(number_of_processes) From 6f64b48075f5bc858c94c0f5cc4b070b69883fac Mon Sep 17 00:00:00 2001 From: youkaichao Date: Thu, 11 Apr 2024 22:22:42 -0700 Subject: [PATCH 05/38] avoid overwritten warning in ray --- vllm/engine/ray_utils.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/vllm/engine/ray_utils.py b/vllm/engine/ray_utils.py index 0d82418b5603e..10ebf2aec83fc 100644 --- a/vllm/engine/ray_utils.py +++ b/vllm/engine/ray_utils.py @@ -1,3 +1,4 @@ +import os import pickle from typing import Dict, List, Optional, Tuple @@ -53,6 +54,9 @@ def get_node_and_gpu_ids(self) -> Tuple[str, List[int]]: return node_id, gpu_ids def update_environment_variables(self, envs: Dict[str, str]) -> None: + if 'CUDA_VISIBLE_DEVICES' in envs: + # avoid overwritten warning + del os.environ['CUDA_VISIBLE_DEVICES'] update_environment_variables(envs) def execute_model_compiled_dag_remote(self, ignored): From 0499106caaddbec54109dcd293b75c0eb0a24a31 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Thu, 11 Apr 2024 22:27:00 -0700 Subject: [PATCH 06/38] fix lint --- tests/distributed/test_pynccl.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/distributed/test_pynccl.py b/tests/distributed/test_pynccl.py index f902a031be7bf..d58f621d36b86 100644 --- a/tests/distributed/test_pynccl.py +++ b/tests/distributed/test_pynccl.py @@ -1,5 +1,4 @@ import multiprocessing -import os import pytest import torch From d26672f83d763e7b23898f9e9124b278ac02f1df Mon Sep 17 00:00:00 2001 From: youkaichao Date: Thu, 11 Apr 2024 23:17:00 -0700 Subject: [PATCH 07/38] allow heterogeneous args in _run_workers; move update_environment_variables to base worker --- vllm/engine/ray_utils.py | 11 ++------ vllm/executor/ray_gpu_executor.py | 46 +++++++++++++++++++------------ vllm/worker/worker_base.py | 10 +++++++ 3 files changed, 40 insertions(+), 27 deletions(-) diff --git a/vllm/engine/ray_utils.py b/vllm/engine/ray_utils.py index 10ebf2aec83fc..e48eca53186e3 100644 --- a/vllm/engine/ray_utils.py +++ b/vllm/engine/ray_utils.py @@ -1,10 +1,9 @@ -import os import pickle -from typing import Dict, List, Optional, Tuple +from typing import List, Optional, Tuple from vllm.config import ParallelConfig from vllm.logger import init_logger -from vllm.utils import get_ip, is_hip, update_environment_variables +from vllm.utils import get_ip, is_hip logger = init_logger(__name__) @@ -53,12 +52,6 @@ def get_node_and_gpu_ids(self) -> Tuple[str, List[int]]: gpu_ids = ray.get_gpu_ids() return node_id, gpu_ids - def update_environment_variables(self, envs: Dict[str, str]) -> None: - if 'CUDA_VISIBLE_DEVICES' in envs: - # avoid overwritten warning - del os.environ['CUDA_VISIBLE_DEVICES'] - update_environment_variables(envs) - def execute_model_compiled_dag_remote(self, ignored): """Used only when compiled DAG is enabled.""" import torch diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index 267f163b9f2a2..2cebf763f2875 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -14,7 +14,7 @@ from vllm.lora.request import LoRARequest from vllm.sequence import SamplerOutput, SequenceGroupMetadata from vllm.utils import (get_distributed_init_method, get_ip, get_open_port, - make_async, update_environment_variables) + make_async) if ray is not None: from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy @@ -134,16 +134,13 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", node_gpus[node_id] = sorted(gpu_ids) # Set CUDA_VISIBLE_DEVICES for the driver and workers. - # ",".join(map(str, device_ids)) - update_environment_variables({ - "CUDA_VISIBLE_DEVICES": - ",".join(map(str, node_gpus[driver_node_id])) - }) - for worker, (node_id, _) in zip(self.workers, worker_node_and_gpu_ids): - worker.update_environment_variables.remote({ + all_args_to_update_environment_variables = [] + for (node_id, _) in [(driver_node_id, driver_gpu_ids) + ] + worker_node_and_gpu_ids: + all_args_to_update_environment_variables.append([{ "CUDA_VISIBLE_DEVICES": ",".join(map(str, node_gpus[node_id])) - }) + }]) distributed_init_method = get_distributed_init_method( driver_ip, get_open_port()) @@ -197,6 +194,8 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", is_driver_worker=True, ) + self._run_workers("update_environment_variables", + all_args=all_args_to_update_environment_variables) self._run_workers("init_device") self._run_workers( "load_model", @@ -285,11 +284,25 @@ def _run_workers( *args, driver_args: Optional[List[Any]] = None, driver_kwargs: Optional[Dict[str, Any]] = None, + all_args: Optional[List[List[Any]]] = None, + all_kwargs: Optional[List[Dict[str, Any]]] = None, max_concurrent_workers: Optional[int] = None, use_ray_compiled_dag: bool = False, **kwargs, ) -> Any: - """Runs the given method on all workers.""" + """Runs the given method on all workers. + all_args and all_kwargs are used to pass heterogeneous arguments, + i.e. different arguments for each worker. + """ + if driver_args is None: + driver_args = args + if driver_kwargs is None: + driver_kwargs = kwargs + + if all_args is None: + all_args = [driver_args] + [args] * len(self.workers) + if all_kwargs is None: + all_kwargs = [driver_kwargs] + [kwargs] * len(self.workers) if max_concurrent_workers: raise NotImplementedError( @@ -302,18 +315,15 @@ def _run_workers( else: # Start the ray workers first. ray_worker_outputs = [ - worker.execute_method.remote(method, *args, **kwargs) - for worker in self.workers + worker.execute_method.remote(method, *worker_args, + **worker_kwargs) + for (worker, worker_args, worker_kwargs + ) in zip(self.workers, all_args[1:], all_kwargs[1:]) ] - if driver_args is None: - driver_args = args - if driver_kwargs is None: - driver_kwargs = kwargs - # Start the driver worker after all the ray workers. driver_worker_output = getattr(self.driver_worker, - method)(*driver_args, **driver_kwargs) + method)(*all_args[0], **all_kwargs[0]) # Get the results of the ray workers. if self.workers: diff --git a/vllm/worker/worker_base.py b/vllm/worker/worker_base.py index e3027c406ffeb..849bcf55f7dae 100644 --- a/vllm/worker/worker_base.py +++ b/vllm/worker/worker_base.py @@ -1,8 +1,10 @@ +import os from abc import ABC, abstractmethod from typing import Dict, List from vllm.lora.request import LoRARequest from vllm.sequence import SamplerOutput, SequenceGroupMetadata +from vllm.utils import update_environment_variables class WorkerBase(ABC): @@ -10,6 +12,14 @@ class WorkerBase(ABC): different hardware. """ + def update_environment_variables(self, envs: Dict[str, str]) -> None: + key = 'CUDA_VISIBLE_DEVICES' + if key in envs and key in os.environ: + # overwriting CUDA_VISIBLE_DEVICES is desired behavior + # suppress the warning in `update_environment_variables` + del os.environ[key] + update_environment_variables(envs) + @abstractmethod def init_device(self) -> None: """Initialize device state, such as loading the model or other on-device From 3a01337d3d9632e05e8dd7986cc79376067883b7 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Fri, 12 Apr 2024 00:02:59 -0700 Subject: [PATCH 08/38] unified init worker --- vllm/engine/ray_utils.py | 26 ++++++++++--- vllm/executor/ray_gpu_executor.py | 63 +++++++++++++++---------------- vllm/worker/cpu_worker.py | 18 +++++---- vllm/worker/neuron_worker.py | 7 +++- vllm/worker/worker.py | 20 +++++----- vllm/worker/worker_base.py | 8 ++++ 6 files changed, 86 insertions(+), 56 deletions(-) diff --git a/vllm/engine/ray_utils.py b/vllm/engine/ray_utils.py index e48eca53186e3..6ac55a7d0e2f7 100644 --- a/vllm/engine/ray_utils.py +++ b/vllm/engine/ray_utils.py @@ -1,3 +1,4 @@ +import importlib import pickle from typing import List, Optional, Tuple @@ -14,10 +15,15 @@ class RayWorkerVllm: """Ray wrapper for vllm.worker.Worker, allowing Worker to be lazliy initialized after Ray sets CUDA_VISIBLE_DEVICES.""" - def __init__(self, init_cached_hf_modules=False) -> None: - if init_cached_hf_modules: - from transformers.dynamic_module_utils import init_hf_modules - init_hf_modules() + def __init__(self, + init_cached_hf_modules=False, + worker_module=None, + worker_class_name=None, + **kwargs) -> None: + self.worker_module = worker_module + self.worker_class_name = worker_class_name + self.kwargs = kwargs + self.init_cached_hf_modules = init_cached_hf_modules self.worker = None # Since the compiled DAG runs a main execution # in a different thread that calls cuda.set_device. @@ -25,10 +31,18 @@ def __init__(self, init_cached_hf_modules=False) -> None: # that thread. self.compiled_dag_cuda_device_set = False - def init_worker(self, worker_init_fn): - self.worker = worker_init_fn() + def init_worker(self): + if self.init_cached_hf_modules: + from transformers.dynamic_module_utils import init_hf_modules + init_hf_modules() + mod = importlib.import_module(self.worker_module) + worker_class = getattr(mod, self.worker_class_name) + self.worker = worker_class(**self.kwargs) + self.worker.init_worker() def __getattr__(self, name): + if hasattr(self, name): + return getattr(self, name) return getattr(self.worker, name) def execute_method(self, method, *args, **kwargs): diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index 2cebf763f2875..ac8050c40284b 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -83,8 +83,21 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", # The remaining workers are the actual ray actors. self.workers: List[RayWorkerVllm] = [] + model_config = copy.deepcopy(self.model_config) + parallel_config = copy.deepcopy(self.parallel_config) + scheduler_config = copy.deepcopy(self.scheduler_config) + device_config = copy.deepcopy(self.device_config) + lora_config = copy.deepcopy(self.lora_config) + cache_config = copy.deepcopy(self.cache_config) + vision_language_config = copy.deepcopy(self.vision_language_config) + # Create the workers. driver_ip = get_ip() + distributed_init_method = get_distributed_init_method( + driver_ip, get_open_port()) + + rank = 1 + local_rank = -1 # todo: on-the-fly local rank assignment for bundle_id, bundle in enumerate(placement_group.bundle_specs): if not bundle.get("GPU", 0): continue @@ -98,7 +111,22 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", num_gpus=num_gpus, scheduling_strategy=scheduling_strategy, **ray_remote_kwargs, - )(RayWorkerVllm).remote(self.model_config.trust_remote_code) + )(RayWorkerVllm).remote( + init_cached_hf_modules=self.model_config.trust_remote_code, + worker_module="vllm.worker.worker", + worker_class_name="Worker", + model_config=model_config, + parallel_config=parallel_config, + scheduler_config=scheduler_config, + device_config=device_config, + cache_config=cache_config, + local_rank=local_rank, + rank=rank, + distributed_init_method=distributed_init_method, + lora_config=lora_config, + vision_language_config=vision_language_config, + ) + rank += 1 worker_ip = ray.get(worker.get_node_ip.remote()) if worker_ip == driver_ip and self.driver_dummy_worker is None: @@ -142,41 +170,10 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", ",".join(map(str, node_gpus[node_id])) }]) - distributed_init_method = get_distributed_init_method( - driver_ip, get_open_port()) - # Lazy import the Worker to avoid importing torch.cuda/xformers # before CUDA_VISIBLE_DEVICES is set in the Worker from vllm.worker.worker import Worker - model_config = copy.deepcopy(self.model_config) - parallel_config = copy.deepcopy(self.parallel_config) - scheduler_config = copy.deepcopy(self.scheduler_config) - device_config = copy.deepcopy(self.device_config) - lora_config = copy.deepcopy(self.lora_config) - cache_config = copy.deepcopy(self.cache_config) - vision_language_config = copy.deepcopy(self.vision_language_config) - - # Initialize the actual workers with the Worker class. - for rank, (worker, (node_id, _)) in enumerate( - zip(self.workers, worker_node_and_gpu_ids), - start=1, - ): - local_rank = node_workers[node_id].index(rank) - worker.init_worker.remote( - lambda rank=rank, local_rank=local_rank: Worker( - model_config=model_config, - parallel_config=parallel_config, - scheduler_config=scheduler_config, - device_config=device_config, - cache_config=cache_config, - local_rank=local_rank, - rank=rank, - distributed_init_method=distributed_init_method, - lora_config=lora_config, - vision_language_config=vision_language_config, - )) - # Initialize the driver worker with the Worker class. driver_rank = 0 driver_local_rank = node_workers[driver_node_id].index(driver_rank) @@ -194,6 +191,8 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", is_driver_worker=True, ) + self._run_workers("init_worker") + self._run_workers("update_environment_variables", all_args=all_args_to_update_environment_variables) self._run_workers("init_device") diff --git a/vllm/worker/cpu_worker.py b/vllm/worker/cpu_worker.py index 751384eb72af3..af8bc83fda8f6 100644 --- a/vllm/worker/cpu_worker.py +++ b/vllm/worker/cpu_worker.py @@ -147,19 +147,23 @@ def __init__( self.is_driver_worker = is_driver_worker if self.is_driver_worker: assert self.rank == 0, "The driver worker must have rank 0." + self.kv_cache_dtype = kv_cache_dtype - self.model_runner = CPUModelRunner(model_config, - parallel_config, - scheduler_config, - device_config, - lora_config=self.lora_config, - kv_cache_dtype=kv_cache_dtype, - is_driver_worker=is_driver_worker) # Uninitialized cache engine. Will be initialized by # initialize_cache. self.cache_engine = None self.cpu_cache = None + def init_worker(self) -> None: + self.model_runner = CPUModelRunner( + self.model_config, + self.parallel_config, + self.scheduler_config, + self.device_config, + lora_config=self.lora_config, + kv_cache_dtype=self.kv_cache_dtype, + is_driver_worker=self.is_driver_worker) + def init_device(self) -> None: self.init_distributed_environment() # Set random seed. diff --git a/vllm/worker/neuron_worker.py b/vllm/worker/neuron_worker.py index 6136d50d0c068..7510d4a50a548 100644 --- a/vllm/worker/neuron_worker.py +++ b/vllm/worker/neuron_worker.py @@ -30,8 +30,11 @@ def __init__( self.device_config = device_config self.cache_config = cache_config - self.model_runner = NeuronModelRunner(model_config, parallel_config, - scheduler_config, device_config) + def init_worker(self) -> None: + self.model_runner = NeuronModelRunner(self.model_config, + self.parallel_config, + self.scheduler_config, + self.device_config) def init_device(self) -> None: # Set random seed. diff --git a/vllm/worker/worker.py b/vllm/worker/worker.py index 3f0b2fd83f3e5..96bdb03be787a 100644 --- a/vllm/worker/worker.py +++ b/vllm/worker/worker.py @@ -62,20 +62,22 @@ def __init__( assert not self.lora_config, ( "To be tested: vision language model with LoRA settings.") - self.model_runner = ModelRunner( - model_config, - parallel_config, - scheduler_config, - device_config, - lora_config=self.lora_config, - kv_cache_dtype=self.cache_config.cache_dtype, - is_driver_worker=is_driver_worker, - vision_language_config=vision_language_config) # Uninitialized cache engine. Will be initialized by # initialize_cache. self.cache_engine = None self.gpu_cache = None + def init_worker(self) -> None: + self.model_runner = ModelRunner( + self.model_config, + self.parallel_config, + self.scheduler_config, + self.device_config, + lora_config=self.lora_config, + kv_cache_dtype=self.cache_config.cache_dtype, + is_driver_worker=self.is_driver_worker, + vision_language_config=self.vision_language_config) + def init_device(self) -> None: if self.device_config.device.type == "cuda": # torch.distributed.all_reduce does not free the input tensor until diff --git a/vllm/worker/worker_base.py b/vllm/worker/worker_base.py index 849bcf55f7dae..1064b9bc75c3a 100644 --- a/vllm/worker/worker_base.py +++ b/vllm/worker/worker_base.py @@ -20,6 +20,14 @@ def update_environment_variables(self, envs: Dict[str, str]) -> None: del os.environ[key] update_environment_variables(envs) + @abstractmethod + def init_worker(self) -> None: + """ + Constructor of the worker should only save construction arguments. + Initialization should be done in this method, for lazy initialization. + """ + raise NotImplementedError + @abstractmethod def init_device(self) -> None: """Initialize device state, such as loading the model or other on-device From c85d040b3bdb509a0d093d46b31c599d50354d27 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Fri, 12 Apr 2024 00:05:38 -0700 Subject: [PATCH 09/38] fix recursion --- vllm/engine/ray_utils.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/vllm/engine/ray_utils.py b/vllm/engine/ray_utils.py index 6ac55a7d0e2f7..f4b4a307f4aa9 100644 --- a/vllm/engine/ray_utils.py +++ b/vllm/engine/ray_utils.py @@ -40,14 +40,12 @@ def init_worker(self): self.worker = worker_class(**self.kwargs) self.worker.init_worker() - def __getattr__(self, name): - if hasattr(self, name): - return getattr(self, name) - return getattr(self.worker, name) - def execute_method(self, method, *args, **kwargs): try: - executor = getattr(self, method) + if hasattr(self, method): + executor = getattr(self, method) + else: + executor = getattr(self.worker, method) return executor(*args, **kwargs) except Exception as e: # exceptions in ray worker may cause deadlock From 5e49b98d0609229716a394e4fd12a36bde65ce23 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Fri, 12 Apr 2024 00:11:46 -0700 Subject: [PATCH 10/38] on the fly local rank calculation --- vllm/executor/ray_gpu_executor.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index ac8050c40284b..0e32c6b2b475e 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -97,7 +97,9 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", driver_ip, get_open_port()) rank = 1 - local_rank = -1 # todo: on-the-fly local rank assignment + local_rank_dict = defaultdict(int) + local_rank_dict[driver_ip] = 1 + local_rank = -1 # placeholder for bundle_id, bundle in enumerate(placement_group.bundle_specs): if not bundle.get("GPU", 0): continue @@ -136,6 +138,8 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", else: # Else, added to the list of workers. self.workers.append(worker) + worker.kwargs["local_rank"] = local_rank_dict[worker_ip] + local_rank_dict[worker_ip] += 1 if self.driver_dummy_worker is None: raise ValueError( @@ -176,7 +180,7 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", # Initialize the driver worker with the Worker class. driver_rank = 0 - driver_local_rank = node_workers[driver_node_id].index(driver_rank) + driver_local_rank = 0 self.driver_worker = Worker( model_config=self.model_config, parallel_config=self.parallel_config, @@ -191,10 +195,9 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", is_driver_worker=True, ) - self._run_workers("init_worker") - self._run_workers("update_environment_variables", all_args=all_args_to_update_environment_variables) + self._run_workers("init_worker") self._run_workers("init_device") self._run_workers( "load_model", From 37ed6c97a5c2d270d00e377fad16b0f266ffd30d Mon Sep 17 00:00:00 2001 From: youkaichao Date: Fri, 12 Apr 2024 00:14:13 -0700 Subject: [PATCH 11/38] post update kwargs --- vllm/engine/ray_utils.py | 3 +++ vllm/executor/ray_gpu_executor.py | 3 ++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/vllm/engine/ray_utils.py b/vllm/engine/ray_utils.py index f4b4a307f4aa9..df7aa347fb3d4 100644 --- a/vllm/engine/ray_utils.py +++ b/vllm/engine/ray_utils.py @@ -40,6 +40,9 @@ def init_worker(self): self.worker = worker_class(**self.kwargs) self.worker.init_worker() + def update_kwargs(self, **kwargs): + self.kwargs.update(kwargs) + def execute_method(self, method, *args, **kwargs): try: if hasattr(self, method): diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index 0e32c6b2b475e..d2891f24f92d4 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -138,7 +138,8 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", else: # Else, added to the list of workers. self.workers.append(worker) - worker.kwargs["local_rank"] = local_rank_dict[worker_ip] + ray.wait( + worker.update_kwargs(local_rank=local_rank_dict[worker_ip])) local_rank_dict[worker_ip] += 1 if self.driver_dummy_worker is None: From b654ee25cb3b359ab3eb8fe40c4d7c178e250097 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Fri, 12 Apr 2024 00:15:07 -0700 Subject: [PATCH 12/38] add remote --- vllm/executor/ray_gpu_executor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index d2891f24f92d4..2634e043e6995 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -139,7 +139,8 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", # Else, added to the list of workers. self.workers.append(worker) ray.wait( - worker.update_kwargs(local_rank=local_rank_dict[worker_ip])) + worker.update_kwargs.remote( + local_rank=local_rank_dict[worker_ip])) local_rank_dict[worker_ip] += 1 if self.driver_dummy_worker is None: From e11448ec3b3a4a0f2e57e740ed89c0d4bbf2e1d4 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Fri, 12 Apr 2024 00:22:51 -0700 Subject: [PATCH 13/38] fix update_environment_variables in ray worker --- vllm/engine/ray_utils.py | 7 ++++++- vllm/executor/ray_gpu_executor.py | 2 +- vllm/worker/worker_base.py | 3 ++- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/vllm/engine/ray_utils.py b/vllm/engine/ray_utils.py index df7aa347fb3d4..6c637f0d8ed32 100644 --- a/vllm/engine/ray_utils.py +++ b/vllm/engine/ray_utils.py @@ -1,10 +1,11 @@ import importlib import pickle -from typing import List, Optional, Tuple +from typing import Dict, List, Optional, Tuple from vllm.config import ParallelConfig from vllm.logger import init_logger from vllm.utils import get_ip, is_hip +from vllm.worker.worker_base import WorkerBase logger = init_logger(__name__) @@ -31,6 +32,10 @@ def __init__(self, # that thread. self.compiled_dag_cuda_device_set = False + @staticmethod + def update_environment_variables(cls, envs: Dict[str, str]) -> None: + WorkerBase.update_environment_variables(cls, envs) + def init_worker(self): if self.init_cached_hf_modules: from transformers.dynamic_module_utils import init_hf_modules diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index 2634e043e6995..6602e533cd47c 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -138,7 +138,7 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", else: # Else, added to the list of workers. self.workers.append(worker) - ray.wait( + ray.get( worker.update_kwargs.remote( local_rank=local_rank_dict[worker_ip])) local_rank_dict[worker_ip] += 1 diff --git a/vllm/worker/worker_base.py b/vllm/worker/worker_base.py index 1064b9bc75c3a..f9aa00eb5dbd4 100644 --- a/vllm/worker/worker_base.py +++ b/vllm/worker/worker_base.py @@ -12,7 +12,8 @@ class WorkerBase(ABC): different hardware. """ - def update_environment_variables(self, envs: Dict[str, str]) -> None: + @staticmethod + def update_environment_variables(cls, envs: Dict[str, str]) -> None: key = 'CUDA_VISIBLE_DEVICES' if key in envs and key in os.environ: # overwriting CUDA_VISIBLE_DEVICES is desired behavior From 97e660148419ea722db02421c84e350bff018ee1 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Fri, 12 Apr 2024 00:25:23 -0700 Subject: [PATCH 14/38] use staticmethod --- vllm/engine/ray_utils.py | 4 ++-- vllm/worker/worker_base.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/vllm/engine/ray_utils.py b/vllm/engine/ray_utils.py index 6c637f0d8ed32..e214cb0671ea6 100644 --- a/vllm/engine/ray_utils.py +++ b/vllm/engine/ray_utils.py @@ -33,8 +33,8 @@ def __init__(self, self.compiled_dag_cuda_device_set = False @staticmethod - def update_environment_variables(cls, envs: Dict[str, str]) -> None: - WorkerBase.update_environment_variables(cls, envs) + def update_environment_variables(envs: Dict[str, str]) -> None: + WorkerBase.update_environment_variables(envs) def init_worker(self): if self.init_cached_hf_modules: diff --git a/vllm/worker/worker_base.py b/vllm/worker/worker_base.py index f9aa00eb5dbd4..8adf0121a09cf 100644 --- a/vllm/worker/worker_base.py +++ b/vllm/worker/worker_base.py @@ -13,7 +13,7 @@ class WorkerBase(ABC): """ @staticmethod - def update_environment_variables(cls, envs: Dict[str, str]) -> None: + def update_environment_variables(envs: Dict[str, str]) -> None: key = 'CUDA_VISIBLE_DEVICES' if key in envs and key in os.environ: # overwriting CUDA_VISIBLE_DEVICES is desired behavior From fd2cbe24f4b9d19000cb9b0b2a84964dd30f9e91 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Fri, 12 Apr 2024 00:33:21 -0700 Subject: [PATCH 15/38] fix dummy worker local_rank --- vllm/executor/ray_gpu_executor.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index 6602e533cd47c..e3ee11fc2eb75 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -138,10 +138,11 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", else: # Else, added to the list of workers. self.workers.append(worker) - ray.get( - worker.update_kwargs.remote( - local_rank=local_rank_dict[worker_ip])) - local_rank_dict[worker_ip] += 1 + # don't update local_rank for the dummy worker + ray.get( + worker.update_kwargs.remote( + local_rank=local_rank_dict[worker_ip])) + local_rank_dict[worker_ip] += 1 if self.driver_dummy_worker is None: raise ValueError( From a8d75045df18d721e76042b9b1e13d6c368a2411 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Fri, 12 Apr 2024 00:41:23 -0700 Subject: [PATCH 16/38] fix dummy worker rank --- vllm/executor/ray_gpu_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index e3ee11fc2eb75..9159260d5cc87 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -128,7 +128,6 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", lora_config=lora_config, vision_language_config=vision_language_config, ) - rank += 1 worker_ip = ray.get(worker.get_node_ip.remote()) if worker_ip == driver_ip and self.driver_dummy_worker is None: @@ -139,6 +138,7 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", # Else, added to the list of workers. self.workers.append(worker) # don't update local_rank for the dummy worker + rank += 1 ray.get( worker.update_kwargs.remote( local_rank=local_rank_dict[worker_ip])) From e659635a547f3e0fcefa31cfc9060ec80b715705 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Fri, 12 Apr 2024 10:01:25 -0700 Subject: [PATCH 17/38] add WorkerWrapperBase --- vllm/engine/ray_utils.py | 51 +++--------------------------- vllm/worker/worker_base.py | 64 ++++++++++++++++++++++++++++++++------ 2 files changed, 60 insertions(+), 55 deletions(-) diff --git a/vllm/engine/ray_utils.py b/vllm/engine/ray_utils.py index e214cb0671ea6..0438952f7799f 100644 --- a/vllm/engine/ray_utils.py +++ b/vllm/engine/ray_utils.py @@ -1,69 +1,28 @@ -import importlib import pickle -from typing import Dict, List, Optional, Tuple +from typing import List, Optional, Tuple from vllm.config import ParallelConfig from vllm.logger import init_logger from vllm.utils import get_ip, is_hip -from vllm.worker.worker_base import WorkerBase +from vllm.worker.worker_base import WorkerWrapperBase logger = init_logger(__name__) try: import ray - class RayWorkerVllm: + class RayWorkerVllm(WorkerWrapperBase): """Ray wrapper for vllm.worker.Worker, allowing Worker to be lazliy initialized after Ray sets CUDA_VISIBLE_DEVICES.""" - def __init__(self, - init_cached_hf_modules=False, - worker_module=None, - worker_class_name=None, - **kwargs) -> None: - self.worker_module = worker_module - self.worker_class_name = worker_class_name - self.kwargs = kwargs - self.init_cached_hf_modules = init_cached_hf_modules - self.worker = None + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) # Since the compiled DAG runs a main execution # in a different thread that calls cuda.set_device. # The flag indicates is set_device is called on # that thread. self.compiled_dag_cuda_device_set = False - @staticmethod - def update_environment_variables(envs: Dict[str, str]) -> None: - WorkerBase.update_environment_variables(envs) - - def init_worker(self): - if self.init_cached_hf_modules: - from transformers.dynamic_module_utils import init_hf_modules - init_hf_modules() - mod = importlib.import_module(self.worker_module) - worker_class = getattr(mod, self.worker_class_name) - self.worker = worker_class(**self.kwargs) - self.worker.init_worker() - - def update_kwargs(self, **kwargs): - self.kwargs.update(kwargs) - - def execute_method(self, method, *args, **kwargs): - try: - if hasattr(self, method): - executor = getattr(self, method) - else: - executor = getattr(self.worker, method) - return executor(*args, **kwargs) - except Exception as e: - # exceptions in ray worker may cause deadlock - # see https://github.com/vllm-project/vllm/issues/3455 - # print the error and inform the user to solve the error - msg = (f"Error executing method {method}. " - "This might cause deadlock in distributed execution.") - logger.exception(msg) - raise e - def get_node_ip(self) -> str: return get_ip() diff --git a/vllm/worker/worker_base.py b/vllm/worker/worker_base.py index 8adf0121a09cf..703283902dd3f 100644 --- a/vllm/worker/worker_base.py +++ b/vllm/worker/worker_base.py @@ -1,26 +1,21 @@ +import importlib import os from abc import ABC, abstractmethod from typing import Dict, List +from vllm.logger import init_logger from vllm.lora.request import LoRARequest from vllm.sequence import SamplerOutput, SequenceGroupMetadata from vllm.utils import update_environment_variables +logger = init_logger(__name__) + class WorkerBase(ABC): """Worker interface that allows vLLM to cleanly separate implementations for different hardware. """ - @staticmethod - def update_environment_variables(envs: Dict[str, str]) -> None: - key = 'CUDA_VISIBLE_DEVICES' - if key in envs and key in os.environ: - # overwriting CUDA_VISIBLE_DEVICES is desired behavior - # suppress the warning in `update_environment_variables` - del os.environ[key] - update_environment_variables(envs) - @abstractmethod def init_worker(self) -> None: """ @@ -100,3 +95,54 @@ def remove_lora(self, lora_id: int) -> bool: def list_loras(self) -> List[int]: raise ValueError(f"{type(self)} does not support LoRA") + + +class WorkerWrapperBase: + """ + The whole point of this class is to lazily initialize the worker. + We first instantiate the WorkerWrapper, which remembers the worker module + and class name. Then, when we call `update_environment_variables`, and the + real initialization happens in `init_worker`. + """ + + def __init__(self, + init_cached_hf_modules=False, + worker_module_name=None, + worker_class_name=None) -> None: + self.worker_module_name = worker_module_name + self.worker_class_name = worker_class_name + self.init_cached_hf_modules = init_cached_hf_modules + self.worker = None + + def update_environment_variables(self, envs: Dict[str, str]) -> None: + key = 'CUDA_VISIBLE_DEVICES' + if key in envs and key in os.environ: + # overwriting CUDA_VISIBLE_DEVICES is desired behavior + # suppress the warning in `update_environment_variables` + del os.environ[key] + update_environment_variables(envs) + + def init_worker(self, *args, **kwargs): + if self.init_cached_hf_modules: + from transformers.dynamic_module_utils import init_hf_modules + init_hf_modules() + mod = importlib.import_module(self.worker_module_name) + worker_class = getattr(mod, self.worker_class_name) + self.worker = worker_class(*args, **self.kwargs) + + def execute_method(self, method, *args, **kwargs): + try: + if hasattr(self, method): + executor = getattr(self, method) + else: + executor = getattr(self.worker, method) + return executor(*args, **kwargs) + except Exception as e: + # if the driver worker also execute methods, + # exceptions in the rest worker may cause deadlock in rpc like ray + # see https://github.com/vllm-project/vllm/issues/3455 + # print the error and inform the user to solve the error + msg = (f"Error executing method {method}. " + "This might cause deadlock in distributed execution.") + logger.exception(msg) + raise e From 778fb3f1c81bb9a1ed2b5091ee27d8716912fb7e Mon Sep 17 00:00:00 2001 From: youkaichao Date: Fri, 12 Apr 2024 10:13:50 -0700 Subject: [PATCH 18/38] add all_args to _run_workers --- vllm/executor/ray_gpu_executor.py | 92 ++++++++++++++++--------------- 1 file changed, 49 insertions(+), 43 deletions(-) diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index 9159260d5cc87..eb00968fc2236 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -83,23 +83,8 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", # The remaining workers are the actual ray actors. self.workers: List[RayWorkerVllm] = [] - model_config = copy.deepcopy(self.model_config) - parallel_config = copy.deepcopy(self.parallel_config) - scheduler_config = copy.deepcopy(self.scheduler_config) - device_config = copy.deepcopy(self.device_config) - lora_config = copy.deepcopy(self.lora_config) - cache_config = copy.deepcopy(self.cache_config) - vision_language_config = copy.deepcopy(self.vision_language_config) - # Create the workers. driver_ip = get_ip() - distributed_init_method = get_distributed_init_method( - driver_ip, get_open_port()) - - rank = 1 - local_rank_dict = defaultdict(int) - local_rank_dict[driver_ip] = 1 - local_rank = -1 # placeholder for bundle_id, bundle in enumerate(placement_group.bundle_specs): if not bundle.get("GPU", 0): continue @@ -113,21 +98,7 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", num_gpus=num_gpus, scheduling_strategy=scheduling_strategy, **ray_remote_kwargs, - )(RayWorkerVllm).remote( - init_cached_hf_modules=self.model_config.trust_remote_code, - worker_module="vllm.worker.worker", - worker_class_name="Worker", - model_config=model_config, - parallel_config=parallel_config, - scheduler_config=scheduler_config, - device_config=device_config, - cache_config=cache_config, - local_rank=local_rank, - rank=rank, - distributed_init_method=distributed_init_method, - lora_config=lora_config, - vision_language_config=vision_language_config, - ) + )(RayWorkerVllm).remote(self.model_config.trust_remote_code) worker_ip = ray.get(worker.get_node_ip.remote()) if worker_ip == driver_ip and self.driver_dummy_worker is None: @@ -137,12 +108,6 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", else: # Else, added to the list of workers. self.workers.append(worker) - # don't update local_rank for the dummy worker - rank += 1 - ray.get( - worker.update_kwargs.remote( - local_rank=local_rank_dict[worker_ip])) - local_rank_dict[worker_ip] += 1 if self.driver_dummy_worker is None: raise ValueError( @@ -176,14 +141,47 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", "CUDA_VISIBLE_DEVICES": ",".join(map(str, node_gpus[node_id])) }]) + self._run_workers("update_environment_variables", + all_args=all_args_to_update_environment_variables) + # self._run_workers("init_worker") # TODO, fix args + distributed_init_method = get_distributed_init_method( + driver_ip, get_open_port()) # Lazy import the Worker to avoid importing torch.cuda/xformers # before CUDA_VISIBLE_DEVICES is set in the Worker from vllm.worker.worker import Worker + model_config = copy.deepcopy(self.model_config) + parallel_config = copy.deepcopy(self.parallel_config) + scheduler_config = copy.deepcopy(self.scheduler_config) + device_config = copy.deepcopy(self.device_config) + lora_config = copy.deepcopy(self.lora_config) + cache_config = copy.deepcopy(self.cache_config) + vision_language_config = copy.deepcopy(self.vision_language_config) + + # Initialize the actual workers with the Worker class. + for rank, (worker, (node_id, _)) in enumerate( + zip(self.workers, worker_node_and_gpu_ids), + start=1, + ): + local_rank = node_workers[node_id].index(rank) + worker.init_worker.remote( + lambda rank=rank, local_rank=local_rank: Worker( + model_config=model_config, + parallel_config=parallel_config, + scheduler_config=scheduler_config, + device_config=device_config, + cache_config=cache_config, + local_rank=local_rank, + rank=rank, + distributed_init_method=distributed_init_method, + lora_config=lora_config, + vision_language_config=vision_language_config, + )) + # Initialize the driver worker with the Worker class. driver_rank = 0 - driver_local_rank = 0 + driver_local_rank = node_workers[driver_node_id].index(driver_rank) self.driver_worker = Worker( model_config=self.model_config, parallel_config=self.parallel_config, @@ -198,9 +196,6 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", is_driver_worker=True, ) - self._run_workers("update_environment_variables", - all_args=all_args_to_update_environment_variables) - self._run_workers("init_worker") self._run_workers("init_device") self._run_workers( "load_model", @@ -291,6 +286,7 @@ def _run_workers( driver_kwargs: Optional[Dict[str, Any]] = None, all_args: Optional[List[List[Any]]] = None, all_kwargs: Optional[List[Dict[str, Any]]] = None, + use_dummy_driver: bool = False, max_concurrent_workers: Optional[int] = None, use_ray_compiled_dag: bool = False, **kwargs, @@ -326,10 +322,20 @@ def _run_workers( ) in zip(self.workers, all_args[1:], all_kwargs[1:]) ] - # Start the driver worker after all the ray workers. - driver_worker_output = getattr(self.driver_worker, - method)(*all_args[0], **all_kwargs[0]) + if driver_args is None: + driver_args = args + if driver_kwargs is None: + driver_kwargs = kwargs + # Start the driver worker after all the ray workers. + if not use_dummy_driver: + driver_worker_output = getattr(self.driver_worker, + method)(*all_args[0], + **all_kwargs[0]) + else: + driver_worker_output = ray.get( + self.driver_dummy_worker.execute_method.remote( + method, *all_args[0], **all_kwargs[0])) # Get the results of the ray workers. if self.workers: if use_ray_compiled_dag: From d2951075859465dbe2c33752896d3e8466a6d6ee Mon Sep 17 00:00:00 2001 From: youkaichao Date: Fri, 12 Apr 2024 10:29:55 -0700 Subject: [PATCH 19/38] refactor --- vllm/executor/ray_gpu_executor.py | 65 ++++++++++++++++++------------- 1 file changed, 38 insertions(+), 27 deletions(-) diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index eb00968fc2236..9fed2949aa597 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -98,13 +98,22 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", num_gpus=num_gpus, scheduling_strategy=scheduling_strategy, **ray_remote_kwargs, - )(RayWorkerVllm).remote(self.model_config.trust_remote_code) + )(RayWorkerVllm).remote( + init_cached_hf_modules=self.model_config.trust_remote_code, + worker_module_name="vllm.worker.worker", + worker_class_name="Worker", + ) worker_ip = ray.get(worker.get_node_ip.remote()) if worker_ip == driver_ip and self.driver_dummy_worker is None: # If the worker is on the same node as the driver, we use it # as the resource holder for the driver process. self.driver_dummy_worker = worker + self.driver_worker = RayWorkerVllm( + init_cached_hf_modules=self.model_config.trust_remote_code, + worker_module_name="vllm.worker.worker", + worker_class_name="Worker", + ) else: # Else, added to the list of workers. self.workers.append(worker) @@ -116,10 +125,9 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", "GPU node.") # Get the set of GPU IDs used on each node. - driver_node_id, driver_gpu_ids = ray.get( - self.driver_dummy_worker.get_node_and_gpu_ids.remote()) - worker_node_and_gpu_ids = ray.get( - [worker.get_node_and_gpu_ids.remote() for worker in self.workers]) + (driver_node_id, + driver_gpu_ids), *worker_node_and_gpu_ids = self._run_workers( + "get_node_and_gpu_ids", use_dummy_driver=True) node_workers = defaultdict(list) node_gpus = defaultdict(list) @@ -147,9 +155,28 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", distributed_init_method = get_distributed_init_method( driver_ip, get_open_port()) - # Lazy import the Worker to avoid importing torch.cuda/xformers - # before CUDA_VISIBLE_DEVICES is set in the Worker - from vllm.worker.worker import Worker + def collect_arg_helper_func(**kwargs): + # avoid writing `{"name": value}` manually + return kwargs + + init_worker_all_kwargs = [] + # Initialize the driver worker with the Worker class. + driver_rank = 0 + driver_local_rank = node_workers[driver_node_id].index(driver_rank) + init_worker_all_kwargs.append( + collect_arg_helper_func( + model_config=self.model_config, + parallel_config=self.parallel_config, + scheduler_config=self.scheduler_config, + device_config=self.device_config, + cache_config=self.cache_config, + local_rank=driver_local_rank, + rank=driver_rank, + distributed_init_method=distributed_init_method, + lora_config=self.lora_config, + vision_language_config=self.vision_language_config, + is_driver_worker=True, + )) model_config = copy.deepcopy(self.model_config) parallel_config = copy.deepcopy(self.parallel_config) @@ -165,8 +192,8 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", start=1, ): local_rank = node_workers[node_id].index(rank) - worker.init_worker.remote( - lambda rank=rank, local_rank=local_rank: Worker( + init_worker_all_kwargs.append( + collect_arg_helper_func( model_config=model_config, parallel_config=parallel_config, scheduler_config=scheduler_config, @@ -178,23 +205,7 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", lora_config=lora_config, vision_language_config=vision_language_config, )) - - # Initialize the driver worker with the Worker class. - driver_rank = 0 - driver_local_rank = node_workers[driver_node_id].index(driver_rank) - self.driver_worker = Worker( - model_config=self.model_config, - parallel_config=self.parallel_config, - scheduler_config=self.scheduler_config, - device_config=self.device_config, - cache_config=self.cache_config, - local_rank=driver_local_rank, - rank=driver_rank, - distributed_init_method=distributed_init_method, - lora_config=self.lora_config, - vision_language_config=self.vision_language_config, - is_driver_worker=True, - ) + self._run_workers("init_worker", all_kwargs=init_worker_all_kwargs) self._run_workers("init_device") self._run_workers( From 7ca22a4f32f7a7f65475922d2e9b715a1a952a25 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Fri, 12 Apr 2024 10:32:45 -0700 Subject: [PATCH 20/38] fix dangling self --- vllm/worker/worker_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/worker/worker_base.py b/vllm/worker/worker_base.py index 703283902dd3f..363403e03130f 100644 --- a/vllm/worker/worker_base.py +++ b/vllm/worker/worker_base.py @@ -128,7 +128,7 @@ def init_worker(self, *args, **kwargs): init_hf_modules() mod = importlib.import_module(self.worker_module_name) worker_class = getattr(mod, self.worker_class_name) - self.worker = worker_class(*args, **self.kwargs) + self.worker = worker_class(*args, **kwargs) def execute_method(self, method, *args, **kwargs): try: From 5f6c8f3ce87c2ad18d2e2d54ec516187380b0bf9 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Fri, 12 Apr 2024 10:34:18 -0700 Subject: [PATCH 21/38] fix execute_method in driver worker --- vllm/executor/ray_gpu_executor.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index 9fed2949aa597..191e1036ae23c 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -340,9 +340,8 @@ def _run_workers( # Start the driver worker after all the ray workers. if not use_dummy_driver: - driver_worker_output = getattr(self.driver_worker, - method)(*all_args[0], - **all_kwargs[0]) + driver_worker_output = self.driver_worker.execute_method( + method, *all_args[0], **all_kwargs[0]) else: driver_worker_output = ray.get( self.driver_dummy_worker.execute_method.remote( From 13de66e080c787cc088d7d1b0e15585d08ef1950 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Fri, 12 Apr 2024 10:36:36 -0700 Subject: [PATCH 22/38] withdraw changes in many workers --- vllm/worker/cpu_worker.py | 18 +++++++----------- vllm/worker/neuron_worker.py | 7 ++----- vllm/worker/worker.py | 20 +++++++++----------- 3 files changed, 18 insertions(+), 27 deletions(-) diff --git a/vllm/worker/cpu_worker.py b/vllm/worker/cpu_worker.py index af8bc83fda8f6..751384eb72af3 100644 --- a/vllm/worker/cpu_worker.py +++ b/vllm/worker/cpu_worker.py @@ -147,23 +147,19 @@ def __init__( self.is_driver_worker = is_driver_worker if self.is_driver_worker: assert self.rank == 0, "The driver worker must have rank 0." - self.kv_cache_dtype = kv_cache_dtype + self.model_runner = CPUModelRunner(model_config, + parallel_config, + scheduler_config, + device_config, + lora_config=self.lora_config, + kv_cache_dtype=kv_cache_dtype, + is_driver_worker=is_driver_worker) # Uninitialized cache engine. Will be initialized by # initialize_cache. self.cache_engine = None self.cpu_cache = None - def init_worker(self) -> None: - self.model_runner = CPUModelRunner( - self.model_config, - self.parallel_config, - self.scheduler_config, - self.device_config, - lora_config=self.lora_config, - kv_cache_dtype=self.kv_cache_dtype, - is_driver_worker=self.is_driver_worker) - def init_device(self) -> None: self.init_distributed_environment() # Set random seed. diff --git a/vllm/worker/neuron_worker.py b/vllm/worker/neuron_worker.py index 7510d4a50a548..6136d50d0c068 100644 --- a/vllm/worker/neuron_worker.py +++ b/vllm/worker/neuron_worker.py @@ -30,11 +30,8 @@ def __init__( self.device_config = device_config self.cache_config = cache_config - def init_worker(self) -> None: - self.model_runner = NeuronModelRunner(self.model_config, - self.parallel_config, - self.scheduler_config, - self.device_config) + self.model_runner = NeuronModelRunner(model_config, parallel_config, + scheduler_config, device_config) def init_device(self) -> None: # Set random seed. diff --git a/vllm/worker/worker.py b/vllm/worker/worker.py index 96bdb03be787a..3f0b2fd83f3e5 100644 --- a/vllm/worker/worker.py +++ b/vllm/worker/worker.py @@ -62,22 +62,20 @@ def __init__( assert not self.lora_config, ( "To be tested: vision language model with LoRA settings.") + self.model_runner = ModelRunner( + model_config, + parallel_config, + scheduler_config, + device_config, + lora_config=self.lora_config, + kv_cache_dtype=self.cache_config.cache_dtype, + is_driver_worker=is_driver_worker, + vision_language_config=vision_language_config) # Uninitialized cache engine. Will be initialized by # initialize_cache. self.cache_engine = None self.gpu_cache = None - def init_worker(self) -> None: - self.model_runner = ModelRunner( - self.model_config, - self.parallel_config, - self.scheduler_config, - self.device_config, - lora_config=self.lora_config, - kv_cache_dtype=self.cache_config.cache_dtype, - is_driver_worker=self.is_driver_worker, - vision_language_config=self.vision_language_config) - def init_device(self) -> None: if self.device_config.device.type == "cuda": # torch.distributed.all_reduce does not free the input tensor until From 32ef3bbbce58c0cadd94f1f5f2ed82573933b392 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Fri, 12 Apr 2024 10:38:54 -0700 Subject: [PATCH 23/38] no need for init_worker in workerbase --- vllm/worker/worker_base.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/vllm/worker/worker_base.py b/vllm/worker/worker_base.py index 363403e03130f..c715a03f8c176 100644 --- a/vllm/worker/worker_base.py +++ b/vllm/worker/worker_base.py @@ -16,14 +16,6 @@ class WorkerBase(ABC): different hardware. """ - @abstractmethod - def init_worker(self) -> None: - """ - Constructor of the worker should only save construction arguments. - Initialization should be done in this method, for lazy initialization. - """ - raise NotImplementedError - @abstractmethod def init_device(self) -> None: """Initialize device state, such as loading the model or other on-device From 221f626373494bd444146ff02f601f81f1206ce0 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Fri, 12 Apr 2024 10:45:40 -0700 Subject: [PATCH 24/38] unify worker_node_and_gpu_ids --- vllm/executor/ray_gpu_executor.py | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index 191e1036ae23c..b058a299f52c7 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -125,17 +125,13 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", "GPU node.") # Get the set of GPU IDs used on each node. - (driver_node_id, - driver_gpu_ids), *worker_node_and_gpu_ids = self._run_workers( - "get_node_and_gpu_ids", use_dummy_driver=True) + worker_node_and_gpu_ids = self._run_workers("get_node_and_gpu_ids", + use_dummy_driver=True) node_workers = defaultdict(list) node_gpus = defaultdict(list) - node_workers[driver_node_id].append(0) - node_gpus[driver_node_id].extend(driver_gpu_ids) - for i, (node_id, gpu_ids) in enumerate(worker_node_and_gpu_ids, - start=1): + for i, (node_id, gpu_ids) in enumerate(worker_node_and_gpu_ids): node_workers[node_id].append(i) node_gpus[node_id].extend(gpu_ids) for node_id, gpu_ids in node_gpus.items(): @@ -143,8 +139,7 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", # Set CUDA_VISIBLE_DEVICES for the driver and workers. all_args_to_update_environment_variables = [] - for (node_id, _) in [(driver_node_id, driver_gpu_ids) - ] + worker_node_and_gpu_ids: + for (node_id, _) in worker_node_and_gpu_ids: all_args_to_update_environment_variables.append([{ "CUDA_VISIBLE_DEVICES": ",".join(map(str, node_gpus[node_id])) @@ -162,7 +157,7 @@ def collect_arg_helper_func(**kwargs): init_worker_all_kwargs = [] # Initialize the driver worker with the Worker class. driver_rank = 0 - driver_local_rank = node_workers[driver_node_id].index(driver_rank) + driver_local_rank = node_workers[driver_ip].index(driver_rank) init_worker_all_kwargs.append( collect_arg_helper_func( model_config=self.model_config, @@ -188,7 +183,7 @@ def collect_arg_helper_func(**kwargs): # Initialize the actual workers with the Worker class. for rank, (worker, (node_id, _)) in enumerate( - zip(self.workers, worker_node_and_gpu_ids), + zip(self.workers, worker_node_and_gpu_ids[1:]), start=1, ): local_rank = node_workers[node_id].index(rank) From 0087773be8b66ce92fd907aaf4554493d0cbe3a1 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Fri, 12 Apr 2024 10:49:28 -0700 Subject: [PATCH 25/38] use id rather than ip --- vllm/executor/ray_gpu_executor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index b058a299f52c7..44c7bfcc7f8ca 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -157,7 +157,8 @@ def collect_arg_helper_func(**kwargs): init_worker_all_kwargs = [] # Initialize the driver worker with the Worker class. driver_rank = 0 - driver_local_rank = node_workers[driver_ip].index(driver_rank) + driver_local_rank = node_workers[worker_node_and_gpu_ids[0][0]].index( + driver_rank) init_worker_all_kwargs.append( collect_arg_helper_func( model_config=self.model_config, From 36a185e448c5f9959bcb1a08572a4bddcbfda834 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Fri, 12 Apr 2024 10:53:48 -0700 Subject: [PATCH 26/38] unify init --- vllm/executor/ray_gpu_executor.py | 50 +++++++------------------------ 1 file changed, 11 insertions(+), 39 deletions(-) diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index 44c7bfcc7f8ca..3418aeff9ea1d 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -155,51 +155,23 @@ def collect_arg_helper_func(**kwargs): return kwargs init_worker_all_kwargs = [] - # Initialize the driver worker with the Worker class. - driver_rank = 0 - driver_local_rank = node_workers[worker_node_and_gpu_ids[0][0]].index( - driver_rank) - init_worker_all_kwargs.append( - collect_arg_helper_func( - model_config=self.model_config, - parallel_config=self.parallel_config, - scheduler_config=self.scheduler_config, - device_config=self.device_config, - cache_config=self.cache_config, - local_rank=driver_local_rank, - rank=driver_rank, - distributed_init_method=distributed_init_method, - lora_config=self.lora_config, - vision_language_config=self.vision_language_config, - is_driver_worker=True, - )) - - model_config = copy.deepcopy(self.model_config) - parallel_config = copy.deepcopy(self.parallel_config) - scheduler_config = copy.deepcopy(self.scheduler_config) - device_config = copy.deepcopy(self.device_config) - lora_config = copy.deepcopy(self.lora_config) - cache_config = copy.deepcopy(self.cache_config) - vision_language_config = copy.deepcopy(self.vision_language_config) - - # Initialize the actual workers with the Worker class. - for rank, (worker, (node_id, _)) in enumerate( - zip(self.workers, worker_node_and_gpu_ids[1:]), - start=1, - ): + + # Initialize the actual workers inside worker wrapper. + for rank, (node_id, _) in enumerate(worker_node_and_gpu_ids, ): local_rank = node_workers[node_id].index(rank) init_worker_all_kwargs.append( collect_arg_helper_func( - model_config=model_config, - parallel_config=parallel_config, - scheduler_config=scheduler_config, - device_config=device_config, - cache_config=cache_config, + model_config=self.model_config, + parallel_config=self.parallel_config, + scheduler_config=self.scheduler_config, + device_config=self.device_config, + cache_config=self.cache_config, local_rank=local_rank, rank=rank, distributed_init_method=distributed_init_method, - lora_config=lora_config, - vision_language_config=vision_language_config, + lora_config=self.lora_config, + vision_language_config=self.vision_language_config, + is_driver_worker=rank == 0, )) self._run_workers("init_worker", all_kwargs=init_worker_all_kwargs) From 95ca917309cbf87ff8e0a8e2b899b244e7740bde Mon Sep 17 00:00:00 2001 From: youkaichao Date: Fri, 12 Apr 2024 10:54:49 -0700 Subject: [PATCH 27/38] fix lint --- vllm/executor/ray_gpu_executor.py | 1 - 1 file changed, 1 deletion(-) diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index 3418aeff9ea1d..4a502f925d4c0 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -1,5 +1,4 @@ import asyncio -import copy import os import pickle from collections import defaultdict From ea5f2a5d8907f2b72ea085954605d1612bde3a81 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Fri, 12 Apr 2024 10:57:33 -0700 Subject: [PATCH 28/38] finish todo --- vllm/executor/ray_gpu_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index 4a502f925d4c0..f7b8f7ccfdd0b 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -145,7 +145,7 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", }]) self._run_workers("update_environment_variables", all_args=all_args_to_update_environment_variables) - # self._run_workers("init_worker") # TODO, fix args + distributed_init_method = get_distributed_init_method( driver_ip, get_open_port()) From d10ca88093970195d99339bed440d75fbd38915c Mon Sep 17 00:00:00 2001 From: youkaichao Date: Fri, 12 Apr 2024 11:26:36 -0700 Subject: [PATCH 29/38] rename to RayWorkerWrapper --- vllm/engine/ray_utils.py | 4 ++-- vllm/executor/ray_gpu_executor.py | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/vllm/engine/ray_utils.py b/vllm/engine/ray_utils.py index 0438952f7799f..fc5f5e355c950 100644 --- a/vllm/engine/ray_utils.py +++ b/vllm/engine/ray_utils.py @@ -11,7 +11,7 @@ try: import ray - class RayWorkerVllm(WorkerWrapperBase): + class RayWorkerWrapper(WorkerWrapperBase): """Ray wrapper for vllm.worker.Worker, allowing Worker to be lazliy initialized after Ray sets CUDA_VISIBLE_DEVICES.""" @@ -47,7 +47,7 @@ def execute_model_compiled_dag_remote(self, ignored): "For distributed inference, please install Ray with " "`pip install ray`.") ray = None - RayWorkerVllm = None + RayWorkerWrapper = None def initialize_ray_cluster( diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index f7b8f7ccfdd0b..422d9a9c2020a 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -7,7 +7,7 @@ from vllm.config import (CacheConfig, DeviceConfig, LoRAConfig, ModelConfig, ParallelConfig, SchedulerConfig, SpeculativeConfig, VisionLanguageConfig) -from vllm.engine.ray_utils import RayWorkerVllm, ray +from vllm.engine.ray_utils import RayWorkerWrapper, ray from vllm.executor.executor_base import ExecutorAsyncBase, ExecutorBase from vllm.logger import init_logger from vllm.lora.request import LoRARequest @@ -78,9 +78,9 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", # The driver dummy worker does not actually use any resources. # It holds the resource for the driver worker. - self.driver_dummy_worker: RayWorkerVllm = None + self.driver_dummy_worker: RayWorkerWrapper = None # The remaining workers are the actual ray actors. - self.workers: List[RayWorkerVllm] = [] + self.workers: List[RayWorkerWrapper] = [] # Create the workers. driver_ip = get_ip() @@ -97,7 +97,7 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", num_gpus=num_gpus, scheduling_strategy=scheduling_strategy, **ray_remote_kwargs, - )(RayWorkerVllm).remote( + )(RayWorkerWrapper).remote( init_cached_hf_modules=self.model_config.trust_remote_code, worker_module_name="vllm.worker.worker", worker_class_name="Worker", @@ -108,7 +108,7 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", # If the worker is on the same node as the driver, we use it # as the resource holder for the driver process. self.driver_dummy_worker = worker - self.driver_worker = RayWorkerVllm( + self.driver_worker = RayWorkerWrapper( init_cached_hf_modules=self.model_config.trust_remote_code, worker_module_name="vllm.worker.worker", worker_class_name="Worker", From eb27be90458e983fa92be1928b94e5daba424c39 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Tue, 16 Apr 2024 14:52:55 -0700 Subject: [PATCH 30/38] fix mypy typing --- vllm/engine/ray_utils.py | 6 +++--- vllm/executor/ray_gpu_executor.py | 12 ++++++++---- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/vllm/engine/ray_utils.py b/vllm/engine/ray_utils.py index b0515fdb4d3d2..febae42b84549 100644 --- a/vllm/engine/ray_utils.py +++ b/vllm/engine/ray_utils.py @@ -1,5 +1,5 @@ import pickle -from typing import Callable, List, Optional, Tuple +from typing import List, Optional, Tuple from vllm.config import ParallelConfig from vllm.logger import init_logger @@ -46,8 +46,8 @@ def execute_model_compiled_dag_remote(self, ignored): logger.warning(f"Failed to import Ray with {e!r}. " "For distributed inference, please install Ray with " "`pip install ray`.") - ray = None # type: ignore - RayWorkerWrapper = None # type: ignore + ray = None # type: ignore + RayWorkerWrapper = None # type: ignore def initialize_ray_cluster( diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index f24c86902568f..e535eb10514aa 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -4,9 +4,6 @@ from collections import defaultdict from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple -from vllm.config import (CacheConfig, DeviceConfig, LoRAConfig, ModelConfig, - ParallelConfig, SchedulerConfig, SpeculativeConfig, - VisionLanguageConfig) from vllm.engine.ray_utils import RayWorkerWrapper, ray from vllm.executor.executor_base import ExecutorAsyncBase, ExecutorBase from vllm.logger import init_logger @@ -263,7 +260,7 @@ def _run_workers( self, method: str, *args, - driver_args: Optional[Tuple[Any, ...]] = None, + driver_args: Optional[Tuple[Any]] = None, driver_kwargs: Optional[Dict[str, Any]] = None, all_args: Optional[List[List[Any]]] = None, all_kwargs: Optional[List[Dict[str, Any]]] = None, @@ -281,11 +278,18 @@ def _run_workers( if driver_kwargs is None: driver_kwargs = kwargs + # for mypy type checking + assert driver_args is not None + assert driver_kwargs is not None if all_args is None: all_args = [driver_args] + [args] * len(self.workers) if all_kwargs is None: all_kwargs = [driver_kwargs] + [kwargs] * len(self.workers) + # for mypy type checking + assert all_args is not None + assert all_kwargs is not None + if max_concurrent_workers: raise NotImplementedError( "max_concurrent_workers is not supported yet.") From 74deb444c1a694b940720806e8b318da562273f5 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Tue, 16 Apr 2024 15:01:02 -0700 Subject: [PATCH 31/38] move init hf decision to each worker --- vllm/executor/ray_gpu_executor.py | 2 -- vllm/worker/cpu_worker.py | 4 ++++ vllm/worker/neuron_worker.py | 4 ++++ vllm/worker/worker.py | 5 +++++ vllm/worker/worker_base.py | 5 ----- 5 files changed, 13 insertions(+), 7 deletions(-) diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index e535eb10514aa..24440bfeb90e8 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -97,7 +97,6 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", scheduling_strategy=scheduling_strategy, **ray_remote_kwargs, )(RayWorkerWrapper).remote( - init_cached_hf_modules=self.model_config.trust_remote_code, worker_module_name="vllm.worker.worker", worker_class_name="Worker", ) @@ -108,7 +107,6 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", # as the resource holder for the driver process. self.driver_dummy_worker = worker self.driver_worker = RayWorkerWrapper( - init_cached_hf_modules=self.model_config.trust_remote_code, worker_module_name="vllm.worker.worker", worker_class_name="Worker", ) diff --git a/vllm/worker/cpu_worker.py b/vllm/worker/cpu_worker.py index afc4a1e1f4630..4ac8c18aedbd6 100644 --- a/vllm/worker/cpu_worker.py +++ b/vllm/worker/cpu_worker.py @@ -138,6 +138,10 @@ def __init__( self.is_driver_worker = is_driver_worker if self.is_driver_worker: assert self.rank == 0, "The driver worker must have rank 0." + init_cached_hf_modules = self.model_config.trust_remote_code + if init_cached_hf_modules: + from transformers.dynamic_module_utils import init_hf_modules + init_hf_modules() self.model_runner = CPUModelRunner(model_config, parallel_config, diff --git a/vllm/worker/neuron_worker.py b/vllm/worker/neuron_worker.py index 142c6c97f5194..eec2ab9de483b 100644 --- a/vllm/worker/neuron_worker.py +++ b/vllm/worker/neuron_worker.py @@ -29,6 +29,10 @@ def __init__( self.scheduler_config = scheduler_config self.device_config = device_config self.cache_config = cache_config + init_cached_hf_modules = self.model_config.trust_remote_code + if init_cached_hf_modules: + from transformers.dynamic_module_utils import init_hf_modules + init_hf_modules() self.model_runner = NeuronModelRunner(model_config, parallel_config, scheduler_config, device_config) diff --git a/vllm/worker/worker.py b/vllm/worker/worker.py index e2b47530d41e4..7f39eb43ff185 100644 --- a/vllm/worker/worker.py +++ b/vllm/worker/worker.py @@ -60,6 +60,11 @@ def __init__( if self.is_driver_worker: assert self.rank == 0, "The driver worker must have rank 0." + init_cached_hf_modules = self.model_config.trust_remote_code + if init_cached_hf_modules: + from transformers.dynamic_module_utils import init_hf_modules + init_hf_modules() + self.vision_language_config = vision_language_config if self.vision_language_config: assert not self.lora_config, ( diff --git a/vllm/worker/worker_base.py b/vllm/worker/worker_base.py index 22f49f6cbfde7..c38ff2b70fdb1 100644 --- a/vllm/worker/worker_base.py +++ b/vllm/worker/worker_base.py @@ -99,12 +99,10 @@ class WorkerWrapperBase: """ def __init__(self, - init_cached_hf_modules=False, worker_module_name=None, worker_class_name=None) -> None: self.worker_module_name = worker_module_name self.worker_class_name = worker_class_name - self.init_cached_hf_modules = init_cached_hf_modules self.worker = None def update_environment_variables(self, envs: Dict[str, str]) -> None: @@ -116,9 +114,6 @@ def update_environment_variables(self, envs: Dict[str, str]) -> None: update_environment_variables(envs) def init_worker(self, *args, **kwargs): - if self.init_cached_hf_modules: - from transformers.dynamic_module_utils import init_hf_modules - init_hf_modules() mod = importlib.import_module(self.worker_module_name) worker_class = getattr(mod, self.worker_class_name) self.worker = worker_class(*args, **kwargs) From 3bd2c980744d24a8d4890197d1fdef7c8c466fb0 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Tue, 16 Apr 2024 15:13:46 -0700 Subject: [PATCH 32/38] use quotes to address white space in env var values --- vllm/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/utils.py b/vllm/utils.py index b9de36055f52c..2003af84258bb 100644 --- a/vllm/utils.py +++ b/vllm/utils.py @@ -275,7 +275,7 @@ def update_environment_variables(envs: Dict[str, str]): for k, v in envs.items(): if k in os.environ: logger.warning(f"Overwriting environment variable {k} " - f"from {os.environ[k]} to {v}") + f"from '{os.environ[k]}' to '{v}'") os.environ[k] = v From 21be00449f892f6480b95e54d2cd94db1c46d720 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Tue, 16 Apr 2024 15:19:30 -0700 Subject: [PATCH 33/38] add docstring --- vllm/worker/worker_base.py | 1 + 1 file changed, 1 insertion(+) diff --git a/vllm/worker/worker_base.py b/vllm/worker/worker_base.py index c38ff2b70fdb1..70aa4dd0a18c4 100644 --- a/vllm/worker/worker_base.py +++ b/vllm/worker/worker_base.py @@ -114,6 +114,7 @@ def update_environment_variables(self, envs: Dict[str, str]) -> None: update_environment_variables(envs) def init_worker(self, *args, **kwargs): + # actual initialization of the worker class mod = importlib.import_module(self.worker_module_name) worker_class = getattr(mod, self.worker_class_name) self.worker = worker_class(*args, **kwargs) From 1aee6a036e4e8651cbed43ea4999547a6b3c2cc9 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Tue, 16 Apr 2024 16:40:10 -0700 Subject: [PATCH 34/38] add config --- vllm/executor/ray_gpu_executor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index 24440bfeb90e8..ac653e92b8798 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -162,6 +162,7 @@ def collect_arg_helper_func(**kwargs): scheduler_config=self.scheduler_config, device_config=self.device_config, cache_config=self.cache_config, + load_config=self.load_config, local_rank=local_rank, rank=rank, distributed_init_method=distributed_init_method, From 40d4560c32d94616fbf81ce811e0307ad19ca5e0 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Tue, 16 Apr 2024 18:10:06 -0700 Subject: [PATCH 35/38] fix _run_workers_async --- vllm/executor/ray_gpu_executor.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/vllm/executor/ray_gpu_executor.py b/vllm/executor/ray_gpu_executor.py index ac653e92b8798..5a43f1fc28a84 100644 --- a/vllm/executor/ray_gpu_executor.py +++ b/vllm/executor/ray_gpu_executor.py @@ -394,8 +394,12 @@ async def _run_workers_async( driver_kwargs = kwargs # Run the driver worker asynchronously. - driver_executor = make_async(getattr(self.driver_worker, method)) - coros.append(driver_executor(*driver_args, **driver_kwargs)) + def helper(): + return self.driver_worker.execute_method(method, *driver_args, + **driver_kwargs) + + driver_executor = make_async(helper) + coros.append(driver_executor()) # Run the ray workers asynchronously. for worker in self.workers: From 2509db4e62c44192198de1b61c9eda8dd75628a6 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Tue, 16 Apr 2024 23:26:35 -0700 Subject: [PATCH 36/38] move duplicate code to utils --- vllm/utils.py | 5 +++++ vllm/worker/cpu_worker.py | 9 ++++----- vllm/worker/neuron_worker.py | 8 ++++---- vllm/worker/worker.py | 9 ++++----- 4 files changed, 17 insertions(+), 14 deletions(-) diff --git a/vllm/utils.py b/vllm/utils.py index 2003af84258bb..73e1aa1c60c68 100644 --- a/vllm/utils.py +++ b/vllm/utils.py @@ -509,3 +509,8 @@ def merge_dicts(dict1: Dict[Any, List[Any]], merged_dict[key].extend(value) return dict(merged_dict) + + +def init_cached_hf_modules(): + from transformers.dynamic_module_utils import init_hf_modules + init_hf_modules() diff --git a/vllm/worker/cpu_worker.py b/vllm/worker/cpu_worker.py index 4ac8c18aedbd6..8468ace5a2fdc 100644 --- a/vllm/worker/cpu_worker.py +++ b/vllm/worker/cpu_worker.py @@ -138,11 +138,10 @@ def __init__( self.is_driver_worker = is_driver_worker if self.is_driver_worker: assert self.rank == 0, "The driver worker must have rank 0." - init_cached_hf_modules = self.model_config.trust_remote_code - if init_cached_hf_modules: - from transformers.dynamic_module_utils import init_hf_modules - init_hf_modules() - + if self.model_config.trust_remote_code: + # note: lazy import to avoid importing torch before initializing + from vllm.utils import init_cached_hf_modules + init_cached_hf_modules() self.model_runner = CPUModelRunner(model_config, parallel_config, scheduler_config, diff --git a/vllm/worker/neuron_worker.py b/vllm/worker/neuron_worker.py index eec2ab9de483b..d0e6aaed180e6 100644 --- a/vllm/worker/neuron_worker.py +++ b/vllm/worker/neuron_worker.py @@ -29,10 +29,10 @@ def __init__( self.scheduler_config = scheduler_config self.device_config = device_config self.cache_config = cache_config - init_cached_hf_modules = self.model_config.trust_remote_code - if init_cached_hf_modules: - from transformers.dynamic_module_utils import init_hf_modules - init_hf_modules() + if self.model_config.trust_remote_code: + # note: lazy import to avoid importing torch before initializing + from vllm.utils import init_cached_hf_modules + init_cached_hf_modules() self.model_runner = NeuronModelRunner(model_config, parallel_config, scheduler_config, device_config) diff --git a/vllm/worker/worker.py b/vllm/worker/worker.py index 7f39eb43ff185..b021866965401 100644 --- a/vllm/worker/worker.py +++ b/vllm/worker/worker.py @@ -60,11 +60,10 @@ def __init__( if self.is_driver_worker: assert self.rank == 0, "The driver worker must have rank 0." - init_cached_hf_modules = self.model_config.trust_remote_code - if init_cached_hf_modules: - from transformers.dynamic_module_utils import init_hf_modules - init_hf_modules() - + if self.model_config.trust_remote_code: + # note: lazy import to avoid importing torch before initializing + from vllm.utils import init_cached_hf_modules + init_cached_hf_modules() self.vision_language_config = vision_language_config if self.vision_language_config: assert not self.lora_config, ( From d1bda36bf1d94ccc600b4fb871bbafeaea83df5f Mon Sep 17 00:00:00 2001 From: youkaichao Date: Tue, 16 Apr 2024 23:27:11 -0700 Subject: [PATCH 37/38] add docstring --- vllm/utils.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/vllm/utils.py b/vllm/utils.py index 73e1aa1c60c68..e132575e7bf81 100644 --- a/vllm/utils.py +++ b/vllm/utils.py @@ -512,5 +512,8 @@ def merge_dicts(dict1: Dict[Any, List[Any]], def init_cached_hf_modules(): + """ + Lazy initialization of the Hugging Face modules. + """ from transformers.dynamic_module_utils import init_hf_modules init_hf_modules() From 1e30d89f97e294a49232b11b1496bcbff2a4a404 Mon Sep 17 00:00:00 2001 From: youkaichao Date: Tue, 16 Apr 2024 23:31:39 -0700 Subject: [PATCH 38/38] use docstring --- vllm/worker/worker_base.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/vllm/worker/worker_base.py b/vllm/worker/worker_base.py index 70aa4dd0a18c4..309aa6256acea 100644 --- a/vllm/worker/worker_base.py +++ b/vllm/worker/worker_base.py @@ -114,7 +114,10 @@ def update_environment_variables(self, envs: Dict[str, str]) -> None: update_environment_variables(envs) def init_worker(self, *args, **kwargs): - # actual initialization of the worker class + """ + Actual initialization of the worker class. + Arguments are passed to the worker class constructor. + """ mod = importlib.import_module(self.worker_module_name) worker_class = getattr(mod, self.worker_class_name) self.worker = worker_class(*args, **kwargs)