Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pirillo/litgpt nvtx #354

Merged
merged 8 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sample_workloads/lit-gpt-demo/LitGPT.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ RUN echo "deb [signed-by=/usr/share/keyrings/cloud.google.gpg] http://packages.c
&& apt-get update -y && apt-get install google-cloud-cli -y

COPY scripts /workspace/scripts
COPY utilities /workspace/pretrain/utilities
COPY openwebtext_trainer.py /workspace/pretrain/

ENTRYPOINT ["/bin/bash", "/workspace/scripts/litgpt_container_entrypoint.sh"]
Expand Down
2 changes: 2 additions & 0 deletions sample_workloads/lit-gpt-demo/helm/templates/litgpt.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ spec:
value: "{{$root.Values.workload.warmupIters}}"
- name: MAX_ITERS
value: "{{$root.Values.workload.maxIters}}"
- name: COLLECT_NSYS_PROFILE
value: "{{$root.Values.workload.collectNsysProfile}}"
- name: CLUSTER_TYPE
value: GKE
volumeMounts:
Expand Down
2 changes: 1 addition & 1 deletion sample_workloads/lit-gpt-demo/helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ workload:
microBatchSize: 6
warmupIters: 10
maxIters: 1000

collectNsysProfile: 'no' # Set to 'yes' for profiles
183 changes: 122 additions & 61 deletions sample_workloads/lit-gpt-demo/openwebtext_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,19 @@
from lightning.pytorch.strategies import FSDPStrategy, XLAStrategy
from torch.utils.data import DataLoader, IterableDataset

import torch.multiprocessing as mp
import nvtx

# support running without installing as a package
wd = Path(__file__).parent.parent.resolve()
sys.path.append(str(wd))

mp.set_start_method("spawn", force=True)
import utilities.monitor_collectives

utilities.monitor_collectives.shunt_torch_communication()
Chris113113 marked this conversation as resolved.
Show resolved Hide resolved


from lit_gpt import Config
from lit_gpt.model import GPT, Block
from lit_gpt.speed_monitor import SpeedMonitorCallback, estimate_flops, measure_flops
Expand Down Expand Up @@ -57,6 +66,8 @@ def __init__(self, config: Config) -> None:
self.config = config
self.module: Optional[torch.nn.Module] = None
self.measured_flops: Optional[int] = None
self.nsys_profile_step_multiple = 5
self.backward_nvtx_range = None

def configure_model(self) -> None:
self.module = GPT(self.config)
Expand All @@ -66,9 +77,14 @@ def configure_optimizers(self) -> torch.optim.Optimizer:
return torch.optim.AdamW(
self.module.parameters(), lr=learning_rate, weight_decay=weight_decay, betas=(beta1, beta2), foreach=False
)

def on_train_epoch_start(self) -> None:
print("Resetting max memory allocation")
torch.cuda.reset_peak_memory_stats()

def on_fit_start(self) -> None:
trainer = self.trainer

with torch.device("meta"):
meta_model = GPT(self.module.config)
# "estimated" is not as precise as "measured". Estimated is optimistic but widely used in the wild.
Expand All @@ -88,14 +104,57 @@ def on_train_batch_start(self, batch: Any, batch_idx: int) -> None:
for optimizer in self.trainer.strategy.optimizers:
for param_group in optimizer.param_groups:
param_group["lr"] = lr

global_batch_idx = batch_idx / gradient_accumulation_steps
if (
global_batch_idx > 0
and global_batch_idx % self.nsys_profile_step_multiple == 0
):
print(f"Starting Nsys profiling")
torch.cuda.cudart().cudaProfilerStart()


def on_train_batch_end(
self, outputs, batch: Any, batch_idx: int, unused: int = 0
) -> None:
global_batch_idx = batch_idx // gradient_accumulation_steps
global_batch_offset = batch_idx % gradient_accumulation_steps
is_last_microbatch = global_batch_offset == gradient_accumulation_steps - 1

if (
global_batch_idx > 1
and global_batch_idx % self.nsys_profile_step_multiple == 0
and is_last_microbatch
):
self.print(f"Stopping Nsys profiling")
torch.cuda.cudart().cudaProfilerStop()
if is_last_microbatch:
self.print(f"HEARTBEAT: {global_batch_idx=}, {batch_idx=}")
self.print(
f"Max memory used: {torch.cuda.max_memory_allocated() / 1e9:.02f} GB"
Chris113113 marked this conversation as resolved.
Show resolved Hide resolved
)
sys.stdout.flush()
sys.stderr.flush()

