-
Notifications
You must be signed in to change notification settings - Fork 221
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add generation server scripts using HF accelerate and DS-inference #328
Changes from 25 commits
47f3fc0
435af43
d1676fc
eef490c
25d0c70
7be1410
5c31d9a
2905955
12c4cf7
46ade32
c46d957
f3dac05
4425614
c97d6ea
f3385f2
8f25200
b11bb7f
99dedb0
497f00e
aa8c08c
25b5d85
9201770
649d7f8
aa9ebea
997a5fa
85d9fcb
11d50f1
403424b
493b2ee
c84d9b7
81d1469
a85b488
43a844c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
## Inference solutions for BLOOM 176B | ||
We support HuggingFace accelerate and DeepSpeed Inference for generation. | ||
|
||
Required packages: | ||
1. [DeepSpeed](https://github.com/microsoft/DeepSpeed) | ||
1. [DeepSpeed MII](https://github.com/microsoft/DeepSpeed-MII) | ||
1. [HuggingFace accelerate](https://github.com/huggingface/accelerate) | ||
|
||
All the provided scripts are tested on 8 A100 80GB GPUs for BLOOM 176B. These scripts might not work for other models or a different number of GPUs. | ||
DS inference only supports fp16 for cli and server application. However, for benchmarking, it supports both fp16 and bf16. bf16 support will be added once DeepSpeed adds suitable CUDA kernels for these. | ||
|
||
DS inference is deployed using the DeepSpeed MII library which requires the resharded checkpoints for 8 x Tensor Parallel. The HuggingFace checkpoints can be resharded and cached using the following command: | ||
```shell | ||
deepspeed --num_gpus 8 scripts/bloom-inference-server/cache_ds_checkpoints.py --model_name bigscience/bloom --dtype fp16 --save_mp_checkpoint_path <PATH TO DS CACHED MODEL> | ||
``` | ||
Note: Running the above script will consume ~350 GB of disk space and will take some time (~30 minutes), depending on both the speed of your GPUs and storage. | ||
|
||
Note: sometimes GPU memory is not freed when DS inference deployment is shutdown. You can free this memory by running: | ||
```python | ||
import mii | ||
mii.terminate("ds_inference_grpc_server") | ||
``` | ||
or alternatively, just doing a `killall python` in terminal. | ||
|
||
#### BLOOM inference via command-line | ||
This asks for generate_kwargs everytime. | ||
Example: generate_kwargs = | ||
```json | ||
{"min_length": 100, "max_new_tokens": 100, "do_sample": false} | ||
``` | ||
|
||
1. using HF accelerate | ||
```shell | ||
python scripts/bloom-inference-server/cli.py --model_name bigscience/bloom --dtype bf16 --deployment_framework hf_accelerate --generate_kwargs '{"min_length": 100, "max_new_tokens": 100, "do_sample": false}' | ||
``` | ||
|
||
2. using DS inference | ||
```shell | ||
python scripts/bloom-inference-server/cli.py --model_name bigscience/bloom --dtype fp16 --deployment_framework ds_inference --save_mp_checkpoint_path <PATH TO DS CACHED MODEL> --generate_kwargs '{"min_length": 100, "max_new_tokens": 100, "do_sample": false}' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd say make and list it as an option below for those who want it - and also how to load it after it was saved of course. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. have you tried running this one? I run into multiple issues here:
if I make the dir:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @stas00 . The caching is not done every time. I think the argument name is misleading. I will change that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For caching, one needs to run cache_ds_checkpoints. Will change argument name here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, the backend for this is running via DeepSpeed MII. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. LOL @stas00 I dropped the caching functionality since I figured that no one would use it. Especially now, when Microsoft has provided the weights themselves. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You might need to look back 3-4 commits @pai4451 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh, I see, yes, we should leave it in place. Especially since eventually this will be no longer bloom-176b specific, so having the ability to generate small weights locally would be beneficial. apologies if I wasn't clear in the first place, I was suggesting to add the support to the pre-sharded weights from the hub, but not dropping the original way. so if possible please restore those? and of course we want to document that. That's another way to approach it, is to add to README a code example on how to pre-shard and save the weights - instead of having it implemented in the code. Whatever you feel is more intuitive to you, @mayank31398 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
@mayank31398 @stas00 Thanks, I can run the ZeroQuant int8 version of bloom now. But the output is repetitive, has it been fixed right now?
My generate_kwargs are
@mayank31398 Any changes did you make to fix this bug? Thanks in advance. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, are you on the latest commit (master branch) in DeepSpeed @pai4451 ? |
||
``` | ||
|
||
#### BLOOM server deployment | ||
1. using HF accelerate | ||
```shell | ||
stas00 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
python scripts/bloom-inference-server/server.py --model_name bigscience/bloom --dtype bf16 --deployment_framework hf_accelerate --host <HOST ADDRESS> --port <PORT> --allowed_max_new_tokens 100 | ||
``` | ||
|
||
2. using DS inference | ||
```shell | ||
python scripts/bloom-inference-server/server.py --model_name bigscience/bloom --dtype fp16 --deployment_framework ds_inference --save_mp_checkpoint_path <PATH TO DS CACHED MODEL> --host <HOST ADDRESS> --port <PORT> --allowed_max_new_tokens 100 | ||
``` | ||
|
||
We provide an example [script](examples/server_request.py) to query the BLOOM server is provided. | ||
|
||
#### Benchmark system for BLOOM inference | ||
1. using HF accelerate | ||
```shell | ||
python scripts/bloom-inference-server/benchmark.py --model_name bigscience/bloom --dtype bf16 --deployment_framework hf_accelerate --benchmark_cycles 5 | ||
``` | ||
|
||
2. using DS inference | ||
```shell | ||
deepspeed --num_gpus 8 scripts/bloom-inference-server/benchmark.py --model_name bigscience/bloom --dtype fp16 --deployment_framework ds_inference --save_mp_checkpoint_path <PATH TO DS CACHED MODEL> --benchmark_cycles 5 | ||
``` | ||
|
||
3. using DS ZeRO | ||
```shell | ||
deepspeed --num_gpus 8 scripts/bloom-inference-server/benchmark.py --model_name bigscience/bloom --dtype bf16 --deployment_framework ds_zero --benchmark_cycles 5 | ||
``` | ||
|
||
Alternatively, the following shell script will benchmark different batch sizes for the model. | ||
```shell | ||
mkdir -p logs | ||
|
||
for bs in {1,2,4,8,16,32,64,128} | ||
do | ||
python scripts/bloom-inference-server/benchmark.py --model_name bigscience/bloom --dtype bf16 --deployment_framework hf_accelerate --benchmark_cycles 5 --batch_size $bs 2>&1 | tee logs/hf-$bs.log | ||
|
||
deepspeed --num_gpus 8 scripts/bloom-inference-server/benchmark.py --model_name bigscience/bloom --dtype fp16 --deployment_framework ds_inference --save_mp_checkpoint_path <PATH TO DS CACHED MODEL> --benchmark_cycles 5 --batch_size $bs 2>&1 | tee logs/ds-$bs.log | ||
|
||
deepspeed --num_gpus 8 scripts/bloom-inference-server/benchmark.py --model_name bigscience/bloom --dtype bf16 --deployment_framework ds_zero --benchmark_cycles 5 --batch_size $bs 2>&1 | tee logs/ds-zero-$bs.log | ||
done | ||
``` | ||
|
||
The following will benchmark sequence length for batch size = 1 on DS inference. | ||
```shell | ||
for sq in {1,10,50,100,200,300,400,500,600,700,800,900,1000,1500,2000,2500,3000,3500,4000,4500,5000} | ||
do | ||
deepspeed --num_gpus 8 scripts/bloom-inference-server/benchmark.py --model_name bigscience/bloom --dtype fp16 --batch_size 1 --benchmark_cycles 5 --deployment_framework ds_inference --generate_kwargs '{"do_sample": false, "min_length": '$sq', "max_new_tokens": '$sq'}' 2>&1 | tee logs/ds_$sq.log | ||
done | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,188 @@ | ||
import argparse | ||
import gc | ||
import os | ||
import time | ||
from typing import Any, List, Tuple, Union | ||
|
||
import deepspeed | ||
import torch | ||
|
||
import constants | ||
import utils | ||
from ds_inference import DSInferenceModel | ||
from ds_zero import DSZeROModel | ||
from hf_accelerate import HFAccelerateModel | ||
from utils import ( | ||
Execute, | ||
GenerateRequest, | ||
Model, | ||
get_argument_parser, | ||
get_dummy_batch, | ||
parse_generate_kwargs, | ||
print_rank_n | ||
) | ||
|
||
|
||
def run_and_log_time(execs: Union[List[Execute], Execute]) -> Tuple[Union[List[Any], Any], float]: | ||
""" | ||
runs a list of Execute objects and returns a list of outputs and the time taken | ||
""" | ||
start_time = time.time() | ||
|
||
if (type(execs) == list): | ||
results = [] | ||
for e in execs: | ||
results.append(e()) | ||
else: | ||
results = execs() | ||
|
||
time_elapsed = time.time() - start_time | ||
return results, time_elapsed | ||
|
||
|
||
def benchmark_generation(model: Model, | ||
request: GenerateRequest, | ||
cycles: int = 5): | ||
total_new_tokens_generated = 0 | ||
for _ in range(cycles): | ||
response = model.generate(request) | ||
total_new_tokens_generated += sum( | ||
new_tokens for new_tokens in response.num_generated_tokens) | ||
return total_new_tokens_generated | ||
|
||
|
||
def get_benchmark_results(benchmark_time: float, | ||
initialization_time: float, | ||
total_new_tokens_generated: int, | ||
batch_size: int, | ||
cycles: int) -> str: | ||
throughput = total_new_tokens_generated / benchmark_time | ||
latency = benchmark_time / cycles | ||
return f""" | ||
*** Performance stats: | ||
Throughput (including tokenization) = {throughput:.2f} tokens/sec | ||
Throughput (including tokenization) = {1000 / throughput:.2f} msecs/token | ||
Model loading time = {initialization_time:.2f} secs | ||
Total tokens generated = {total_new_tokens_generated} with batch size = {batch_size} | ||
Latency = {latency:.2f} secs | ||
Model loading time + generation time per batch = {initialization_time + latency:.2f} secs | ||
""" | ||
|
||
|
||
def benchmark_end_to_end(args: argparse.Namespace, | ||
model_class: Model, | ||
zero_activated: bool = False) -> None: | ||
model, initialization_time = run_and_log_time( | ||
Execute(model_class, {"args": args}) | ||
) | ||
|
||
request = parse_generate_kwargs( | ||
get_dummy_batch(args.batch_size), | ||
args.generate_kwargs | ||
) | ||
|
||
print_rank_n(f"generate_kwargs = {request}") | ||
print_rank_n(f"batch_size = {args.batch_size}") | ||
|
||
# warmup is a must if measuring speed as it's when all the optimizations are performed | ||
# e.g. on 8x80 a100 the first pass of 100 tokens takes 23sec, and the next one is 4secs | ||
response = model.generate(request) | ||
|
||
for i, (o, _) in zip(request.text, zip(response.text, response.num_generated_tokens)): | ||
print_rank_n(f"{'-' * 60}\nin = {i}\nout = {o}\n") | ||
|
||
if (args.benchmark_cycles > 0): | ||
print_rank_n(f"*** Running benchmark") | ||
|
||
torch.cuda.empty_cache() | ||
gc.collect() | ||
|
||
# warm up | ||
model.generate(request) | ||
torch.cuda.synchronize() | ||
|
||
# benchmark | ||
total_new_tokens_generated, benchmark_time = run_and_log_time( | ||
Execute( | ||
benchmark_generation, | ||
{ | ||
"model": model, | ||
"request": request, | ||
"cycles": args.benchmark_cycles | ||
} | ||
) | ||
) | ||
|
||
# with ZeRO every GPU is generating batch_size * sequence_length tokens | ||
if (zero_activated): | ||
world_size = int(os.getenv('WORLD_SIZE', '1')) | ||
total_new_tokens_generated *= world_size | ||
|
||
print_rank_n( | ||
get_benchmark_results( | ||
benchmark_time, | ||
initialization_time, | ||
total_new_tokens_generated, | ||
args.batch_size, | ||
args.benchmark_cycles | ||
) | ||
) | ||
|
||
|
||
def get_args() -> argparse.Namespace: | ||
parser = get_argument_parser() | ||
|
||
group = parser.add_argument_group(title="launch config") | ||
group.add_argument( | ||
"--deployment_framework", | ||
type=str, | ||
choices=[ | ||
constants.HF_ACCELERATE, | ||
constants.DS_INFERENCE, | ||
constants.DS_ZERO | ||
], | ||
default=constants.HF_ACCELERATE | ||
) | ||
group.add_argument("--benchmark_cycles", type=int, | ||
default=0, help="additionally run benchmark") | ||
group.add_argument("--local_rank", required=False, | ||
type=int, help="used by dist launchers") | ||
group.add_argument("--batch_size", default=1, type=int, help="batch size") | ||
group.add_argument("--save_mp_checkpoint_path", required=False, | ||
type=str, help="MP checkpoints path for DS inference") | ||
group.add_argument("--cpu_offload", action="store_true", | ||
help="whether to activate CPU offload for DS ZeRO") | ||
|
||
args = utils.get_args(parser) | ||
|
||
launched_with_deepspeed = args.deployment_framework in [ | ||
constants.DS_INFERENCE, constants.DS_ZERO] | ||
|
||
if (not launched_with_deepspeed): | ||
assert args.local_rank == None, "local_rank must be None if not launched with DeepSpeed" | ||
|
||
if (args.save_mp_checkpoint_path): | ||
assert args.deployment_framework == constants.DS_INFERENCE, "save_mp_checkpoint_path only works with DS inference" | ||
|
||
if (args.cpu_offload): | ||
assert args.deployment_framework == constants.DS_ZERO, "cpu_offload only works with DS_ZeRO" | ||
|
||
return args | ||
|
||
|
||
def main() -> None: | ||
args = get_args() | ||
|
||
if (args.deployment_framework == constants.HF_ACCELERATE): | ||
benchmark_end_to_end(args, HFAccelerateModel) | ||
elif (args.deployment_framework == constants.DS_INFERENCE): | ||
benchmark_end_to_end(args, DSInferenceModel) | ||
elif (args.deployment_framework == constants.DS_ZERO): | ||
benchmark_end_to_end(args, DSZeROModel, zero_activated=True) | ||
else: | ||
raise ValueError( | ||
f"Unknown deployment framework {args.deployment_framework}") | ||
|
||
|
||
if (__name__ == "__main__"): | ||
main() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
import argparse | ||
|
||
import utils | ||
from ds_inference import cache_ds_checkpoints | ||
from utils import get_argument_parser | ||
|
||
|
||
def get_args() -> argparse.Namespace: | ||
parser = get_argument_parser() | ||
|
||
group = parser.add_argument_group(title="launch config") | ||
group.add_argument("--local_rank", required=False, | ||
type=int, help="used by dist launchers") | ||
group.add_argument("--save_mp_checkpoint_path", required=True, | ||
type=str, help="MP checkpoints path for DS inference") | ||
|
||
args = utils.get_args(parser) | ||
|
||
return args | ||
|
||
|
||
def main() -> None: | ||
cache_ds_checkpoints(get_args()) | ||
|
||
|
||
if (__name__ == "__main__"): | ||
main() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
import argparse | ||
import json | ||
import sys | ||
|
||
import constants | ||
import utils | ||
from ds_inference import DSInferenceGRPCServer | ||
from hf_accelerate import HFAccelerateModel | ||
from utils import get_argument_parser, parse_generate_kwargs, print_rank_n | ||
|
||
|
||
def get_args() -> argparse.Namespace: | ||
parser = get_argument_parser() | ||
|
||
group = parser.add_argument_group(title="launch config") | ||
group.add_argument( | ||
"--deployment_framework", | ||
type=str, | ||
choices=[ | ||
constants.HF_ACCELERATE, | ||
constants.DS_INFERENCE | ||
], | ||
default=constants.HF_ACCELERATE | ||
) | ||
group.add_argument("--save_mp_checkpoint_path", required=False, | ||
type=str, help="MP checkpoints path for DS inference") | ||
group.add_argument("--shutdown_command", required=False, | ||
type=str, default="__shutdown__", help="This string will exit the script") | ||
|
||
args = utils.get_args(parser) | ||
|
||
if (args.save_mp_checkpoint_path): | ||
assert args.deployment_framework == constants.DS_INFERENCE, "save_mp_checkpoint_path only works with DS inference" | ||
|
||
return args | ||
|
||
|
||
def main() -> None: | ||
args = get_args() | ||
|
||
if (args.deployment_framework == constants.HF_ACCELERATE): | ||
model = HFAccelerateModel(args) | ||
elif (args.deployment_framework == constants.DS_INFERENCE): | ||
model = DSInferenceGRPCServer(args) | ||
else: | ||
raise ValueError( | ||
f"Unknown deployment framework {args.deployment_framework}") | ||
|
||
generate_kwargs = args.generate_kwargs | ||
|
||
while (True): | ||
# currently only 1 process is running so its | ||
# fine but might need to run_rank_n for this | ||
# if running a deployment_framework with | ||
# multiple processes | ||
input_text = input("Input text: ") | ||
|
||
if (input_text == args.shutdown_command): | ||
model.shutdown() | ||
|
||
if (input("change generate_kwargs? [y/n] ") == "y"): | ||
while (True): | ||
try: | ||
generate_kwargs = json.loads(input("Generate kwargs: ")) | ||
break | ||
except Exception as e: | ||
e_type, e_message, _ = sys.exc_info() | ||
print("error =", e_type.__name__) | ||
print("message =", e_message) | ||
continue | ||
|
||
request = parse_generate_kwargs(input_text, generate_kwargs) | ||
response = model.generate(request) | ||
|
||
print_rank_n("Output text:", response.text) | ||
print_rank_n("Generated tokens:", response.num_generated_tokens) | ||
|
||
|
||
if (__name__ == "__main__"): | ||
main() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
HF_ACCELERATE = "hf_accelerate" | ||
DS_INFERENCE = "ds_inference" | ||
DS_ZERO = "ds_zero" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
from .cache import cache_ds_checkpoints | ||
from .grpc_server import DSInferenceGRPCServer | ||
from .model import DSInferenceModel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's make the easily installable ones into that:
add the minimal versions if needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DeepSpeed needs to be at master branch.
pip install is not up-do-date yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's easy,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and you can already make a note of which minimal version will be required - i.e. the next released one - that way you don't have to search for it later. (if it makes sense that is)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so something like: