diff --git a/docker/llm/vllm_sycl/docker/Dockerfile b/docker/llm/vllm_sycl/docker/Dockerfile new file mode 100644 index 00000000000..49e90c87e5b --- /dev/null +++ b/docker/llm/vllm_sycl/docker/Dockerfile @@ -0,0 +1,129 @@ +FROM intel/oneapi-basekit:2024.1.1-devel-ubuntu22.04 + +ARG http_proxy +ARG https_proxy + +# Disable pip's cache behavior +ARG PIP_NO_CACHE_DIR=false +ADD ./gradio_web_server.patch /tmp/gradio_web_server.patch +ADD ./oneccl-binding.patch /tmp/oneccl-binding.patch + +RUN wget -O- https://apt.repos.intel.com/intel-gpg-keys/GPG-PUB-KEY-INTEL-SW-PRODUCTS.PUB | gpg --dearmor | tee /usr/share/keyrings/intel-oneapi-archive-keyring.gpg > /dev/null && \ + echo "deb [signed-by=/usr/share/keyrings/intel-oneapi-archive-keyring.gpg] https://apt.repos.intel.com/oneapi all main " | tee /etc/apt/sources.list.d/oneAPI.list && \ + chmod 644 /usr/share/keyrings/intel-oneapi-archive-keyring.gpg && \ + rm /etc/apt/sources.list.d/intel-graphics.list && \ + wget -O- https://repositories.intel.com/graphics/intel-graphics.key | gpg --dearmor | tee /usr/share/keyrings/intel-graphics.gpg > /dev/null && \ + echo "deb [arch=amd64,i386 signed-by=/usr/share/keyrings/intel-graphics.gpg] https://repositories.intel.com/graphics/ubuntu jammy arc" | tee /etc/apt/sources.list.d/intel.gpu.jammy.list && \ + chmod 644 /usr/share/keyrings/intel-graphics.gpg && \ + apt-get update && \ + apt-get install -y --no-install-recommends curl wget git libunwind8-dev vim less && \ + # Install PYTHON 3.11 and IPEX-LLM[xpu] + ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone && \ + env DEBIAN_FRONTEND=noninteractive apt-get update && \ + # add-apt-repository requires gnupg, gpg-agent, software-properties-common + apt-get install -y --no-install-recommends gnupg gpg-agent software-properties-common && \ + # Add Python 3.11 PPA repository + add-apt-repository ppa:deadsnakes/ppa -y && \ + apt-get install -y --no-install-recommends python3.11 git curl wget && \ + rm /usr/bin/python3 && \ + ln -s /usr/bin/python3.11 /usr/bin/python3 && \ + ln -s /usr/bin/python3 /usr/bin/python && \ + apt-get install -y --no-install-recommends python3-pip python3.11-dev python3-wheel python3.11-distutils && \ + wget https://bootstrap.pypa.io/get-pip.py -O get-pip.py && \ + # Install FastChat from source requires PEP 660 support + python3 get-pip.py && \ + rm get-pip.py && \ + pip install --upgrade requests argparse urllib3 && \ + pip install --pre --upgrade ipex-llm[xpu] --extra-index-url https://pytorch-extension.intel.com/release-whl/stable/xpu/us/ && \ + # Fix Trivy CVE Issues + pip install transformers==4.36.2 && \ + pip install transformers_stream_generator einops tiktoken && \ + # # Install opencl-related repos + # apt-get update && \ + # apt-get install -y --no-install-recommends intel-opencl-icd=23.35.27191.42-775~22.04 intel-level-zero-gpu=1.3.27191.42-775~22.04 level-zero=1.14.0-744~22.04 && \ + # Install related libary of chat.py + pip install --upgrade colorama && \ + # Download all-in-one benchmark and examples + git clone https://github.com/intel-analytics/ipex-llm && \ + cp -r ./ipex-llm/python/llm/dev/benchmark/ ./benchmark && \ + cp -r ./ipex-llm/python/llm/example/GPU/HuggingFace/LLM ./examples && \ + # Install vllm dependencies + pip install --upgrade fastapi && \ + pip install --upgrade "uvicorn[standard]" && \ + # Download vLLM-Serving + cp -r ./ipex-llm/python/llm/example/GPU/vLLM-Serving/ ./vLLM-Serving + + +# Install Serving Dependencies +# Install ipex-llm[serving] only will update ipex_llm source code without updating +# bigdl-core-xe, which will lead to problems +RUN apt-get update && \ + apt-get install -y --no-install-recommends libfabric-dev wrk libaio-dev && \ + mkdir -p /llm/neo && \ + cd /llm/neo && \ + wget https://github.com/intel/intel-graphics-compiler/releases/download/igc-1.0.15136.4/intel-igc-core_1.0.15136.4_amd64.deb && \ + wget https://github.com/intel/intel-graphics-compiler/releases/download/igc-1.0.15136.4/intel-igc-opencl_1.0.15136.4_amd64.deb && \ + wget https://github.com/intel/compute-runtime/releases/download/23.35.27191.9/intel-level-zero-gpu-dbgsym_1.3.27191.9_amd64.ddeb && \ + wget https://github.com/intel/compute-runtime/releases/download/23.35.27191.9/intel-level-zero-gpu_1.3.27191.9_amd64.deb && \ + wget https://github.com/intel/compute-runtime/releases/download/23.35.27191.9/intel-opencl-icd-dbgsym_23.35.27191.9_amd64.ddeb && \ + wget https://github.com/intel/compute-runtime/releases/download/23.35.27191.9/intel-opencl-icd_23.35.27191.9_amd64.deb && \ + wget https://github.com/intel/compute-runtime/releases/download/23.35.27191.9/libigdgmm12_22.3.11.ci17747749_amd64.deb && \ + dpkg -i *.deb && \ + pip install --pre --upgrade ipex-llm[xpu,serving] && \ + pip install transformers==4.37.0 gradio==4.19.2 && \ + # Use ipex-vllm-mainline + git clone -b vllm_202411_0807 https://github.com/xiangyuT/ipex-llm.git /llm/ipex-llm && \ + cp /llm/ipex-llm/python/llm/src/ipex_llm/transformers/convert.py /usr/local/lib/python3.11/dist-packages/ipex_llm/transformers/convert.py && \ + cp /llm/ipex-llm/python/llm/src/ipex_llm/transformers/low_bit_linear.py /usr/local/lib/python3.11/dist-packages/ipex_llm/transformers/low_bit_linear.py && \ + rm -rf /usr/local/lib/python3.11/dist-packages/ipex_llm/vllm && \ + cp -r /llm/ipex-llm/python/llm/src/ipex_llm/vllm /usr/local/lib/python3.11/dist-packages/ipex_llm/ && \ + # install ipex 2.1.30 + python -m pip install torch==2.1.0.post2 torchvision==0.16.0.post2 torchaudio==2.1.0.post2 intel-extension-for-pytorch==2.1.30.post0 oneccl_bind_pt==2.1.300+xpu --extra-index-url https://pytorch-extension.intel.com/release-whl/stable/xpu/us/ && \ + python -m pip install setuptools==69.5.1 numpy==1.26.4 && \ + # Install vLLM-v2 dependencies + git clone -b xiangyu_test_202411_0806 https://github.com/analytics-zoo/vllm.git /llm/vllm && \ + pip install -r /llm/vllm/requirements-common.txt && \ + pip install -r /llm/vllm/requirements-xpu.txt && \ + pip install --no-deps xformers && \ + cd /llm/vllm && \ + VLLM_TARGET_DEVICE=xpu python setup.py install && \ + pip install outlines==0.0.34 --no-deps && \ + pip install interegular cloudpickle diskcache joblib lark nest-asyncio numba scipy && \ + # For Qwen series models support + pip install transformers_stream_generator einops tiktoken && \ + # For pipeline serving support + pip install mpi4py fastapi uvicorn openai && \ + # for gradio web UI + pip install gradio && \ + # Install internal oneccl && \ + cd /tmp/ && \ + pip install --upgrade setuptools wheel twine && \ + pip install "setuptools<70.0.0" && \ + git clone https://github.com/intel/torch-ccl -b v2.1.300+xpu && \ + cd torch-ccl && \ + patch -p1 < /tmp/oneccl-binding.patch && \ + git submodule sync && \ + git submodule update --init --recursive && \ + USE_SYSTEM_ONECCL=ON COMPUTE_BACKEND=dpcpp python setup.py install sdist bdist_wheel && \ + mv /tmp/torch-ccl/dist/oneccl_bind_pt-2.1.300+xpu-cp311-cp311-linux_x86_64.whl /tmp/ && \ + cd /tmp/ && \ + wget https://sourceforge.net/projects/oneccl-wks/files/oneccl_wks_installer_2024.0.0.2.sh && \ + bash oneccl_wks_installer_2024.0.0.2.sh && \ + pip uninstall -y oneccl_bind_pt && \ + pip install /tmp/oneccl_bind_pt-2.1.300+xpu-cp311-cp311-linux_x86_64.whl && \ + rm /tmp/oneccl_bind_pt-2.1.300+xpu-cp311-cp311-linux_x86_64.whl && \ + patch /usr/local/lib/python3.11/dist-packages/fastchat/serve/gradio_web_server.py < /tmp/gradio_web_server.patch && \ + pip install -r /llm/vllm/requirements-common.txt && \ + pip install ray + +COPY ./vllm_online_benchmark.py /llm/ +COPY ./vllm_offline_inference.py /llm/ +COPY ./payload-1024.lua /llm/ +COPY ./start-vllm-service.sh /llm/ +COPY ./benchmark_vllm_throughput.py /llm/ +COPY ./start-fastchat-service.sh /llm/ +COPY ./start-pp_serving-service.sh /llm/ +COPY ./start-lightweight_serving-service.sh /llm/ + + +WORKDIR /llm/ diff --git a/docker/llm/vllm_sycl/docker/README.md b/docker/llm/vllm_sycl/docker/README.md new file mode 100644 index 00000000000..24bfbe1b7b3 --- /dev/null +++ b/docker/llm/vllm_sycl/docker/README.md @@ -0,0 +1,207 @@ +## Build/Use IPEX-LLM-serving xpu image + +### Build Image +```bash +docker build \ + --build-arg http_proxy=.. \ + --build-arg https_proxy=.. \ + --build-arg no_proxy=.. \ + --rm --no-cache -t intelanalytics/ipex-llm-serving-xpu:2024.1.1 . +``` + + +### Use the image for doing xpu serving + + +To map the `xpu` into the container, you need to specify `--device=/dev/dri` when booting the container. + +An example could be: +```bash +#/bin/bash +export DOCKER_IMAGE=intelanalytics/ipex-llm-serving-xpu:2.1.0-SNAPSHOT + +sudo docker run -itd \ + --net=host \ + --device=/dev/dri \ + --name=CONTAINER_NAME \ + --shm-size="16g" \ + $DOCKER_IMAGE +``` + + +After the container is booted, you could get into the container through `docker exec`. + +To verify the device is successfully mapped into the container, run `sycl-ls` to check the result. In a machine with Arc A770, the sampled output is: + +```bash +root@arda-arc12:/# sycl-ls +[opencl:acc:0] Intel(R) FPGA Emulation Platform for OpenCL(TM), Intel(R) FPGA Emulation Device 1.2 [2023.16.7.0.21_160000] +[opencl:cpu:1] Intel(R) OpenCL, 13th Gen Intel(R) Core(TM) i9-13900K 3.0 [2023.16.7.0.21_160000] +[opencl:gpu:2] Intel(R) OpenCL Graphics, Intel(R) Arc(TM) A770 Graphics 3.0 [23.17.26241.33] +[ext_oneapi_level_zero:gpu:0] Intel(R) Level-Zero, Intel(R) Arc(TM) A770 Graphics 1.3 [1.3.26241] +``` +After the container is booted, you could get into the container through `docker exec`. + +Currently, we provide two different serving engines in the image, which are FastChat serving engine and vLLM serving engine. + + +#### Lightweight serving engine + +To run Lightweight serving on one intel gpu using `IPEX-LLM` as backend, you can refer to this [readme](https://github.com/intel-analytics/ipex-llm/tree/main/python/llm/example/GPU/Lightweight-Serving). + +For convenience, we have included a file `/llm/start-lightweight_serving-service` in the image. + + +#### Pipeline parallel serving engine + +To run Pipeline parallel serving using `IPEX-LLM` as backend, you can refer to this [readme](https://github.com/intel-analytics/ipex-llm/tree/main/python/llm/example/GPU/Pipeline-Parallel-FastAPI). + +For convenience, we have included a file `/llm/start-pp_serving-service.sh` in the image. + + +#### FastChat serving engine + +To run model-serving using `IPEX-LLM` as backend using FastChat, you can refer to this [quickstart](https://ipex-llm.readthedocs.io/en/latest/doc/LLM/Quickstart/fastchat_quickstart.html#). + +For convenience, we have included a file `/llm/fastchat-examples/start-fastchat-service.sh` in the image. + +You can modify this script to using fastchat with either `ipex_llm_worker` or `vllm_worker`. + +#### vLLM serving engine + +To run vLLM engine using `IPEX-LLM` as backend, you can refer to this [document](https://github.com/intel-analytics/ipex-llm/blob/main/python/llm/example/GPU/vLLM-Serving/README.md). + +We have included multiple example files in `/llm/`: +1. `vllm_offline_inference.py`: Used for vLLM offline inference example +2. `benchmark_vllm_throughput.py`: Used for benchmarking throughput +3. `payload-1024.lua`: Used for testing request per second using 1k-128 request +4. `start-vllm-service.sh`: Used for template for starting vLLM service + +##### Online benchmark throurgh api_server + +We can benchmark the api_server to get an estimation about TPS (transactions per second). To do so, you need to start the service first according to the instructions in this [section](https://github.com/intel-analytics/ipex-llm/blob/main/python/llm/example/GPU/vLLM-Serving/README.md#service). + +###### Online benchmark through benchmark_util + +After starting vllm service, Sending reqs through `vllm_online_benchmark.py` +```bash +python vllm_online_benchmark.py $model_name $max_seqs +``` + +And it will output like this: +```bash +model_name: Qwen1.5-14B-Chat +max_seq: 12 +Warm Up: 100%|█████████████████████████████████████████████████████| 24/24 [01:36<00:00, 4.03s/req] +Benchmarking: 100%|████████████████████████████████████████████████| 60/60 [04:03<00:00, 4.05s/req] +Total time for 60 requests with 12 concurrent requests: xxx seconds. +Average responce time: xxx +Token throughput: xxx + +Average first token latency: xxx milliseconds. +P90 first token latency: xxx milliseconds. +P95 first token latency: xxx milliseconds. + +Average next token latency: xxx milliseconds. +P90 next token latency: xxx milliseconds. +P95 next token latency: xxx milliseconds. +``` + +###### Online benchmark through wrk +In container, do the following: +1. modify the `/llm/payload-1024.lua` so that the "model" attribute is correct. By default, we use a prompt that is roughly 1024 token long, you can change it if needed. +2. Start the benchmark using `wrk` using the script below: + +```bash +cd /llm +# You can change -t and -c to control the concurrency. +# By default, we use 12 connections to benchmark the service. +wrk -t12 -c12 -d15m -s payload-1024.lua http://localhost:8000/v1/completions --timeout 1h +``` + +#### Offline benchmark through benchmark_vllm_throughput.py + +We have included the benchmark_throughput script provied by `vllm` in our image as `/llm/benchmark_vllm_throughput.py`. To use the benchmark_throughput script, you will need to download the test dataset through: + +```bash +wget https://huggingface.co/datasets/anon8231489123/ShareGPT_Vicuna_unfiltered/resolve/main/ShareGPT_V3_unfiltered_cleaned_split.json +``` + +The full example looks like this: +```bash +cd /llm/ + +wget https://huggingface.co/datasets/anon8231489123/ShareGPT_Vicuna_unfiltered/resolve/main/ShareGPT_V3_unfiltered_cleaned_split.json + +export MODEL="YOUR_MODEL" + +# You can change load-in-low-bit from values in [sym_int4, fp8, fp16] + +python3 /llm/benchmark_vllm_throughput.py \ + --backend vllm \ + --dataset /llm/ShareGPT_V3_unfiltered_cleaned_split.json \ + --model $MODEL \ + --num-prompts 1000 \ + --seed 42 \ + --trust-remote-code \ + --enforce-eager \ + --dtype float16 \ + --device xpu \ + --load-in-low-bit sym_int4 \ + --gpu-memory-utilization 0.85 +``` + +> Note: you can adjust --load-in-low-bit to use other formats of low-bit quantization. + + +You can also adjust `--gpu-memory-utilization` rate using the below script to find the best performance using the following script: + +```bash +#!/bin/bash + +# Define the log directory +LOG_DIR="YOUR_LOG_DIR" +# Check if the log directory exists, if not, create it +if [ ! -d "$LOG_DIR" ]; then + mkdir -p "$LOG_DIR" +fi + +# Define an array of model paths +MODELS=( + "YOUR TESTED MODELS" +) + +# Define an array of utilization rates +UTIL_RATES=(0.85 0.90 0.95) + +# Loop over each model +for MODEL in "${MODELS[@]}"; do + # Loop over each utilization rate + for RATE in "${UTIL_RATES[@]}"; do + # Extract a simple model name from the path for easier identification + MODEL_NAME=$(basename "$MODEL") + + # Define the log file name based on the model and rate + LOG_FILE="$LOG_DIR/${MODEL_NAME}_utilization_${RATE}.log" + + # Execute the command and redirect output to the log file + # Sometimes you might need to set --max-model-len if memory is not enough + # load-in-low-bit accepts inputs [sym_int4, fp8, fp16] + python3 /llm/benchmark_vllm_throughput.py \ + --backend vllm \ + --dataset /llm/ShareGPT_V3_unfiltered_cleaned_split.json \ + --model $MODEL \ + --num-prompts 1000 \ + --seed 42 \ + --trust-remote-code \ + --enforce-eager \ + --dtype float16 \ + --load-in-low-bit sym_int4 \ + --device xpu \ + --gpu-memory-utilization $RATE &> "$LOG_FILE" + done +done + +# Inform the user that the script has completed its execution +echo "All benchmarks have been executed and logged." +``` diff --git a/docker/llm/vllm_sycl/docker/benchmark_vllm_throughput.py b/docker/llm/vllm_sycl/docker/benchmark_vllm_throughput.py new file mode 100644 index 00000000000..28e94da1c36 --- /dev/null +++ b/docker/llm/vllm_sycl/docker/benchmark_vllm_throughput.py @@ -0,0 +1,392 @@ +"""Benchmark offline inference throughput.""" +import argparse +import json +import random +import time +from typing import List, Optional, Tuple + +import torch +from transformers import (AutoModelForCausalLM, AutoTokenizer, + PreTrainedTokenizerBase) +from tqdm import tqdm + + +def sample_requests( + dataset_path: str, + num_requests: int, + tokenizer: PreTrainedTokenizerBase, + fixed_output_len: Optional[int], +) -> List[Tuple[str, int, int]]: + if fixed_output_len is not None and fixed_output_len < 4: + raise ValueError("output_len too small") + + # Load the dataset. + with open(dataset_path) as f: + dataset = json.load(f) + # Filter out the conversations with less than 2 turns. + dataset = [data for data in dataset if len(data["conversations"]) >= 2] + # Only keep the first two turns of each conversation. + dataset = [(data["conversations"][0]["value"], + data["conversations"][1]["value"]) for data in dataset] + + # Tokenize the prompts and completions. + prompts = [prompt for prompt, _ in dataset] + prompt_token_ids = tokenizer(prompts).input_ids + completions = [completion for _, completion in dataset] + completion_token_ids = tokenizer(completions).input_ids + tokenized_dataset = [] + for i in range(len(dataset)): + output_len = len(completion_token_ids[i]) + if fixed_output_len is not None: + output_len = fixed_output_len + tokenized_dataset.append((prompts[i], prompt_token_ids[i], output_len)) + + # Filter out too long sequences. + filtered_dataset: List[Tuple[str, int, int]] = [] + for prompt, prompt_token_ids, output_len in tokenized_dataset: + prompt_len = len(prompt_token_ids) + if prompt_len < 4 or output_len < 4: + # Prune too short sequences. + continue + if prompt_len > 1024 or prompt_len + output_len > 2048: + # Prune too long sequences. + continue + filtered_dataset.append((prompt, prompt_len, output_len)) + + # Sample the requests. + sampled_requests = random.sample(filtered_dataset, num_requests) + return sampled_requests + + +def run_vllm( + requests: List[Tuple[str, int, int]], + model: str, + tokenizer: str, + quantization: Optional[str], + tensor_parallel_size: int, + seed: int, + n: int, + use_beam_search: bool, + trust_remote_code: bool, + dtype: str, + max_model_len: Optional[int], + enforce_eager: bool, + kv_cache_dtype: str, + device: str, + enable_prefix_caching: bool, + gpu_memory_utilization: float = 0.9, + load_in_low_bit: str = "sym_int4", + max_num_batched_tokens: int = 5000, + max_num_seqs: int = 256, +) -> float: + from vllm import SamplingParams + from ipex_llm.vllm.xpu.engine import IPEXLLMClass as LLM + llm = LLM(model=model, + tokenizer=tokenizer, + quantization=quantization, + tensor_parallel_size=tensor_parallel_size, + seed=seed, + trust_remote_code=trust_remote_code, + dtype=dtype, + max_model_len=max_model_len, + gpu_memory_utilization=gpu_memory_utilization, + enforce_eager=enforce_eager, + kv_cache_dtype=kv_cache_dtype, + device=device, + enable_prefix_caching=enable_prefix_caching, + load_in_low_bit=load_in_low_bit, + max_num_batched_tokens=max_num_batched_tokens, + max_num_seqs=max_num_seqs,) + + + # Add the requests to the engine. + warm_prompt = "hi " * (1024 - 1) + warm_requests = [(warm_prompt, 1024, 1024) + for _ in range(8)] + + prompts: List[str] = [] + sampling_params: List[SamplingParams] = [] + for prompt, _, output_len in warm_requests: + prompts.append(prompt) + sampling_params.append( + SamplingParams( + n=n, + temperature=0.0 if use_beam_search else 1.0, + top_p=1.0, + use_beam_search=use_beam_search, + ignore_eos=True, + max_tokens=output_len, + )) + llm.generate(prompts, sampling_params, use_tqdm=True) + + prompts: List[str] = [] + sampling_params: List[SamplingParams] = [] + for prompt, _, output_len in requests: + prompts.append(prompt) + sampling_params.append( + SamplingParams( + n=n, + temperature=0.0 if use_beam_search else 1.0, + top_p=1.0, + use_beam_search=use_beam_search, + ignore_eos=True, + max_tokens=output_len, + )) + + start = time.perf_counter() + llm.generate(prompts, sampling_params, use_tqdm=True) + end = time.perf_counter() + return end - start + + +def run_hf( + requests: List[Tuple[str, int, int]], + model: str, + tokenizer: PreTrainedTokenizerBase, + n: int, + use_beam_search: bool, + max_batch_size: int, + trust_remote_code: bool, +) -> float: + assert not use_beam_search + llm = AutoModelForCausalLM.from_pretrained( + model, torch_dtype=torch.float16, trust_remote_code=trust_remote_code) + if llm.config.model_type == "llama": + # To enable padding in the HF backend. + tokenizer.pad_token = tokenizer.eos_token + llm = llm.cuda() + + pbar = tqdm(total=len(requests)) + start = time.perf_counter() + batch: List[str] = [] + max_prompt_len = 0 + max_output_len = 0 + for i in range(len(requests)): + prompt, prompt_len, output_len = requests[i] + # Add the prompt to the batch. + batch.append(prompt) + max_prompt_len = max(max_prompt_len, prompt_len) + max_output_len = max(max_output_len, output_len) + if len(batch) < max_batch_size and i != len(requests) - 1: + # Check if we can add more requests to the batch. + _, next_prompt_len, next_output_len = requests[i + 1] + if (max(max_prompt_len, next_prompt_len) + + max(max_output_len, next_output_len)) <= 2048: + # We can add more requests to the batch. + continue + + # Generate the sequences. + input_ids = tokenizer(batch, return_tensors="pt", + padding=True).input_ids + llm_outputs = llm.generate( + input_ids=input_ids.cuda(), + do_sample=not use_beam_search, + num_return_sequences=n, + temperature=1.0, + top_p=1.0, + use_cache=True, + max_new_tokens=max_output_len, + ) + # Include the decoding time. + tokenizer.batch_decode(llm_outputs, skip_special_tokens=True) + pbar.update(len(batch)) + + # Clear the batch. + batch = [] + max_prompt_len = 0 + max_output_len = 0 + end = time.perf_counter() + return end - start + + +def run_mii( + requests: List[Tuple[str, int, int]], + model: str, + tensor_parallel_size: int, + output_len: int, +) -> float: + from mii import pipeline + llm = pipeline(model, tensor_parallel=tensor_parallel_size) + prompts = [prompt for prompt, _, _ in requests] + + start = time.perf_counter() + llm(prompts, max_new_tokens=output_len) + end = time.perf_counter() + return end - start + + +def main(args: argparse.Namespace): + print(args) + random.seed(args.seed) + + # Sample the requests. + tokenizer = AutoTokenizer.from_pretrained( + args.tokenizer, trust_remote_code=args.trust_remote_code) + if args.dataset is None: + # Synthesize a prompt with the given input length. + prompt = "hi" * (args.input_len - 1) + requests = [(prompt, args.input_len, args.output_len) + for _ in range(args.num_prompts)] + else: + requests = sample_requests(args.dataset, args.num_prompts, tokenizer, + args.output_len) + + if args.backend == "vllm": + elapsed_time = run_vllm( + requests, args.model, args.tokenizer, args.quantization, + args.tensor_parallel_size, args.seed, args.n, args.use_beam_search, + args.trust_remote_code, args.dtype, args.max_model_len, + args.enforce_eager, args.kv_cache_dtype, args.device, + args.enable_prefix_caching, args.gpu_memory_utilization, args.load_in_low_bit, + args.max_num_batched_tokens,args.max_num_seqs) + elif args.backend == "hf": + assert args.tensor_parallel_size == 1 + elapsed_time = run_hf(requests, args.model, tokenizer, args.n, + args.use_beam_search, args.hf_max_batch_size, + args.trust_remote_code) + elif args.backend == "mii": + elapsed_time = run_mii(requests, args.model, args.tensor_parallel_size, + args.output_len) + else: + raise ValueError(f"Unknown backend: {args.backend}") + total_num_tokens = sum(prompt_len + output_len + for _, prompt_len, output_len in requests) + print(f"Throughput: {len(requests) / elapsed_time:.2f} requests/s, " + f"{total_num_tokens / elapsed_time:.2f} tokens/s") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Benchmark the throughput.") + parser.add_argument("--backend", + type=str, + choices=["vllm", "hf", "mii"], + default="vllm") + parser.add_argument("--dataset", + type=str, + default=None, + help="Path to the dataset.") + parser.add_argument("--input-len", + type=int, + default=None, + help="Input prompt length for each request") + parser.add_argument("--output-len", + type=int, + default=None, + help="Output length for each request. Overrides the " + "output length from the dataset.") + parser.add_argument("--model", type=str, default="facebook/opt-125m") + parser.add_argument("--tokenizer", type=str, default=None) + parser.add_argument('--quantization', + '-q', + choices=['awq', 'gptq', 'squeezellm', None], + default=None) + parser.add_argument("--tensor-parallel-size", "-tp", type=int, default=1) + parser.add_argument("--n", + type=int, + default=1, + help="Number of generated sequences per prompt.") + parser.add_argument("--use-beam-search", action="store_true") + parser.add_argument("--num-prompts", + type=int, + default=1000, + help="Number of prompts to process.") + parser.add_argument("--seed", type=int, default=0) + parser.add_argument("--hf-max-batch-size", + type=int, + default=None, + help="Maximum batch size for HF backend.") + parser.add_argument('--trust-remote-code', + action='store_true', + help='trust remote code from huggingface') + parser.add_argument( + '--max-model-len', + type=int, + default=None, + help='Maximum length of a sequence (including prompt and output). ' + 'If None, will be derived from the model.') + parser.add_argument( + '--dtype', + type=str, + default='auto', + choices=['auto', 'half', 'float16', 'bfloat16', 'float', 'float32'], + help='data type for model weights and activations. ' + 'The "auto" option will use FP16 precision ' + 'for FP32 and FP16 models, and BF16 precision ' + 'for BF16 models.') + parser.add_argument('--gpu-memory-utilization', + type=float, + default=0.9, + help='the fraction of GPU memory to be used for ' + 'the model executor, which can range from 0 to 1.' + 'If unspecified, will use the default value of 0.9.') + parser.add_argument("--enforce-eager", + action="store_true", + help="enforce eager execution") + parser.add_argument( + "--kv-cache-dtype", + type=str, + choices=["auto", "fp8_e5m2"], + default="auto", + help= + 'Data type for kv cache storage. If "auto", will use model data type.') + parser.add_argument( + "--device", + type=str, + default="cuda", + choices=["cuda", "xpu"], + help='device type for vLLM execution, supporting CUDA only currently.') + parser.add_argument( + "--enable-prefix-caching", + action='store_true', + help="enable automatic prefix caching for vLLM backend.") + parser.add_argument( + "--load-in-low-bit", + type=str, + choices=["sym_int4", "fp8", "fp8_e4m3", "fp16", "fp6"], + default="sym_int4", + help="Low-bit format quantization with IPEX-LLM") + + parser.add_argument('--max-num-batched-tokens', + type=int, + default=4096, + help='maximum number of batched tokens per iteration') + + parser.add_argument('--max-num-seqs', + type=int, + default=256, + help='Maximum number of sequences per iteration.') + + + args = parser.parse_args() + if args.tokenizer is None: + args.tokenizer = args.model + if args.dataset is None: + assert args.input_len is not None + assert args.output_len is not None + else: + assert args.input_len is None + + if args.backend == "vllm": + if args.hf_max_batch_size is not None: + raise ValueError("HF max batch size is only for HF backend.") + elif args.backend == "hf": + if args.hf_max_batch_size is None: + raise ValueError("HF max batch size is required for HF backend.") + if args.quantization is not None: + raise ValueError("Quantization is only for vLLM backend.") + elif args.backend == "mii": + if args.dtype != "auto": + raise ValueError("dtype must be auto for MII backend.") + if args.n != 1: + raise ValueError("n must be 1 for MII backend.") + if args.use_beam_search: + raise ValueError("Beam search is not supported for MII backend.") + if args.quantization is not None: + raise ValueError("Quantization is only for vLLM backend.") + if args.hf_max_batch_size is not None: + raise ValueError("HF max batch size is only for HF backend.") + if args.tokenizer != args.model: + raise ValueError("Tokenizer must be the same as the model for MII " + "backend.") + main(args) + diff --git a/docker/llm/vllm_sycl/docker/gradio_web_server.patch b/docker/llm/vllm_sycl/docker/gradio_web_server.patch new file mode 100644 index 00000000000..807e0f22231 --- /dev/null +++ b/docker/llm/vllm_sycl/docker/gradio_web_server.patch @@ -0,0 +1,208 @@ +--- gradio_web_server.py 2024-06-20 14:21:48.013518726 +0800 ++++ gradio_web_server_new.py 2024-06-20 14:23:09.822830709 +0800 +@@ -9,8 +9,10 @@ + import json + import os + import random ++import pandas as pd + import time + import uuid ++import numpy as np + + import gradio as gr + import requests +@@ -241,7 +243,7 @@ + ip = get_ip(request) + logger.info(f"clear_history. ip: {ip}") + state = None +- return (state, [], "", None) + (disable_btn,) * 5 ++ return (state, [], "", None, "", "", "", "") + (disable_btn,) * 5 + + + def get_ip(request: gr.Request): +@@ -354,6 +356,18 @@ + return None + + ++def handle_latency_metrics(first_token_time, next_token_time): ++ # next token time is a numpy array... ++ # first token time might be None ++ first_token_latency = "None" ++ next_token_latency = "None" ++ if first_token_time is not None: ++ first_token_latency = str(first_token_time * 1000) + " ms" ++ if next_token_time.size > 0: ++ next_token_latency = str(np.mean(next_token_time) * 1000) + " ms" ++ return first_token_latency, next_token_latency ++ ++ + def bot_response( + state, + temperature, +@@ -372,7 +386,7 @@ + if state.skip_next: + # This generate call is skipped due to invalid inputs + state.skip_next = False +- yield (state, state.to_gradio_chatbot()) + (no_change_btn,) * 5 ++ yield (state, state.to_gradio_chatbot(), "None", "None", "None", "None") + (no_change_btn,) * 5 + return + + if apply_rate_limit: +@@ -381,7 +395,7 @@ + error_msg = RATE_LIMIT_MSG + "\n\n" + ret["reason"] + logger.info(f"rate limit reached. ip: {ip}. error_msg: {ret['reason']}") + state.conv.update_last_message(error_msg) +- yield (state, state.to_gradio_chatbot()) + (no_change_btn,) * 5 ++ yield (state, state.to_gradio_chatbot(), "None", "None", "None", "None") + (no_change_btn,) * 5 + return + + conv, model_name = state.conv, state.model_name +@@ -404,6 +418,10 @@ + yield ( + state, + state.to_gradio_chatbot(), ++ "None", ++ "None", ++ "None", ++ "None", + disable_btn, + disable_btn, + disable_btn, +@@ -444,18 +462,32 @@ + ) + + conv.update_last_message("▌") +- yield (state, state.to_gradio_chatbot()) + (disable_btn,) * 5 ++ # We probably need to change this method ++ yield (state, state.to_gradio_chatbot(), "None", "None", "None", "None") + (disable_btn,) * 5 ++ prompt_tokens = 0 ++ generated_tokens = 0 ++ first_token_latency = None ++ next_token_latencies = np.array([]) ++ start_time = time.time() + + try: + for i, data in enumerate(stream_iter): + if data["error_code"] == 0: ++ prompt_tokens = data["usage"]["prompt_tokens"] ++ generated_tokens = data["usage"]["completion_tokens"] + output = data["text"].strip() + conv.update_last_message(output + "▌") +- yield (state, state.to_gradio_chatbot()) + (disable_btn,) * 5 ++ if first_token_latency is None: ++ first_token_latency = time.time() - start_time ++ else: ++ next_token_latencies = np.append(next_token_latencies, time.time() - start_time) ++ start_time = time.time() ++ first_latency, next_latency = handle_latency_metrics(first_token_latency, next_token_latencies) ++ yield (state, state.to_gradio_chatbot(), prompt_tokens, generated_tokens, first_latency, next_latency) + (disable_btn,) * 5 + else: + output = data["text"] + f"\n\n(error_code: {data['error_code']})" + conv.update_last_message(output) +- yield (state, state.to_gradio_chatbot()) + ( ++ yield (state, state.to_gradio_chatbot(), "None", "None", "None", "None") + ( + disable_btn, + disable_btn, + disable_btn, +@@ -465,13 +497,14 @@ + return + output = data["text"].strip() + conv.update_last_message(output) +- yield (state, state.to_gradio_chatbot()) + (enable_btn,) * 5 ++ first_latency, next_latency = handle_latency_metrics(first_token_latency, next_token_latencies) ++ yield (state, state.to_gradio_chatbot(), prompt_tokens, generated_tokens, first_latency, next_latency) + (enable_btn,) * 5 + except requests.exceptions.RequestException as e: + conv.update_last_message( + f"{SERVER_ERROR_MSG}\n\n" + f"(error_code: {ErrorCode.GRADIO_REQUEST_ERROR}, {e})" + ) +- yield (state, state.to_gradio_chatbot()) + ( ++ yield (state, state.to_gradio_chatbot(), "None", "None", "None", "None") + ( + disable_btn, + disable_btn, + disable_btn, +@@ -484,7 +517,7 @@ + f"{SERVER_ERROR_MSG}\n\n" + f"(error_code: {ErrorCode.GRADIO_STREAM_UNKNOWN_ERROR}, {e})" + ) +- yield (state, state.to_gradio_chatbot()) + ( ++ yield (state, state.to_gradio_chatbot(), "None", "None", "None", "None") + ( + disable_btn, + disable_btn, + disable_btn, +@@ -646,7 +679,8 @@ + ) + + notice_markdown = f""" +-# 🏔️ Chat with Open Large Language Models ++# 🏔️ ChatBot based Xeon-W & Arc GPUs ++### Deployed with IPEX-LLM + {promotion} + """ + +@@ -691,6 +725,26 @@ + regenerate_btn = gr.Button(value="🔄 Regenerate", interactive=False) + clear_btn = gr.Button(value="🗑️ Clear history", interactive=False) + ++ with gr.Row(): ++ with gr.Column(): ++ gr.Markdown("### Performance Metrics") ++ prompt_token = gr.Textbox( ++ label="Prompt token length:", ++ interactive=False, ++ ) ++ next_token = gr.Textbox( ++ label="Generated token length:", ++ interactive=False, ++ ) ++ first_token_latency = gr.Textbox( ++ interactive=False, ++ label="First token Latency:", ++ ) ++ next_token_latency = gr.Textbox( ++ interactive=False, ++ label="Next token Latency:", ++ ) ++ + with gr.Accordion("Parameters", open=False) as parameter_row: + temperature = gr.Slider( + minimum=0.0, +@@ -743,9 +797,9 @@ + ).then( + bot_response, + [state, temperature, top_p, max_output_tokens], +- [state, chatbot] + btn_list, ++ [state, chatbot, prompt_token, next_token, first_token_latency, next_token_latency] + btn_list, + ) +- clear_btn.click(clear_history, None, [state, chatbot, textbox, imagebox] + btn_list) ++ clear_btn.click(clear_history, None, [state, chatbot, textbox, imagebox, prompt_token, next_token, first_token_latency, next_token_latency] + btn_list) + + model_selector.change( + clear_history, None, [state, chatbot, textbox, imagebox] + btn_list +@@ -758,7 +812,7 @@ + ).then( + bot_response, + [state, temperature, top_p, max_output_tokens], +- [state, chatbot] + btn_list, ++ [state, chatbot, prompt_token, next_token, first_token_latency, next_token_latency] + btn_list, + ) + send_btn.click( + add_text, +@@ -767,7 +821,7 @@ + ).then( + bot_response, + [state, temperature, top_p, max_output_tokens], +- [state, chatbot] + btn_list, ++ [state, chatbot, prompt_token, next_token, first_token_latency, next_token_latency] + btn_list, + ) + + return [state, model_selector] +@@ -775,7 +829,7 @@ + + def build_demo(models): + with gr.Blocks( +- title="Chat with Open Large Language Models", ++ title="ChatBot based Xeon-W & Arc GPUs", + theme=gr.themes.Default(), + css=block_css, + ) as demo: diff --git a/docker/llm/vllm_sycl/docker/oneccl-binding.patch b/docker/llm/vllm_sycl/docker/oneccl-binding.patch new file mode 100644 index 00000000000..4b8410dce9d --- /dev/null +++ b/docker/llm/vllm_sycl/docker/oneccl-binding.patch @@ -0,0 +1,14 @@ +diff --git a/src/gpu/dpcpp_ccl.cpp b/src/gpu/dpcpp_ccl.cpp +index 3bd8087..c5b5ce3 100644 +--- a/src/gpu/dpcpp_ccl.cpp ++++ b/src/gpu/dpcpp_ccl.cpp +@@ -689,7 +689,8 @@ c10::intrusive_ptr XPUCCLStubs::allreduce_(std::v + stream, + attr), stream.get_native()); + }); +- // printf("Use One CCL allreduce.\n"); ++ stream.get_native().wait(); ++ // printf("Use One CCL allreduce.\n"); + return ret_evt; + }, + c10d::OpType::ALLREDUCE); diff --git a/docker/llm/vllm_sycl/docker/payload-1024.lua b/docker/llm/vllm_sycl/docker/payload-1024.lua new file mode 100644 index 00000000000..0f8df5c9362 --- /dev/null +++ b/docker/llm/vllm_sycl/docker/payload-1024.lua @@ -0,0 +1,20 @@ +wrk.method = "POST" +wrk.headers["Content-Type"] = "application/json" + +wrk.body = [[ +{ + "model": "llama2", + "prompt": "Once upon a time, there existed a little girl who liked to have adventures. She wanted to go to places and meet new people, and have fun. However, her parents were always telling her to stay close to home, to be careful, and to avoid any danger. But the little girl was stubborn, and she wanted to see what was on the other side of the mountain. So she sneaked out of the house one night, leaving a note for her parents, and set off on her journey. As she climbed the mountain, the little girl felt a sense of excitement and wonder. She had never been this far away from home before, and she couldnt wait to see what she would find on the other side. She climbed higher and higher, her lungs burning from the thin air, until she finally reached the top of the mountain. And there, she found a beautiful meadow filled with wildflowers and a sparkling stream. The little girl danced and played in the meadow, feeling free and alive. She knew she had to return home eventually, but for now, she was content to enjoy her adventure. As the sun began to set, the little girl reluctantly made her way back down the mountain, but she knew that she would never forget her adventure and the joy of discovering something new and exciting. And whenever she felt scared or unsure, she would remember the thrill of climbing the mountain and the beauty of the meadow on the other side, and she would know that she could face any challenge that came her way, with courage and determination. She carried the memories of her journey in her heart, a constant reminder of the strength she possessed. The little girl returned home to her worried parents, who had discovered her note and anxiously awaited her arrival. They scolded her for disobeying their instructions and venturing into the unknown. But as they looked into her sparkling eyes and saw the glow on her face, their anger softened. They realized that their little girl had grown, that she had experienced something extraordinary. The little girl shared her tales of the mountain and the meadow with her parents, painting vivid pictures with her words. She spoke of the breathtaking view from the mountaintop, where the world seemed to stretch endlessly before her. She described the delicate petals of the wildflowers, vibrant hues that danced in the gentle breeze. And she recounted the soothing melody of the sparkling stream, its waters reflecting the golden rays of the setting sun. Her parents listened intently, captivated by her story. They realized that their daughter had discovered a part of herself on that journey—a spirit of curiosity and a thirst for exploration. They saw that she had learned valuable lessons about independence, resilience, and the beauty that lies beyond ones comfort zone. From that day forward, the little girls parents encouraged her to pursue her dreams and embrace new experiences. They understood that while there were risks in the world, there were also rewards waiting to be discovered. They supported her as she continued to embark on adventures, always reminding her to stay safe but never stifling her spirit. As the years passed, the little girl grew into a remarkable woman, fearlessly exploring the world and making a difference wherever she went. The lessons she had learned on that fateful journey stayed with her, guiding her through challenges and inspiring her to live life to the fullest. And so, the once timid little girl became a symbol of courage and resilience, a reminder to all who knew her that the greatest joys in life often lie just beyond the mountains we fear to climb. Her story spread far and wide, inspiring others to embrace their own journeys and discover the wonders that awaited them. In the end, the little girls adventure became a timeless tale, passed down through generations, reminding us all that sometimes, the greatest rewards come to those who dare to step into the unknown and follow their hearts. With each passing day, the little girls story continued to inspire countless individuals, igniting a spark within their souls and encouraging them to embark on their own extraordinary adventures. The tale of her bravery and determination resonated deeply with people from all walks of life, reminding them of the limitless possibilities that awaited them beyond the boundaries of their comfort zones. People marveled at the little girls unwavering spirit and her unwavering belief in the power of dreams. They saw themselves reflected in her journey, finding solace in the knowledge that they too could overcome their fears and pursue their passions. The little girl's story became a beacon of hope, a testament to the human spirit", + "max_tokens": 128, + "temperature": 0.5, + "n": 1, + "use_beam_search": false +} +]] + +logfile = io.open("wrk.log", "w"); + +response = function(status, header, body) + logfile:write("status:" .. status .. "\n" .. body .. "\n-------------------------------------------------\n"); +end + \ No newline at end of file diff --git a/docker/llm/vllm_sycl/docker/start-fastchat-service.sh b/docker/llm/vllm_sycl/docker/start-fastchat-service.sh new file mode 100644 index 00000000000..8d8ca975eab --- /dev/null +++ b/docker/llm/vllm_sycl/docker/start-fastchat-service.sh @@ -0,0 +1,125 @@ +#!/bin/bash + +usage() { + echo "Usage: $0 [-w --worker ] [--help]" + echo "--help: Print help message." + echo "The following environment variables can be set." + echo "MODEL_PATH (default: empty)." + echo "LOW_BIT_FORMAT (default: sym_int4)" + echo "CONTROLLER_HOST (default: localhost)." + echo "CONTROLLER_PORT (default: 21001)." + echo "WORKER_HOST (default: localhost)." + echo "WORKER_PORT (default: 21002)." + echo "API_HOST (default: localhost)." + echo "API_PORT (default: 8000)." + exit 1 +} + +# Default values +controller_host="localhost" +controller_port="21001" +worker_host="localhost" +worker_port="21002" +api_host="localhost" +api_port="8000" +model_path="" +mode="" +dispatch_method="shortest_queue" # shortest_queue or lottery +stream_interval=1 +worker_type="model_worker" +low_bit_format="sym_int4" + +# We do not have any arguments, just run bash + +# Parse command-line options +options=$(getopt -o "hw:" --long "help,worker:" -n "$0" -- "$@") +if [ $? != 0 ]; then + usage +fi +eval set -- "$options" + +while true; do + case "$1" in + -w|--worker) + worker_type="$2" + [[ $worker_type == "model_worker" || $worker_type == "vllm_worker" ]] || usage + shift 2 + ;; + -h|--help) + usage + ;; + --) + shift + break + ;; + *) + usage + ;; + esac +done + +if [ "$worker_type" == "model_worker" ]; then + worker_type="ipex_llm.serving.fastchat.ipex_llm_worker" +elif [ "$worker_type" == "vllm_worker" ]; then + worker_type="ipex_llm.serving.fastchat.vllm_worker" +fi + +if [[ -n $CONTROLLER_HOST ]]; then + controller_host=$CONTROLLER_HOST +fi + +if [[ -n $CONTROLLER_PORT ]]; then + controller_port=$CONTROLLER_PORT +fi + +if [[ -n $LOW_BIT_FORMAT ]]; then + low_bit_format=$LOW_BIT_FORMAT +fi + +if [[ -n $WORKER_HOST ]]; then + worker_host=$WORKER_HOST +fi + +if [[ -n $WORKER_PORT ]]; then + worker_port=$WORKER_PORT +fi + +if [[ -n $MODEL_PATH ]]; then + model_path=$MODEL_PATH +fi + +if [[ -n $API_HOST ]]; then + api_host=$API_HOST +fi + +if [[ -n $API_PORT ]]; then + api_port=$API_PORT +fi + +if [[ -n $DISPATCH_METHOD ]]; then + dispatch_method=$DISPATCH_METHOD +fi + +if [[ -n $STREAM_INTERVAL ]]; then + stream_interval=$STREAM_INTERVAL +fi + +controller_address="http://$controller_host:$controller_port" +echo "Controller address: $controller_address" +python3 -m fastchat.serve.controller --host $controller_host --port $controller_port --dispatch-method $dispatch_method & + +worker_address="http://$worker_host:$worker_port" +echo "Worker type: $worker_type" +echo "Worker address: $worker_address" + +if [ "$worker_type" == "ipex_llm.serving.fastchat.ipex_llm_worker" ]; then + python3 -m "$worker_type" --model-path $model_path --device xpu --low-bit $low_bit_format --host $worker_host --port $worker_port --worker-address $worker_address --controller-address $controller_address --stream-interval $stream_interval & +elif [ "$worker_type" == "ipex_llm.serving.fastchat.vllm_worker" ]; then + python3 -m "$worker_type" --model-path $model_path --device xpu --load-in-low-bit $low_bit_format --host $worker_host --port $worker_port --worker-address $worker_address --controller-address $controller_address --enforce-eager --gpu-memory-utilization 0.85 & +fi + +sleep 10 + +api_address="http://$api_host:$api_port" +echo "OpenAI API address: $api_address" +python3 -m fastchat.serve.openai_api_server --host $api_host --port $api_port --controller-address $controller_address diff --git a/docker/llm/vllm_sycl/docker/start-lightweight_serving-service.sh b/docker/llm/vllm_sycl/docker/start-lightweight_serving-service.sh new file mode 100644 index 00000000000..b51e4fc3e13 --- /dev/null +++ b/docker/llm/vllm_sycl/docker/start-lightweight_serving-service.sh @@ -0,0 +1,4 @@ +cd /llm/lightweight_serving +model_path="/llm/models/Llama-2-7b-chat-hf" +low_bit="sym_int4" +python lightweight_serving.py --repo-id-or-model-path $model_path --low-bit $low_bit \ No newline at end of file diff --git a/docker/llm/vllm_sycl/docker/start-pp_serving-service.sh b/docker/llm/vllm_sycl/docker/start-pp_serving-service.sh new file mode 100644 index 00000000000..588f2922285 --- /dev/null +++ b/docker/llm/vllm_sycl/docker/start-pp_serving-service.sh @@ -0,0 +1,29 @@ +source /opt/intel/oneapi/setvars.sh --force +export no_proxy=localhost +export FI_PROVIDER=tcp +export OMP_NUM_THREADS=32 + +export LD_PRELOAD=${LD_PRELOAD}:${CONDA_PREFIX}/lib/libtcmalloc.so +basekit_root=/opt/intel/oneapi +source $basekit_root/setvars.sh --force +# source $basekit_root/ccl/latest/env/vars.sh --force +source /opt/intel/1ccl-wks/setvars.sh + +export USE_XETLA=OFF +if [[ $KERNEL_VERSION != *"6.5"* ]]; then + export SYCL_PI_LEVEL_ZERO_USE_IMMEDIATE_COMMANDLISTS=1 +fi +export TORCH_LLM_ALLREDUCE=0 + +export IPEX_LLM_LAST_LM_HEAD=1 +export IPEX_LLM_QUANTIZE_KV_CACHE=1 +export IPEX_LLM_LOW_MEM=1 +export num_gpus=2 +export model_path="/llm/models/Llama-2-7b-chat-hf" +export low_bit="fp8" +# max requests = max_num_reqs * rank_num +export max_num_seqs="4" +export max_prefilled_seqs="0" + +cd /llm/pp_serving +CCL_ZE_IPC_EXCHANGE=sockets torchrun --standalone --nnodes=1 --nproc-per-node $num_gpus pipeline_serving.py --repo-id-or-model-path $model_path --low-bit $low_bit --max-num-seqs $max_num_seqs --max-prefilled-seqs $max_prefilled_seqs diff --git a/docker/llm/vllm_sycl/docker/start-vllm-service.sh b/docker/llm/vllm_sycl/docker/start-vllm-service.sh new file mode 100644 index 00000000000..c0d0f112c41 --- /dev/null +++ b/docker/llm/vllm_sycl/docker/start-vllm-service.sh @@ -0,0 +1,20 @@ +#!/bin/bash +model="YOUR_MODEL_PATH" +served_model_name="YOUR_MODEL_NAME" + +source /opt/intel/1ccl-wks/setvars.sh + +python -m ipex_llm.vllm.xpu.entrypoints.openai.api_server \ + --served-model-name $served_model_name \ + --port 8000 \ + --model $model \ + --trust-remote-code \ + --gpu-memory-utilization 0.75 \ + --device xpu \ + --dtype float16 \ + --enforce-eager \ + --load-in-low-bit sym_int4 \ + --max-model-len 4096 \ + --max-num-batched-tokens 10240 \ + --max-num-seqs 12 \ + --tensor-parallel-size 1 \ No newline at end of file diff --git a/docker/llm/vllm_sycl/docker/vllm_offline_inference.py b/docker/llm/vllm_sycl/docker/vllm_offline_inference.py new file mode 100644 index 00000000000..15ecf4f9e90 --- /dev/null +++ b/docker/llm/vllm_sycl/docker/vllm_offline_inference.py @@ -0,0 +1,61 @@ +# +# Copyright 2016 The BigDL Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Some parts of this file is adapted from +# https://github.com/vllm-project/vllm/blob/v0.2.1.post1/examples/offline_inference.py +# which is licensed under Apache License 2.0 +# +# Copyright 2023 The vLLM team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from vllm import SamplingParams +from ipex_llm.vllm.xpu.engine import IPEXLLMClass as LLM + +# Sample prompts. +prompts = [ + "Hello, my name is", + "The president of the United States is", + "The capital of France is", + "The future of AI is", +] +# Create a sampling params object. +sampling_params = SamplingParams(temperature=0.8, top_p=0.95) + +# Create an LLM. +llm = LLM(model="YOUR_MODEL", + device="xpu", + dtype="float16", + enforce_eager=True, + load_in_low_bit="sym_int4", + tensor_parallel_size=1) +# Generate texts from the prompts. The output is a list of RequestOutput objects +# that contain the prompt, generated text, and other information. +outputs = llm.generate(prompts, sampling_params) +# Print the outputs. +for output in outputs: + prompt = output.prompt + generated_text = output.outputs[0].text + print(f"Prompt: {prompt!r}, Generated text: {generated_text!r}") \ No newline at end of file diff --git a/docker/llm/vllm_sycl/docker/vllm_online_benchmark.py b/docker/llm/vllm_sycl/docker/vllm_online_benchmark.py new file mode 100644 index 00000000000..2db0ff18b99 --- /dev/null +++ b/docker/llm/vllm_sycl/docker/vllm_online_benchmark.py @@ -0,0 +1,289 @@ +import requests +import time +from concurrent.futures import ThreadPoolExecutor +import concurrent +import numpy as np +from tqdm import tqdm +import json +import random +import sys + +if len(sys.argv) < 3: + print("Usage: python bench.py ") + sys.exit(1) + +print("running bench.py") +model_name = sys.argv[1] +print("model_name: " + str(model_name)) +max_seq = sys.argv[2] +print("max_seq: " + str(max_seq)) + +PROMPT_32 = "Once upon a time, there existed a little girl who liked to have adventures. She wanted to go to places and meet new people, and have fun" + +PROMPT_128 = "In a distant future, humanity has expanded across the galaxy, establishing colonies on numerous planets. The interstellar community thrives under the guidance of the United Galactic Federation, which ensures peace and prosperity. However, a new threat emerges from the unknown regions of space, challenging the stability and security of the galaxy. Brave explorers and seasoned warriors must unite to uncover the secrets of this mysterious force and protect the future of all sentient beings. Please continue the above story as long as possible, preferably more than 1000 tokens." + +PROMPT_1024 = "Once upon a time, there existed a little girl who liked to have adventures. She wanted to go to places and meet new people, and have fun. However, her parents were always telling her to stay close to home, to be careful, and to avoid any danger. But the little girl was stubborn, and she wanted to see what was on the other side of the mountain. So she sneaked out of the house one night, leaving a note for her parents, and set off on her journey. As she climbed the mountain, the little girl felt a sense of excitement and wonder. She had never been this far away from home before, and she couldnt wait to see what she would find on the other side. She climbed higher and higher, her lungs burning from the thin air, until she finally reached the top of the mountain. And there, she found a beautiful meadow filled with wildflowers and a sparkling stream. The little girl danced and played in the meadow, feeling free and alive. She knew she had to return home eventually, but for now, she was content to enjoy her adventure. As the sun began to set, the little girl reluctantly made her way back down the mountain, but she knew that she would never forget her adventure and the joy of discovering something new and exciting. And whenever she felt scared or unsure, she would remember the thrill of climbing the mountain and the beauty of the meadow on the other side, and she would know that she could face any challenge that came her way, with courage and determination. She carried the memories of her journey in her heart, a constant reminder of the strength she possessed. The little girl returned home to her worried parents, who had discovered her note and anxiously awaited her arrival. They scolded her for disobeying their instructions and venturing into the unknown. But as they looked into her sparkling eyes and saw the glow on her face, their anger softened. They realized that their little girl had grown, that she had experienced something extraordinary. The little girl shared her tales of the mountain and the meadow with her parents, painting vivid pictures with her words. She spoke of the breathtaking view from the mountaintop, where the world seemed to stretch endlessly before her. She described the delicate petals of the wildflowers, vibrant hues that danced in the gentle breeze. And she recounted the soothing melody of the sparkling stream, its waters reflecting the golden rays of the setting sun. Her parents listened intently, captivated by her story. They realized that their daughter had discovered a part of herself on that journey—a spirit of curiosity and a thirst for exploration. They saw that she had learned valuable lessons about independence, resilience, and the beauty that lies beyond ones comfort zone. From that day forward, the little girls parents encouraged her to pursue her dreams and embrace new experiences. They understood that while there were risks in the world, there were also rewards waiting to be discovered. They supported her as she continued to embark on adventures, always reminding her to stay safe but never stifling her spirit. As the years passed, the little girl grew into a remarkable woman, fearlessly exploring the world and making a difference wherever she went. The lessons she had learned on that fateful journey stayed with her, guiding her through challenges and inspiring her to live life to the fullest. And so, the once timid little girl became a symbol of courage and resilience, a reminder to all who knew her that the greatest joys in life often lie just beyond the mountains we fear to climb. Her story spread far and wide, inspiring others to embrace their own journeys and discover the wonders that awaited them. In the end, the little girls adventure became a timeless tale, passed down through generations, reminding us all that sometimes, the greatest rewards come to those who dare to step into the unknown and follow their hearts. With each passing day, the little girls story continued to inspire countless individuals, igniting a spark within their souls and encouraging them to embark on their own extraordinary adventures. The tale of her bravery and determination resonated deeply with people from all walks of life, reminding them of the limitless possibilities that awaited them beyond the boundaries of their comfort zones. People marveled at the little girls unwavering spirit and her unwavering belief in the power of dreams. They saw themselves reflected in her journey, finding solace in the knowledge that they too could overcome their fears and pursue their passions. The little girl\'s story became a beacon of hope, a testament to the human spirit" + +PROMPT_2048 = "“You’re an idiot,” she said.\nI smiled and leaned back in the chair, looking at her over my glasses. “No, I’m not.”\n“If you were smart you would have learned to dance years ago. You’ve got two left feet.” She held up both of her hands with four fingers extended then made a circular motion that looked like an airplane.\nI leaned forward and put my glasses on the table in front of me, reaching for her hands as I did so, grabbing them before they could leave mine. “The next time you do something like this, call me. The phone number is right here,” I said as I pointed at a piece of paper under a stack of papers on my desk.\n“Fine,” she huffed and turned to leave the room. But she stopped at the doorway when she saw the bookshelves that lined one wall. “What are these for?” She stepped closer, tilting her head back and forth as she looked up. The shelves were three stories high with stacks of books on every level.\n“Books.” I smiled again. “I have a lot of books.”\nShe didn’t respond to that so I continued: “And there are more in the basement.”\n“But you can’t move them all here, right? This place is just too small for all those books. Maybe we should look for a bigger office building.” She looked back at me but said nothing as she took another few steps towards the door and then stopped again when she saw my grandfather clock on the wall.\n“And this?” she pointed to the clock, which had been in the family for over seventy years. “It’s just a clock isn’t it?”\nI laughed. “You can say that, but I know better.” It was then that I told her my grandfather’s story. He made that clock, and it was his favorite possession. When he died she inherited the clock; or at least she thought she did. After a few weeks of trying to sell it on eBay, she gave up because no one would pay what she felt it was worth.\n“You should have had an auction,” she suggested, leaning in towards me again. “Then maybe you could get more for it.”\n“No,” I shook my head. “I don’t want to sell the clock.”\nShe smiled, but this time it didn’t reach her eyes. She took a step back and looked at me again, not saying anything, just staring. The only sound was the ticking of the grandfather clock in the background as she waited for my next words.\n“My grandfather made this clock. He did everything by hand.” I could see that she had no idea what to say or do so I continued: “It’s his favorite possession, and it means more to me than anything else he ever owned. So, if you want the books, you can have them…” I looked at her face for just a second before continuing, “but you won’t take the clock.”\nShe finally responded with: “But what about the money?” She looked around again and said, “I think we could make more selling these books than you would get from all of them. You must have thousands of books here!”\nI took another step forward and put my hand on her shoulder as I spoke to her in a very low voice. “You’ve got it all wrong,” I told her. “There are only two or three hundred books. I’m not looking for money – I’m looking for someone who understands how important this clock is.”\n“How much do you want for the books?” she asked, still staring at me intently as she waited for my answer.\n“Forget about the money,” I said again. “If you really want to buy them, we can take our time and talk more later. But if you just want their value in paperbacks, that’s not what they’re worth.” She still seemed confused by everything I had said so far, so I tried to simplify my words as much as possible: “The books are mine; the clock is my grandfather’s. These books have been passed down through several generations of our family and are irreplaceable. Do you understand?”\n“I guess not,” she answered as she walked away from me, still looking at me but not saying a word. She took two more steps before turning around to say one last thing: “Well, good luck with the books, then.” With that, she went back into her house and out of sight, still walking without talking.\nAfter a few minutes, I slowly walked back toward my grandfather’s home. As I got closer, I could see the roof in the distance; the white crosses on the top of it were hard to miss. It seemed as if the entire town had gathered around there at that moment – people were all over the road around us, watching the commotion and chattering about what was going on.\nWhen my grandfather first saw me, he looked up from his chair with a smile on his face: “There you are.” He looked down at his hands, then back toward me as I walked forward to talk to him for the first time in years: “It’s been too long since we last spoke; it’s good to see you again.”\n“And you,” I said. Then, looking past my grandfather and directly into the face of the man who was sitting next to him (my mother’s father), I said, “I see he got your clock back for you, too. How is he?” My grandfather smiled as he looked up at me again:\n“He’s fine,” he answered, still smiling as he watched my mother’s family and mine chat with one another in the middle of all these people – a situation that I had never seen before. “Come on inside.” He stood up from his chair to do just that; my mom and her sister were already walking out of the building. “I have things for you.”\nMy grandfather led us inside, down some steps where he used to serve as the pastor in his church; there was a big room full of chairs at the bottom with pictures on the wall – all kinds of pictures, from when my family first started coming here to visit and other pictures we took while staying here over the years. All these photographs were all around us as I followed my grandfather through the building:\n“My house is just up the street,” he said. He stopped at a picture on the wall that was taken in the summer when we came to visit, smiling as he looked toward it with his arms folded – the picture was of him and his wife and two of their daughters, all standing together by one of the trees outside; there were other pictures around this one, some from much earlier than when my grandfather first started serving here. “We used to sit in a booth in that restaurant right over there – you remember?” I nodded as we went past it.\nMy grandfather stopped at another picture on the wall: it was of him and his wife with two other families, all sitting around a table together, smiling. He looked down at this one for a moment; then he said, “We used to do things like this every year, when we came to visit.” It was an older picture than the last one my grandfather had stopped in front of; I didn’t know it before but now I realized how much he has aged.\nMy grandparents have lived together for many years. They used to live in a house right next door, so they could walk over whenever they wanted; that is what they have done here all these years – as my grandfather said, “we’ve come here every summer since I was eleven.” But he and his wife are getting old now. He isn’t able to walk much anymore, but it makes him happy when he does: “My health has not been good lately,” he said.\n“You will never have a better time in your life than this one right now; you will never be as happy as you are now.” And for the first time since I have known him – since I was very little and started coming here every summer – my grandfather smiled at me, his eyes sparkling with excitement.\n“I know,” I said. “That’s why I’m really looking forward to it. It will be a lot of fun.” Then he turned back to the picture again; “See this?” he asked, pointing. “I remember that day, all sixteen of us there together. I was eleven then – my dad had taken me and my brother for our first trip away from home – and that was when we used to go to the cottage.” He stared at it for a while longer; he had tears in his eyes. “I loved this picture,” he said, turning it over again with one hand so I could see the back of it.\n“This is my best memory,” he explained. “It was taken on my birthday. That’s what makes me happiest.” He pointed to a man who had a pipe in his mouth. “That’s my uncle,” he said. “He gave all of us kids cigars for our birthdays, and we used to take turns lighting them – then everyone would sit around outside in the sunshine and smoke together like that. It was such a good time.” Then he held up his hand, as if to say, that’s enough now; and he went on, “Anyway, I don’" + +from typing import AsyncGenerator, List, Tuple +from transformers import PreTrainedTokenizerBase +from vllm.transformers_utils.tokenizer import get_tokenizer +def sample_requests( + dataset_path: str, + num_requests: int, + model_path: str, + seed=42 +) -> List[Tuple[str, int, int]]: + # Load the dataset. + random.seed(seed) + np.random.seed(seed) + tokenizer = get_tokenizer(model_path, trust_remote_code=True) + with open(dataset_path) as f: + dataset = json.load(f) + # Filter out the conversations with less than 2 turns. + dataset = [ + data for data in dataset + if len(data["conversations"]) >= 2 + ] + # Only keep the first two turns of each conversation. + dataset = [ + (data["conversations"][0]["value"], data["conversations"][1]["value"]) + for data in dataset + ] + + # Tokenize the prompts and completions. + prompts = [prompt for prompt, _ in dataset] + prompt_token_ids = tokenizer(prompts).input_ids + completions = [completion for _, completion in dataset] + completion_token_ids = tokenizer(completions).input_ids + tokenized_dataset = [] + for i in range(len(dataset)): + output_len = len(completion_token_ids[i]) + tokenized_dataset.append((prompts[i], prompt_token_ids[i], output_len)) + + # Filter out too long sequences. + filtered_dataset: List[Tuple[str, int, int]] = [] + for prompt, prompt_token_ids, output_len in tokenized_dataset: + prompt_len = len(prompt_token_ids) + if prompt_len < 4 or output_len < 4: + # Prune too short sequences. + # This is because TGI causes errors when the input or output length + # is too short. + continue + if prompt_len > 1024 or prompt_len + output_len > 2048: + # Prune too long sequences. + continue + filtered_dataset.append((prompt, prompt_len, output_len)) + + # Sample the requests. + sampled_requests = random.sample(filtered_dataset, num_requests) + return sampled_requests + +# 定义一个函数来执行单个请求 +def perform_request(session, url, payload, headers): + start_time = time.perf_counter() + with session.post(url, json=payload, headers=headers, stream=True) as response: + # 确保响应成功 + response.raise_for_status() + + # 开始接收streaming响应 + first_token_time = None + last_token_time = 0 + first_token_inference_time = None + next_token_inference_time = None + next_token_time = [] + i = 0 + for line in response.iter_lines(): + + token_time = time.perf_counter() - start_time + if line: # 忽略心跳 + data = line.decode('utf-8').strip() + if data.startswith('data: '): + data = data[len('data: '):] + i = i + 1 + # print(i, " ", data) + try: + json_data = json.loads(data) + if 'choices' in json_data and len(json_data['choices']) > 0: + choice = json_data['choices'][0] + if 'finish_reason' in choice and (choice['finish_reason'] == 'length' or choice['finish_reason'] == 'stop'): + if 'first_token_time' in choice and isinstance(choice['first_token_time'], float): + first_token_inference_time = choice['first_token_time'] + if 'rest_token_time' in choice and isinstance(choice['rest_token_time'], float): + next_token_inference_time = choice['rest_token_time'] + else: + # 记录第一个token的时间 + if first_token_time is None: + first_token_time = token_time + else: + # 记录后续token的时间 + next_token_time.append(token_time - last_token_time) + last_token_time = token_time + except json.JSONDecodeError: + pass # 如果不是JSON数据,忽略错误 + + # 返回第一个token和后续token的latency + end_time = time.perf_counter() + # print("length: ", len(next_token_time)) + + return first_token_time, np.mean(next_token_time), end_time - start_time, first_token_inference_time, next_token_inference_time + +def extend_list_to_length(lst, target_length): + if target_length <= len(lst): + return lst[:] + + # 计算每个元素需要复制的次数 + times = target_length // len(lst) + # 计算不能整除的剩余部分 + remainder = target_length % len(lst) + + # 生成新列表:重复整个列表times次,再加上前remainder个元素 + extended_list = lst * times + lst[:remainder] + + return extended_list + +# 定义一个函数来执行benchmark +def benchmark(llm_urls, model, prompt, num_requests, max_concurrent_requests, max_tokens, is_warmup=False, dataset=None): + # 定义请求的payload和headers + + headers = {"Content-Type": "application/json"} + + first_token_latencies = [] + next_token_latencies = [] + total_responce_times = [] + first_token_inference_times = [] + next_token_inference_times = [] + cur_url_index = 0 + sampled_requests = [] + prompt_token_lens = [] + output_tokens_lens = [] + + + if not dataset is None: + sampled_requests = sample_requests(dataset, num_requests, model) + + # 使用Session对象以便复用连接 + with requests.Session() as session: + # 创建一个线程池 + with ThreadPoolExecutor(max_workers=max_concurrent_requests) as executor: + # 开始计时 + # time.sleep(1) + llm_url = llm_urls[cur_url_index] + cur_url_index = (cur_url_index + 1) % len(llm_urls) + + cur_llm_urls = extend_list_to_length(llm_urls, max_concurrent_requests) + cur_len = len(cur_llm_urls) + if dataset is None: + payload = { + "model": model_name, + "prompt": prompt, + "n": 1, + "best_of": 1, + "use_beam_search": False, + "temperature": 0.0, + "top_p": 1.0, + "max_tokens": max_tokens, + "ignore_eos": True, + "stream": True # 开启streaming模式 + } + futures = [executor.submit(perform_request, session, cur_llm_urls[index % cur_len], payload, headers) for index in range(num_requests)] + else: + payloads = [] + for index in range(num_requests): + prompt, prompt_len, output_len = sampled_requests[index] + payload = { + "model": model_name, + "prompt": prompt, + "n": 1, + "best_of": 1, + "use_beam_search": False, + "temperature": 0.0, + "top_p": 1.0, + "max_tokens": output_len, + "ignore_eos": True, + "stream": True # 开启streaming模式 + } + prompt_token_lens.append(prompt_len) + output_tokens_lens.append(output_len) + payloads.append(payload) + futures = [executor.submit(perform_request, session, cur_llm_urls[index % cur_len], payloads[index], headers) for index in range(num_requests)] + + + start_time = time.perf_counter() + + if is_warmup: + phase = "Warm Up" + else: + phase = "Benchmarking" + with tqdm(total=num_requests, desc=phase, unit="req", ncols=100) as pbar: + # 等待所有请求完成 + for future in concurrent.futures.as_completed(futures): + try: + first_token_latency, next_token_latency, total_responce_time, first_token_inference_time, next_token_inference_time = future.result() + first_token_latencies.append(first_token_latency) + next_token_latencies.append(next_token_latency) + total_responce_times.append(total_responce_time) + if first_token_inference_time: + first_token_inference_times.append(first_token_inference_time) + if next_token_inference_time: + next_token_inference_times.append(next_token_inference_time) + except Exception as e: + print(f"Request failed: {e}") + pbar.update(1) + + # 计算总用时 + if is_warmup: + return + total_time = time.perf_counter() - start_time + print(f"Total time for {num_requests} requests with {max_concurrent_requests} concurrent requests: {total_time} seconds.") + print(f"Average responce time: {np.mean(total_responce_times)}") + if dataset is None: + print(f"Token throughput: {num_requests * max_tokens / total_time}") + else: + print(f"Output token throughput: {sum(output_tokens_lens) / total_time}") + print(f"Total token throughput: {(sum(prompt_token_lens) + sum(output_tokens_lens)) / total_time}") + print() + if first_token_latencies: + average_first_token_latency = sum(first_token_latencies) / len(first_token_latencies) + p90_first_token_latency = np.percentile(first_token_latencies, 90) + p95_first_token_latency = np.percentile(first_token_latencies, 95) + average_first_token_inference_latency = np.mean(first_token_inference_times) + print(f"Average first token latency: {average_first_token_latency * 1000} milliseconds.") + print(f"P90 first token latency: {p90_first_token_latency * 1000} milliseconds.") + print(f"P95 first token latency: {p95_first_token_latency * 1000} milliseconds.") + #print(f"Average first token inference latency: {average_first_token_inference_latency * 1000} milliseconds.") + print() + if next_token_latencies: + average_next_token_latency = sum(next_token_latencies) / len(next_token_latencies) + p90_next_token_latency = np.percentile(next_token_latencies, 90) + p95_next_token_latency = np.percentile(next_token_latencies, 95) + average_next_token_inference_latency = np.mean(next_token_inference_times) + print(f"Average next token latency: {average_next_token_latency * 1000} milliseconds.") + print(f"P90 next token latency: {p90_next_token_latency * 1000} milliseconds.") + print(f"P95 next token latency: {p95_next_token_latency * 1000} milliseconds.") + #print(f"Average next token inference latency: {average_next_token_inference_latency * 1000} milliseconds.") + print() + + +# 设置benchmark参数 +LLM_URLS = [f"http://localhost:{PORT}/v1/completions" for PORT in [8000]] + + +MODEL = "llm/models/" + model_name +MAX_TOKENS = 512 + +PROMPT = PROMPT_1024 + +max_batch=int(max_seq) + +for MAX_CONCURRENT_REQUESTS in [max_batch]: + NUM_WARMUP = 2 * MAX_CONCURRENT_REQUESTS + NUM_REQUESTS = 5 * MAX_CONCURRENT_REQUESTS # 总请求次数 + + # to avoid warm_up time out + benchmark(LLM_URLS, MODEL, PROMPT_1024, 2, 1, 32, is_warmup = True) + benchmark(LLM_URLS, MODEL, PROMPT, NUM_WARMUP, MAX_CONCURRENT_REQUESTS, MAX_TOKENS, is_warmup = True) + + # 运行benchmark + benchmark(LLM_URLS, MODEL, PROMPT, NUM_REQUESTS, MAX_CONCURRENT_REQUESTS, MAX_TOKENS) diff --git a/python/llm/src/ipex_llm/vllm/xpu/entrypoints/openai/api_server.py b/python/llm/src/ipex_llm/vllm/xpu/entrypoints/openai/api_server.py index 198b45ade08..347ddd0e09b 100644 --- a/python/llm/src/ipex_llm/vllm/xpu/entrypoints/openai/api_server.py +++ b/python/llm/src/ipex_llm/vllm/xpu/entrypoints/openai/api_server.py @@ -2,13 +2,13 @@ import importlib import inspect import re +from argparse import Namespace from contextlib import asynccontextmanager from http import HTTPStatus -from typing import Optional, Set +from multiprocessing import Process +from typing import AsyncIterator, Set -import fastapi -import uvicorn -from fastapi import APIRouter, Request +from fastapi import APIRouter, FastAPI, Request from fastapi.exceptions import RequestValidationError from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, Response, StreamingResponse @@ -16,12 +16,13 @@ from starlette.routing import Mount import vllm.envs as envs +from vllm.config import ModelConfig from vllm.engine.arg_utils import AsyncEngineArgs -from vllm.engine.async_llm_engine import AsyncLLMEngine -from vllm.entrypoints.openai.cli_args import make_arg_parser from ipex_llm.vllm.xpu.engine import IPEXLLMAsyncLLMEngine as AsyncLLMEngine -from ipex_llm.utils.common import invalidInputError - +from vllm.engine.protocol import AsyncEngineClient +from vllm.entrypoints.launcher import serve_http +from vllm.entrypoints.logger import RequestLogger +from ipex_llm.vllm.xpu.entrypoints.openai.cli_args import make_arg_parser # yapf conflicts with isort for this block # yapf: disable from vllm.entrypoints.openai.protocol import (ChatCompletionRequest, @@ -32,37 +33,49 @@ EmbeddingRequest, ErrorResponse, TokenizeRequest, TokenizeResponse) +from vllm.entrypoints.openai.rpc.client import AsyncEngineRPCClient +from ipex_llm.vllm.xpu.entrypoints.openai.rpc.server import run_rpc_server # yapf: enable from vllm.entrypoints.openai.serving_chat import OpenAIServingChat from vllm.entrypoints.openai.serving_completion import OpenAIServingCompletion from vllm.entrypoints.openai.serving_embedding import OpenAIServingEmbedding -from vllm.entrypoints.openai.serving_tokenization import OpenAIServingTokenization +from vllm.entrypoints.openai.serving_tokenization import ( + OpenAIServingTokenization) from vllm.logger import init_logger from vllm.usage.usage_lib import UsageContext -from vllm.utils import FlexibleArgumentParser +from vllm.utils import FlexibleArgumentParser, get_open_port from vllm.version import __version__ as VLLM_VERSION TIMEOUT_KEEP_ALIVE = 5 # seconds -engine: AsyncLLMEngine +async_engine_client: AsyncEngineClient engine_args: AsyncEngineArgs openai_serving_chat: OpenAIServingChat openai_serving_completion: OpenAIServingCompletion openai_serving_embedding: OpenAIServingEmbedding openai_serving_tokenization: OpenAIServingTokenization -logger = init_logger("vllm.entrypoints.openai.api_server") +logger = init_logger('vllm.entrypoints.openai.api_server') _running_tasks: Set[asyncio.Task] = set() +def model_is_embedding(model_name: str, trust_remote_code: bool) -> bool: + return ModelConfig(model=model_name, + tokenizer=model_name, + tokenizer_mode="auto", + trust_remote_code=trust_remote_code, + seed=0, + dtype="float16").embedding_mode + + @asynccontextmanager -async def lifespan(app: fastapi.FastAPI): +async def lifespan(app: FastAPI): async def _force_log(): while True: await asyncio.sleep(10) - await engine.do_log_stats() + await async_engine_client.do_log_stats() if not engine_args.disable_log_stats: task = asyncio.create_task(_force_log()) @@ -72,21 +85,69 @@ async def _force_log(): yield +@asynccontextmanager +async def build_async_engine_client(args) -> AsyncIterator[AsyncEngineClient]: + # Context manager to handle async_engine_client lifecycle + # Ensures everything is shutdown and cleaned up on error/exit + global engine_args + engine_args = AsyncEngineArgs.from_cli_args(args) + + # Backend itself still global for the silly lil' health handler + global async_engine_client + + # If manually triggered or embedding model, use AsyncLLMEngine in process. + # TODO: support embedding model via RPC. + if (model_is_embedding(args.model, args.trust_remote_code) + or args.disable_frontend_multiprocessing): + async_engine_client = AsyncLLMEngine.from_engine_args( + engine_args, usage_context=UsageContext.OPENAI_API_SERVER, + load_in_low_bit=args.load_in_low_bit) + yield async_engine_client + return + + # Otherwise, use the multiprocessing AsyncLLMEngine. + else: + # Start RPCServer in separate process (holds the AsyncLLMEngine). + port = get_open_port(envs.VLLM_RPC_PORT) + load_in_low_bit = args.load_in_low_bit + rpc_server_process = Process(target=run_rpc_server, + args=(engine_args, + UsageContext.OPENAI_API_SERVER, + port, load_in_low_bit)) + rpc_server_process.start() + + # Build RPCClient, which conforms to AsyncEngineClient Protocol. + async_engine_client = AsyncEngineRPCClient(port) + await async_engine_client.setup() + + try: + yield async_engine_client + finally: + # Ensure rpc server process was terminated + rpc_server_process.terminate() + + # Close all open connections to the backend + async_engine_client.close() + + # Wait for server process to join + rpc_server_process.join() + + router = APIRouter() -def mount_metrics(app: fastapi.FastAPI): +def mount_metrics(app: FastAPI): # Add prometheus asgi middleware to route /metrics requests metrics_route = Mount("/metrics", make_asgi_app()) # Workaround for 307 Redirect for /metrics - metrics_route.path_regex = re.compile("^/metrics(?P.*)$") + metrics_route.path_regex = re.compile('^/metrics(?P.*)$') app.routes.append(metrics_route) @router.get("/health") async def health() -> Response: """Health check.""" - await openai_serving_chat.engine.check_health() + await async_engine_client.check_health() return Response(status_code=200) @@ -94,9 +155,10 @@ async def health() -> Response: async def tokenize(request: TokenizeRequest): generator = await openai_serving_tokenization.create_tokenize(request) if isinstance(generator, ErrorResponse): - return JSONResponse(content=generator.model_dump(), status_code=generator.code) + return JSONResponse(content=generator.model_dump(), + status_code=generator.code) else: - assert isinstance(generator, TokenizeResponse) # noqa + assert isinstance(generator, TokenizeResponse) return JSONResponse(content=generator.model_dump()) @@ -104,9 +166,10 @@ async def tokenize(request: TokenizeRequest): async def detokenize(request: DetokenizeRequest): generator = await openai_serving_tokenization.create_detokenize(request) if isinstance(generator, ErrorResponse): - return JSONResponse(content=generator.model_dump(), status_code=generator.code) + return JSONResponse(content=generator.model_dump(), + status_code=generator.code) else: - assert isinstance(generator, DetokenizeResponse) # noqa + assert isinstance(generator, DetokenizeResponse) return JSONResponse(content=generator.model_dump()) @@ -123,39 +186,48 @@ async def show_version(): @router.post("/v1/chat/completions") -async def create_chat_completion(request: ChatCompletionRequest, raw_request: Request): - generator = await openai_serving_chat.create_chat_completion(request, raw_request) +async def create_chat_completion(request: ChatCompletionRequest, + raw_request: Request): + generator = await openai_serving_chat.create_chat_completion( + request, raw_request) if isinstance(generator, ErrorResponse): - return JSONResponse(content=generator.model_dump(), status_code=generator.code) + return JSONResponse(content=generator.model_dump(), + status_code=generator.code) if request.stream: - return StreamingResponse(content=generator, media_type="text/event-stream") + return StreamingResponse(content=generator, + media_type="text/event-stream") else: - assert isinstance(generator, ChatCompletionResponse) # noqa + assert isinstance(generator, ChatCompletionResponse) return JSONResponse(content=generator.model_dump()) @router.post("/v1/completions") async def create_completion(request: CompletionRequest, raw_request: Request): - generator = await openai_serving_completion.create_completion(request, raw_request) + generator = await openai_serving_completion.create_completion( + request, raw_request) if isinstance(generator, ErrorResponse): - return JSONResponse(content=generator.model_dump(), status_code=generator.code) + return JSONResponse(content=generator.model_dump(), + status_code=generator.code) if request.stream: - return StreamingResponse(content=generator, media_type="text/event-stream") + return StreamingResponse(content=generator, + media_type="text/event-stream") else: return JSONResponse(content=generator.model_dump()) @router.post("/v1/embeddings") async def create_embedding(request: EmbeddingRequest, raw_request: Request): - generator = await openai_serving_embedding.create_embedding(request, raw_request) + generator = await openai_serving_embedding.create_embedding( + request, raw_request) if isinstance(generator, ErrorResponse): - return JSONResponse(content=generator.model_dump(), status_code=generator.code) + return JSONResponse(content=generator.model_dump(), + status_code=generator.code) else: return JSONResponse(content=generator.model_dump()) -def build_app(args): - app = fastapi.FastAPI(lifespan=lifespan) +def build_app(args: Namespace) -> FastAPI: + app = FastAPI(lifespan=lifespan) app.include_router(router) app.root_path = args.root_path @@ -172,10 +244,10 @@ def build_app(args): @app.exception_handler(RequestValidationError) async def validation_exception_handler(_, exc): err = openai_serving_chat.create_error_response(message=str(exc)) - return JSONResponse(err.model_dump(), status_code=HTTPStatus.BAD_REQUEST) + return JSONResponse(err.model_dump(), + status_code=HTTPStatus.BAD_REQUEST) - token = envs.VLLM_API_KEY or args.api_key - if token: + if token := envs.VLLM_API_KEY or args.api_key: @app.middleware("http") async def authentication(request: Request, call_next): @@ -185,7 +257,8 @@ async def authentication(request: Request, call_next): if not request.url.path.startswith(f"{root_path}/v1"): return await call_next(request) if request.headers.get("Authorization") != "Bearer " + token: - return JSONResponse(content={"error": "Unauthorized"}, status_code=401) + return JSONResponse(content={"error": "Unauthorized"}, + status_code=401) return await call_next(request) for middleware in args.middleware: @@ -196,51 +269,29 @@ async def authentication(request: Request, call_next): elif inspect.iscoroutinefunction(imported): app.middleware("http")(imported) else: - invalidInputError( - False, - f"Invalid middleware {middleware}. " f"Must be a function or a class." - ) + raise ValueError(f"Invalid middleware {middleware}. " + f"Must be a function or a class.") return app -def run_server(args, llm_engine=None): +async def init_app( + async_engine_client: AsyncEngineClient, + args: Namespace, +) -> FastAPI: app = build_app(args) - logger.info("vLLM API server version %s", VLLM_VERSION) - logger.info("args: %s", args) - if args.served_model_name is not None: served_model_names = args.served_model_name else: served_model_names = [args.model] - global engine, engine_args - - engine_args = AsyncEngineArgs.from_cli_args(args) - engine = ( - llm_engine - if llm_engine is not None - else AsyncLLMEngine.from_engine_args( - engine_args, - usage_context=UsageContext.OPENAI_API_SERVER, - load_in_low_bit=args.load_in_low_bit - ) - ) + model_config = await async_engine_client.get_model_config() - event_loop: Optional[asyncio.AbstractEventLoop] - try: - event_loop = asyncio.get_running_loop() - except RuntimeError: - event_loop = None - - if event_loop is not None and event_loop.is_running(): - # If the current is instanced by Ray Serve, - # there is already a running event loop - model_config = event_loop.run_until_complete(engine.get_model_config()) + if args.disable_log_requests: + request_logger = None else: - # When using single vLLM without engine_use_ray - model_config = asyncio.run(engine.get_model_config()) + request_logger = RequestLogger(max_log_len=args.max_log_len) global openai_serving_chat global openai_serving_completion @@ -248,59 +299,74 @@ def run_server(args, llm_engine=None): global openai_serving_tokenization openai_serving_chat = OpenAIServingChat( - engine, + async_engine_client, model_config, served_model_names, args.response_role, - args.lora_modules, - args.chat_template, + lora_modules=args.lora_modules, + prompt_adapters=args.prompt_adapters, + request_logger=request_logger, + chat_template=args.chat_template, + return_tokens_as_token_ids=args.return_tokens_as_token_ids, ) openai_serving_completion = OpenAIServingCompletion( - engine, + async_engine_client, model_config, served_model_names, - args.lora_modules, - args.prompt_adapters, + lora_modules=args.lora_modules, + prompt_adapters=args.prompt_adapters, + request_logger=request_logger, + return_tokens_as_token_ids=args.return_tokens_as_token_ids, ) openai_serving_embedding = OpenAIServingEmbedding( - engine, model_config, served_model_names + async_engine_client, + model_config, + served_model_names, + request_logger=request_logger, ) openai_serving_tokenization = OpenAIServingTokenization( - engine, model_config, served_model_names, args.lora_modules, args.chat_template + async_engine_client, + model_config, + served_model_names, + lora_modules=args.lora_modules, + request_logger=request_logger, + chat_template=args.chat_template, ) app.root_path = args.root_path - logger.info("Available routes are:") - for route in app.routes: - if not hasattr(route, "methods"): - continue - methods = ", ".join(route.methods) - logger.info("Route: %s, Methods: %s", route.path, methods) - - uvicorn.run( - app, - host=args.host, - port=args.port, - log_level=args.uvicorn_log_level, - timeout_keep_alive=TIMEOUT_KEEP_ALIVE, - ssl_keyfile=args.ssl_keyfile, - ssl_certfile=args.ssl_certfile, - ssl_ca_certs=args.ssl_ca_certs, - ssl_cert_reqs=args.ssl_cert_reqs, - ) + return app + + +async def run_server(args, **uvicorn_kwargs) -> None: + logger.info("vLLM API server version %s", VLLM_VERSION) + logger.info("args: %s", args) + + async with build_async_engine_client(args) as async_engine_client: + app = await init_app(async_engine_client, args) + + shutdown_task = await serve_http( + app, + host=args.host, + port=args.port, + log_level=args.uvicorn_log_level, + timeout_keep_alive=TIMEOUT_KEEP_ALIVE, + ssl_keyfile=args.ssl_keyfile, + ssl_certfile=args.ssl_certfile, + ssl_ca_certs=args.ssl_ca_certs, + ssl_cert_reqs=args.ssl_cert_reqs, + **uvicorn_kwargs, + ) + + # NB: Await server shutdown only after the backend context is exited + await shutdown_task if __name__ == "__main__": # NOTE(simon): # This section should be in sync with vllm/scripts.py for CLI entrypoints. parser = FlexibleArgumentParser( - description="vLLM OpenAI-Compatible RESTful API server." - ) + description="vLLM OpenAI-Compatible RESTful API server.") parser = make_arg_parser(parser) - parser.add_argument( - "--load-in-low-bit", - type=str, - default="sym_int4", - help="Low-bit quantization for IPEX-LLM models") args = parser.parse_args() - run_server(args) + + asyncio.run(run_server(args)) \ No newline at end of file diff --git a/python/llm/src/ipex_llm/vllm/xpu/entrypoints/openai/cli_args.py b/python/llm/src/ipex_llm/vllm/xpu/entrypoints/openai/cli_args.py new file mode 100644 index 00000000000..70af2a0389d --- /dev/null +++ b/python/llm/src/ipex_llm/vllm/xpu/entrypoints/openai/cli_args.py @@ -0,0 +1,163 @@ +""" +This file contains the command line arguments for the vLLM's +OpenAI-compatible server. It is kept in a separate file for documentation +purposes. +""" + +import argparse +import json +import ssl + +from vllm.engine.arg_utils import AsyncEngineArgs, nullable_str +from vllm.entrypoints.openai.serving_engine import (LoRAModulePath, + PromptAdapterPath) +from vllm.utils import FlexibleArgumentParser + + +class LoRAParserAction(argparse.Action): + + def __call__(self, parser, namespace, values, option_string=None): + lora_list = [] + for item in values: + name, path = item.split('=') + lora_list.append(LoRAModulePath(name, path)) + setattr(namespace, self.dest, lora_list) + + +class PromptAdapterParserAction(argparse.Action): + + def __call__(self, parser, namespace, values, option_string=None): + adapter_list = [] + for item in values: + name, path = item.split('=') + adapter_list.append(PromptAdapterPath(name, path)) + setattr(namespace, self.dest, adapter_list) + + +def make_arg_parser(parser: FlexibleArgumentParser) -> FlexibleArgumentParser: + parser.add_argument("--host", + type=nullable_str, + default=None, + help="host name") + parser.add_argument("--port", type=int, default=8000, help="port number") + parser.add_argument( + "--uvicorn-log-level", + type=str, + default="info", + choices=['debug', 'info', 'warning', 'error', 'critical', 'trace'], + help="log level for uvicorn") + parser.add_argument("--allow-credentials", + action="store_true", + help="allow credentials") + parser.add_argument("--allowed-origins", + type=json.loads, + default=["*"], + help="allowed origins") + parser.add_argument("--allowed-methods", + type=json.loads, + default=["*"], + help="allowed methods") + parser.add_argument("--allowed-headers", + type=json.loads, + default=["*"], + help="allowed headers") + parser.add_argument("--api-key", + type=nullable_str, + default=None, + help="If provided, the server will require this key " + "to be presented in the header.") + parser.add_argument( + "--lora-modules", + type=nullable_str, + default=None, + nargs='+', + action=LoRAParserAction, + help="LoRA module configurations in the format name=path. " + "Multiple modules can be specified.") + parser.add_argument( + "--prompt-adapters", + type=nullable_str, + default=None, + nargs='+', + action=PromptAdapterParserAction, + help="Prompt adapter configurations in the format name=path. " + "Multiple adapters can be specified.") + parser.add_argument("--chat-template", + type=nullable_str, + default=None, + help="The file path to the chat template, " + "or the template in single-line form " + "for the specified model") + parser.add_argument("--response-role", + type=nullable_str, + default="assistant", + help="The role name to return if " + "`request.add_generation_prompt=true`.") + parser.add_argument("--ssl-keyfile", + type=nullable_str, + default=None, + help="The file path to the SSL key file") + parser.add_argument("--ssl-certfile", + type=nullable_str, + default=None, + help="The file path to the SSL cert file") + parser.add_argument("--ssl-ca-certs", + type=nullable_str, + default=None, + help="The CA certificates file") + parser.add_argument( + "--ssl-cert-reqs", + type=int, + default=int(ssl.CERT_NONE), + help="Whether client certificate is required (see stdlib ssl module's)" + ) + parser.add_argument( + "--root-path", + type=nullable_str, + default=None, + help="FastAPI root_path when app is behind a path based routing proxy") + parser.add_argument( + "--middleware", + type=nullable_str, + action="append", + default=[], + help="Additional ASGI middleware to apply to the app. " + "We accept multiple --middleware arguments. " + "The value should be an import path. " + "If a function is provided, vLLM will add it to the server " + "using @app.middleware('http'). " + "If a class is provided, vLLM will add it to the server " + "using app.add_middleware(). ") + parser.add_argument( + "--return-tokens-as-token-ids", + action="store_true", + help="When --max-logprobs is specified, represents single tokens as " + "strings of the form 'token_id:{token_id}' so that tokens that " + "are not JSON-encodable can be identified.") + parser.add_argument( + "--disable-frontend-multiprocessing", + action="store_true", + help="If specified, will run the OpenAI frontend server in the same " + "process as the model serving engine.") + parser.add_argument( + "--load-in-low-bit", + type=str, + default="sym_int4", + help="Low-bit quantization for IPEX-LLM models") + + parser = AsyncEngineArgs.add_cli_args(parser) + + parser.add_argument('--max-log-len', + type=int, + default=None, + help='Max number of prompt characters or prompt ' + 'ID numbers being printed in log.' + '\n\nDefault: Unlimited') + + return parser + + +def create_parser_for_docs() -> FlexibleArgumentParser: + parser_for_docs = FlexibleArgumentParser( + prog="-m vllm.entrypoints.openai.api_server") + return make_arg_parser(parser_for_docs) diff --git a/python/llm/src/ipex_llm/vllm/xpu/entrypoints/openai/rpc/server.py b/python/llm/src/ipex_llm/vllm/xpu/entrypoints/openai/rpc/server.py new file mode 100644 index 00000000000..5596c24d30f --- /dev/null +++ b/python/llm/src/ipex_llm/vllm/xpu/entrypoints/openai/rpc/server.py @@ -0,0 +1,221 @@ +import asyncio +import signal +from typing import Any, Coroutine + +import cloudpickle +import zmq +import zmq.asyncio +from typing_extensions import Never + +from vllm import AsyncEngineArgs +from ipex_llm.vllm.xpu.engine import IPEXLLMAsyncLLMEngine as AsyncLLMEngine + +from vllm.entrypoints.openai.rpc import (VLLM_RPC_HEALTHY_STR, + VLLM_RPC_SUCCESS_STR, RPCAbortRequest, + RPCGenerateRequest, RPCUtilityRequest) +from vllm.logger import init_logger +from vllm.usage.usage_lib import UsageContext + +logger = init_logger(__name__) + + +class AsyncEngineRPCServer: + + def __init__(self, async_engine_args: AsyncEngineArgs, + usage_context: UsageContext, port: int, load_in_low_bit: str): + # Initialize engine first. + self.engine = AsyncLLMEngine.from_engine_args(async_engine_args, + usage_context=usage_context, + load_in_low_bit=load_in_low_bit) + + # Initialize context. + self.context = zmq.asyncio.Context() + + # Init socket for readiness state. + self.socket = self.context.socket(zmq.constants.ROUTER) + # Note numeric form of localhost should be used for zmq bind(), + # see https://stackoverflow.com/a/8958414 + self.socket.bind(f"tcp://127.0.0.1:{port}") + + def cleanup(self): + """Cleanup all resources.""" + self.socket.close() + self.context.destroy() + + async def get_model_config(self, identity): + """Send the ModelConfig""" + model_config = await self.engine.get_model_config() + + await self.socket.send_multipart( + [identity, cloudpickle.dumps(model_config)]) + + async def get_decoding_config(self, identity): + """Send the DecodingConfig""" + decoding_config = await self.engine.get_decoding_config() + + await self.socket.send_multipart( + [identity, cloudpickle.dumps(decoding_config)]) + + async def get_lora_config(self, identity): + lora_config = await self.engine.get_lora_config() + + await self.socket.send_multipart( + [identity, cloudpickle.dumps(lora_config)]) + + async def get_scheduler_config(self, identity): + """Send the SchedulerConfig""" + parallel_config = await self.engine.get_scheduler_config() + + await self.socket.send_multipart( + [identity, cloudpickle.dumps(parallel_config)]) + + async def get_parallel_config(self, identity): + """Send the ParallelConfig""" + parallel_config = await self.engine.get_parallel_config() + + await self.socket.send_multipart( + [identity, cloudpickle.dumps(parallel_config)]) + + async def is_tracing_enabled(self, identity): + """Send the is_tracing_enabled flag""" + tracing_flag = await self.engine.is_tracing_enabled() + + await self.socket.send_multipart( + [identity, cloudpickle.dumps(tracing_flag)]) + + async def do_log_stats(self, identity): + """Log stats and confirm success.""" + await self.engine.do_log_stats() + + await self.socket.send_multipart([ + identity, + cloudpickle.dumps(VLLM_RPC_SUCCESS_STR), + ]) + + async def is_server_ready(self, identity): + """Notify the client that we are ready.""" + await self.socket.send_multipart([ + identity, + cloudpickle.dumps(VLLM_RPC_SUCCESS_STR), + ]) + + async def abort(self, identity, request: RPCAbortRequest): + """Abort request and notify the client of success.""" + # Abort the request in the llm engine. + await self.engine.abort(request.request_id) + + # Send confirmation to the client. + await self.socket.send_multipart([ + identity, + cloudpickle.dumps(VLLM_RPC_SUCCESS_STR), + ]) + + async def generate(self, identity, generate_request: RPCGenerateRequest): + try: + results_generator = self.engine.generate( + generate_request.inputs, + sampling_params=generate_request.sampling_params, + request_id=generate_request.request_id, + lora_request=generate_request.lora_request, + trace_headers=generate_request.trace_headers, + prompt_adapter_request=generate_request.prompt_adapter_request) + + async for request_output in results_generator: + await self.socket.send_multipart( + [identity, cloudpickle.dumps(request_output)]) + + except Exception as e: + ### Notify client of all failures + await self.socket.send_multipart([identity, cloudpickle.dumps(e)]) + + async def check_health(self, identity): + try: + await self.engine.check_health() + await self.socket.send_multipart( + [identity, cloudpickle.dumps(VLLM_RPC_HEALTHY_STR)]) + except Exception as e: + await self.socket.send_multipart([identity, cloudpickle.dumps(e)]) + + def _make_handler_coro(self, identity, + message) -> Coroutine[Any, Any, Never]: + """Route the zmq message to the handler coroutine.""" + + request = cloudpickle.loads(message) + + if isinstance(request, RPCGenerateRequest): + return self.generate(identity, request) + + elif isinstance(request, RPCAbortRequest): + return self.abort(identity, request) + + elif isinstance(request, RPCUtilityRequest): + if request == RPCUtilityRequest.GET_MODEL_CONFIG: + return self.get_model_config(identity) + elif request == RPCUtilityRequest.GET_PARALLEL_CONFIG: + return self.get_parallel_config(identity) + elif request == RPCUtilityRequest.GET_DECODING_CONFIG: + return self.get_decoding_config(identity) + elif request == RPCUtilityRequest.GET_SCHEDULER_CONFIG: + return self.get_scheduler_config(identity) + elif request == RPCUtilityRequest.GET_LORA_CONFIG: + return self.get_lora_config(identity) + elif request == RPCUtilityRequest.DO_LOG_STATS: + return self.do_log_stats(identity) + elif request == RPCUtilityRequest.IS_SERVER_READY: + return self.is_server_ready(identity) + elif request == RPCUtilityRequest.CHECK_HEALTH: + return self.check_health(identity) + elif request == RPCUtilityRequest.IS_TRACING_ENABLED: + return self.is_tracing_enabled(identity) + else: + raise ValueError(f"Unknown RPCUtilityRequest type: {request}") + + else: + raise ValueError(f"Unknown RPCRequest type: {request}") + + async def run_server_loop(self): + """Inner RPC Server Loop""" + + running_tasks = set() + while True: + # Wait for a request. + identity, message = await self.socket.recv_multipart() + + # Process the request async. + task = asyncio.create_task( + self._make_handler_coro(identity, message)) + + # We need to keep around a strong reference to the task, + # to avoid the task disappearing mid-execution as running tasks + # can be GC'ed. Below is a common "fire-and-forget" tasks + # https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task + running_tasks.add(task) + task.add_done_callback(running_tasks.discard) + + +async def run_server(server: AsyncEngineRPCServer): + # Put the server task into the asyncio loop. + loop = asyncio.get_running_loop() + server_task = loop.create_task(server.run_server_loop()) + + # Interruption handling. + def signal_handler() -> None: + # Kill the server on interrupt / terminate + server_task.cancel() + + loop.add_signal_handler(signal.SIGINT, signal_handler) + loop.add_signal_handler(signal.SIGTERM, signal_handler) + + try: + await server_task + except asyncio.CancelledError: + logger.info("vLLM ZMQ RPC Server was interrupted.") + finally: + # Clean up all resources. + server.cleanup() + + +def run_rpc_server(async_engine_args: AsyncEngineArgs, + usage_context: UsageContext, port: int, load_in_low_bit: str): + server = AsyncEngineRPCServer(async_engine_args, usage_context, port, load_in_low_bit) + asyncio.run(run_server(server))