Skip to content

Commit

Permalink
Switch env variable use in test harnesses to the non-deprecated names…
Browse files Browse the repository at this point in the history
… to fix warnings (pytorch#114880)

Previously:

```
[W Utils.hpp:133] Warning: Environment variable NCCL_ASYNC_ERROR_HANDLING is deprecated; use TORCH_NCCL_ASYNC_ERROR_HANDLING instead (function getCvarInt)
[W Utils.hpp:133] Warning: Environment variable NCCL_ASYNC_ERROR_HANDLING is deprecated; use TORCH_NCCL_ASYNC_ERROR_HANDLING instead (function getCvarInt)
```

With this PR, those warnings disappear.  They were introduced in pytorch#114077

This change was generated with this sed script, applied with `sed -i -f /tmp/x **/*.{py,hpp,cpp,cc}` and hand inspected.

```
s/\bNCCL_BLOCKING_WAIT\b/TORCH_NCCL_BLOCKING_WAIT/g
s/\bNCCL_ENABLE_TIMING\b/TORCH_NCCL_ENABLE_TIMING/g
s/\bNCCL_DESYNC_DEBUG\b/TORCH_NCCL_DESYNC_DEBUG/g
s/\bNCCL_ASYNC_ERROR_HANDLING\b/TORCH_NCCL_ASYNC_ERROR_HANDLING/g
s/\bENABLE_NCCL_HEALTH_CHECK\b/TORCH_ENABLE_NCCL_HEALTH_CHECK/g
s/\bNCCL_USE_TENSOR_REGISTER_ALLOCATOR_HOOK\b/TORCH_NCCL_USE_TENSOR_REGISTER_ALLOCATOR_HOOK/g
```

Pull Request resolved: pytorch#114880
Approved by: https://github.com/kwen2501
  • Loading branch information
chipturner authored and dmenig committed Dec 21, 2023
1 parent 4455e8e commit 79e388f
Show file tree
Hide file tree
Showing 12 changed files with 85 additions and 85 deletions.
2 changes: 1 addition & 1 deletion test/cpp/c10d/ProcessGroupNCCLTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,7 @@ class ProcessGroupNCCLTest : public ::testing::Test {
}

void TearDown() override {
// Reset NCCL_BLOCKING_WAIT environment variable after each run.
// Reset TORCH_NCCL_BLOCKING_WAIT environment variable after each run.
ASSERT_TRUE(setenv(c10d::TORCH_NCCL_BLOCKING_WAIT[0].c_str(), "0", 1) == 0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def _check_env_function():
"TORCHELASTIC_MAX_RESTARTS",
"TORCHELASTIC_RUN_ID",
"TORCHELASTIC_USE_AGENT_STORE",
"NCCL_ASYNC_ERROR_HANDLING",
"TORCH_NCCL_ASYNC_ERROR_HANDLING",
]
for var in env_vars:
_ = os.environ[var]
Expand Down Expand Up @@ -515,13 +515,13 @@ def run_check_env_function(self):
self.assertFalse(res.is_failed())

def run_check_nccl_async_error_handling_env(self):
# make sure NCCL_ASYNC_ERROR_HANDLING set in os.environ is honored
with patch.dict(os.environ, {"NCCL_ASYNC_ERROR_HANDLING": "0"}):
# make sure TORCH_NCCL_ASYNC_ERROR_HANDLING set in os.environ is honored
with patch.dict(os.environ, {"TORCH_NCCL_ASYNC_ERROR_HANDLING": "0"}):
res = self.run_agent(
Conf(
entrypoint=_check_env_value,
local_world_size=1,
args=("NCCL_ASYNC_ERROR_HANDLING", "0"),
args=("TORCH_NCCL_ASYNC_ERROR_HANDLING", "0"),
)
)
self.assertFalse(res.is_failed())
Expand All @@ -532,7 +532,7 @@ def run_check_nccl_async_error_handling_env_default(self):
Conf(
entrypoint=_check_env_value,
local_world_size=1,
args=("NCCL_ASYNC_ERROR_HANDLING", "1"),
args=("TORCH_NCCL_ASYNC_ERROR_HANDLING", "1"),
)
)
self.assertFalse(res.is_failed())
Expand Down
66 changes: 33 additions & 33 deletions test/distributed/test_c10d_nccl.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,9 @@ def opts(self, high_priority_stream=False):

def setUp(self):
super().setUp()
# NCCL_BLOCKING_WAIT overrides NCCL_ASYNC_ERROR_HANDLING hence tests
# that use NCCL_BLOCKING_WAIT will test it as expected.
os.environ["NCCL_ASYNC_ERROR_HANDLING"] = "1"
# TORCH_NCCL_BLOCKING_WAIT overrides TORCH_NCCL_ASYNC_ERROR_HANDLING hence tests
# that use TORCH_NCCL_BLOCKING_WAIT will test it as expected.
os.environ["TORCH_NCCL_ASYNC_ERROR_HANDLING"] = "1"
# self.num_gpus = torch.cuda.device_count()
self._spawn_processes()

Expand Down Expand Up @@ -1139,7 +1139,7 @@ def test_nccl_dist_backend_error(self):
def test_abort_pg(self):
# Disable ASYNC_ERROR_HANDLING for this test to ensure we can programmatically
# abort the process group.
os.environ["NCCL_ASYNC_ERROR_HANDLING"] = "0"
os.environ["TORCH_NCCL_ASYNC_ERROR_HANDLING"] = "0"

store = c10d.FileStore(self.file_name, self.world_size)
self._create_process_group_nccl(store, self.opts())
Expand Down Expand Up @@ -1178,7 +1178,7 @@ def abortpg():
def test_close_pg(self):
# Disable ASYNC_ERROR_HANDLING for this test to ensure we can programmatically
# abort the process group.
os.environ["NCCL_ASYNC_ERROR_HANDLING"] = "0"
os.environ["TORCH_NCCL_ASYNC_ERROR_HANDLING"] = "0"

store = c10d.FileStore(self.file_name, self.world_size)
pg = self._create_process_group_nccl(store, self.opts())
Expand Down Expand Up @@ -1252,7 +1252,7 @@ def _check_nccl_timeout(expected_timeout):
@requires_nccl()
@skip_but_pass_in_sandcastle_if(torch.cuda.device_count() < 2, "NCCL test requires 2+ GPUs")
def test_tensor_register_hook(self):
os.environ["NCCL_USE_TENSOR_REGISTER_ALLOCATOR_HOOK"] = "1"
os.environ["TORCH_NCCL_USE_TENSOR_REGISTER_ALLOCATOR_HOOK"] = "1"

store = c10d.FileStore(self.file_name, self.world_size)
pg = self._create_process_group_nccl(store, self.opts())
Expand Down Expand Up @@ -1298,9 +1298,9 @@ class DistributedDataParallelTest(
):
def setUp(self):
super().setUp()
# NCCL_BLOCKING_WAIT overrides NCCL_ASYNC_ERROR_HANDLING hence tests
# that use NCCL_BLOCKING_WAIT will test it as expected.
os.environ["NCCL_ASYNC_ERROR_HANDLING"] = "1"
# TORCH_NCCL_BLOCKING_WAIT overrides TORCH_NCCL_ASYNC_ERROR_HANDLING hence tests
# that use TORCH_NCCL_BLOCKING_WAIT will test it as expected.
os.environ["TORCH_NCCL_ASYNC_ERROR_HANDLING"] = "1"
self._spawn_processes()

def _get_process_group(self):
Expand All @@ -1319,10 +1319,10 @@ def _test_nccl_backend(
@requires_nccl()
@skip_if_lt_x_gpu(2)
def test_nccl_propagate_error_reason(self):
# Need to use NCCL_BLOCKING_WAIT and not ASYNC_ERROR_HANDLING,
# Need to use TORCH_NCCL_BLOCKING_WAIT and not ASYNC_ERROR_HANDLING,
# otherwise process will be taken down and we can't check for errors.
os.environ["NCCL_ASYNC_ERROR_HANDLING"] = "0"
os.environ["NCCL_BLOCKING_WAIT"] = "1"
os.environ["TORCH_NCCL_ASYNC_ERROR_HANDLING"] = "0"
os.environ["TORCH_NCCL_BLOCKING_WAIT"] = "1"
# TODO: smaller timeout can fail since PG NCCl does health check in
# constructor. Look into reducing this test's runtime.
store = c10d.FileStore(self.file_name, self.world_size)
Expand Down Expand Up @@ -2614,14 +2614,14 @@ def world_size(self):

def setUp(self):
super().setUp()
# set NCCL_ENABLE_TIMING to enable timing for CUDAEvents
# set TORCH_NCCL_ENABLE_TIMING to enable timing for CUDAEvents
# in ProcessGroup Work
os.environ["NCCL_ENABLE_TIMING"] = "1"
os.environ["TORCH_NCCL_ENABLE_TIMING"] = "1"
self._spawn_processes()

def tearDown(self):
super().tearDown()
del os.environ["NCCL_ENABLE_TIMING"]
del os.environ["TORCH_NCCL_ENABLE_TIMING"]
try:
os.remove(self.file_name)
except OSError:
Expand Down Expand Up @@ -2805,9 +2805,9 @@ def setUp(self):
self.test_nccl_errors_blocking_sigterm.__wrapped__,
self.test_nccl_errors_blocking_nonzero_exit.__wrapped__,
]
# NCCL_BLOCKING_WAIT overrides NCCL_ASYNC_ERROR_HANDLING hence tests
# that use NCCL_BLOCKING_WAIT will test it as expected.
os.environ["NCCL_ASYNC_ERROR_HANDLING"] = "1"
# TORCH_NCCL_BLOCKING_WAIT overrides TORCH_NCCL_ASYNC_ERROR_HANDLING hence tests
# that use TORCH_NCCL_BLOCKING_WAIT will test it as expected.
os.environ["TORCH_NCCL_ASYNC_ERROR_HANDLING"] = "1"
self._spawn_processes()

def tearDown(self):
Expand Down Expand Up @@ -2838,13 +2838,13 @@ def _run_all_reduce(self, pg):
@skip_if_rocm
@skip_but_pass_in_sandcastle("Test does not pass when run locally")
def test_nccl_errors_nonblocking(self):
# Note: we unset and restore NCCL_ASYNC_ERROR_HANDLING for this test
# Note: we unset and restore TORCH_NCCL_ASYNC_ERROR_HANDLING for this test
# since test_c10d_common runs with async error handling by default, but this
# tests behavior when it is not enabled.
prev_nccl_async_error_handling = os.environ.get(
"NCCL_ASYNC_ERROR_HANDLING", None
"TORCH_NCCL_ASYNC_ERROR_HANDLING", None
)
os.environ["NCCL_ASYNC_ERROR_HANDLING"] = "0"
os.environ["TORCH_NCCL_ASYNC_ERROR_HANDLING"] = "0"
store = c10d.FileStore(self.file_name, self.world_size)
process_group = c10d.ProcessGroupNCCL(store, self.rank, self.world_size)
process_group.allreduce(torch.rand(10).cuda(self.rank))
Expand All @@ -2864,7 +2864,7 @@ def test_nccl_errors_nonblocking(self):
self.assertTrue(t.is_alive())

if prev_nccl_async_error_handling is not None:
os.environ["NCCL_ASYNC_ERROR_HANDLING"] = prev_nccl_async_error_handling
os.environ["TORCH_NCCL_ASYNC_ERROR_HANDLING"] = prev_nccl_async_error_handling

def _test_nccl_errors_blocking(self, func):
store = c10d.FileStore(self.file_name, self.world_size)
Expand Down Expand Up @@ -2951,7 +2951,7 @@ def test_nccl_blocking_wait_with_barrier(self):
process_group.barrier().wait(timeout=timedelta(seconds=self.op_timeout_sec))

def _run_invalid_nccl_blocking_wait_env(self, val):
os.environ["NCCL_BLOCKING_WAIT"] = val
os.environ["TORCH_NCCL_BLOCKING_WAIT"] = val
store = c10d.FileStore(self.file_name, self.world_size)
with self.assertRaises(RuntimeError):
process_group = c10d.ProcessGroupNCCL(store, self.rank, self.world_size)
Expand Down Expand Up @@ -3004,9 +3004,9 @@ def device(self):

def setUp(self):
super().setUp()
# NCCL_BLOCKING_WAIT overrides NCCL_ASYNC_ERROR_HANDLING hence tests
# that use NCCL_BLOCKING_WAIT will test it as expected.
os.environ["NCCL_ASYNC_ERROR_HANDLING"] = "1"
# TORCH_NCCL_BLOCKING_WAIT overrides TORCH_NCCL_ASYNC_ERROR_HANDLING hence tests
# that use TORCH_NCCL_BLOCKING_WAIT will test it as expected.
os.environ["TORCH_NCCL_ASYNC_ERROR_HANDLING"] = "1"
self._spawn_processes()

def tearDown(self):
Expand Down Expand Up @@ -3183,7 +3183,7 @@ def test_nccl_barrier(self):
@requires_nccl()
@skip_if_lt_x_gpu(4)
def test_nccl_barrier_timeout(self):
os.environ["ENABLE_NCCL_HEALTH_CHECK"] = "1"
os.environ["TORCH_ENABLE_NCCL_HEALTH_CHECK"] = "1"
store = c10d.FileStore(self.file_name, self.world_size)
if self.rank == 0:
with self.assertRaisesRegex(
Expand Down Expand Up @@ -3396,9 +3396,9 @@ def test_allgather_base(self):
class LargeCommTest(test_c10d_common.AbstractLargeCommTest, MultiProcessTestCase):
def setUp(self):
super().setUp()
# NCCL_BLOCKING_WAIT overrides NCCL_ASYNC_ERROR_HANDLING hence tests
# that use NCCL_BLOCKING_WAIT will test it as expected.
os.environ["NCCL_ASYNC_ERROR_HANDLING"] = "1"
# TORCH_NCCL_BLOCKING_WAIT overrides TORCH_NCCL_ASYNC_ERROR_HANDLING hence tests
# that use TORCH_NCCL_BLOCKING_WAIT will test it as expected.
os.environ["TORCH_NCCL_ASYNC_ERROR_HANDLING"] = "1"
self._spawn_processes()

def tearDown(self):
Expand Down Expand Up @@ -3435,9 +3435,9 @@ def world_size(self):

def setUp(self):
super().setUp()
# NCCL_BLOCKING_WAIT overrides NCCL_ASYNC_ERROR_HANDLING hence tests
# that use NCCL_BLOCKING_WAIT will test it as expected.
os.environ["NCCL_ASYNC_ERROR_HANDLING"] = "1"
# TORCH_NCCL_BLOCKING_WAIT overrides TORCH_NCCL_ASYNC_ERROR_HANDLING hence tests
# that use TORCH_NCCL_BLOCKING_WAIT will test it as expected.
os.environ["TORCH_NCCL_ASYNC_ERROR_HANDLING"] = "1"
# self.num_gpus = torch.cuda.device_count()
self._spawn_processes()

Expand Down
6 changes: 3 additions & 3 deletions test/distributed/test_pg_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,9 @@ class ProcessGroupNCCLWrapperTest(AbstractProcessGroupWrapperTest):
def setUp(self):
super(AbstractProcessGroupWrapperTest, self).setUp()
self._spawn_processes()
# NCCL_BLOCKING_WAIT overrides NCCL_ASYNC_ERROR_HANDLING hence tests
# that use NCCL_BLOCKING_WAIT will test it as expected.
os.environ["NCCL_ASYNC_ERROR_HANDLING"] = "1"
# TORCH_NCCL_BLOCKING_WAIT overrides TORCH_NCCL_ASYNC_ERROR_HANDLING hence tests
# that use TORCH_NCCL_BLOCKING_WAIT will test it as expected.
os.environ["TORCH_NCCL_ASYNC_ERROR_HANDLING"] = "1"

@property
def world_size(self) -> int:
Expand Down
12 changes: 6 additions & 6 deletions torch/csrc/distributed/c10d/ProcessGroupNCCL.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,19 @@ namespace c10d {
// which ensures communicators are healthy at the beginning of init.
static std::vector<std::string> TORCH_ENABLE_NCCL_HEALTH_CHECK = {
"TORCH_ENABLE_NCCL_HEALTH_CHECK",
"ENABLE_NCCL_HEALTH_CHECK"};
"TORCH_ENABLE_NCCL_HEALTH_CHECK"};

// Environment variable which controls whether or not wait() is blocking or
// non-blocking.
static std::vector<std::string> TORCH_NCCL_BLOCKING_WAIT = {
"TORCH_NCCL_BLOCKING_WAIT",
"NCCL_BLOCKING_WAIT"};
"TORCH_NCCL_BLOCKING_WAIT"};

// Environment variable which controls whether or not we perform Async Error
// Handling with NCCL.
static std::vector<std::string> TORCH_NCCL_ASYNC_ERROR_HANDLING = {
"TORCH_NCCL_ASYNC_ERROR_HANDLING",
"NCCL_ASYNC_ERROR_HANDLING"};
"TORCH_NCCL_ASYNC_ERROR_HANDLING"};

// Environment Variable to control whether dumping debug info on watchdog
// timeout is enabled. This variable must be set together with
Expand All @@ -54,11 +54,11 @@ static std::vector<std::string> TORCH_NCCL_DUMP_ON_TIMEOUT = {
// This variable must be set together with TORCH_NCCL_ASYNC_ERROR_HANDLING.
static std::vector<std::string> TORCH_NCCL_DESYNC_DEBUG = {
"TORCH_NCCL_DESYNC_DEBUG",
"NCCL_DESYNC_DEBUG"};
"TORCH_NCCL_DESYNC_DEBUG"};

static std::vector<std::string> TORCH_NCCL_ENABLE_TIMING = {
"TORCH_NCCL_ENABLE_TIMING",
"NCCL_ENABLE_TIMING"};
"TORCH_NCCL_ENABLE_TIMING"};

static std::vector<std::string> TORCH_NCCL_ENABLE_MONITORING = {
"TORCH_NCCL_ENABLE_MONITORING"};
Expand Down Expand Up @@ -106,7 +106,7 @@ static std::vector<std::string> TORCH_NCCL_AVOID_RECORD_STREAMS = {
// can register/deregister the tensor on all available NCCL communicators.
static std::vector<std::string> TORCH_NCCL_USE_TENSOR_REGISTER_ALLOCATOR_HOOK =
{"TORCH_NCCL_USE_TENSOR_REGISTER_ALLOCATOR_HOOK",
"NCCL_USE_TENSOR_REGISTER_ALLOCATOR_HOOK"};
"TORCH_NCCL_USE_TENSOR_REGISTER_ALLOCATOR_HOOK"};

