From 046b9edb7298b06930acd89fb9fdcbb12ac0bf4a Mon Sep 17 00:00:00 2001 From: Jingyue Wu Date: Tue, 30 Apr 2024 13:01:17 -0700 Subject: [PATCH] Remove NVFUSER_DISTRIBUTED. (#2155) @xwang233 discussed this in the nvFuser-MultiGPU chatroom. At this moment, supporting non-distributed build of pytorch isn't worth the cost of additional CI and using knobs like `NVFUSER_DISTRIBUTED` or `USE_DISTRIBUTED`. Feel free to revert this PR if non-distributed build becomes important. --- CMakeLists.txt | 8 -- csrc/multidevice/c10d_mock.h | 134 ------------------------ csrc/multidevice/communication.cpp | 4 +- csrc/multidevice/communication.h | 4 - csrc/multidevice/communicator.cpp | 10 -- csrc/multidevice/communicator.h | 31 +++--- csrc/multidevice/utils.cpp | 8 -- csrc/multidevice/utils.h | 3 - setup.py | 15 +-- tests/cpp/test_multidevice_pipeline.cpp | 3 +- tests/cpp/test_resharding.cpp | 3 - tools/gen_nvfuser_version.py | 20 ++-- 12 files changed, 28 insertions(+), 215 deletions(-) delete mode 100644 csrc/multidevice/c10d_mock.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 6a8fe98ae80..b906edc5e61 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -15,13 +15,6 @@ set(NVFUSER_THIRD_PARTY_DIR "${NVFUSER_ROOT}/third_party") option(NVFUSER_STANDALONE_BUILD_WITH_UCC "" OFF) option(NVFUSER_BUILD_WITH_ASAN "Build nvFuser with asan" OFF) -include(CMakeDependentOption) -cmake_dependent_option(NVFUSER_DISTRIBUTED "" ON "USE_DISTRIBUTED" OFF) -if (NVFUSER_DISTRIBUTED) - add_compile_definitions(NVFUSER_DISTRIBUTED) -endif() -message(STATUS "Setting NVFUSER_DISTRIBUTED=${NVFUSER_DISTRIBUTED}") - # We try to update which C++ standard we use together in lockstep across all # built libraries, and these variables control which that is. Generally we are # on C++20, but we still support a version of CUDA (11) that does not recognize @@ -769,7 +762,6 @@ message(STATUS "******** Nvfuser configuration summary ********") message(STATUS " UCC_FOUND: ${UCC_FOUND}") message(STATUS " NVFUSER_STANDALONE_BUILD_WITH_UCC : ${NVFUSER_STANDALONE_BUILD_WITH_UCC}") message(STATUS " NVFUSER_BUILD_WITH_ASAN : ${NVFUSER_BUILD_WITH_ASAN}") -message(STATUS " NVFUSER_DISTRIBUTED : ${NVFUSER_DISTRIBUTED}") message(STATUS " NVFUSER_CPP_STANDARD : ${NVFUSER_CPP_STANDARD}") if(NVFUSER_STANDALONE_BUILD_WITH_UCC) diff --git a/csrc/multidevice/c10d_mock.h b/csrc/multidevice/c10d_mock.h deleted file mode 100644 index 58572507216..00000000000 --- a/csrc/multidevice/c10d_mock.h +++ /dev/null @@ -1,134 +0,0 @@ -// clang-format off -/* - * SPDX-FileCopyrightText: Copyright (c) 2024-present NVIDIA CORPORATION & AFFILIATES. - * All rights reserved. - * SPDX-License-Identifier: BSD-3-Clause - */ -// clang-format on -#pragma once - -#include -#include -#include - -namespace c10d { -class Work : public torch::CustomClassHolder { - public: - void wait() {} -}; - -struct ReduceOp : torch::CustomClassHolder { - enum RedOpType { - SUM, - AVG, - PRODUCT, - MIN, - MAX, - BAND, - BOR, - BXOR, - UNUSED, - }; - - ReduceOp() = default; - ReduceOp(RedOpType op) : op_(op) {} - - RedOpType op_ = UNUSED; -}; - -struct ReduceScatterOptions { - ReduceOp reduceOp = ReduceOp::UNUSED; -}; - -struct ScatterOptions { - int64_t rootRank = 0; -}; - -struct AllgatherOptions {}; - -struct GatherOptions { - int64_t rootRank = 0; -}; - -struct BroadcastOptions { - int64_t rootRank = 0; -}; - -struct AllreduceOptions { - ReduceOp reduceOp = ReduceOp::UNUSED; -}; - -struct ReduceOptions { - ReduceOp reduceOp = ReduceOp::UNUSED; - int64_t rootRank = 0; -}; - -class Backend : public torch::CustomClassHolder { - public: - c10::intrusive_ptr barrier() { - return c10::make_intrusive(); - } - - c10::intrusive_ptr send( - std::vector& tensors, - int dstRank, - int tag) { - return c10::make_intrusive(); - } - - c10::intrusive_ptr recv( - std::vector& tensors, - int srcRank, - int tag) { - return c10::make_intrusive(); - } - - c10::intrusive_ptr allgather( - std::vector>& outputTensors, - std::vector& inputTensors, - const AllgatherOptions& opts = AllgatherOptions()) { - return c10::make_intrusive(); - } - - c10::intrusive_ptr gather( - std::vector>& outputTensors, - std::vector& inputTensors, - const GatherOptions& opts = GatherOptions()) { - return c10::make_intrusive(); - } - - c10::intrusive_ptr reduce_scatter( - std::vector& outputTensors, - std::vector>& inputTensors, - const ReduceScatterOptions& opts = ReduceScatterOptions()) { - return c10::make_intrusive(); - } - c10::intrusive_ptr scatter( - std::vector& outputTensors, - std::vector>& inputTensors, - const ScatterOptions& opts = ScatterOptions()) { - return c10::make_intrusive(); - } - - c10::intrusive_ptr broadcast( - std::vector& tensors, - const BroadcastOptions& opts = BroadcastOptions()) { - return c10::make_intrusive(); - } - - c10::intrusive_ptr allreduce( - std::vector& tensors, - const AllreduceOptions& opts = AllreduceOptions()) { - return c10::make_intrusive(); - } - - c10::intrusive_ptr reduce( - std::vector& tensors, - const ReduceOptions& opts = ReduceOptions()) { - return c10::make_intrusive(); - } -}; - -class TCPStore : public torch::CustomClassHolder {}; - -} // namespace c10d diff --git a/csrc/multidevice/communication.cpp b/csrc/multidevice/communication.cpp index 4f4db1711ab..d512cd4dcb3 100644 --- a/csrc/multidevice/communication.cpp +++ b/csrc/multidevice/communication.cpp @@ -6,7 +6,7 @@ */ // clang-format on #include -#if defined(NVFUSER_DISTRIBUTED) && defined(USE_C10D_NCCL) +#if defined(USE_C10D_NCCL) #include #endif #include @@ -229,7 +229,7 @@ c10::intrusive_ptr Reduce::post( c10d::ReduceOptions options = { .reduceOp = params_.redOp, .rootRank = root_relative_index_}; auto team_backend = comm.getBackendForTeam(params_.team, backend); -#if defined(NVFUSER_DISTRIBUTED) && defined(USE_C10D_NCCL) +#if defined(USE_C10D_NCCL) auto nccl_backend = dynamic_cast(team_backend.get()); if (nccl_backend) { #if NVF_TORCH_VERSION_NO_LESS(2, 3, 0) diff --git a/csrc/multidevice/communication.h b/csrc/multidevice/communication.h index 8594c89ff5b..f8b6f558810 100644 --- a/csrc/multidevice/communication.h +++ b/csrc/multidevice/communication.h @@ -9,11 +9,7 @@ #include #include -#ifdef NVFUSER_DISTRIBUTED #include -#else -#include -#endif #include #include diff --git a/csrc/multidevice/communicator.cpp b/csrc/multidevice/communicator.cpp index b113edd61d6..e0162c8bc8e 100644 --- a/csrc/multidevice/communicator.cpp +++ b/csrc/multidevice/communicator.cpp @@ -10,7 +10,6 @@ #include #include -#ifdef NVFUSER_DISTRIBUTED #include #ifdef USE_C10D_GLOO #include @@ -21,7 +20,6 @@ #if defined(USE_C10D_UCC) && defined(NVFUSER_BUILD_WITH_UCC) #include #endif -#endif namespace nvfuser { @@ -132,7 +130,6 @@ inline std::string getTeamKey(const Team& team, CommunicatorBackend backend) { }); } -#ifdef NVFUSER_DISTRIBUTED // creates and return a process group backend c10::intrusive_ptr createBackend( CommunicatorBackend backend, @@ -164,7 +161,6 @@ c10::intrusive_ptr createBackend( #endif NVF_ERROR(false, "no distributed backend available"); } -#endif } // namespace Communicator::Communicator( @@ -187,7 +183,6 @@ Communicator::Communicator( return; } -#ifdef NVFUSER_DISTRIBUTED c10d::TCPStoreOptions store_opts; { char hostname[HOST_NAME_MAX]; // NOLINT (modernize-avoid-c-arrays) @@ -203,7 +198,6 @@ Communicator::Communicator( c10d::TCPStoreOptions::kDefaultPort; // 29500 store_opts.port = master_port_ ? master_port_ : comm_master_port_default; store_ = c10::make_intrusive(master_addr_, store_opts); -#endif #if defined(USE_C10D_UCC) && defined(NVFUSER_BUILD_WITH_UCC) ucc_available_ = true; @@ -222,7 +216,6 @@ c10::intrusive_ptr Communicator::getBackendForTeam( // check if backend associated with the team is present in the cache if (backends_.find(team_key) == backends_.end()) { // create the backend and cache it -#ifdef NVFUSER_DISTRIBUTED // check that the caller's rank belongs to the requested team auto rank_it = std::find(team.begin(), team.end(), deviceId()); NVF_ERROR( @@ -237,9 +230,6 @@ c10::intrusive_ptr Communicator::getBackendForTeam( c10::make_intrusive(team_key, store_), team_rank, static_cast(team.size())); -#else - backends_[team_key] = c10::make_intrusive(); -#endif } return backends_.at(team_key); } diff --git a/csrc/multidevice/communicator.h b/csrc/multidevice/communicator.h index 2081d4a2efb..53efe005328 100644 --- a/csrc/multidevice/communicator.h +++ b/csrc/multidevice/communicator.h @@ -10,32 +10,25 @@ #include #include #include - -#include -#include -#ifdef NVFUSER_DISTRIBUTED #include #include #include -#else -#include -#endif + +#include +#include #include namespace nvfuser { -/* - This file implements the class Communicator which sets up the inter-process - Backend. This class contains inter-process information, such as the rank, the - world size, as well as the Process Group that can be called to perform - inter-process communications. - - Each process is associated with a unique deviceId and device. The actual MPI - rank remains private to the class and should not be used by the user. The - communicator class holds privately the mappings ranks <-> device IDs <-> - device. - -*/ +// This file implements the class Communicator which sets up the inter-process +// Backend. This class contains inter-process information, such as the rank, the +// world size, as well as the Process Group that can be called to perform +// inter-process communications. +// +// Each process is associated with a unique deviceId and device. The actual MPI +// rank remains private to the class and should not be used by the user. The +// communicator class holds privately the mappings ranks <-> device IDs <-> +// device. using RankType = DeviceIdxType; diff --git a/csrc/multidevice/utils.cpp b/csrc/multidevice/utils.cpp index 651bbd3807f..66482359240 100644 --- a/csrc/multidevice/utils.cpp +++ b/csrc/multidevice/utils.cpp @@ -20,14 +20,6 @@ namespace nvfuser { -NVF_API bool distributedEnabled() { -#ifdef NVFUSER_DISTRIBUTED - return true; -#else - return false; -#endif -} - namespace { std::unordered_set getShardedIterDomains(TensorView* tv) { diff --git a/csrc/multidevice/utils.h b/csrc/multidevice/utils.h index 7e4b1688655..100f948cb7b 100644 --- a/csrc/multidevice/utils.h +++ b/csrc/multidevice/utils.h @@ -15,9 +15,6 @@ namespace nvfuser { -// Returns true iff nvFuser was compiled with distributed APIs enabled. -NVF_API bool distributedEnabled(); - // Returns whether a TensorView has a non-reduction axis parallelized Didx // Checks that the other non-reduction axis are not parallelized on Didx NVF_API bool isSharded(TensorView*); diff --git a/setup.py b/setup.py index 5f5e47a8e03..612914e5cc6 100644 --- a/setup.py +++ b/setup.py @@ -26,9 +26,6 @@ # --build-with-ucc # Build nvfuser with UCC support. You may need to specify environment variables of UCC_HOME, UCC_DIR, UCX_HOME, UCX_DIR. # -# --build-without-distributed -# Build nvfuser without multidevice support -# # --debug # Building nvfuser in debug mode # @@ -74,7 +71,6 @@ NO_NINJA = False BUILD_WITH_UCC = False BUILD_WITH_ASAN = False -BUILD_WITHOUT_DISTRIBUTED = False OVERWRITE_VERSION = False VERSION_TAG = None BUILD_TYPE = "Release" @@ -106,9 +102,6 @@ if arg == "--build-with-asan": BUILD_WITH_ASAN = True continue - if arg == "--build-without-distributed": - BUILD_WITHOUT_DISTRIBUTED = True - continue if arg == "--debug": BUILD_TYPE = "Debug" continue @@ -306,7 +299,10 @@ def cmake(install_prefix: str = "./nvfuser"): logger.setLevel(logger_level) - pytorch_use_distributed = get_pytorch_use_distributed() + if not get_pytorch_use_distributed(): + raise RuntimeError( + "nvFuser requires PyTorch to be built with USE_DISTRIBUTED on." + ) # generate cmake directory cmd_str = [ @@ -315,7 +311,6 @@ def cmake(install_prefix: str = "./nvfuser"): "-DCMAKE_BUILD_TYPE=" + BUILD_TYPE, f"-DCMAKE_INSTALL_PREFIX={install_prefix}", f"-DNVFUSER_CPP_STANDARD={CPP_STANDARD}", - f"-DUSE_DISTRIBUTED={pytorch_use_distributed}", "-B", cmake_build_dir, ] @@ -333,8 +328,6 @@ def cmake(install_prefix: str = "./nvfuser"): cmd_str.append("-DBUILD_NVFUSER_BENCHMARK=ON") if BUILD_WITH_ASAN: cmd_str.append("-DNVFUSER_BUILD_WITH_ASAN=ON") - if BUILD_WITHOUT_DISTRIBUTED: - cmd_str.append("-DNVFUSER_DISTRIBUTED=OFF") cmd_str.append(".") print(f"Configuring CMake with {' '.join(cmd_str)}") diff --git a/tests/cpp/test_multidevice_pipeline.cpp b/tests/cpp/test_multidevice_pipeline.cpp index 8300b68bae3..85f7f74c257 100644 --- a/tests/cpp/test_multidevice_pipeline.cpp +++ b/tests/cpp/test_multidevice_pipeline.cpp @@ -43,8 +43,7 @@ using namespace torch::jit::fuser::cuda; using namespace at::indexing; // To run the following tests on several devices, pytorch must be installed with -// the flag USE_DISTRIBUTED=1 and nccl support. With that, nvFuser is built by -// default with NVFUSER_DISTRIBUTED defined. Then, on a node with at least 6 +// the flag USE_DISTRIBUTED=1 and nccl support. Then, on a node with at least 6 // GPUs, run the test using mpirun: `mpirun -np 6 build/test_multidevice // --gtest_filter=PipelineTestTwoStages*`. diff --git a/tests/cpp/test_resharding.cpp b/tests/cpp/test_resharding.cpp index f14f8ab7cfd..0e7ce822e15 100644 --- a/tests/cpp/test_resharding.cpp +++ b/tests/cpp/test_resharding.cpp @@ -320,9 +320,6 @@ TEST_F(ReshardingTest, InsertShardedAxisReordering) { } TEST_P(ReshardingTest, Insert) { - if (!distributedEnabled()) { // Test only works with distributed - GTEST_SKIP() << "Requires distributed API"; - } auto [mesh0, mesh1, diff --git a/tools/gen_nvfuser_version.py b/tools/gen_nvfuser_version.py index 789aa96d37a..c4072939d46 100644 --- a/tools/gen_nvfuser_version.py +++ b/tools/gen_nvfuser_version.py @@ -1,6 +1,7 @@ # SPDX-FileCopyrightText: Copyright (c) 2023-present NVIDIA CORPORATION & AFFILIATES. # All rights reserved. # SPDX-License-Identifier: BSD-3-Clause +import ast import subprocess import sys from pathlib import Path @@ -30,35 +31,32 @@ def get_version() -> str: def get_pytorch_cmake_prefix(): - from subprocess import Popen, PIPE - # need to do this in a separate process so we are not going to delete nvfuser library while it's loaded by torch - process_torch_prefix = Popen( + process_torch_prefix = subprocess.Popen( [ sys.executable, "-c", "import torch.utils; print(torch.utils.cmake_prefix_path)", ], - stdout=PIPE, + stdout=subprocess.PIPE, ) stdout_msg, error_msg = process_torch_prefix.communicate() return stdout_msg.decode("utf-8").rstrip("\n") -def get_pytorch_use_distributed(): - from subprocess import Popen, PIPE - +def get_pytorch_use_distributed() -> bool: # need to do this in a separate process so we are not going to delete nvfuser library while it's loaded by torch - process_torch_prefix = Popen( + process_torch_prefix = subprocess.Popen( [ sys.executable, "-c", "import torch; print(torch._C._has_distributed())", ], - stdout=PIPE, + stdout=subprocess.PIPE, ) - stdout_msg, error_msg = process_torch_prefix.communicate() - return stdout_msg.decode("utf-8").rstrip("\n") + stdout_msg, _ = process_torch_prefix.communicate() + stdout_msg = stdout_msg.decode("utf-8").rstrip("\n") + return ast.literal_eval(stdout_msg) if __name__ == "__main__":