Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Doc] Update Ray Data distributed offline inference example #4871

Merged
merged 1 commit into from
May 17, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 42 additions & 6 deletions examples/offline_inference_distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,31 @@

import numpy as np
import ray
from packaging.version import Version
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy

from vllm import LLM, SamplingParams

assert Version(ray.__version__) >= Version(
"2.22.0"), "Ray version must be at least 2.22.0"

# Create a sampling params object.
sampling_params = SamplingParams(temperature=0.8, top_p=0.95)

# Set tensor parallelism per instance.
tensor_parallel_size = 1

# Set number of instances. Each instance will use tensor_parallel_size GPUs.
num_instances = 1


# Create a class to do batch inference.
class LLMPredictor:

def __init__(self):
# Create an LLM.
self.llm = LLM(model="meta-llama/Llama-2-7b-chat-hf")
self.llm = LLM(model="meta-llama/Llama-2-7b-chat-hf",
tensor_parallel_size=tensor_parallel_size)

def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, list]:
# Generate texts from the prompts.
Expand All @@ -43,17 +55,41 @@ def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, list]:
# from cloud storage (such as JSONL, Parquet, CSV, binary format).
ds = ray.data.read_text("s3://anonymous@air-example-data/prompts.txt")


# For tensor_parallel_size > 1, we need to create placement groups for vLLM
# to use. Every actor has to have its own placement group.
def scheduling_strategy_fn():
# One bundle per tensor parallel worker
pg = ray.util.placement_group(
[{
"GPU": 1,
"CPU": 1
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to set CPU: 1 here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not directly necessary (it will work fine without that), but it shows that we will still be running a CPU process per each worker

}] * tensor_parallel_size,
strategy="STRICT_PACK",
)
return dict(scheduling_strategy=PlacementGroupSchedulingStrategy(
pg, placement_group_capture_child_tasks=True))


resources_kwarg = {}
if tensor_parallel_size == 1:
# For tensor_parallel_size == 1, we simply set num_gpus=1.
resources_kwarg["num_gpus"] = 1
else:
# Otherwise, we have to set num_gpus=0 and provide
# a function that will create a placement group for
# each instance.
resources_kwarg["num_gpus"] = 0
resources_kwarg["ray_remote_args_fn"] = scheduling_strategy_fn

# Apply batch inference for all input data.
ds = ds.map_batches(
LLMPredictor,
# Set the concurrency to the number of LLM instances.
concurrency=10,
# Specify the number of GPUs required per LLM instance.
# NOTE: Do NOT set `num_gpus` when using vLLM with tensor-parallelism
# (i.e., `tensor_parallel_size`).
num_gpus=1,
concurrency=num_instances,
# Specify the batch size for inference.
batch_size=32,
**resources_kwarg,
)

# Peek first 10 results.
Expand Down
Loading