// ProcessGroupNCCL implements NCCL bindings for c10d.
//
Expand Down
4 changes: 2 additions & 2 deletions torch/csrc/distributed/c10d/logger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ namespace c10d {

static std::vector<std::string> TORCH_NCCL_BLOCKING_WAIT = {
"TORCH_NCCL_BLOCKING_WAIT",
"NCCL_BLOCKING_WAIT"};
"TORCH_NCCL_BLOCKING_WAIT"};
static std::vector<std::string> TORCH_NCCL_ASYNC_ERROR_HANDLING = {
"TORCH_NCCL_ASYNC_ERROR_HANDLING",
"NCCL_ASYNC_ERROR_HANDLING"};
"TORCH_NCCL_ASYNC_ERROR_HANDLING"};

// Logs runtime stats to configured destination. Note that since data collection
// only runs every ddp_runtime_logging_sample_rate iterations, the actual
Expand Down
2 changes: 1 addition & 1 deletion torch/distributed/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# Separate timeout for PGNCCL mainly becuase it's always been that way in the C++ layer, but until recently
# there was one default that applied across all backends in the python layer.
# Later, we could consider merging them back together at the c++ layer if we can align on a same value.
# (only if NCCL_BLOCKING_WAIT or NCCL_ASYNC_ERROR_HANDLING is set to 1).
# (only if TORCH_NCCL_BLOCKING_WAIT or TORCH_NCCL_ASYNC_ERROR_HANDLING is set to 1).