@nvtx.annotate(color='green')
def training_step(self, batch: Any, batch_idx: int) -> torch.Tensor:
input_ids, targets = batch
logits = self.module(input_ids)
loss = chunked_cross_entropy(logits, targets, chunk_size=0)
self.log("train_loss", loss, on_step=True, on_epoch=False, prog_bar=True)
return loss

def on_before_backward(self, loss):
self.backward_nvtx_range = nvtx.start_range(message="backward", color="red")

def on_after_backward(self):
if self.backward_nvtx_range:
nvtx.end_range(self.backward_nvtx_range)

@nvtx.annotate(color='orange')
def optimizer_step(self, epoch, batch_idx, optimizer, optimizer_closure):
optimizer.step(closure=optimizer_closure)

def validation_step(self, batch: Any, batch_idx: int) -> None:
input_ids, targets = batch
logits = self.module(input_ids)
Expand All @@ -104,68 +163,70 @@ def validation_step(self, batch: Any, batch_idx: int) -> None:


def main(devices: int = 1, precision: Optional[str] = None, tpu: bool = False) -> None:
precision = precision or get_default_supported_precision(training=True, tpu=tpu)

if devices > 1:
if tpu:
# For multi-host TPU training, the device count for Fabric is limited to the count on a single host.
devices = "auto"
strategy = XLAStrategy(sync_module_states=False)
cm = torch.autograd.profiler.emit_nvtx()
with cm:
precision = precision or get_default_supported_precision(training=True, tpu=tpu)

if devices > 1:
if tpu:
# For multi-host TPU training, the device count for Fabric is limited to the count on a single host.
devices = "auto"
strategy = XLAStrategy(sync_module_states=False)
else:
strategy = FSDPStrategy(
auto_wrap_policy={Block},
activation_checkpointing_policy={Block},
# the argument is not available in the Trainer strategy, but it's the default anyways
# state_dict_type="full",
limit_all_gathers=True,
cpu_offload=False,
)
else:
strategy = FSDPStrategy(
auto_wrap_policy={Block},
activation_checkpointing_policy={Block},
# the argument is not available in the Trainer strategy, but it's the default anyways
# state_dict_type="full",
limit_all_gathers=True,
cpu_offload=False,
)
else:
strategy = "auto"

logger = step_csv_logger(out_dir, name, cls=CSVLogger, flush_logs_every_n_steps=log_interval)
speed_monitor = SpeedMonitorCallback(
length_fn=lambda batch: batch[0].size(1), batch_size=micro_batch_size, window_size=10, time_unit="seconds"
)
model_checkpoint = ModelCheckpoint(dirpath=out_dir, every_n_train_steps=save_interval, save_last=True, verbose=True)
trainer = L.Trainer(
devices=devices,
strategy=strategy,
precision=precision,
logger=logger,
callbacks=[speed_monitor, model_checkpoint],
max_steps=max_iters,
max_epochs=1,
limit_val_batches=eval_iters,
accumulate_grad_batches=gradient_accumulation_steps,
log_every_n_steps=log_interval,
val_check_interval=eval_interval,
num_nodes=num_nodes
)

L.seed_everything(1337, workers=True) # same seed for every process to init model (FSDP)

trainer.print(hparams)

if trainer.global_rank == 0:
out_dir.mkdir(parents=True, exist_ok=True)

config = Config.from_name(model_name)
trainer.print(f"Loading model with {config.__dict__}")
t0 = time.perf_counter()
model = LightningGPTModule(config)
trainer.print(f"Time to instantiate model: {time.perf_counter() - t0:.02f} seconds.")

train_data = Dataset(str(data_dir / "train.bin"), config.block_size)
val_data = Dataset(str(data_dir / "val.bin"), config.block_size)
train_dataloader = DataLoader(train_data, batch_size=micro_batch_size, num_workers=2)
val_dataloader = DataLoader(val_data, batch_size=micro_batch_size, num_workers=2)

t0 = time.perf_counter()
trainer.fit(model, train_dataloader, val_dataloader, ckpt_path="last")
trainer.print(f"Training time: {(time.perf_counter()-t0):.2f}s")
if trainer.strategy.root_device.type == "cuda":
trainer.print(f"Memory used: {torch.cuda.max_memory_allocated() / 1e9:.02f} GB")
strategy = "auto"

