Skip to content

Commit

Permalink
LLM: Refactor Pipeline-Parallel-FastAPI example (#11319)
Browse files Browse the repository at this point in the history
Initially Refactor for Pipeline-Parallel-FastAPI example
  • Loading branch information
xiangyuT authored Jun 25, 2024
1 parent 34c15d3 commit 8ddae22
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 704 deletions.
2 changes: 1 addition & 1 deletion python/llm/example/GPU/Pipeline-Parallel-FastAPI/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pip install mpi4py fastapi uvicorn openai
pip install gradio # for gradio web UI
conda install -c conda-forge -y gperftools=2.10 # to enable tcmalloc

pip install transformers==4.31.0 # for llama2 models
pip install transformers==4.37.0
```

### 2. Run pipeline parallel serving on multiple GPUs
Expand Down
112 changes: 52 additions & 60 deletions python/llm/example/GPU/Pipeline-Parallel-FastAPI/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,29 +30,36 @@ def perform_request(session, url, payload, headers):
start_time = time.perf_counter()
with session.post(url, json=payload, headers=headers, stream=True) as response:
response.raise_for_status()

first_token_time = None
last_token_time = 0
first_token_inference_time = None
next_token_inference_time = None
next_token_time = []
i = 0
for line in response.iter_lines():

token_time = time.perf_counter() - start_time
if line:
data = line.decode("utf-8").strip()
i = i + 1
try:
json_data = json.loads(data)
if json_data["message"] is not None:
if first_token_time is None:
first_token_time = token_time
else:
next_token_time.append(token_time - last_token_time)
last_token_time = token_time
except json.JSONDecodeError:
pass
data = line.decode('utf-8').strip()
if data.startswith('data: '):
data = data[len('data: '):]
i = i + 1
try:
json_data = json.loads(data)
if 'choices' in json_data and len(json_data['choices']) > 0:
choice = json_data['choices'][0]
if 'finish_reason' in choice and (choice['finish_reason'] == 'length' or choice['finish_reason'] == 'stop'):
if 'first_token_time' in choice and isinstance(choice['first_token_time'], float):
first_token_inference_time = choice['first_token_time']
if 'rest_token_time' in choice and isinstance(choice['rest_token_time'], float):
next_token_inference_time = choice['rest_token_time']
else:
if first_token_time is None:
first_token_time = token_time
else:
next_token_time.append(token_time - last_token_time)
last_token_time = token_time
except json.JSONDecodeError:
pass
end_time = time.perf_counter()
return (
first_token_time,
Expand All @@ -76,11 +83,11 @@ def extend_list_to_length(lst, target_length):
def benchmark(
llm_urls,
prompt,
num_warmup_requests,
num_requests,
max_concurrent_requests,
max_tokens,
prompt_length,
is_warmup=False,
):

headers = {"Content-Type": "application/json"}
Expand All @@ -92,6 +99,8 @@ def benchmark(
next_token_inference_times = []
cur_url_index = 0

num_requests = num_requests + num_warmup_requests

with requests.Session() as session:
with ThreadPoolExecutor(max_workers=max_concurrent_requests) as executor:
llm_url = llm_urls[cur_url_index]
Expand All @@ -101,8 +110,17 @@ def benchmark(
cur_len = len(cur_llm_urls)

payload = {
"model": "Meta-Llama-3-8B-Instruct",
"prompt": prompt,
"n_predict": max_tokens,
"max_tokens": max_tokens,
"stream": True,
# for vllm openai api server
"ignore_eos": True,
"n": 1,
"best_of": 1,
"use_beam_search": False,
"temperature": 0.0,
"top_p": 1.0,
}
futures = [
executor.submit(
Expand All @@ -115,14 +133,13 @@ def benchmark(
for index in range(num_requests)
]

start_time = time.perf_counter()
phase = "Benchmarking"

if is_warmup:
phase = "Warm Up"
else:
phase = "Benchmarking"
with tqdm(total=num_requests, desc=phase, unit="req", ncols=100) as pbar:
cur_index = 0
for future in concurrent.futures.as_completed(futures):
if cur_index == num_warmup_requests:
start_time = time.perf_counter()
try:
(
first_token_latency,
Expand All @@ -131,21 +148,21 @@ def benchmark(
first_token_inference_time,
next_token_inference_time,
) = future.result()
first_token_latencies.append(first_token_latency)
next_token_latencies.append(next_token_latency)
total_responce_times.append(total_responce_time)
if first_token_inference_time:
first_token_inference_times.append(
first_token_inference_time
)
if next_token_inference_time:
next_token_inference_times.append(next_token_inference_time)
cur_index = cur_index + 1
if cur_index > num_warmup_requests:
first_token_latencies.append(first_token_latency)
next_token_latencies.append(next_token_latency)
total_responce_times.append(total_responce_time)
if first_token_inference_time:
first_token_inference_times.append(
first_token_inference_time
)
if next_token_inference_time:
next_token_inference_times.append(next_token_inference_time)
except Exception as e:
print(f"Request failed: {e}")
pbar.update(1)

if is_warmup:
return
total_time = time.perf_counter() - start_time
log_file = f"{max_concurrent_requests}.log"

Expand Down Expand Up @@ -174,9 +191,6 @@ def benchmark(
)
p90_first_token_latency = np.percentile(first_token_latencies, 90)
p95_first_token_latency = np.percentile(first_token_latencies, 95)
# average_first_token_inference_latency = np.mean(
# first_token_inference_times
# )
print(
f"Average first token latency: {average_first_token_latency * 1000} milliseconds.",
file=file,
Expand All @@ -189,10 +203,6 @@ def benchmark(
f"P95 first token latency: {p95_first_token_latency * 1000} milliseconds.",
file=file,
)
# print(
# f"Average first token inference latency: {average_first_token_inference_latency * 1000} milliseconds.",
# file=file,
# )
print(file=file)

if next_token_latencies:
Expand All @@ -201,9 +211,6 @@ def benchmark(
)
p90_next_token_latency = np.percentile(next_token_latencies, 90)
p95_next_token_latency = np.percentile(next_token_latencies, 95)
# average_next_token_inference_latency = np.mean(
# next_token_inference_times
# )
print(
f"Average next token latency: {average_next_token_latency * 1000} milliseconds.",
file=file,
Expand All @@ -216,14 +223,10 @@ def benchmark(
f"P95 next token latency: {p95_next_token_latency * 1000} milliseconds.",
file=file,
)
# print(
# f"Average next token inference latency: {average_next_token_inference_latency * 1000} milliseconds.",
# file=file,
# )
print(file=file)


LLM_URLS = [f"http://localhost:{PORT}/generate_stream/" for PORT in [8000]]
LLM_URLS = [f"http://localhost:{PORT}/v1/completions" for PORT in [8000]]

parser = argparse.ArgumentParser(description="Set prompt length.")
parser.add_argument(
Expand Down Expand Up @@ -254,17 +257,6 @@ def benchmark(

for MAX_CONCURRENT_REQUESTS in args.max_concurrent_requests:
NUM_WARMUP = 5 * MAX_CONCURRENT_REQUESTS
NUM_REQUESTS = 10 * MAX_CONCURRENT_REQUESTS

# warm up
benchmark(
LLM_URLS,
PROMPT,
NUM_WARMUP,
MAX_CONCURRENT_REQUESTS,
MAX_TOKENS,
PROMPT_LENGTH,
is_warmup=True,
)
NUM_REQUESTS = 30 * MAX_CONCURRENT_REQUESTS

benchmark(LLM_URLS, PROMPT, NUM_REQUESTS, MAX_CONCURRENT_REQUESTS, MAX_TOKENS, PROMPT_LENGTH)
benchmark(LLM_URLS, PROMPT, NUM_WARMUP, NUM_REQUESTS, MAX_CONCURRENT_REQUESTS, MAX_TOKENS, PROMPT_LENGTH)
Loading

0 comments on commit 8ddae22

Please sign in to comment.