diff --git a/.gitignore b/.gitignore index 8877fafbac..cf9589514e 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,4 @@ _build/ .python-version cookbook/release-snacks .kube/ +.docker/ diff --git a/Makefile b/Makefile index cf69331d63..272f7123a3 100644 --- a/Makefile +++ b/Makefile @@ -12,7 +12,7 @@ FLYTE_DIR := ~/.flyte # Module of cookbook examples to register -EXAMPLES_MODULE := core +EXAMPLES_MODULE ?= core define LOG echo "$(shell tput bold)$(shell tput setaf 2)$(1)$(shell tput sgr0)" diff --git a/cookbook/case_studies/ml_training/mnist_classifier/Dockerfile b/cookbook/case_studies/ml_training/mnist_classifier/Dockerfile index 5fe6cc70ba..bd076c758f 100644 --- a/cookbook/case_studies/ml_training/mnist_classifier/Dockerfile +++ b/cookbook/case_studies/ml_training/mnist_classifier/Dockerfile @@ -1,4 +1,4 @@ -FROM nvcr.io/nvidia/pytorch:21.06-py3 +FROM pytorch/pytorch:1.9.0-cuda10.2-cudnn7-runtime LABEL org.opencontainers.image.source https://github.com/flyteorg/flytesnacks WORKDIR /root @@ -6,15 +6,15 @@ ENV LANG C.UTF-8 ENV LC_ALL C.UTF-8 ENV PYTHONPATH /root -# Give your wandb API key. Get it from https://wandb.ai/authorize. -# ENV WANDB_API_KEY your-api-key +# Set your wandb API key and user name. Get the API key from https://wandb.ai/authorize. +# ENV WANDB_API_KEY +# ENV WANDB_USERNAME # Install the AWS cli for AWS support RUN pip install awscli -ENV VENV /opt/venv - # Virtual environment +ENV VENV /opt/venv RUN python3 -m venv ${VENV} ENV PATH="${VENV}/bin:$PATH" @@ -25,6 +25,10 @@ RUN pip install -r /root/requirements.txt # Copy the actual code COPY mnist_classifier/ /root/mnist_classifier/ +# Copy the makefile targets to expose on the container. This makes it easier to register. +COPY in_container.mk /root/Makefile +COPY mnist_classifier/sandbox.config /root + # This tag is supplied by the build script and will be used to determine the version # when registering tasks, workflows, and launch plans ARG tag diff --git a/cookbook/case_studies/ml_training/mnist_classifier/README.rst b/cookbook/case_studies/ml_training/mnist_classifier/README.rst index e895fd5710..d086562c13 100644 --- a/cookbook/case_studies/ml_training/mnist_classifier/README.rst +++ b/cookbook/case_studies/ml_training/mnist_classifier/README.rst @@ -53,7 +53,7 @@ Weights & Biases Integration `Weights & Biases `__, or simply, ``wandb`` helps build better models faster with experiment tracking, dataset versioning, and model management. -We'll use ``wandb`` alongside PyTorch to track our ML experiment and its concerned model parameters. +We'll use ``wandb`` alongside PyTorch to track our ML experiment and its associated model parameters. .. note:: Before running the example, create a ``wandb`` account and log in to access the API. diff --git a/cookbook/case_studies/ml_training/mnist_classifier/pytorch_single_node_and_gpu.py b/cookbook/case_studies/ml_training/mnist_classifier/pytorch_single_node_and_gpu.py new file mode 100644 index 0000000000..20349669a9 --- /dev/null +++ b/cookbook/case_studies/ml_training/mnist_classifier/pytorch_single_node_and_gpu.py @@ -0,0 +1,350 @@ +""" +Single Node, Single GPU Training +-------------------------------- + +Training a model on a single node on one GPU is as trivial as writing any Flyte task and simply setting the GPU to ``1``. +As long as the Docker image is built correctly with the right version of the GPU drivers and the Flyte backend is +provisioned to have GPU machines, Flyte will execute the task on a node that has GPU(s). + +Currently, Flyte does not provide any specific task type for PyTorch (though it is entirely possible to provide a task-type +that supports *PyTorch-Ignite* or *PyTorch Lightening* support, but this is not critical). One can request for a GPU, simply +by setting GPU="1" resource request and then at runtime, the GPU will be provisioned. + +In this example, we'll see how we can create any PyTorch model, train it using Flyte and a specialized container. +""" + +# %% +# First, let's import the libraries. +import json +import os +import typing +from dataclasses import dataclass + +import torch +import torch.nn.functional as F +import wandb +from dataclasses_json import dataclass_json +from flytekit import Resources, task, workflow +from flytekit.types.file import PythonPickledFile +from torch import distributed as dist +from torch import nn, optim +from torchvision import datasets, transforms + +# %% +# Let's define some variables to be used later. The following variables are specific to ``wandb``: +# +# - ``NUM_BATCHES_TO_LOG``: Number of batches to log from the test data for each test step +# - ``LOG_IMAGES_PER_BATCH``: Number of images to log per test batch +NUM_BATCHES_TO_LOG = 10 +LOG_IMAGES_PER_BATCH = 32 + +# %% +# If running remotely, copy your ``wandb`` API key to the Dockerfile under the environment variable ``WANDB_API_KEY``. +# This function logs into ``wandb`` and initializes the project. If you built your Docker image with the +# ``WANDB_USERNAME``, this will work. Otherwise, replace ``my-user-name`` with your ``wandb`` user name. +# +# We'll call this function in the ``pytorch_mnist_task`` defined below. +def wandb_setup(): + wandb.login() + wandb.init(project="mnist-single-node-single-gpu", entity=os.environ.get("WANDB_USERNAME", "my-user-name")) + +# %% +# Creating the Network +# ==================== +# +# We use a simple PyTorch model with :py:class:`pytorch:torch.nn.Conv2d` and :py:class:`pytorch:torch.nn.Linear` layers. +# Let's also use :py:func:`pytorch:torch.nn.functional.relu`, :py:func:`pytorch:torch.nn.functional.max_pool2d`, and +# :py:func:`pytorch:torch.nn.functional.relu` to define the forward pass. +class Net(nn.Module): + def __init__(self): + super(Net, self).__init__() + self.conv1 = nn.Conv2d(1, 20, 5, 1) + self.conv2 = nn.Conv2d(20, 50, 5, 1) + self.fc1 = nn.Linear(4 * 4 * 50, 500) + self.fc2 = nn.Linear(500, 10) + + def forward(self, x): + x = F.relu(self.conv1(x)) + x = F.max_pool2d(x, 2, 2) + x = F.relu(self.conv2(x)) + x = F.max_pool2d(x, 2, 2) + x = x.view(-1, 4 * 4 * 50) + x = F.relu(self.fc1(x)) + x = self.fc2(x) + return F.log_softmax(x, dim=1) + + +# %% +# The Data Loader +# =============== + +def mnist_dataloader(batch_size, train=True, **kwargs): + return torch.utils.data.DataLoader( + datasets.MNIST( + "../data", + train=train, + download=True, + transform=transforms.Compose( + [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))] + ), + ), + batch_size=batch_size, + shuffle=True, + **kwargs, + ) + + +# %% +# Training +# ======== +# +# We define a ``train`` function to enclose the training loop per epoch, i.e., this gets called for every successive epoch. +# Additionally, we log the loss and epoch progression, which can later be visualized in a ``wandb`` dashboard. +def train(model, device, train_loader, optimizer, epoch, log_interval): + model.train() + + # hooks into the model to collect gradients and the topology + wandb.watch(model) + + # loop through the training batches + for batch_idx, (data, target) in enumerate(train_loader): + data, target = data.to(device), target.to(device) # device conversion + optimizer.zero_grad() # clear gradient + output = model(data) # forward pass + loss = F.nll_loss(output, target) # compute loss + loss.backward() # propagate the loss backward + optimizer.step() # update the model parameters + + if batch_idx % log_interval == 0: + print( + "Train Epoch: {} [{}/{} ({:.0f}%)]\tloss={:.4f}".format( + epoch, + batch_idx * len(data), + len(train_loader.dataset), + 100.0 * batch_idx / len(train_loader), + loss.item(), + ) + ) + + # log epoch and loss + wandb.log({"loss": loss, "epoch": epoch}) + + +# %% +# We define a test logger function which will be called when we run the model on test dataset. +def log_test_predictions(images, labels, outputs, predicted, my_table, log_counter): + """ + Convenience function to log predictions for a batch of test images + """ + + # obtain confidence scores for all classes + scores = F.softmax(outputs.data, dim=1) + + # assign ids based on the order of the images + for i, (image, pred, label, score) in enumerate( + zip(*[x.cpu().numpy() for x in (images, predicted, labels, scores)]) + ): + # add required info to data table: id, image pixels, model's guess, true label, scores for all classes + my_table.add_data(f"{i}_{log_counter}", wandb.Image(image), pred, label, *score) + if i == LOG_IMAGES_PER_BATCH: + break + + +# %% +# Evaluation +# ========== +# +# We define a ``test`` function to test the model on the test dataset. +# +# We log ``accuracy``, ``test_loss``, and a ``wandb`` `table `__. +# The ``wandb`` table can help in depicting the model's performance in a structured format. +def test(model, device, test_loader): + + # ``wandb`` tabular columns + columns = ["id", "image", "guess", "truth"] + for digit in range(10): + columns.append("score_" + str(digit)) + my_table = wandb.Table(columns=columns) + + model.eval() + + # hooks into the model to collect gradients and the topology + wandb.watch(model) + + test_loss = 0 + correct = 0 + log_counter = 0 + + # disable gradient + with torch.no_grad(): + + # loop through the test data loader + for images, targets in test_loader: + images, targets = images.to(device), targets.to(device) # device conversion + outputs = model(images) # forward pass -- generate predictions + test_loss += F.nll_loss(outputs, targets, reduction="sum").item() # sum up batch loss + _, predicted = torch.max(outputs.data, 1) # get the index of the max log-probability + correct += (predicted == targets).sum().item() # compare predictions to true label + + # log predictions to the ``wandb`` table + if log_counter < NUM_BATCHES_TO_LOG: + log_test_predictions( + images, targets, outputs, predicted, my_table, log_counter + ) + log_counter += 1 + + # compute the average loss + test_loss /= len(test_loader.dataset) + + print("\naccuracy={:.4f}\n".format(float(correct) / len(test_loader.dataset))) + accuracy = float(correct) / len(test_loader.dataset) + + # log the average loss, accuracy, and table + wandb.log({"test_loss": test_loss, "accuracy": accuracy, "mnist_predictions": my_table}) + + return accuracy + + +# %% +# Hyperparameters +# =============== +# +# We define a few hyperparameters for training our model. +@dataclass_json +@dataclass +class Hyperparameters(object): + """ + Args: + backend: pytorch backend to use, e.g. "gloo" or "nccl" + sgd_momentum: SGD momentum (default: 0.5) + seed: random seed (default: 1) + log_interval: how many batches to wait before logging training status + batch_size: input batch size for training (default: 64) + test_batch_size: input batch size for testing (default: 1000) + epochs: number of epochs to train (default: 10) + learning_rate: learning rate (default: 0.01) + """ + + backend: str = dist.Backend.GLOO + sgd_momentum: float = 0.5 + seed: int = 1 + log_interval: int = 10 + batch_size: int = 64 + test_batch_size: int = 1000 + epochs: int = 10 + learning_rate: float = 0.01 + + +# %% +# Training and Evaluating +# ======================= +# +# The output model using :py:func:`pytorch:torch.save` saves the `state_dict` as described +# `in pytorch docs `_. +# A common convention is to have the ``.pt`` extension for the model file. +# +# .. note:: +# Note the usage of ``requests=Resources(gpu="1")``. This will force Flyte to allocate this task onto a machine with GPU(s). +# The task will be queued up until a machine with GPU(s) can be procured. Also, for the GPU Training to work, the +# Dockerfile needs to be built as explained in the :ref:`pytorch-dockerfile` section. +TrainingOutputs = typing.NamedTuple( + "TrainingOutputs", + epoch_accuracies=typing.List[float], + model_state=PythonPickledFile, +) + + +@task( + retries=2, + cache=True, + cache_version="1.0", + requests=Resources(gpu="1", mem="3Gi", storage="1Gi"), + limits=Resources(gpu="1", mem="3Gi", storage="1Gi"), +) +def pytorch_mnist_task(hp: Hyperparameters) -> TrainingOutputs: + wandb_setup() + + # store the hyperparameters' config in ``wandb`` + wandb.config.update(json.loads(hp.to_json())) + print(wandb.config) + + # set random seed + torch.manual_seed(hp.seed) + + # ideally, if GPU training is required, and if cuda is not available, we can raise an exception + # however, as we want this algorithm to work locally as well (and most users don't have a GPU locally), we will fallback to using a CPU + use_cuda = torch.cuda.is_available() + print(f"Use cuda {use_cuda}") + device = torch.device("cuda" if use_cuda else "cpu") + + # load data + kwargs = {"num_workers": 1, "pin_memory": True} if use_cuda else {} + training_data_loader = mnist_dataloader(hp.batch_size, train=True, **kwargs) + test_data_loader = mnist_dataloader(hp.batch_size, train=False, **kwargs) + + # train the model + model = Net().to(device) + + optimizer = optim.SGD( + model.parameters(), lr=hp.learning_rate, momentum=hp.sgd_momentum + ) + + # run multiple epochs and capture the accuracies for each epoch + # train the model: run multiple epochs and capture the accuracies for each epoch + accuracies = [] + for epoch in range(1, hp.epochs + 1): + train(model, device, training_data_loader, optimizer, epoch, hp.log_interval) + accuracies.append(test(model, device, test_data_loader)) + + # after training the model, we can simply save it to disk and return it from the Flyte task as a :py:class:`flytekit.types.file.FlyteFile` + # type, which is the ``PythonPickledFile``. ``PythonPickledFile`` is simply a decorator on the ``FlyteFile`` that records the format + # of the serialized model as ``pickled`` + model_file = "mnist_cnn.pt" + torch.save(model.state_dict(), model_file) + + return TrainingOutputs( + epoch_accuracies=accuracies, model_state=PythonPickledFile(model_file) + ) + + +# %% +# Finally, we define a workflow to run the training algorithm. We return the model and accuracies. +@workflow +def pytorch_training_wf( + hp: Hyperparameters = Hyperparameters(epochs=10, batch_size=128) +) -> TrainingOutputs: + return pytorch_mnist_task(hp=hp) + + +# %% +# Running the Model Locally +# ========================= +# +# It is possible to run the model locally with almost no modifications (as long as the code takes care of resolving +# if the code is distributed or not). This is how we can do it: +if __name__ == "__main__": + model, accuracies = pytorch_training_wf(hp=Hyperparameters(epochs=10, batch_size=128)) + print(f"Model: {model}, Accuracies: {accuracies}") + +# %% +# Weights & Biases Report +# ======================= +# +# Lastly, let's look at the reports that are generated by the model. +# +# .. figure:: https://raw.githubusercontent.com/flyteorg/flyte/static-resources/img/flytesnacks/pytorch/single-node/wandb_graphs.png +# :alt: Wandb Graphs +# :class: with-shadow +# +# Wandb Graphs +# +# .. figure:: https://raw.githubusercontent.com/flyteorg/flyte/static-resources/img/flytesnacks/pytorch/single-node/wandb_table.png +# :alt: Wandb Table +# :class: with-shadow +# +# Wandb Table +# +# You can refer to the complete ``wandb`` report `here `__. +# +# .. tip:: +# A lot more customizations can be done to the report according to your requirement! diff --git a/cookbook/case_studies/ml_training/mnist_classifier/pytorch_single_node_multi_gpu.py b/cookbook/case_studies/ml_training/mnist_classifier/pytorch_single_node_multi_gpu.py new file mode 100644 index 0000000000..7df16cabae --- /dev/null +++ b/cookbook/case_studies/ml_training/mnist_classifier/pytorch_single_node_multi_gpu.py @@ -0,0 +1,405 @@ +""" +Single Node, Multi GPU Training +-------------------------------- + +When you need to scale up model training in pytorch, you can use the :py:class:`pytorch:torch.nn.DataParallel` for +single node, multi-gpu/cpu training or :py:class`pytorch:torch.nn.parallel.DistributedDataParallel` for multi-node, +multi-gpu training. + +This tutorial will cover how to write a simple training script on the MNIST dataset that uses +:py:class`pytorch:torch.nn.parallel.DistributedDataParallel`, since this is the `recommended way `__ +of distributing your training workload. For training on a single node and gpu see +:ref:`this tutorial `. + +For more information on distributed training, check out the +`pytorch documentation `__. +""" + +# %% +# Import the required libraries. +import json +import os +import typing +from dataclasses import dataclass + +import torch +import torch.nn.functional as F +import wandb +from dataclasses_json import dataclass_json +from flytekit import Resources, task, workflow +from flytekit.types.file import PythonPickledFile +from torch import distributed as dist +from torch import nn, multiprocessing as mp, optim +from torchvision import datasets, transforms + +# %% +# Let's define some variables to be used later. +# +# ``WORLD_SIZE`` defines the total number of GPUs we want to use to distribute our training job. +WORLD_SIZE = 2 +DATA_DIR = "./data" + +# %% +# The following variables are specific to ``wandb``: +# +# - ``NUM_BATCHES_TO_LOG``: Number of batches to log from the test data for each test step +# - ``LOG_IMAGES_PER_BATCH``: Number of images to log per test batch +NUM_BATCHES_TO_LOG = 10 +LOG_IMAGES_PER_BATCH = 32 + +# %% +# If running remotely, copy your ``wandb`` API key to the Dockerfile under the environment variable ``WANDB_API_KEY``. +# This function logs into ``wandb`` and initializes the project. If you built your Docker image with the +# ``WANDB_USERNAME``, this will work. Otherwise, replace ``my-user-name`` with your ``wandb`` user name. +# +# We'll call this function in the ``pytorch_mnist_task`` defined below. +def wandb_setup(): + wandb.login() + wandb.init(project="mnist-single-node-multi-gpu", entity=os.environ.get("WANDB_USERNAME", "my-user-name")) + +# %% +# Creating the Network +# ==================== +# +# We'll use the same neural network architecture as the one we define in the +# :ref:`single node tutorial `. +class Net(nn.Module): + def __init__(self): + super(Net, self).__init__() + self.conv1 = nn.Conv2d(1, 20, 5, 1) + self.conv2 = nn.Conv2d(20, 50, 5, 1) + self.fc1 = nn.Linear(4 * 4 * 50, 500) + self.fc2 = nn.Linear(500, 10) + + def forward(self, x): + x = F.relu(self.conv1(x)) + x = F.max_pool2d(x, 2, 2) + x = F.relu(self.conv2(x)) + x = F.max_pool2d(x, 2, 2) + x = x.view(-1, 4 * 4 * 50) + x = F.relu(self.fc1(x)) + x = self.fc2(x) + return F.log_softmax(x, dim=1) + + +# %% +# Data Downloader +# =============== +# +# We'll use this helper function to download the training and test sets before-hand to avoid race conditions when +# initializing the train and test dataloaders during training. +def download_mnist(data_dir): + for train in [True, False]: + datasets.MNIST(data_dir, train=train, download=True) + +# %% +# The Data Loader +# =============== +def mnist_dataloader(data_dir, batch_size, train=True, distributed=False, rank=None, world_size=None, **kwargs): + dataset = datasets.MNIST( + data_dir, + train=train, + download=False, + transform=transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307), (0.3081))]), + ) + if distributed: + assert rank is not None, "rank needs to be specified when doing distributed training." + sampler = torch.utils.data.distributed.DistributedSampler( + dataset, rank=rank, num_replicas=1 if world_size is None else world_size, shuffle=True + ) + else: + sampler = None + return torch.utils.data.DataLoader( + dataset, + batch_size=batch_size, + shuffle=False, + sampler=sampler, + **kwargs, + ) + + +# %% +# Training +# ======== +# +# We define a ``train`` function to enclose the training loop per epoch, i.e., this gets called for every successive epoch. +# Additionally, we log the loss and epoch progression, which can later be visualized in a ``wandb`` dashboard. +def train(model, rank, train_loader, optimizer, epoch, log_interval): + model.train() + + # hooks into the model to collect gradients and the topology + if rank == 0: + wandb.watch(model) + + # loop through the training batches + for batch_idx, (data, target) in enumerate(train_loader): + data, target = data.to(rank), target.to(rank) # device conversion + optimizer.zero_grad() # clear gradient + output = model(data) # forward pass + loss = F.nll_loss(output, target) # compute loss + loss.backward() # propagate the loss backward + optimizer.step() # update the model parameters + + if rank == 0 and batch_idx % log_interval == 0: + # log epoch and loss + print( + "Train Epoch: {} [{}/{} ({:.0f}%)]\tloss={:.4f}".format( + epoch, + batch_idx * len(data), + len(train_loader.dataset), + 100.0 * batch_idx / len(train_loader), + loss.item(), + ) + ) + wandb.log({"loss": loss, "epoch": epoch}) + + +# %% +# We define a test logger function which will be called when we run the model on test dataset. +def log_test_predictions(images, labels, outputs, predicted, my_table, log_counter): + """ + Convenience function to log predictions for a batch of test images + """ + + # obtain confidence scores for all classes + scores = F.softmax(outputs.data, dim=1) + + # assign ids based on the order of the images + for i, (image, pred, label, score) in enumerate( + zip(*[x.cpu().numpy() for x in (images, predicted, labels, scores)]) + ): + # add required info to data table: id, image pixels, model's guess, true label, scores for all classes + my_table.add_data(f"{i}_{log_counter}", wandb.Image(image), pred, label, *score) + if i == LOG_IMAGES_PER_BATCH: + break + + + +# %% +# Evaluation +# ========== +# +# We define a ``test`` function to test the model on the test dataset. +# +# We log ``accuracy``, ``test_loss``, and a ``wandb`` `table `__. +# The ``wandb`` table can help in depicting the model's performance in a structured format. +def test(model, rank, test_loader): + + model.eval() + + # define ``wandb`` tabular columns and hooks into the model to collect gradients and the topology + columns = ["id", "image", "guess", "truth", *[f"score_{i}" for i in range(10)]] + if rank == 0: + my_table = wandb.Table(columns=columns) + wandb.watch(model) + + test_loss = 0 + correct = 0 + log_counter = 0 + + # disable gradient + with torch.no_grad(): + + # loop through the test data loader + total = 0. + for images, targets in test_loader: + total += len(targets) + images, targets = images.to(rank), targets.to(rank) # device conversion + outputs = model(images) # forward pass -- generate predictions + test_loss += F.nll_loss(outputs, targets, reduction="sum").item() # sum up batch loss + _, predicted = torch.max(outputs.data, 1) # get the index of the max log-probability + correct += (predicted == targets).sum().item() # compare predictions to true label + + # log predictions to the ``wandb`` table + if log_counter < NUM_BATCHES_TO_LOG: + if rank == 0: + log_test_predictions(images, targets, outputs, predicted, my_table, log_counter) + log_counter += 1 + + # compute the average loss + test_loss /= total + accuracy = float(correct) / total + + if rank == 0: + print("\ntest_loss={:.4f}\naccuracy={:.4f}\n".format(test_loss, accuracy)) + # log the average loss, accuracy, and table + wandb.log({"test_loss": test_loss, "accuracy": accuracy, "mnist_predictions": my_table}) + + return accuracy + + +# %% +# Hyperparameters +# =============== +# +# We define a few hyperparameters for training our model. +@dataclass_json +@dataclass +class Hyperparameters(object): + """ + Args: + backend: pytorch backend to use, e.g. "gloo" or "nccl" + sgd_momentum: SGD momentum (default: 0.5) + seed: random seed (default: 1) + log_interval: how many batches to wait before logging training status + batch_size: input batch size for training (default: 64) + test_batch_size: input batch size for testing (default: 1000) + epochs: number of epochs to train (default: 10) + learning_rate: learning rate (default: 0.01) + """ + + backend: str = dist.Backend.GLOO + sgd_momentum: float = 0.5 + seed: int = 1 + log_interval: int = 10 + batch_size: int = 64 + test_batch_size: int = 1000 + epochs: int = 10 + learning_rate: float = 0.01 + + +# %% +# Training and Evaluating +# ======================= + +TrainingOutputs = typing.NamedTuple( + "TrainingOutputs", + epoch_accuracies=typing.List[float], + model_state=PythonPickledFile, +) + +MODEL_FILE = "./mnist_cnn.pt" +ACCURACIES_FILE = "./mnist_cnn_accuracies.json" + + +def dist_setup(rank, world_size, backend): + os.environ["MASTER_ADDR"] = "localhost" + os.environ["MASTER_PORT"] = "8888" + dist.init_process_group(backend, rank=rank, world_size=world_size) + + +def train_mnist(rank: int, world_size: int, hp: Hyperparameters) -> TrainingOutputs: + + # store the hyperparameters' config in ``wandb`` + if rank == 0: + wandb_setup() + wandb.config.update(json.loads(hp.to_json())) + print("wandb config:", wandb.config) + + # set random seed + torch.manual_seed(hp.seed) + + use_cuda = torch.cuda.is_available() + print(f"Using distributed PyTorch with {hp.backend} backend") + print(f"Running MNIST training on rank {rank}, world size: {world_size}") + print(f"Use cuda: {use_cuda}") + dist_setup(rank, world_size, hp.backend) + print(f"Rank {rank + 1}/{world_size} process initialized.\n") + + # load data + kwargs = {"num_workers": 0, "pin_memory": True} if use_cuda else {} + print("Getting data loaders") + training_data_loader = mnist_dataloader( + DATA_DIR, hp.batch_size, train=True, distributed=use_cuda, rank=rank, world_size=world_size, **kwargs + ) + test_data_loader = mnist_dataloader(DATA_DIR, hp.test_batch_size, train=False, **kwargs) + + # define the distributed model and optimizer + print("Defining model") + model = Net().cuda(rank) + model = nn.parallel.DistributedDataParallel(model, device_ids=[rank]) + + optimizer = optim.SGD(model.parameters(), lr=hp.learning_rate, momentum=hp.sgd_momentum) + + # train the model: run multiple epochs and capture the accuracies for each epoch + print(f"Training for {hp.epochs} epochs") + accuracies = [] + for epoch in range(1, hp.epochs + 1): + train(model, rank, training_data_loader, optimizer, epoch, hp.log_interval) + + # only compute validation metrics in the main process + if rank == 0: + accuracies.append(test(model, rank, test_data_loader)) + dist.barrier() # wait for main process to complete validation before continuing training + + if rank == 0: + wandb.finish() # this is important to tell to wandb that we're done logging metrics + + # after training the model, we can simply save it to disk and return it from the Flyte task as a :py:class:`flytekit.types.file.FlyteFile` + # type, which is the ``PythonPickledFile``. ``PythonPickledFile`` is simply a decorator on the ``FlyteFile`` that records the format + # of the serialized model as ``pickled`` + print("Saving model") + torch.save(model.state_dict(), MODEL_FILE) + + # save epoch accuracies + print("Saving accuracies") + with open(ACCURACIES_FILE, "w") as fp: + json.dump(accuracies, fp) + + print(f"Rank {rank + 1}/{world_size} process complete.\n") + dist.destroy_process_group() # clean up + + +# The output model using :py:func:`pytorch:torch.save` saves the `state_dict` as described +# `in pytorch docs `_. +# A common convention is to have the ``.pt`` extension for the model file. +# +# .. note:: +# Note the usage of ``requests=Resources(gpu=WORLD_SIZE)``. This will force Flyte to allocate this task onto a +# machine with GPU(s), which in our case is 2 gpus. The task will be queued up until a machine with GPU(s) can be +# procured. Also, for the GPU Training to work, the Dockerfile needs to be built as explained in the +# :ref:`pytorch-dockerfile` section. + +@task( + retries=2, + cache=True, + cache_version="1.2", + requests=Resources(gpu=str(WORLD_SIZE), mem="30Gi", storage="20Gi", ephemeral_storage="500Mi"), + limits=Resources(gpu=str(WORLD_SIZE), mem="30Gi", storage="20Gi", ephemeral_storage="500Mi"), +) +def pytorch_mnist_task(hp: Hyperparameters) -> TrainingOutputs: + print("Start MNIST training:") + + world_size = torch.cuda.device_count() + print(f"Device count: {world_size}") + download_mnist(DATA_DIR) + mp.spawn( + train_mnist, + args=(world_size, hp), + nprocs=world_size, + join=True, + ) + print("Training Complete") + with open(ACCURACIES_FILE) as fp: + accuracies = json.load(fp) + return TrainingOutputs( + epoch_accuracies=accuracies, model_state=PythonPickledFile(MODEL_FILE) + ) + + + +# %% +# Finally, we define a workflow to run the training algorithm. We return the model and accuracies. +@workflow +def pytorch_training_wf(hp: Hyperparameters = Hyperparameters(epochs=10, batch_size=128)) -> TrainingOutputs: + return pytorch_mnist_task(hp=hp) + + +# %% +# Running the Model Locally +# ========================= +# +# It is possible to run the model locally with almost no modifications (as long as the code takes care of resolving +# if the code is distributed or not). This is how we can do it: +if __name__ == "__main__": + model, accuracies = pytorch_training_wf(hp=Hyperparameters(epochs=10, batch_size=128)) + print(f"Model: {model}, Accuracies: {accuracies}") + +# %% +# Weights & Biases Report +# ======================= +# +# You can refer to the complete ``wandb`` report `here `__. +# +# .. tip:: +# A lot more customizations can be done to the report according to your requirement! diff --git a/cookbook/case_studies/ml_training/mnist_classifier/requirements.txt b/cookbook/case_studies/ml_training/mnist_classifier/requirements.txt index 1967aea3ea..1e483799d9 100644 --- a/cookbook/case_studies/ml_training/mnist_classifier/requirements.txt +++ b/cookbook/case_studies/ml_training/mnist_classifier/requirements.txt @@ -10,7 +10,7 @@ certifi==2021.5.30 # via # requests # sentry-sdk -charset-normalizer==2.0.3 +charset-normalizer==2.0.4 # via requests click==7.1.2 # via @@ -30,13 +30,15 @@ deprecated==1.2.12 # via flytekit dirhash==0.2.1 # via flytekit -docker-image-py==0.1.10 +docker-image-py==0.1.12 # via flytekit docker-pycreds==0.4.0 # via wandb -flyteidl==0.19.14 +docstring-parser==0.10 # via flytekit -flytekit==0.20.1 +flyteidl==0.19.19 + # via flytekit +flytekit==0.21.2 # via -r ../../../common/requirements-common.in gitdb==4.0.7 # via gitpython @@ -46,7 +48,7 @@ grpcio==1.39.0 # via flytekit idna==3.2 # via requests -importlib-metadata==4.6.1 +importlib-metadata==4.6.3 # via keyring keyring==23.0.1 # via flytekit @@ -73,7 +75,7 @@ numpy==1.21.1 # pandas # pyarrow # torchvision -pandas==1.3.0 +pandas==1.3.1 # via flytekit pathspec==0.9.0 # via scantree @@ -105,7 +107,7 @@ python-dateutil==2.8.1 # matplotlib # pandas # wandb -python-json-logger==2.0.1 +python-json-logger==2.0.2 # via flytekit pytimeparse==1.1.8 # via flytekit @@ -115,20 +117,20 @@ pytz==2018.4 # pandas pyyaml==5.4.1 # via wandb -regex==2021.7.6 +regex==2021.8.3 # via docker-image-py requests==2.26.0 # via # flytekit # responses # wandb -responses==0.13.3 +responses==0.13.4 # via flytekit retry==0.9.2 # via flytekit scantree==0.0.1 # via dirhash -sentry-sdk==1.3.0 +sentry-sdk==1.3.1 # via wandb shortuuid==1.0.1 # via wandb @@ -173,9 +175,9 @@ urllib3==1.26.6 # responses # sentry-sdk # wandb -wandb==0.11.0 +wandb==0.11.2 # via -r requirements.in -wheel==0.36.2 +wheel==0.37.0 # via # -r ../../../common/requirements-common.in # flytekit diff --git a/cookbook/common/requirements-common.in b/cookbook/common/requirements-common.in index 82cab9ce21..19326239bf 100644 --- a/cookbook/common/requirements-common.in +++ b/cookbook/common/requirements-common.in @@ -1,3 +1,3 @@ -flytekit>=0.20.1 +flytekit>=0.21.2 wheel matplotlib diff --git a/cookbook/docs/conf.py b/cookbook/docs/conf.py index e5d84ba78e..0a9ab112a6 100644 --- a/cookbook/docs/conf.py +++ b/cookbook/docs/conf.py @@ -130,7 +130,8 @@ class CustomSorter(FileNameSortKey): "multiregion_house_price_predictor.py", "datacleaning_tasks.py", "datacleaning_workflow.py", - "single_node.py", + "pytorch_single_node_and_gpu.py", + "pytorch_single_node_multi_gpu.py", ] """ Take a look at the code for the default sorter included in the sphinx_gallery to see how this works.