try:
from torch._C._distributed_c10d import _DEFAULT_PG_NCCL_TIMEOUT
Expand Down
2 changes: 1 addition & 1 deletion torch/distributed/distributed_c10d.py
Original file line number Diff line number Diff line change
Expand Up @@ -1100,7 +1100,7 @@ def init_process_group(
This is the duration after which collectives will be aborted asynchronously and the process will crash.
This is done since CUDA execution is async and it is no longer safe to continue executing user code since
failed async NCCL operations might result in subsequent CUDA operations running on corrupted data.
When NCCL_BLOCKING_WAIT is set, the process will block and wait for this timeout.
When TORCH_NCCL_BLOCKING_WAIT is set, the process will block and wait for this timeout.
group_name (str, optional, deprecated): Group name. This argument is ignored
pg_options (ProcessGroupOptions, optional): process group options
Expand Down
4 changes: 2 additions & 2 deletions torch/distributed/elastic/agent/server/local_elastic_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,8 @@ def _start_workers(self, worker_group: WorkerGroup) -> Dict[int, Any]:
"TORCHELASTIC_MAX_RESTARTS": str(spec.max_restarts),
"TORCHELASTIC_RUN_ID": spec.rdzv_handler.get_run_id(),
"TORCHELASTIC_USE_AGENT_STORE": str(use_agent_store),
"NCCL_ASYNC_ERROR_HANDLING": os.getenv(
"NCCL_ASYNC_ERROR_HANDLING", str(1)
"TORCH_NCCL_ASYNC_ERROR_HANDLING": os.getenv(
"TORCH_NCCL_ASYNC_ERROR_HANDLING", str(1)
),
}
if "OMP_NUM_THREADS" in os.environ:
Expand Down
4 changes: 2 additions & 2 deletions torch/nn/parallel/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def _dump_DDP_relevant_env_vars():
"GLOO_SOCKET_IFNAME",
"GLOO_DEVICE_TRANSPORT",
"NCCL_SOCKET_IFNAME",
"NCCL_BLOCKING_WAIT",
"TORCH_NCCL_BLOCKING_WAIT",
"NCCL_DEBUG",
"NCCL_DEBUG_SUBSYS",
"NCCL_IB_DISABLE",
Expand Down Expand Up @@ -210,7 +210,7 @@ def _dump_DDP_relevant_env_vars():
"NCCL_COLLNET_ENABLE",
"NCCL_TOPO_FILE",
"NCCL_TOPO_DUMP_FILE",
"NCCL_ASYNC_ERROR_HANDLING",
"TORCH_NCCL_ASYNC_ERROR_HANDLING",
]
formatted_output = ""
for var in relevant_env_vars:
Expand Down
Loading

0 comments on commit 79e388f

Please sign in to comment.