logger = step_csv_logger(out_dir, name, cls=CSVLogger, flush_logs_every_n_steps=log_interval)
speed_monitor = SpeedMonitorCallback(
length_fn=lambda batch: batch[0].size(1), batch_size=micro_batch_size, window_size=10, time_unit="seconds"
)
model_checkpoint = ModelCheckpoint(dirpath=out_dir, every_n_train_steps=save_interval, save_last=True, verbose=True)
trainer = L.Trainer(
devices=devices,
strategy=strategy,
precision=precision,
logger=logger,
callbacks=[speed_monitor, model_checkpoint],
max_steps=max_iters,
max_epochs=1,
limit_val_batches=eval_iters,
accumulate_grad_batches=gradient_accumulation_steps,
log_every_n_steps=log_interval,
val_check_interval=eval_interval,
num_nodes=num_nodes
)

L.seed_everything(1337, workers=True) # same seed for every process to init model (FSDP)

trainer.print(hparams)

if trainer.global_rank == 0:
out_dir.mkdir(parents=True, exist_ok=True)

config = Config.from_name(model_name)
trainer.print(f"Loading model with {config.__dict__}")
t0 = time.perf_counter()
model = LightningGPTModule(config)
trainer.print(f"Time to instantiate model: {time.perf_counter() - t0:.02f} seconds.")

train_data = Dataset(str(data_dir / "train.bin"), config.block_size)
val_data = Dataset(str(data_dir / "val.bin"), config.block_size)
train_dataloader = DataLoader(train_data, batch_size=micro_batch_size, num_workers=2)
val_dataloader = DataLoader(val_data, batch_size=micro_batch_size, num_workers=2)

t0 = time.perf_counter()
trainer.fit(model, train_dataloader, val_dataloader, ckpt_path="last")
trainer.print(f"Training time: {(time.perf_counter()-t0):.2f}s")
if trainer.strategy.root_device.type == "cuda":
trainer.print(f"Memory used: {torch.cuda.max_memory_allocated() / 1e9:.02f} GB")
Chris113113 marked this conversation as resolved.
Show resolved Hide resolved


class Dataset(IterableDataset):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ set -o pipefail
: "${GCS_DATA_BUCKET:?Must set GCS_DATA_BUCKET}"
: "${DATA_DIR:?Must set DATA_DIR}"
: "${CLUSTER_TYPE:='GKE'}"
: "${COLLECT_NSYS_PROFILE:='no'}"

export EXPERIMENT_LOCAL_DIR=/experiment/${EXPERIMENT_ROOT_DIR}

Expand All @@ -33,6 +34,9 @@ export WORLD_SIZE=$((NNODES * GPUS_PER_NODE))
LOG_DIR=$EXPERIMENT_LOCAL_DIR/training_logs
mkdir -p $LOG_DIR

PROFILING_DIR=$EXPERIMENT_LOCAL_DIR/nsys_profiles
mkdir -p $PROFILING_DIR

OUT_DIR=$EXPERIMENT_LOCAL_DIR/out
mkdir -p $OUT_DIR

Expand Down Expand Up @@ -145,7 +149,6 @@ fi

PIDS=()


CPU_SETS=( "0-7,104-111" "8-15,112-119" "16-23,120-127" "24-31,128-135" "52-59,156-163" "60-67,164-171" "68-75,172-179" "76-83,180-187" )

for ((LOCAL_RANK=0; LOCAL_RANK <= $((GPUS_PER_NODE - 1)); LOCAL_RANK++)); do
Expand All @@ -161,6 +164,10 @@ for ((LOCAL_RANK=0; LOCAL_RANK <= $((GPUS_PER_NODE - 1)); LOCAL_RANK++)); do
fi
CMD_PREFIX="numactl --membind=$MEMBIND_NUMA_NODE --physcpubind $CPUS"

if [[ "${COLLECT_NSYS_PROFILE:="no"}" == "yes" ]]; then
echo "Collecting nsys profile"
CMD_PREFIX="${CMD_PREFIX} nsys profile --sample=none --trace=cuda,nvtx -o $PROFILING_DIR/node_${NODE_RANK:?}_local_rank_${LOCAL_RANK} --capture-range=cudaProfilerApi --capture-range-end=repeat:${PROFILE_REPS:=5} --export sqlite "
fi

RANK=$RANK LOCAL_RANK=$LOCAL_RANK \
$CMD_PREFIX \
Expand Down
Loading