diff --git a/.gitignore b/.gitignore index 8da0cf85..b3cf5aaa 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ !**/samples/**/terraform.tfvars **/tfplan **/backend.tf +sample_workloads/lit_gpt_demo/lit-gpt/** diff --git a/a3/terraform/modules/cluster/gke/main.tf b/a3/terraform/modules/cluster/gke/main.tf index 0ac888ab..ad5b1c9c 100644 --- a/a3/terraform/modules/cluster/gke/main.tf +++ b/a3/terraform/modules/cluster/gke/main.tf @@ -79,10 +79,9 @@ resource "google_container_cluster" "cluster" { # pool defined. So we create the smallest possible default node pool and # immediately delete it. This is a best-practice suggested in the Terraform # documentation for the container_cluster resource. - remove_default_node_pool = true - initial_node_count = 1 - min_master_version = local.gke_master_version - deletion_protection = false + initial_node_count = 1 + min_master_version = local.gke_master_version + deletion_protection = false network = module.network.network_self_links[0] subnetwork = module.network.subnetwork_self_links[0] diff --git a/sample_workloads/lit-gpt-demo/Dockerfile b/sample_workloads/lit-gpt-demo/Dockerfile index 9a5a34b1..7cbe6284 100644 --- a/sample_workloads/lit-gpt-demo/Dockerfile +++ b/sample_workloads/lit-gpt-demo/Dockerfile @@ -1,6 +1,6 @@ # syntax=docker/dockerfile:experimental -FROM nvcr.io/nvidia/pytorch:23.09-py3 +FROM nvcr.io/nvidia/pytorch:24.01-py3 # Ensure apt-get won't prompt for selecting options @@ -18,10 +18,11 @@ RUN apt-get update && \ WORKDIR /workspace/ -COPY requirements.txt requirements.txt +COPY requirements-all.txt . +COPY requirements.txt . -RUN MAX_JOBS=4 pip install 'flash-attn==2.0.4' --no-build-isolation \ - && pip install -r requirements.txt tokenizers sentencepiece ujson +RUN pip install -r requirements-all.txt tokenizers sentencepiece ujson +RUN pip install --upgrade torchvision RUN pip install nvidia-dlprof-pytorch-nvtx nvidia-pyindex nvidia-dlprof @@ -30,6 +31,4 @@ COPY . . # Check install RUN python -c "from lit_gpt.model import GPT, Block, Config" && \ python -c "import lightning as L" && \ - python -c "from lightning.fabric.strategies import FSDPStrategy" - - + python -c "from lightning.fabric.strategies import FSDPStrategy" \ No newline at end of file diff --git a/sample_workloads/lit-gpt-demo/LitGPT.Dockerfile b/sample_workloads/lit-gpt-demo/LitGPT.Dockerfile index 06a2c656..c48100b8 100644 --- a/sample_workloads/lit-gpt-demo/LitGPT.Dockerfile +++ b/sample_workloads/lit-gpt-demo/LitGPT.Dockerfile @@ -18,7 +18,8 @@ RUN echo "deb [signed-by=/usr/share/keyrings/cloud.google.gpg] http://packages.c && apt-get update -y && apt-get install google-cloud-cli -y COPY scripts /workspace/scripts -COPY openwebtext_trainer.py /workspace/pretrain/ +COPY utilities /workspace/pretrain/utilities +COPY openwebtext.py /workspace/pretrain/ ENTRYPOINT ["/bin/bash", "/workspace/scripts/litgpt_container_entrypoint.sh"] diff --git a/sample_workloads/lit-gpt-demo/build_and_push_litgpt.sh b/sample_workloads/lit-gpt-demo/build_and_push_litgpt.sh old mode 100644 new mode 100755 index de2742f6..346c3c97 --- a/sample_workloads/lit-gpt-demo/build_and_push_litgpt.sh +++ b/sample_workloads/lit-gpt-demo/build_and_push_litgpt.sh @@ -13,11 +13,11 @@ FULL_IMAGE=${FULL_IMAGE:="$ARTIFACT_REGISTRY/litgpt-full"} # Clone LitGPT and checkout a flash-attn enabled commit if [ ! -d $LITGPT_PATH ]; then git clone https://github.com/Lightning-AI/lit-gpt.git - cd lit-gpt - git checkout d5d371417ecb3d3b6c4f30837d8bb7cf2b5310ae - cd .. LITGPT_PATH=lit-gpt fi +cd lit-gpt +git checkout 44c3c58b759fa0903ab31ed8863a66c157d5ccd9 +cd .. cp Dockerfile $LITGPT_PATH/Dockerfile diff --git a/sample_workloads/lit-gpt-demo/helm/templates/litgpt.yaml b/sample_workloads/lit-gpt-demo/helm/templates/litgpt.yaml index 49b5dcbd..e0de7714 100644 --- a/sample_workloads/lit-gpt-demo/helm/templates/litgpt.yaml +++ b/sample_workloads/lit-gpt-demo/helm/templates/litgpt.yaml @@ -1,9 +1,8 @@ {{- $requiredVar := .Values.cluster.nNodes | required ".Values.cluster.nNodes is required" -}} {{- $requiredVar := .Values.cluster.nodePool | required ".Values.cluster.nodePool is required" -}} {{- $requiredVar := .Values.network.ncclIfnames | required ".Values.ncclIfnames is required" -}} -{{- $requiredVar := .Values.workload.jobTimestamp | required ".Values.jobTimestamp is required" -}} -{{- $requiredVar := .Values.workload.gcsExperimentBucket | required ".Values.gcsExperimentBucket is required" -}} -{{- $requiredVar := .Values.workload.experimentDir | required ".Values.experimentDir is required" -}} +{{- $requiredVar := .Values.logging.jobTimestamp | required ".Values.jobTimestamp is required" -}} +{{- $requiredVar := .Values.logging.experimentDir | required ".Values.experimentDir is required" -}} {{- $requiredVar := .Values.workload.gcsDataBucket | required ".Values.gcsDataBucket is required" -}} {{- $requiredVar := .Values.workload.dataDir| required ".Values.dataDir is required" -}} {{- $requiredVar := .Values.workload.image | required ".Values.image is required" -}} @@ -51,6 +50,8 @@ spec: tolerations: - operator: "Exists" key: nvidia.com/gpu + - operator: "Exists" + key: cloud.google.com/impending-node-termination volumes: - name: nvidia-install-dir-host hostPath: @@ -66,6 +67,9 @@ spec: emptyDir: {} - name: tcpx-nccl-plugin-volume emptyDir: {} + - name: data-volume + hostPath: + path: /home/data {{if eq $root.Values.network.useTcpx "yes"}} initContainers: - name: tcpx-nccl-plugin-installer @@ -127,9 +131,9 @@ spec: fieldRef: fieldPath: status.hostIP - name: LD_LIBRARY_PATH - value: "/usr/local/nvidia/lib64" + value: "/usr/lib/x86_64-linux-gnu:/usr/local/nvidia/lib64" - name: JOB_TIMESTAMP - value: "{{$root.Values.workload.jobTimestamp}}" + value: "{{$root.Values.logging.jobTimestamp}}" - name: MASTER_ADDR value: "pytorch-leader-{{$.Release.Name}}" - name: NCCL_SOCKET_IFNAME @@ -147,9 +151,9 @@ spec: - name: CPU_PINNING_MODE value: "{{$root.Values.network.cpuPinningMode}}" - name: GCS_EXPERIMENT_BUCKET - value: "{{$root.Values.workload.gcsExperimentBucket}}" + value: "{{$root.Values.logging.gcsExperimentBucket}}" - name: EXPERIMENT_ROOT_DIR - value: "{{$root.Values.workload.experimentDir}}" + value: "{{$root.Values.logging.experimentDir}}" - name: GCS_DATA_BUCKET value: "{{$root.Values.workload.gcsDataBucket}}" - name: DATA_DIR @@ -162,10 +166,18 @@ spec: value: "{{$root.Values.workload.modelName}}" - name: WARMUP_ITERS value: "{{$root.Values.workload.warmupIters}}" - - name: MAX_ITERS - value: "{{$root.Values.workload.maxIters}}" + - name: COLLECT_NSYS_PROFILE + value: "{{$root.Values.logging.collectNsysProfile}}" - name: CLUSTER_TYPE value: GKE + - name: NCCL_NVLS_ENABLE + value: '0' + - name: NCCL_DEBUG + value: "{{$root.Values.logging.ncclDebugLevel}}" + - name: NUMBER_OF_EPOCHS + value: "{{$root.Values.workload.numberOfEpochs}}" + - name: STEPS_PER_EPOCH + value: "{{$root.Values.workload.stepsPerEpoch}}" volumeMounts: - name: nvidia-install-dir-host mountPath: /usr/local/nvidia/lib64 @@ -177,6 +189,8 @@ spec: mountPath: /dev/shm - name: workload-terminated-volume mountPath: /usr/share/litgpt + - name: data-volume + mountPath: /data resources: limits: nvidia.com/gpu: !!int 8 diff --git a/sample_workloads/lit-gpt-demo/helm/values.yaml b/sample_workloads/lit-gpt-demo/helm/values.yaml index 24638314..1d0a3119 100644 --- a/sample_workloads/lit-gpt-demo/helm/values.yaml +++ b/sample_workloads/lit-gpt-demo/helm/values.yaml @@ -1,22 +1,25 @@ cluster: - nNodes: 8 + nNodes: 4 nodePool: np-1 network: useTcpx: "yes" ncclIfnames: 'eth0' - ncclPlugin: us-docker.pkg.dev/gce-ai-infra/gpudirect-tcpx/nccl-plugin-gpudirecttcpx-dev:v3.1.6_2023_10_06 - rxdmContainer: us-docker.pkg.dev/gce-ai-infra/gpudirect-tcpx/tcpgpudmarxd-dev:v2.0.9 + ncclPlugin: us-docker.pkg.dev/gce-ai-infra/gpudirect-tcpx/nccl-plugin-gpudirecttcpx-dev:v3.1.7 + rxdmContainer: us-docker.pkg.dev/gce-ai-infra/gpudirect-tcpx/tcpgpudmarxd-dev:v2.0.11 disablePmtu: "yes" -workload: - jobTimestamp: # Must be defined - gcsExperimentBucket: # Must be defined +logging: + collectNsysProfile: 'no' # Set to 'yes' for profiles + ncclDebugLevel: WARN + gcsExperimentBucket: '' # Set to a writable GCS bucket to upload logs and Nsys Profiles + jobTimestamp: 1 experimentDir: llama2-70b +workload: gcsDataBucket: litgpt-public-bucket dataDir: openwebtext_dataset - image: us-docker.pkg.dev/gce-ai-infra/litgpt-full/litgpt + image: us-docker.pkg.dev/gce-ai-infra/litgpt-full/litgpt/litgpt-full:latest modelName: Llama-2-70b-hf batchSize: 6 microBatchSize: 6 warmupIters: 10 - maxIters: 1000 - \ No newline at end of file + numberOfEpochs: 1 + stepsPerEpoch: 30 \ No newline at end of file diff --git a/sample_workloads/lit-gpt-demo/openwebtext.py b/sample_workloads/lit-gpt-demo/openwebtext.py new file mode 100644 index 00000000..09010a3c --- /dev/null +++ b/sample_workloads/lit-gpt-demo/openwebtext.py @@ -0,0 +1,339 @@ +# Copyright Lightning AI. Licensed under the Apache License 2.0, see LICENSE file. + +import math +import os +import sys +import time +from pathlib import Path +from typing import Optional, Tuple, Union + +import lightning as L +import numpy as np +import nvtx +import torch +from lightning.fabric.loggers import CSVLogger +from lightning.fabric.strategies import FSDPStrategy +from lightning.fabric.utilities import ThroughputMonitor, measure_flops +from torch.utils.data import DataLoader, IterableDataset + +# support running without installing as a package +wd = Path(__file__).parent.parent.resolve() +sys.path.append(str(wd)) + +from lit_gpt import Config +from lit_gpt.args import EvalArgs, IOArgs, TrainArgs +from lit_gpt.model import GPT, Block +from lit_gpt.utils import chunked_cross_entropy, estimate_flops, get_default_supported_precision, num_parameters + +from utilities.nsight_callbacks import NsightCallback + +use_nsight = os.getenv("COLLECT_NSYS_PROFILE") == "yes" +if (use_nsight): + import utilities.monitor_collectives + utilities.monitor_collectives.shunt_torch_communication() + print("Enabling nsight profiling.") + + +def setup( + model_name: str = os.getenv("MODEL_NAME", "Llama-2-70b-hf"), + data_dir: Path = Path("/data"), + out_dir: Path = Path(os.getenv("EXPERIMENT_LOCAL_DIR", "")) / "out", + precision: Optional[str] = None, + resume: Union[bool, Path] = False, + eval_interval: int = 1000, + save_interval: int = 1000, + eval_iters: int = 100, + log_interval: int = 1, + devices: int = 4, + learning_rate: float = 6e-4, + weight_decay: float = 1e-1, + beta1: float = 0.9, + beta2: float = 0.95, + lr_warmup_steps: int = 100, + min_lr: float = 6e-5, + global_batch_size: int = (int(os.getenv("NNODES", "1")) * 8 * int(os.getenv("BATCH_SIZE", "6"))), + micro_batch_size: int = int(os.getenv("MICRO_BATCH_SIZE", "6")), + max_norm: float = 1.0, + epochs: int = int(os.getenv("NUMBER_OF_EPOCHS", "2")), + train_epoch_size: int = 8 * int(os.getenv("MICRO_BATCH_SIZE", "6")) * int(os.getenv("STEPS_PER_EPOCH", "30")), +) -> None: + print(locals()) + precision = precision or get_default_supported_precision(training=True) + + if devices > 1: + strategy = FSDPStrategy( + auto_wrap_policy={Block}, + activation_checkpointing_policy={Block}, + state_dict_type="full", + limit_all_gathers=True, + cpu_offload=False, + ) + else: + strategy = "auto" + + logger = CSVLogger(out_dir.parent, out_dir.name, flush_logs_every_n_steps=log_interval) + callbacks = [] + if use_nsight: + callbacks.append(NsightCallback()) + fabric = L.Fabric( + devices=devices, strategy=strategy, precision=precision, + loggers=logger, callbacks=callbacks, num_nodes=int(os.getenv("NNODES", "1"))) + + fabric.launch( + main, + devices, + resume, + Config.from_name(name=model_name), + IOArgs(train_data_dir=data_dir, val_data_dir=data_dir, out_dir=out_dir), + TrainArgs( + save_interval=save_interval, + log_interval=log_interval, + global_batch_size=global_batch_size, + micro_batch_size=micro_batch_size, + lr_warmup_steps=lr_warmup_steps, + epochs=epochs, + epoch_size=train_epoch_size, + learning_rate=learning_rate, + weight_decay=weight_decay, + beta1=beta1, + beta2=beta2, + max_norm=max_norm, + min_lr=min_lr, + ), + EvalArgs(interval=eval_interval, max_iters=eval_iters), + ) + + +def main( + fabric: L.Fabric, + devices: int, + resume: Union[bool, Path], + config: Config, + io_args: IOArgs, + train_args: TrainArgs, + eval_args: EvalArgs, +) -> None: + validate_args(io_args, train_args, eval_args) + + if fabric.global_rank == 0: + io_args.out_dir.mkdir(parents=True, exist_ok=True) + + fabric.seed_everything(1337, workers=True) # same seed for every process to init model (FSDP) + + fabric.print(f"Loading model with {config.__dict__}") + t0 = time.perf_counter() + with fabric.init_module(empty_init=(fabric.world_size > 1)): + model = GPT(config) + model.apply(model._init_weights) + + fabric.print(f"Time to instantiate model: {time.perf_counter() - t0:.02f} seconds.") + fabric.print(f"Total parameters {num_parameters(model):,}") + + model = fabric.setup(model) + optimizer = torch.optim.AdamW( + model.parameters(), + lr=train_args.learning_rate, + weight_decay=train_args.weight_decay, + betas=(train_args.beta1, train_args.beta2), + foreach=False, + ) + optimizer = fabric.setup_optimizers(optimizer) + + train_data, val_data = load_datasets(io_args, max_seq_length=model.max_seq_length) + train_dataloader = DataLoader(train_data, batch_size=train_args.micro_batch_size, num_workers=2) + val_dataloader = DataLoader(val_data, batch_size=train_args.micro_batch_size, num_workers=2) + train_dataloader, val_dataloader = fabric.setup_dataloaders(train_dataloader, val_dataloader) + + state = {"model": model, "optimizer": optimizer, "iter_num": 0, "step_count": 0} + + if resume is True: + resume = max(io_args.out_dir.glob("*.pth"), key=lambda p: int(p.name.split("-")[1])) + if resume: + fabric.print(f"Resuming training from {resume}") + fabric.load(resume, state) + + train_time = time.perf_counter() + train(fabric, devices, state, train_dataloader, val_dataloader, io_args, train_args, eval_args) + fabric.print(f"Training time: {(time.perf_counter()-train_time):.2f}s") + if fabric.device.type == "cuda": + fabric.print(f"Memory used: {torch.cuda.max_memory_allocated() / 1e9:.02f} GB") + + +def train( + fabric: L.Fabric, + devices: int, + state: dict, + train_dataloader: DataLoader, + val_dataloader: DataLoader, + io_args: IOArgs, + train_args: TrainArgs, + eval_args: EvalArgs, +) -> None: + model = state["model"] + optimizer = state["optimizer"] + + validate(fabric, model, val_dataloader, max_iters=2) # sanity check + + with torch.device("meta"): + meta_model = GPT(model.config) + # "estimated" is not as precise as "measured". Estimated is optimistic but widely used in the wild. + # When comparing MFU or FLOP numbers with other projects that use estimated FLOPs, + # consider passing `flops_per_batch=estimated_flops` instead + estimated_flops = estimate_flops(meta_model, training=True) * train_args.micro_batch_size + fabric.print(f"Estimated TFLOPs: {estimated_flops * fabric.world_size / 1e12:.2f}") + x = torch.randint(0, 1, (train_args.micro_batch_size, model.max_seq_length)) + forward_fn = lambda: meta_model(x) + loss_fn = lambda y: chunked_cross_entropy(y, x, chunk_size=0) + measured_flops = measure_flops(meta_model, forward_fn, loss_fn) + fabric.print(f"Measured TFLOPs: {measured_flops * fabric.world_size / 1e12:.2f}") + del meta_model, x + + throughput = ThroughputMonitor(fabric, window_size=int(os.getenv("WINDOW_SIZE", "10"))) + total_t0 = time.perf_counter() + + train_iter = iter(train_dataloader) + + fabric.call("on_train_epoch_start") + + lr_warmup_iters = train_args.lr_warmup_steps * train_args.gradient_accumulation_iters(devices) + for state["iter_num"] in range(state["iter_num"], train_args.max_iters(devices)): + # determine and set the learning rate for this iteration + lr = get_lr( + train_args.learning_rate, + state["iter_num"], + lr_warmup_iters, + train_args.max_iters(devices), + min_lr=train_args.min_lr, + ) + for param_group in optimizer.param_groups: + param_group["lr"] = lr + + iter_num = state["iter_num"] + 1 + iter_t0 = time.perf_counter() + + input_ids, targets = next(train_iter) + + fabric.call("on_train_batch_start", iter_num, train_args.gradient_accumulation_iters(devices)) + + is_accumulating = iter_num % train_args.gradient_accumulation_iters(devices) != 0 + with fabric.no_backward_sync(model, enabled=is_accumulating): + # Forward pass + logits = model(input_ids) + loss = chunked_cross_entropy(logits, targets, chunk_size=0) + + # Backward pass + fabric.call("on_before_backward") + fabric.backward(loss / train_args.gradient_accumulation_iters(devices)) + fabric.call("on_after_backward") + + if not is_accumulating: + fabric.clip_gradients(model, optimizer, max_norm=train_args.max_norm) + with nvtx.annotate(color="orange"): + optimizer.step() + optimizer.zero_grad() + state["step_count"] += 1 + + if iter_num % train_args.log_interval == 0: + loss_item = loss.item() # expensive device-to-host synchronization + t1 = time.perf_counter() + throughput.update( + time=t1 - total_t0, + batches=iter_num, + samples=iter_num * train_args.micro_batch_size, + lengths=iter_num * train_args.micro_batch_size * model.max_seq_length, + flops=measured_flops * train_args.log_interval, + ) + throughput.compute_and_log(step=iter_num) + fabric.print( + f"iter {iter_num} step {state['step_count']}: loss {loss_item:.4f}, iter time:" + f" {(t1 - iter_t0) * 1000:.2f}ms{' (optimizer.step)' if not is_accumulating else ''}" + ) + + if not is_accumulating and state["step_count"] % eval_args.interval == 0: + t0 = time.perf_counter() + val_loss = validate(fabric, model, val_dataloader, max_iters=eval_args.max_iters) + t1 = time.perf_counter() - t0 + fabric.print(f"step {iter_num}: val loss {val_loss.item():.4f}, val time: {t1 * 1000:.2f}ms") + fabric.barrier() + + fabric.call("on_train_batch_end", iter_num, train_args.gradient_accumulation_iters(devices)) + + if not is_accumulating and state["step_count"] % train_args.save_interval == 0: + checkpoint_path = io_args.out_dir / f"iter-{iter_num:06d}-ckpt.pth" + fabric.print(f"Saving checkpoint to {str(checkpoint_path)!r}") + fabric.save(checkpoint_path, state) + + +# FSDP has issues with `inference_mode` +@torch.no_grad() +def validate(fabric: L.Fabric, model: torch.nn.Module, val_dataloader: DataLoader, max_iters: int) -> torch.Tensor: + fabric.print("Validating ...") + model.eval() + val_iter = iter(val_dataloader) + + losses = torch.zeros(max_iters, device=fabric.device) + for k in range(max_iters): + input_ids, targets = next(val_iter) + logits = model(input_ids) + losses[k] = chunked_cross_entropy(logits, targets, chunk_size=0) + out = losses.mean() + + model.train() + return out + + +def load_datasets(io_args: IOArgs, max_seq_length: int) -> Tuple["Dataset", "Dataset"]: + train_data = Dataset(io_args.train_data_dir / "train.bin", max_seq_length) + val_data = Dataset(io_args.val_data_dir / "val.bin", max_seq_length) + return train_data, val_data + + +class Dataset(IterableDataset): + def __init__(self, data_file: Path, max_seq_length: int): + super().__init__() + self.data_file = data_file + self.max_seq_length = max_seq_length + + def __iter__(self): + data = np.memmap(self.data_file, dtype=np.uint16, mode="r") + while True: + i = torch.randint(len(data) - self.max_seq_length, (1,)).item() + x = torch.from_numpy((data[i : i + self.max_seq_length]).astype(np.int64)) + y = torch.from_numpy((data[i + 1 : i + 1 + self.max_seq_length]).astype(np.int64)) + yield x, y + + +# learning rate decay scheduler (cosine with linear warmup) +def get_lr(learning_rate: float, it: int, warmup_iters: int, max_iters: int, min_lr: float) -> float: + # 1) linear warmup for warmup_iters steps + if it < warmup_iters: + return learning_rate * it / warmup_iters + # 2) if it > max_iters, return min learning rate + if it > max_iters: + return min_lr + # 3) in between, use cosine decay down to min learning rate + decay_ratio = (it - warmup_iters) / (max_iters - warmup_iters) + assert 0 <= decay_ratio <= 1 + coeff = 0.5 * (1.0 + math.cos(math.pi * decay_ratio)) # coeff ranges 0..1 + return min_lr + coeff * (learning_rate - min_lr) + + +def validate_args(io_args: IOArgs, train_args: TrainArgs, eval_args: EvalArgs) -> None: + unsupported = [(io_args, ["checkpoint_dir"]), (train_args, ["max_tokens"]), (eval_args, ["max_new_tokens"])] + for args, names in unsupported: + for name in names: + if getattr(args, name) is not None: + raise ValueError(f"{__file__} doesn't support the {name!r} argument. This is set in {args}") + required = [(io_args, ["train_data_dir", "val_data_dir"]), (train_args, ["epoch_size", "epochs", "max_norm"])] + for args, names in required: + for name in names: + if getattr(args, name) is None: + raise ValueError(f"{__file__} requires the {name!r} argument. This is set in {args}") + + +if __name__ == "__main__": + torch.set_float32_matmul_precision("high") + + from jsonargparse import CLI + + CLI(setup) \ No newline at end of file diff --git a/sample_workloads/lit-gpt-demo/openwebtext_trainer.py b/sample_workloads/lit-gpt-demo/openwebtext_trainer.py deleted file mode 100644 index def3f415..00000000 --- a/sample_workloads/lit-gpt-demo/openwebtext_trainer.py +++ /dev/null @@ -1,208 +0,0 @@ -# Modified from https://github.com/Lightning-AI/lit-gpt/blob/d5d371417ecb3d3b6c4f30837d8bb7cf2b5310ae/pretrain/openwebtext_trainer.py -import math -import sys -import time -from pathlib import Path -from typing import Any, Optional - -import lightning as L -import numpy as np -import torch -import os -from lightning.pytorch.callbacks import ModelCheckpoint -from lightning.pytorch.loggers import CSVLogger -from lightning.pytorch.strategies import FSDPStrategy, XLAStrategy -from torch.utils.data import DataLoader, IterableDataset - -# support running without installing as a package -wd = Path(__file__).parent.parent.resolve() -sys.path.append(str(wd)) - -from lit_gpt import Config -from lit_gpt.model import GPT, Block -from lit_gpt.speed_monitor import SpeedMonitorCallback, estimate_flops, measure_flops -from lit_gpt.utils import chunked_cross_entropy, get_default_supported_precision, step_csv_logger - -model_name = os.getenv("MODEL_NAME", "Llama-2-70b-hf") -name = "openwebtext" -out_dir = Path(os.getenv("EXPERIMENT_LOCAL_DIR", "")) / "out" -data_dir = Path("/data") -save_interval = 1000 -eval_interval = 1000 -eval_iters = 100 -log_interval = 1 -num_nodes = int(os.getenv("NNODES", "1")) - -# Hyperparameters -learning_rate = 6e-4 -batch_size = int(os.getenv("BATCH_SIZE", "6")) -micro_batch_size = int(os.getenv("MICRO_BATCH_SIZE", "6")) -gradient_accumulation_steps = batch_size // micro_batch_size -assert gradient_accumulation_steps > 0 -max_iters = int(os.getenv("MAX_ITERS", "1000")) # num_epochs * (epoch_size // micro_batch_size) // devices -weight_decay = 1e-1 -beta1 = 0.9 -beta2 = 0.95 -decay_lr = True -warmup_iters = int(os.getenv("WARMUP_ITERS", "10")) -lr_decay_iters = max_iters -min_lr = 6e-5 - -hparams = {k: v for k, v in locals().items() if isinstance(v, (int, float, str)) and not k.startswith("_")} - - -class LightningGPTModule(L.LightningModule): - def __init__(self, config: Config) -> None: - super().__init__() - self.config = config - self.module: Optional[torch.nn.Module] = None - self.measured_flops: Optional[int] = None - - def configure_model(self) -> None: - self.module = GPT(self.config) - self.module.apply(self.module._init_weights) - - def configure_optimizers(self) -> torch.optim.Optimizer: - return torch.optim.AdamW( - self.module.parameters(), lr=learning_rate, weight_decay=weight_decay, betas=(beta1, beta2), foreach=False - ) - - def on_fit_start(self) -> None: - trainer = self.trainer - with torch.device("meta"): - meta_model = GPT(self.module.config) - # "estimated" is not as precise as "measured". Estimated is optimistic but widely used in the wild. - # When comparing MFU or FLOP numbers with other projects that use estimated FLOPs, - # consider setting `self.measured_flops = estimated_flops` instead - estimated_flops = estimate_flops(meta_model) * micro_batch_size - self.print(f"Estimated TFLOPs: {estimated_flops * trainer.world_size / 1e12:.2f}") - x = torch.randint(0, 1, (micro_batch_size, meta_model.config.block_size)) - self.measured_flops = measure_flops(meta_model, x) - self.print(f"Measured TFLOPs: {self.measured_flops * trainer.world_size / 1e12:.2f}") - - def on_train_batch_start(self, batch: Any, batch_idx: int) -> None: - if not decay_lr: - return - # determine and set the learning rate for this iteration - lr = get_lr(self.trainer.fit_loop.total_batch_idx) - for optimizer in self.trainer.strategy.optimizers: - for param_group in optimizer.param_groups: - param_group["lr"] = lr - - def training_step(self, batch: Any, batch_idx: int) -> torch.Tensor: - input_ids, targets = batch - logits = self.module(input_ids) - loss = chunked_cross_entropy(logits, targets, chunk_size=0) - self.log("train_loss", loss, on_step=True, on_epoch=False, prog_bar=True) - return loss - - def validation_step(self, batch: Any, batch_idx: int) -> None: - input_ids, targets = batch - logits = self.module(input_ids) - loss = chunked_cross_entropy(logits, targets, chunk_size=0) - self.log("val_loss", loss, on_step=False, on_epoch=True, prog_bar=True) - - -def main(devices: int = 1, precision: Optional[str] = None, tpu: bool = False) -> None: - precision = precision or get_default_supported_precision(training=True, tpu=tpu) - - if devices > 1: - if tpu: - # For multi-host TPU training, the device count for Fabric is limited to the count on a single host. - devices = "auto" - strategy = XLAStrategy(sync_module_states=False) - else: - strategy = FSDPStrategy( - auto_wrap_policy={Block}, - activation_checkpointing_policy={Block}, - # the argument is not available in the Trainer strategy, but it's the default anyways - # state_dict_type="full", - limit_all_gathers=True, - cpu_offload=False, - ) - else: - strategy = "auto" - - logger = step_csv_logger(out_dir, name, cls=CSVLogger, flush_logs_every_n_steps=log_interval) - speed_monitor = SpeedMonitorCallback( - length_fn=lambda batch: batch[0].size(1), batch_size=micro_batch_size, window_size=10, time_unit="seconds" - ) - model_checkpoint = ModelCheckpoint(dirpath=out_dir, every_n_train_steps=save_interval, save_last=True, verbose=True) - trainer = L.Trainer( - devices=devices, - strategy=strategy, - precision=precision, - logger=logger, - callbacks=[speed_monitor, model_checkpoint], - max_steps=max_iters, - max_epochs=1, - limit_val_batches=eval_iters, - accumulate_grad_batches=gradient_accumulation_steps, - log_every_n_steps=log_interval, - val_check_interval=eval_interval, - num_nodes=num_nodes - ) - - L.seed_everything(1337, workers=True) # same seed for every process to init model (FSDP) - - trainer.print(hparams) - - if trainer.global_rank == 0: - out_dir.mkdir(parents=True, exist_ok=True) - - config = Config.from_name(model_name) - trainer.print(f"Loading model with {config.__dict__}") - t0 = time.perf_counter() - model = LightningGPTModule(config) - trainer.print(f"Time to instantiate model: {time.perf_counter() - t0:.02f} seconds.") - - train_data = Dataset(str(data_dir / "train.bin"), config.block_size) - val_data = Dataset(str(data_dir / "val.bin"), config.block_size) - train_dataloader = DataLoader(train_data, batch_size=micro_batch_size, num_workers=2) - val_dataloader = DataLoader(val_data, batch_size=micro_batch_size, num_workers=2) - - t0 = time.perf_counter() - trainer.fit(model, train_dataloader, val_dataloader, ckpt_path="last") - trainer.print(f"Training time: {(time.perf_counter()-t0):.2f}s") - if trainer.strategy.root_device.type == "cuda": - trainer.print(f"Memory used: {torch.cuda.max_memory_allocated() / 1e9:.02f} GB") - - -class Dataset(IterableDataset): - def __init__(self, data_file: Path, block_size: int): - super().__init__() - self.data_file = data_file - self.block_size = block_size - - def __iter__(self): - data = np.memmap(self.data_file, dtype=np.uint16, mode="r") - while True: - i = torch.randint(len(data) - self.block_size, (1,)).item() - x = torch.from_numpy((data[i : i + self.block_size]).astype(np.int64)) - y = torch.from_numpy((data[i + 1 : i + 1 + self.block_size]).astype(np.int64)) - yield x, y - - -# learning rate decay scheduler (cosine with warmup) -def get_lr(it): - # 1) linear warmup for warmup_iters steps - if it < warmup_iters: - return learning_rate * it / warmup_iters - # 2) if it > lr_decay_iters, return min learning rate - if it > lr_decay_iters: - return min_lr - # 3) in between, use cosine decay down to min learning rate - decay_ratio = (it - warmup_iters) / (lr_decay_iters - warmup_iters) - assert 0 <= decay_ratio <= 1 - coeff = 0.5 * (1.0 + math.cos(math.pi * decay_ratio)) # coeff ranges 0..1 - return min_lr + coeff * (learning_rate - min_lr) - - -if __name__ == "__main__": - # Uncomment this line if you see an error: "Expected is_sm80 to be true, but got false" - # torch.backends.cuda.enable_flash_sdp(False) - torch.set_float32_matmul_precision("high") - - from jsonargparse import CLI - - CLI(main) diff --git a/sample_workloads/lit-gpt-demo/scripts/litgpt_container_entrypoint.sh b/sample_workloads/lit-gpt-demo/scripts/litgpt_container_entrypoint.sh index 1999904f..7c43916d 100644 --- a/sample_workloads/lit-gpt-demo/scripts/litgpt_container_entrypoint.sh +++ b/sample_workloads/lit-gpt-demo/scripts/litgpt_container_entrypoint.sh @@ -7,11 +7,13 @@ set -o pipefail : "${NODE_RANK:?Must set NODE_RANK}" : "${JOB_TIMESTAMP:?Must set JOB_TIMESTAMP}" : "${NNODES:?Must set NNODES}" -: "${GCS_EXPERIMENT_BUCKET:?Must set GCS_EXPERIMENT_BUCKET}" : "${EXPERIMENT_ROOT_DIR:?Must set EXPERIMENT_ROOT_DIR}" : "${GCS_DATA_BUCKET:?Must set GCS_DATA_BUCKET}" : "${DATA_DIR:?Must set DATA_DIR}" +: "${GCS_EXPERIMENT_BUCKET:=''}" : "${CLUSTER_TYPE:='GKE'}" +: "${COLLECT_NSYS_PROFILE:='no'}" +: "${NCCL_DEBUG:='INFO'}" export EXPERIMENT_LOCAL_DIR=/experiment/${EXPERIMENT_ROOT_DIR} @@ -20,7 +22,11 @@ mkdir -p $EXPERIMENT_LOCAL_DIR echo $EXPERIMENT_ROOT_DIR echo $EXPERIMENT_LOCAL_DIR -gsutil rsync -r gs://${GCS_EXPERIMENT_BUCKET}/${EXPERIMENT_ROOT_DIR}/ ${EXPERIMENT_LOCAL_DIR}/ +if [[ ${#GCS_EXPERIMENT_BUCKET} -le 2 ]]; then + echo "Disabling gsutil calls. Not syncing experiment dir." +else + gsutil -m rsync -r gs://${GCS_EXPERIMENT_BUCKET}/${EXPERIMENT_ROOT_DIR}/ ${EXPERIMENT_LOCAL_DIR}/ +fi LOCAL_DATA_DIR=/data mkdir -p $LOCAL_DATA_DIR @@ -33,6 +39,9 @@ export WORLD_SIZE=$((NNODES * GPUS_PER_NODE)) LOG_DIR=$EXPERIMENT_LOCAL_DIR/training_logs mkdir -p $LOG_DIR +PROFILING_DIR=$EXPERIMENT_LOCAL_DIR/nsys_profiles +mkdir -p $PROFILING_DIR + OUT_DIR=$EXPERIMENT_LOCAL_DIR/out mkdir -p $OUT_DIR @@ -53,7 +62,7 @@ set_nccl_specific_configuration() { export NCCL_NET_GDR_LEVEL=PIX export NCCL_P2P_PXN_LEVEL=0 export NCCL_DEBUG_SUBSYS=INIT,GRAPH,ENV,TUNING,NET,VERSION - export NCCL_DEBUG=INFO + export NCCL_DEBUG=${NCCL_DEBUG} export LD_LIBRARY_PATH="${LD_LIBRARY_PATH}:/usr/local/tcpx/lib64" export NCCL_GPUDIRECTTCPX_FORCE_ACK=1 export NCCL_GPUDIRECTTCPX_TX_COMPLETION_NANOSLEEP=1000 @@ -122,11 +131,24 @@ non_blocking_wait() { } function on_script_completion { - # semaphore to cleanly exit hardware utilization monitor - touch /usr/share/litgpt/workload_terminated + if [[ ${#GCS_EXPERIMENT_BUCKET} -le 2 ]]; then + echo "Disabling gsutil. Not uploading logs." + else + echo "Uploading ${EXPERIMENT_LOCAL_DIR} to gs://${GCS_EXPERIMENT_BUCKET}/${EXPERIMENT_ROOT_DIR}/" + gsutil rsync -r ${EXPERIMENT_LOCAL_DIR}/ gs://${GCS_EXPERIMENT_BUCKET}/${EXPERIMENT_ROOT_DIR}/ + fi - echo "Uploading ${EXPERIMENT_LOCAL_DIR} to gs://${GCS_EXPERIMENT_BUCKET}/${EXPERIMENT_ROOT_DIR}/" - gsutil rsync -r ${EXPERIMENT_LOCAL_DIR}/ gs://${GCS_EXPERIMENT_BUCKET}/${EXPERIMENT_ROOT_DIR}/ + # semaphore to cleanly exit hardware utilization monitor + echo "Writing semaphore to exit sidecar container to /usr/share/litgpt/workload_terminated" + touch /usr/share/litgpt/workload_terminated + + METRICS_FILE=$EXPERIMENT_LOCAL_DIR/out/version_0/metrics.csv + if test -f $METRICS_FILE; then + echo "Printing out metrics.csv results from $METRICS_FILE" + cat $EXPERIMENT_LOCAL_DIR/out/version_0/metrics.csv + else + echo "Metrics.csv not located at $METRICS_FILE" + fi } @@ -145,7 +167,6 @@ fi PIDS=() - CPU_SETS=( "0-7,104-111" "8-15,112-119" "16-23,120-127" "24-31,128-135" "52-59,156-163" "60-67,164-171" "68-75,172-179" "76-83,180-187" ) for ((LOCAL_RANK=0; LOCAL_RANK <= $((GPUS_PER_NODE - 1)); LOCAL_RANK++)); do @@ -161,15 +182,19 @@ for ((LOCAL_RANK=0; LOCAL_RANK <= $((GPUS_PER_NODE - 1)); LOCAL_RANK++)); do fi CMD_PREFIX="numactl --membind=$MEMBIND_NUMA_NODE --physcpubind $CPUS" + if [[ "${COLLECT_NSYS_PROFILE:="no"}" == "yes" ]]; then + echo "Collecting nsys profile" + CMD_PREFIX="${CMD_PREFIX} nsys profile --sample=none --trace=cuda,nvtx -o $PROFILING_DIR/node_${NODE_RANK:?}_local_rank_${LOCAL_RANK} --capture-range=cudaProfilerApi --capture-range-end=repeat:${PROFILE_REPS:=5} --export sqlite " + fi RANK=$RANK LOCAL_RANK=$LOCAL_RANK \ $CMD_PREFIX \ - python /workspace/pretrain/openwebtext_trainer.py \ - --devices=$GPUS_PER_NODE --precision="bf16-true" > >(tee "$LOG_DIR/pretrain_gpt_rank$RANK.log") 2>&1 & + python /workspace/pretrain/openwebtext.py \ + --devices=$GPUS_PER_NODE --precision="bf16-true" --model_name="$MODEL_NAME" > >(tee "$LOG_DIR/pretrain_gpt_rank$RANK.log") 2>&1 & PID=$! PIDS+=($PID) - echo "Launched openwebtext_trainer.py for rank $RANK with PID $PID" + echo "Launched openwebtext.py for rank $RANK with PID $PID" done wait_all_success_or_exit "${PIDS[@]}" \ No newline at end of file diff --git a/sample_workloads/lit-gpt-demo/slurm/litgpt_container.sh b/sample_workloads/lit-gpt-demo/slurm/litgpt_container.sh index e0ff6fa1..10680f13 100644 --- a/sample_workloads/lit-gpt-demo/slurm/litgpt_container.sh +++ b/sample_workloads/lit-gpt-demo/slurm/litgpt_container.sh @@ -1,3 +1,19 @@ +# Check for required environment variables +if [ -z "$MODEL_NAME" ]; then + echo "Error: MODEL_NAME environment variable is not set. Please set it before running the script." + exit 1 +fi + +if [ -z "$GCS_EXPERIMENT_BUCKET" ]; then + echo "Error: GCS_EXPERIMENT_BUCKET environment variable is not set. Please set it before running the script." + exit 1 +fi + +if [ -z "$EXPERIMENT_ROOT_DIR" ]; then + echo "Error: EXPERIMENT_ROOT_DIR environment variable is not set. Please set it before running the script." + exit 1 +fi + MASTER_ADDR=$(scontrol show hostnames "$SLURM_JOB_NODELIST" | head -n 1) # Start the Lit-GPT training container @@ -11,16 +27,16 @@ docker run \ -e JOB_TIMESTAMP=$(date +%s) \ -e NNODES=$SLURM_NNODES \ -e NODE_RANK=$SLURM_NODEID \ - -e MODEL_NAME='Llama-2-70b-hf' \ - -e GCS_EXPERIMENT_BUCKET=litgpt-public-bucket \ + -e MODEL_NAME=${MODEL_NAME} \ + -e GCS_EXPERIMENT_BUCKET=${GCS_EXPERIMENT_BUCKET} \ -e GCS_DATA_BUCKET=litgpt-public-bucket \ -e USE_TCPX=yes \ -e CLUSTER_TYPE=SLURM \ - -e EXPERIMENT_ROOT_DIR=llama-70b/training_logs \ + -e EXPERIMENT_ROOT_DIR=${EXPERIMENT_ROOT_DIR} \ -e DATA_DIR=openwebtext_dataset \ -e MASTER_ADDR=$MASTER_ADDR \ -e MASTER_PORT=20120 \ -e NCCL_GPUDIRECTTCPX_UNIX_CLIENT_PREFIX=${UDS_PATH} \ -e WARMUP_ITERS=10 \ -e MAX_ITERS=1000 \ - us-docker.pkg.dev/gce-ai-infra/litgpt-full/litgpt:slurm \ No newline at end of file + us-docker.pkg.dev/gce-ai-infra/litgpt-full/litgpt:slurm diff --git a/sample_workloads/lit-gpt-demo/slurm/setup_and_launch_training.sh b/sample_workloads/lit-gpt-demo/slurm/setup_and_launch_training.sh index 7319c934..cd4e4f53 100644 --- a/sample_workloads/lit-gpt-demo/slurm/setup_and_launch_training.sh +++ b/sample_workloads/lit-gpt-demo/slurm/setup_and_launch_training.sh @@ -5,49 +5,15 @@ #SBATCH --gpus-per-node 8 #SBATCH --nodes 4 -export UDS_PATH="/run/tcpx-${SLURM_JOB_ID}" -GPU_NIC_TOPOLOGY=/opt/tcpdirect_benchmark/gpu_rxq_configuration.textproto -GPU_NIC_TOPOLOGY_DIR=`dirname ${GPU_NIC_TOPOLOGY}` - -if [ ! -f "${GPU_NIC_TOPOLOGY}" ]; then - echo "GPU_NIC_TOPOLOGY file ${GPU_NIC_TOPOLOGY} must exist!" - exit 1 -fi +export MODEL_NAME= #'Llama-2-70b-hf' +export GCS_EXPERIMENT_BUCKET= # myBucket +export EXPERIMENT_ROOT_DIR= # llama-2/training_logs -# Install NCCL plugin -srun --ntasks-per-node=1 \ - docker run --rm -v /var/lib:/var/lib \ - us-docker.pkg.dev/gce-ai-infra/gpudirect-tcpx/nccl-plugin-gpudirecttcpx-dev:v3.1.6_2023_10_06 install --install-nccl +export UDS_PATH="/run/tcpx-${SLURM_JOB_ID}" # Configure Docker srun --ntasks-per-node=1 \ gcloud auth configure-docker us-central1-docker.pkg.dev -# Start rxdm container -srun --ntasks-per-node=1 \ - docker run \ - --pull=always \ - --detach \ - --rm \ - --name receive-datapath-manager-${SLURM_JOB_ID} \ - --privileged \ - --cap-add=NET_ADMIN \ - --network=host \ - --gpus all \ - --volume /var/lib/nvidia/lib64:/usr/local/nvidia/lib64 \ - --volume ${GPU_NIC_TOPOLOGY_DIR}:${GPU_NIC_TOPOLOGY_DIR} \ - --volume ${UDS_PATH}:${UDS_PATH} \ - --env LD_LIBRARY_PATH=/usr/local/nvidia/lib64:${UDS_PATH}:/usr/lib/lib32:/usr/lib/x86_64-linux-gnu/ \ - --entrypoint /tcpgpudmarxd/build/app/tcpgpudmarxd \ - us-docker.pkg.dev/gce-ai-infra/gpudirect-tcpx/tcpgpudmarxd-dev:v2.0.9 \ - --setup_param "--verbose 128 2 0" \ - --gpu_nic_preset manual \ - --gpu_nic_topology ${GPU_NIC_TOPOLOGY} \ - --gpu_shmem_type fd \ - --uds_path ${UDS_PATH} - # Launch the litgpt script srun -l --ntasks-per-node=1 bash litgpt_container.sh - -# Stop rxdm container -srun --ntasks-per-node=1 docker container stop receive-datapath-manager-${SLURM_JOB_ID} \ No newline at end of file diff --git a/sample_workloads/lit-gpt-demo/utilities/monitor_collectives.py b/sample_workloads/lit-gpt-demo/utilities/monitor_collectives.py new file mode 100644 index 00000000..b24e645d --- /dev/null +++ b/sample_workloads/lit-gpt-demo/utilities/monitor_collectives.py @@ -0,0 +1,594 @@ +"""A utility to trace torch.distributed calls. + +Traces torch.distributed collectives before dispatch. In particular, logs the +collective kind (all_reduce, all_to_all, ..), message size (10 MB), and which +GPU devices are participating ([0, 1, 6, 7]). These are logged as NVTX markers +by NVIDIA Nsight, as well as printed to stdout. By default, we only log +cross-node collective communications. + +To assist with computing the effective bandwidth of a collective, a nominal +expression is provided in the doc string of each 'traced_'. This +also requires extracting the timings of the corresponding NCCL kernels. + +Typical usage example: + + import utilities.monitor_collectives + utilities.monitor_collectives.shunt_torch_communication() + +When running a workload, also define TORCH_DISTRIBUTED_TRACING to be one of +'ALL' or 'CROSSNODE'. See `should_rank_record_comm` for added details. +""" + + +import functools +import inspect +import io +import json +import os +import pickle +import sys +from datetime import datetime +import calendar +import uuid + +import nvtx +import torch.cuda +import torch.distributed + + +_TRACE_MODE = None + + +# Note: By default, we only target tracing *cross-node* communications. +# See 'should_rank_record_comm' +def shunt_torch_communication(): + _identify_trace_mode() + if _TRACE_MODE == 'none': + if int(os.environ.get("RANK", "0")) == 0: + print('Tracing torch.distributed collectives disabled.', flush=True) + return + + _shunt_torch_communication_objects() + _shunt_torch_communication_calls() + + if int(os.environ.get("RANK", "0")) == 0: + print('NVTX and print tracing of torch.distributed collectives enabled.', + flush=True) + print(f"{_GPU_SERIAL=}, {_VM_ID=}") + + if not _SHOULD_PRINT: + print('Collectives are traced but will not be printed to stdout', flush=True) + + +def _identify_trace_mode(): + global _TRACE_MODE + _TRACE_MODE = os.environ.get('TORCH_DISTRIBUTED_TRACING', 'CROSSNODE') + _TRACE_MODE = _TRACE_MODE.lower() + + global _SHOULD_PRINT + _SHOULD_PRINT = os.environ.get('TORCH_DISTRIBUTED_TRACING_PRINT', 'False') + _SHOULD_PRINT = _SHOULD_PRINT.lower() in ['true', '1', 't', 'y', 'yes'] + + global _GPU_SERIAL + _GPU_SERIAL = os.environ.get("GPU_SERIAL", "unknown") + global _VM_ID + _VM_ID = os.environ.get("VM_ID", "unknown") + + +# Each wrapper should match format 'traced_' +def _shunt_torch_communication_calls(): + """Replaces torch.distributed. with a traced version. + """ + target_collectives = [ + 'barrier', + 'broadcast_object_list', + 'broadcast', + 'gather', + 'scatter', + 'reduce', + 'reduce_scatter', + 'reduce_scatter_tensor', + 'all_reduce', + 'all_gather', + 'all_gather_into_tensor', + 'all_to_all', + 'all_to_all_single', + 'batch_isend_irecv', + 'isend', + 'irecv', + 'send', + 'recv', + ] + + this_module = sys.modules[__name__] + for collective in target_collectives: + original_fn = getattr(torch.distributed, collective) + replaced_fn = getattr(this_module, 'traced_' + collective) + setattr(torch.distributed, 'untraced_' + collective, original_fn) + setattr(torch.distributed, collective, replaced_fn) + + +def _shunt_torch_communication_objects(): + original_p2p = torch.distributed.P2POp + setattr(torch.distributed, 'UntracedP2POp', original_p2p) + setattr(torch.distributed, 'P2POp', _TracedP2POp) + + +# Each 'traced_' defines a 'message_size' to compute B/W. +# Ref https://github.com/NVIDIA/nccl-tests/blob/master/doc/PERFORMANCE.md + +# pylint: disable=g-doc-args,g-doc-return-or-yield +def traced_barrier(group=None, async_op=False, device_ids=None): + """Intercepts invocations of torch.distributed.barrier. + """ + if _should_rank_record_comm(group): + _emit_call_description('barrier', message_size=1, group=group) + + return torch.distributed.untraced_barrier(group, async_op, device_ids) + + +# pylint: disable=g-doc-args,g-doc-return-or-yield +def traced_broadcast_object_list(object_list, src=0, group=None, device=None): + """Intercepts invocations of torch.distributed.broadcast_object_list. + + Converts objects to tensor data using the pickle library. Then conducts a + torch.distributed.broadcast call. + """ + + if _should_rank_record_comm(group, root_rank=src): + message_size = 0 + for obj in object_list: + # Note: This computation is sadly redundant with underlying call :( + # For now we don't expect this invocation to be in critical path. + buf = io.BytesIO() + pickle.Pickler(buf).dump(obj) + message_size += buf.getbuffer().nbytes + _emit_call_description( + 'broadcast_object_list', message_size, group, root_rank=src) + + return torch.distributed.untraced_broadcast_object_list( + object_list, src, group, device) + + +# pylint: disable=g-doc-args,g-doc-return-or-yield +def traced_broadcast(tensor, src, group=None, async_op=False): + """Intercepts invocations of torch.distributed.broadcast. + + Calculate [Ring-B/W] = [Message Size]/[Kernel Time] for large [Message Size] + + https://images.nvidia.com/events/sc15/pdfs/NCCL-Woolley.pdf + """ + if _should_rank_record_comm(group, root_rank=src): + message_size = tensor.nelement() * tensor.element_size() + _emit_call_description('broadcast', message_size, group, root_rank=src) + + return torch.distributed.untraced_broadcast( + tensor, src, group, async_op) + + +# pylint: disable=g-doc-args,g-doc-return-or-yield +def traced_gather( + tensor, gather_list=None, dst=0, group=None, async_op=False): + """Intercepts invocations of torch.distributed.gather. + + Let T := sum([Receive Kernel Time from Rank i] for i != dst) + Calculate [P2P-B/W] = [Message Size]/T + + Each of (n-1) ranks sends a message to the root. + + Note that any correction factors for the bus bandwidth (e.g. [n-1]/n) depend + on the *definition* of 'Message Size'. In some cases, such as for 'gather', we + define 'Message Size' so as to omit the size of data that is already local + to the destination GPU for the 'gather' operation. In this case, no correction + factor is needed. In NCCL tests, they assume all ranks send equal sized + messages and include this size of data already resident on the destination + GPU. Thus, in there case you see a (n-1)/n correction factor on calculating + the bus bandwidth. In general, the goal of computing the bus bandwidth is + to compare data transfer rates on the bus relative to peak bus bandwidth. + See https://github.com/NVIDIA/nccl-tests/blob/master/doc/PERFORMANCE.md. + + https://github.com/NVIDIA/nccl-tests/blob/1a5f551ffd6e/src/gather.cu#L54 + https://github.com/pytorch/pytorch/blob/bfd995f0d6bf/torch/csrc/cuda/nccl.cpp#L1040 + """ + if _should_rank_record_comm(group, root_rank=dst, is_ring=False): + message_size = functools.reduce( + lambda sz, x: sz + x.nelement() * x.element_size(), gather_list, 0) + message_size -= tensor.nelement() * tensor.element_size() + + _emit_call_description('gather', message_size, group, root_rank=dst) + + return torch.distributed.untraced_gather( + tensor, gather_list, dst, group, async_op) + + +# pylint: disable=g-doc-args,g-doc-return-or-yield +def traced_scatter( + tensor, scatter_list=None, src=0, group=None, async_op=False): + """Intercepts invocations of torch.distributed.scatter. + + Let T := sum([Send Kernel Time from Rank i] for i != src) + Calculate [P2P-B/W] = [Message Size]/T + + Each of (n-1) ranks receives a message from the root. + There is no (n-1)/n factor as we factor it in [Message Size]. + + https://github.com/NVIDIA/nccl-tests/blob/1a5f551ffd6e/src/scatter.cu#L50 + https://github.com/pytorch/pytorch/blob/bfd995f0d6bf/torch/csrc/cuda/nccl.cpp#L1089 + """ + if _should_rank_record_comm(group, root_rank=src, is_ring=False): + message_size = functools.reduce( + lambda sz, x: sz + x.nelement() * x.element_size(), scatter_list, 0) + message_size -= tensor.nelement() * tensor.element_size() + + _emit_call_description('scatter', message_size, group, root_rank=src) + + return torch.distributed.untraced_scatter( + tensor, scatter_list, src, group, async_op) + + +# pylint: disable=g-doc-args,g-doc-return-or-yield +def traced_reduce( + tensor, dst, op=torch.distributed.ReduceOp.SUM, group=None, async_op=False): + """Intercepts invocations of torch.distributed.reduce. + + Calculate [Ring-B/W] = [Message Size]/[Kernel Time] for large [Message Size] + Also see 'traced_broadcast' + """ + if _should_rank_record_comm(group, root_rank=dst): + message_size = tensor.nelement() * tensor.element_size() + _emit_call_description('reduce', message_size, group, root_rank=dst) + + return torch.distributed.untraced_reduce(tensor, dst, op, group, async_op) + + +# pylint: disable=g-doc-args,g-doc-return-or-yield +def traced_reduce_scatter( + output, + input_list, + op=torch.distributed.ReduceOp.SUM, + group=None, + async_op=False): + """Intercepts invocations of torch.distributed.reduce_scatter. + + Let n := [Group Size]. + Calculate [Ring-B/W] = (n-1)/n * [Message Size]/[Kernel Time] + Assumes equal tensor sizes. It's the same as first half of ring All-Reduce. + """ + if _should_rank_record_comm(group): + message_size = output.nelement() * output.element_size() + _emit_call_description('reduce_scatter', message_size, group) + + return torch.distributed.untraced_reduce_scatter( + output, input_list, op, group, async_op) + + +# pylint: disable=redefined-builtin,g-doc-args,g-doc-return-or-yield +def traced_reduce_scatter_tensor( + output, + input, + op=torch.distributed.ReduceOp.SUM, + group=None, + async_op=False): + """Intercepts invocations of torch.distributed.reduce_scatter_tensor. + + Similar to 'traced_reduce_scatter' + """ + + if _should_rank_record_comm(group): + message_size = output.nelement() * output.element_size() + _emit_call_description('reduce_scatter', message_size, group) + + return torch.distributed.untraced_reduce_scatter_tensor( + output, input, op, group, async_op) + + +# pylint: disable=g-doc-args,g-doc-return-or-yield +def traced_all_reduce( + tensor, op=torch.distributed.ReduceOp.SUM, group=None, async_op=False): + """Intercepts invocations of torch.distributed.all_reduce. + + Let n := [Group Size] + Calculate [Ring-B/W] = 2(n-1)/n * [Message Size] / [Kernel Time] + + https://images.nvidia.com/events/sc15/pdfs/NCCL-Woolley.pdf + """ + if _should_rank_record_comm(group): + message_size = tensor.nelement() * tensor.element_size() + _emit_call_description('all_reduce', message_size, group) + + return torch.distributed.untraced_all_reduce( + tensor, op, group, async_op) + + +# pylint: disable=g-doc-args,g-doc-return-or-yield +def traced_all_gather(tensor_list, tensor, group=None, async_op=False): + """Intercepts invocations of torch.distributed.all_gather. + + Let n := [Group Size] + Calculate [Ring-B/W] = (n-1)/n * [Message Size] / [Kernel Time] + Assuming equal tensor sizes. + """ + if _should_rank_record_comm(group): + message_size = functools.reduce( + lambda size, x: size + x.nelement() * x.element_size(), tensor_list, 0) + _emit_call_description('all_gather', message_size, group) + + return torch.distributed.untraced_all_gather( + tensor_list, tensor, group, async_op) + + +# pylint: disable=g-doc-args,g-doc-return-or-yield +def traced_all_gather_into_tensor( + output_tensor, input_tensor, group=None, async_op=False): + """Intercepts invocations of torch.distributed.all_gather_into_tensor. + + Similar 'traced_all_gather' + """ + if _should_rank_record_comm(group): + message_size = output_tensor.nelement() * output_tensor.element_size() + _emit_call_description('all_gather', message_size, group) + + return torch.distributed.untraced_all_gather_into_tensor( + output_tensor, input_tensor, group, async_op) + + +# Note: The TCP Direct team intends to implement a custom version of AllToAll. +# pylint: disable=g-doc-args,g-doc-return-or-yield +def traced_all_to_all( + output_tensor_list, input_tensor_list, group=None, async_op=False): + """Intercepts invocations of torch.distributed.all_to_all. + + Let S := sum([Message Size on Rank i] for i = 1..n) where n := [Group Size] + Let T := [End of last Receive last rank] - [Start of first Send first rank] + Calculate [Algo B/W] = S / T. + + There is no n/(n-1) correction factor as we factor it in [Message Size]. + + https://github.com/NVIDIA/nccl-tests/blob/1a5f551ffd6e/src/alltoall.cu#L57 + https://github.com/pytorch/pytorch/blob/bfd995f0d6bf/torch/csrc/cuda/nccl.cpp#L911 + """ + if _should_rank_record_comm(group): + message_size = functools.reduce( + lambda s, x: s + x.nelement() * x.element_size(), input_tensor_list, 0) + + # Omit bytes corresponding to send and receive on the same rank + self_tensor = input_tensor_list[torch.distributed.get_rank(group)] + message_size -= self_tensor.nelement() * self_tensor.element_size() + + _emit_call_description('all_to_all', message_size, group) + + return torch.distributed.untraced_all_to_all( + output_tensor_list, input_tensor_list, group, async_op) + + +# pylint: disable=g-doc-args,g-doc-return-or-yield,redefined-builtin +def traced_all_to_all_single( + output, + input, + output_split_sizes=None, + input_split_sizes=None, + group=None, + async_op=False): + """Intercepts invocations of torch.distributed.all_to_all_single. + + Similar to 'traced_all_to_all' + """ + if _should_rank_record_comm(group): + self_rank = torch.distributed.get_rank(group) + + if input_split_sizes is not None: + self_slice = input_split_sizes[self_rank] + else: + self_slice = input.size(dim=0) / torch.distributed.get_world_size(group) + + slice_nelement = input.nelement() / input.size(dim=0) + message_size = input.nelement() * input.element_size() + message_size -= self_slice * slice_nelement * input.element_size() + + _emit_call_description('all_to_all_single', message_size, group) + + return torch.distributed.untraced_all_to_all_single( + output, input, output_split_sizes, input_split_sizes, group, async_op) + + +# Note: Each send and receive occurs on indepenent CUDA streams +# pylint: disable=g-doc-args,g-doc-return-or-yield +def traced_batch_isend_irecv(p2p_op_list): + """Intercepts invocations of torch.distributed.batch_isend_irecv. + + Calculate [P2P-B/W] = [Message Size]/[Kernel Time] for each send and recv. + """ + correlation_id = str(uuid.uuid4()) + for p2p in p2p_op_list: + if _SHOULD_PRINT: + print(f"Num p2p ops in batch: {len(p2p_op_list)}") + if _should_rank_record_comm(p2p.group, peer_rank=p2p.peer, is_ring=False): + api = 'send' if p2p.op == torch.distributed.untraced_isend else 'recv' + + message_size = p2p.tensor.nelement() * p2p.tensor.element_size() + _emit_call_description(api, message_size, group=p2p.group, peer_rank=p2p.peer, correlation_id=correlation_id) + + return torch.distributed.untraced_batch_isend_irecv(p2p_op_list) + + +# pylint: disable=g-doc-args,g-doc-return-or-yield +def traced_isend(tensor, dst, group=None, tag=0): + """Intercepts invocations of torch.distributed.isend. + + Calculate [P2P-B/W] = [Message Size]/[Kernel Time] + """ + if _should_rank_record_comm(group, peer_rank=dst, is_ring=False): + message_size = tensor.nelement() * tensor.element_size() + _emit_call_description('send', message_size, group, dst) + + return torch.distributed.untraced_isend(tensor, dst, group, tag) + + +# pylint: disable=g-doc-args,g-doc-return-or-yield +def traced_irecv(tensor, src=None, group=None, tag=0): + """Intercepts invocations of torch.distributed.irecv. + """ + if _should_rank_record_comm(group, peer_rank=src, is_ring=False): + message_size = tensor.nelement() * tensor.element_size() + _emit_call_description('recv', message_size, group, src) + + return torch.distributed.untraced_irecv(tensor, src, group, tag) + + +# pylint: disable=g-doc-args,g-doc-return-or-yield +def traced_send(tensor, dst, group=None, tag=0): + """Intercepts invocations of torch.distributed.send. + """ + if _should_rank_record_comm(group, peer_rank=dst, is_ring=False): + message_size = tensor.nelement() * tensor.element_size() + _emit_call_description('send', message_size, group, dst) + + return torch.distributed.untraced_send(tensor, dst, group, tag) + + +# pylint: disable=g-doc-args,g-doc-return-or-yield +def traced_recv(tensor, src=None, group=None, tag=0): + """Intercepts invocations of torch.distributed.recv. + """ + if _should_rank_record_comm(group, peer_rank=src, is_ring=False): + message_size = tensor.nelement() * tensor.element_size() + _emit_call_description('recv', message_size, group, src) + + return torch.distributed.untraced_recv(tensor, src, group, tag) + + +@functools.lru_cache(maxsize=None) +def _should_rank_record_comm( + group=None, peer_rank=None, root_rank=None, is_ring=True): + """Decides whether a given torch.distributed collective should be recorded. + + Args: + group: The torch process group (i.e. participating GPUs) in this collective. + peer_rank: In direct peer to peer operations, the global rank of the peer. + root_rank: The global rank of the root GPU, for collectives with a root. + as_ring: Whether the default NCCL implementation uses a ring algorithm. + Specifying 'peer_rank' and 'is_ring=True' are incompatible. + + Returns: + Whether to record a descriptive NVTX marker, and possibly print a log trace. + """ + if not _is_current_process_in_group(group): + return False + if _TRACE_MODE == 'crossnode' and not _is_crossnode_comm(group, peer_rank): + return False + if not is_ring and root_rank is not None: + return torch.distributed.get_rank() == root_rank + + return True + + +def _is_current_process_in_group(group=None): + return torch.distributed.get_rank(group) >= 0 + + +@functools.lru_cache(maxsize=None) +def _is_crossnode_comm(group=None, peer_rank=None): + """Whether this collective involves communication across nodes. + + Args: + group: The torch process group (i.e. participating GPUs) in this collective. + peer: In direct peer to peer operations, the global rank of the peer. + + Returns: + Whether this collective involves communications across nodes. + """ + count_per_node = torch.cuda.device_count() + + if peer_rank is not None: + this_node = int(torch.distributed.get_rank() / count_per_node) + peer_node = int(peer_rank / count_per_node) + return this_node != peer_node + else: + if group is not None: + ranks = torch.distributed.get_process_group_ranks(group=group) + else: + ranks = [*range(torch.distributed.get_world_size())] + + nodes = list(map(lambda rank: int(rank / count_per_node), ranks)) + return any([node != nodes[0] for node in nodes]) + + +def _emit_call_description( + name, message_size, group=None, peer_rank=None, root_rank=None, correlation_id=None): + call_description = _TorchDistributedCallDescriptor( + name, message_size, group, peer_rank, root_rank, correlation_id).to_json() + + nvtx.mark(call_description) + if _should_rank_print(group, peer_rank, root_rank): + print(call_description) + + +class _TorchDistributedCallDescriptor: + """Description of a torch.distributed comm call to be stored as NVTX marker. + """ + + def __init__( + self, name, message_size, group=None, peer_rank=None, root_rank=None, correlation_id=None): + self.name = name + self.rank = torch.distributed.get_rank() + self.source_line = _get_call_source_line() + self.message_size = message_size + self.device = torch.cuda.current_device() + self.timestamp = calendar.timegm(datetime.utcnow().utctimetuple()) + self.gpu_serial = _GPU_SERIAL + self.vm_id = _VM_ID + if group is not None: + self.group_ranks = torch.distributed.get_process_group_ranks(group=group) + if peer_rank is not None: + self.peer_rank = peer_rank + if root_rank is not None: + self.root_rank = root_rank + if correlation_id is not None: + self.correlation_id = correlation_id + + def to_json(self): + return json.dumps(self, default=lambda o: o.__dict__) + + +def _should_rank_print(group=None, peer_rank=None, root_rank=None): + if not _SHOULD_PRINT: + return False + if root_rank is not None: + leader = root_rank + elif group is not None: + leader = torch.distributed.get_global_rank(group, 0) + else: + leader = 0 + + return (peer_rank is not None) or torch.distributed.get_rank() == leader + + +# A fixed depth works for all cases here +def _get_call_source_line(depth=4): + caller = inspect.getframeinfo(inspect.stack()[depth][0]) + return '{}:{}'.format(caller.filename, caller.lineno) + + +# We need to un-hide the original type for 'batch_isend_irecv' due to type +# checks performed by torch.distributed. This is not an issue as by then we +# have already recorded the call. +class _TracedP2POp(torch.distributed.P2POp): + """Used to redirect torch.distributed.i{send,recv} on 'batch_isend_irecv'. + """ + + def __init__(self, op, tensor, peer, group=None, tag=0): + original_op = _get_original_p2p_op(op) + torch.distributed.UntracedP2POp.__init__( + self, original_op, tensor, peer, group, tag) + + def __new__(cls, op, tensor, peer, group=None, tag=0): + original_op = _get_original_p2p_op(op) + return torch.distributed.UntracedP2POp.__new__( + cls, original_op, tensor, peer, group, tag) + + +def _get_original_p2p_op(op): + if op == torch.distributed.isend: + return torch.distributed.untraced_isend + elif op == torch.distributed.irecv: + return torch.distributed.untraced_irecv \ No newline at end of file diff --git a/sample_workloads/lit-gpt-demo/utilities/nsight_callbacks.py b/sample_workloads/lit-gpt-demo/utilities/nsight_callbacks.py new file mode 100644 index 00000000..079eef92 --- /dev/null +++ b/sample_workloads/lit-gpt-demo/utilities/nsight_callbacks.py @@ -0,0 +1,52 @@ +import torch +import sys +from typing import Any +import nvtx + +class NsightCallback: + def __init__(self): + self.nsys_profile_step_multiple = 5 + self.backward_nvtx_range = None + + def on_train_batch_start(self, batch_idx: int, gradient_accumulation_steps: int) -> None: + global_batch_idx = batch_idx / gradient_accumulation_steps + if ( + global_batch_idx > 0 + and global_batch_idx % self.nsys_profile_step_multiple == 0 + ): + print(f"Starting Nsys profiling") + torch.cuda.cudart().cudaProfilerStart() + + def on_train_batch_end( + self, batch_idx: int, gradient_accumulation_steps: int + ) -> None: + global_batch_idx = batch_idx // gradient_accumulation_steps + global_batch_offset = batch_idx % gradient_accumulation_steps + is_last_microbatch = global_batch_offset == gradient_accumulation_steps - 1 + + if ( + global_batch_idx > 1 + and global_batch_idx % self.nsys_profile_step_multiple == 0 + and is_last_microbatch + ): + print(f"Stopping Nsys profiling") + torch.cuda.cudart().cudaProfilerStop() + if is_last_microbatch: + print(f"HEARTBEAT: {global_batch_idx=}, {batch_idx=}") + print( + f"Max memory used: {torch.cuda.max_memory_allocated() / 1e9:.02f} GB" + ) + sys.stdout.flush() + sys.stderr.flush() + + + def on_before_backward(self): + self.backward_nvtx_range = nvtx.start_range(message="backward", color="red") + + def on_after_backward(self): + if self.backward_nvtx_range: + nvtx.end_range(self.backward_nvtx_range) + + def on_train_epoch_start(self) -> None: + print("Resetting max memory allocation") + torch.cuda.reset_peak_memory_stats() \ No newline at end of file diff --git a/sample_workloads/nccltest/README.md b/sample_workloads/nccltest/README.md index 1096affe..cbd911f1 100644 --- a/sample_workloads/nccltest/README.md +++ b/sample_workloads/nccltest/README.md @@ -99,6 +99,8 @@ message sizes to sweep over are shared across all benchmarks.** Supported benchmarks are `all_gather_perf`, `all_reduce_perf`, `reduce_scatter_perf`, `broadcast_perf`, `reduce_perf`, `sendrecv_perf`, `scatter_perf`, `gather_perf`, `alltoall_perf`, and `hypercube_perf`. +*Note: If you want to use orchestrators relying on SSH to launch processes (e.g. MPI) to run communication patterns doing send-recvs between many GPU pairs (e.g. all-to-all), be sure to set `ulimit -n 1048576` for every process you start. To do this, you would need `CAP_SYS_RESOURCE` capability in your workload container, or make it privileged. If you are unsure whether your job orchestrator uses SSH, we recommend doing this out of caution.* + For each benchmark, you must supply a mask. The benchmark does a bitwise AND between the rank and the mask to get a color, and ranks with the same color goes in the same NCCL communicator. Examples: @@ -147,4 +149,4 @@ from superblock `cluster.startSuperblock`. **This guarantees closer affinity between the job nodes and should be enabled for performance benchmarking.** *Note that this feature is based on a `superblock` label in the Kubernetes -cluster and would not work if that label is missing. For example, Superblock 1 should be labeled with `superblock`: 1 * \ No newline at end of file +cluster and would not work if that label is missing. For example, Superblock 1 should be labeled with `superblock`: 1 * diff --git a/sample_workloads/nccltest/gke/values.yaml b/sample_workloads/nccltest/gke/values.yaml index bbb66df3..88f749c8 100644 --- a/sample_workloads/nccltest/gke/values.yaml +++ b/sample_workloads/nccltest/gke/values.yaml @@ -36,7 +36,7 @@ ncclPlugin: unreservedCores: "0-7,104-111,52-59,156-163" envs: NCCL_GPUDIRECTTCPX_FORCE_ACK: "0" - NCCL_SOCKET_IFRAME: "eth0" + NCCL_SOCKET_IFNAME: "eth0" NCCL_DYNAMIC_CHUNK_SIZE: 524288 NCCL_P2P_NET_CHUNKSIZE: 524288 NCCL_P2P_PCI_CHUNKSIZE: 524288