From 4834bbb0f06dc14c755468a21fea71fce04f538e Mon Sep 17 00:00:00 2001 From: Russell Bryant Date: Tue, 15 Oct 2024 20:52:06 +0000 Subject: [PATCH 1/5] [Misc] benchmark: Add option to set max concurrency Add a new flag to `benchmark_serving.py` that allows you to specify the maximum number of concurrent requests. If not specified, it defaults to the current behavior of unbounded concurrency. Signed-off-by: Russell Bryant --- benchmarks/benchmark_serving.py | 29 +++++++++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/benchmarks/benchmark_serving.py b/benchmarks/benchmark_serving.py index c1a396c81f666..156a65af91cbb 100644 --- a/benchmarks/benchmark_serving.py +++ b/benchmarks/benchmark_serving.py @@ -397,6 +397,7 @@ async def benchmark( selected_percentile_metrics: List[str], selected_percentiles: List[str], ignore_eos: bool, + max_concurrency: Optional[int], ): if backend in ASYNC_REQUEST_FUNCS: request_func = ASYNC_REQUEST_FUNCS[backend] @@ -448,6 +449,16 @@ async def benchmark( pbar = None if disable_tqdm else tqdm(total=len(input_requests)) + semaphore = asyncio.Semaphore(max_concurrency) if max_concurrency else None + + async def limited_request_func(request_func_input, pbar): + if semaphore: + async with semaphore: + return await request_func( + request_func_input=request_func_input, pbar=pbar) + return await request_func(request_func_input=request_func_input, + pbar=pbar) + benchmark_start_time = time.perf_counter() tasks: List[asyncio.Task] = [] async for request in get_request(input_requests, request_rate): @@ -463,8 +474,8 @@ async def benchmark( ignore_eos=ignore_eos) tasks.append( asyncio.create_task( - request_func(request_func_input=request_func_input, - pbar=pbar))) + limited_request_func(request_func_input=request_func_input, + pbar=pbar))) outputs: List[RequestFuncOutput] = await asyncio.gather(*tasks) if profile: @@ -680,6 +691,7 @@ def main(args: argparse.Namespace): float(p) for p in args.metric_percentiles.split(",") ], ignore_eos=args.ignore_eos, + max_concurrency=args.max_concurrency, )) # Save config and results to json @@ -766,6 +778,19 @@ def main(args: argparse.Namespace): default=None, help="Path to the sharegpt/sonnet dataset. " "Or the huggingface dataset ID if using HF dataset.") + parser.add_argument( + "--max-concurrency", + type=int, + default=None, + help="Maximum number of concurrent requests. This can be used " + "to help simulate an environment where a higher level component " + "is enforcing a maximum number of concurrent requests. While the " + "--request-rate argument controls the rate at which requests are " + "initiated, this argument will control how many are actually allowed " + "to execute at a time. This means that when used in combination, the " + "actual request rate may be lower than specified with --request-rate, " + "if the server is not processing requests fast enough to keep up.") + parser.add_argument( "--model", type=str, From 520b43f364bb79f27780df97fba75675d69e41ca Mon Sep 17 00:00:00 2001 From: Russell Bryant Date: Wed, 16 Oct 2024 20:11:54 +0000 Subject: [PATCH 2/5] benchmark: Add max_concurrency to logging and result data Signed-off-by: Russell Bryant --- benchmarks/benchmark_serving.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/benchmarks/benchmark_serving.py b/benchmarks/benchmark_serving.py index 156a65af91cbb..5e37d22a2e497 100644 --- a/benchmarks/benchmark_serving.py +++ b/benchmarks/benchmark_serving.py @@ -446,6 +446,7 @@ async def benchmark( print("Profiler started") print(f"Traffic request rate: {request_rate}") + print(f"Maximum request concurrency: {max_concurrency}") pbar = None if disable_tqdm else tqdm(total=len(input_requests)) @@ -721,6 +722,7 @@ def main(args: argparse.Namespace): # Traffic result_json["request_rate"] = ( args.request_rate if args.request_rate < float("inf") else "inf") + result_json["max_concurrency"] = args.max_concurrency # Merge with benchmark result result_json = {**result_json, **benchmark_result} From 6bb7eff9961407f62ded78a12f2bf852c4c2113c Mon Sep 17 00:00:00 2001 From: Russell Bryant Date: Wed, 16 Oct 2024 20:21:58 +0000 Subject: [PATCH 3/5] benchmark: Simplify by using nullcontext Signed-off-by: Russell Bryant --- benchmarks/benchmark_serving.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/benchmarks/benchmark_serving.py b/benchmarks/benchmark_serving.py index 5e37d22a2e497..82c0b73e16629 100644 --- a/benchmarks/benchmark_serving.py +++ b/benchmarks/benchmark_serving.py @@ -25,6 +25,7 @@ import argparse import asyncio import base64 +import contextlib import io import json import os @@ -450,15 +451,13 @@ async def benchmark( pbar = None if disable_tqdm else tqdm(total=len(input_requests)) - semaphore = asyncio.Semaphore(max_concurrency) if max_concurrency else None + semaphore = (asyncio.Semaphore(max_concurrency) + if max_concurrency else contextlib.nullcontext()) async def limited_request_func(request_func_input, pbar): - if semaphore: - async with semaphore: - return await request_func( - request_func_input=request_func_input, pbar=pbar) - return await request_func(request_func_input=request_func_input, - pbar=pbar) + async with semaphore: + return await request_func(request_func_input=request_func_input, + pbar=pbar) benchmark_start_time = time.perf_counter() tasks: List[asyncio.Task] = [] From 6d8fb553ce82929057c6ea489e5d2b6864aa5280 Mon Sep 17 00:00:00 2001 From: Russell Bryant Date: Wed, 16 Oct 2024 21:21:44 +0000 Subject: [PATCH 4/5] benchmarking: Add concurrency to default filename Signed-off-by: Russell Bryant --- benchmarks/benchmark_serving.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/benchmarks/benchmark_serving.py b/benchmarks/benchmark_serving.py index 82c0b73e16629..0068a878f07fb 100644 --- a/benchmarks/benchmark_serving.py +++ b/benchmarks/benchmark_serving.py @@ -728,7 +728,9 @@ def main(args: argparse.Namespace): # Save to file base_model_id = model_id.split("/")[-1] - file_name = f"{backend}-{args.request_rate}qps-{base_model_id}-{current_dt}.json" #noqa + max_concurrency_str = (f"-concurrency{args.max_concurrency}" + if args.max_concurrency is not None else "") + file_name = f"{backend}-{args.request_rate}qps{max_concurrency_str}-{base_model_id}-{current_dt}.json" #noqa if args.result_filename: file_name = args.result_filename if args.result_dir: From a37c18f37eb40dc8c3ec759147e277bc5cb17547 Mon Sep 17 00:00:00 2001 From: Russell Bryant Date: Thu, 17 Oct 2024 15:14:11 +0000 Subject: [PATCH 5/5] benchmarks: fix compat with Python 3.9 Signed-off-by: Russell Bryant --- benchmarks/benchmark_serving.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/benchmarks/benchmark_serving.py b/benchmarks/benchmark_serving.py index 0068a878f07fb..4580729fa4767 100644 --- a/benchmarks/benchmark_serving.py +++ b/benchmarks/benchmark_serving.py @@ -25,7 +25,6 @@ import argparse import asyncio import base64 -import contextlib import io import json import os @@ -451,10 +450,17 @@ async def benchmark( pbar = None if disable_tqdm else tqdm(total=len(input_requests)) + # This can be used once the minimum Python version is 3.10 or higher, + # and it will simplify the code in limited_request_func. + # semaphore = (asyncio.Semaphore(max_concurrency) + # if max_concurrency else contextlib.nullcontext()) semaphore = (asyncio.Semaphore(max_concurrency) - if max_concurrency else contextlib.nullcontext()) + if max_concurrency else None) async def limited_request_func(request_func_input, pbar): + if semaphore is None: + return await request_func(request_func_input=request_func_input, + pbar=pbar) async with semaphore: return await request_func(request_func_input=request_func_input, pbar=pbar)