diff --git a/tests/shared/command.py b/tests/shared/command.py index 1a82f77dd69..b7272887011 100644 --- a/tests/shared/command.py +++ b/tests/shared/command.py @@ -49,7 +49,7 @@ def kill_process_tree(self, pid): except OSError as err: print(err) - def run(self, timeout=3600, assert_returncode_zero=True): + def run(self, timeout=3600, assert_returncode_zero=True, retries_on_segfault: int = 0): print(f"Running command: {self.cmd}") def target(): @@ -82,26 +82,32 @@ def target(): except Exception as e: # pylint:disable=broad-except self.thread_exc = e - thread = threading.Thread(target=target) - thread.start() + for i in range(retries_on_segfault + 1): + thread = threading.Thread(target=target) + thread.start() + + thread.join(timeout) + + if self.thread_exc is not None: + raise self.thread_exc + + if thread.is_alive(): + try: + print("Error: process taking too long to complete--terminating" + ", [ " + self.cmd + " ]") + self.kill_process_tree(self.process.pid) + self.exec_time = timeout + self.timeout = True + thread.join() + except OSError as e: + print(self.process.pid, "Exception when try to kill task by PID, " + e.strerror) + raise + returncode = self.process.wait() + print("Process returncode = " + str(returncode)) + if returncode not in (-11, 139): # return codes for SIGSEGV) + break + if retries_on_segfault > 0 and i < retries_on_segfault: + print(f"Process ended with a segfault, retrying - attempt {i + 1} of {retries_on_segfault}") - thread.join(timeout) - - if self.thread_exc is not None: - raise self.thread_exc - - if thread.is_alive(): - try: - print("Error: process taking too long to complete--terminating" + ", [ " + self.cmd + " ]") - self.kill_process_tree(self.process.pid) - self.exec_time = timeout - self.timeout = True - thread.join() - except OSError as e: - print(self.process.pid, "Exception when try to kill task by PID, " + e.strerror) - raise - returncode = self.process.wait() - print("Process returncode = " + str(returncode)) if assert_returncode_zero: assert returncode == 0, "Process exited with a non-zero exit code {}; output:{}".format( returncode, "".join(self.output) diff --git a/tests/torch/helpers.py b/tests/torch/helpers.py index e89a524bfd4..9846327abba 100644 --- a/tests/torch/helpers.py +++ b/tests/torch/helpers.py @@ -436,10 +436,10 @@ def resolve_constant_node_inputs_to_values( class Command(BaseCommand): - def run(self, timeout=3600, assert_returncode_zero=True): + def run(self, timeout=3600, assert_returncode_zero=True, retries_on_segfault: int = 0): if torch.cuda.is_available(): torch.cuda.empty_cache() # See runs_subprocess_in_precommit for more info on why this is needed - return super().run(timeout, assert_returncode_zero) + return super().run(timeout, assert_returncode_zero, retries_on_segfault) def module_scope_from_node_name(name): diff --git a/tests/torch/test_sanity_sample.py b/tests/torch/test_sanity_sample.py index 9e206c0b127..17d8d4ea47b 100644 --- a/tests/torch/test_sanity_sample.py +++ b/tests/torch/test_sanity_sample.py @@ -106,7 +106,7 @@ for tpl in list(zip(CONFIGS[sample_type_], DATASETS[sample_type_], BATCHSIZE_PER_GPU[sample_type_])): CONFIG_PARAMS.append((sample_type_,) + tpl) - +RETRIES_ON_SEGFAULT = 3 def _get_test_case_id(p) -> str: return "-".join([p[0], p[1].name, p[2], str(p[3])]) @@ -142,6 +142,125 @@ def update_compression_algo_dict_with_legr_save_load_params(nncf_config, tmp_pat algo_dict["params"]["load_ranking_coeffs_path"] = os.path.join(tmp_path, "ranking_coeffs.json") return nncf_config +def _get_test_case_id(p) -> str: + return "-".join([p[0], p[1].name, p[2], str(p[3])]) + + +@pytest.fixture(params=CONFIG_PARAMS, name="config", ids=[_get_test_case_id(p) for p in CONFIG_PARAMS]) +def fixture_config(request, dataset_dir): + sample_type, config_path, dataset_name, batch_size = request.param + dataset_path = DATASET_PATHS[sample_type][dataset_name](dataset_dir) + + with config_path.open() as f: + jconfig = json.load(f) + + if "checkpoint_save_dir" in jconfig.keys(): + del jconfig["checkpoint_save_dir"] + + # Use a reduced number of BN adaptation samples for speed + if "compression" in jconfig: + if isinstance(jconfig["compression"], list): + algos_list = jconfig["compression"] + for algo_dict in algos_list: + update_compression_algo_dict_with_reduced_bn_adapt_params(algo_dict) + else: + algo_dict = jconfig["compression"] + update_compression_algo_dict_with_reduced_bn_adapt_params(algo_dict) + jconfig["dataset"] = dataset_name + + return { + "sample_type": sample_type, + "sample_config": jconfig, + "model_name": jconfig["model"], + "dataset_path": dataset_path, + "batch_size": batch_size, + "test_case_id": _get_test_case_id(request.param), + } + + +@pytest.fixture(scope="module", name="case_common_dirs") +def fixture_case_common_dirs(tmp_path_factory): + return { + "checkpoint_save_dir": str(tmp_path_factory.mktemp("models")), + "save_coeffs_path": str(tmp_path_factory.mktemp("ranking_coeffs")), + } + + +@pytest.mark.parametrize(" multiprocessing_distributed", (True, False), ids=["distributed", "dataparallel"]) +def test_pretrained_model_eval(config, tmp_path, multiprocessing_distributed, case_common_dirs): + if version.parse(torchvision.__version__) < version.parse("0.13") and "voc" in str(config["dataset_path"]): + pytest.skip( + f"Test calls sample that uses `datasets.VOCDetection.parse_voc_xml` function from latest " + f"torchvision.\nThe signature of the function is not compatible with the corresponding signature " + f"from the current torchvision version : {torchvision.__version__}" + ) + config_factory = ConfigFactory(config["sample_config"], tmp_path / "config.json") + config_factory.config = update_compression_algo_dict_with_legr_save_load_params( + config_factory.config, case_common_dirs["save_coeffs_path"] + ) + args = { + "--mode": "test", + "--data": config["dataset_path"], + "--config": config_factory.serialize(), + "--log-dir": tmp_path, + "--batch-size": config["batch_size"] * NUM_DEVICES, + "--workers": 0, # Workaround for the PyTorch MultiProcessingDataLoader issue + "--dist-url": "tcp://127.0.0.1:8987", + } + + if not torch.cuda.is_available(): + args["--cpu-only"] = True + elif multiprocessing_distributed: + args["--multiprocessing-distributed"] = True + + runner = Command(create_command_line(args, config["sample_type"]), env=ROOT_PYTHONPATH_ENV) + runner.run(retries_on_segfault=RETRIES_ON_SEGFAULT) + + +@pytest.mark.dependency() +@pytest.mark.parametrize("multiprocessing_distributed", [True, False], ids=["distributed", "dataparallel"]) +def test_pretrained_model_train(config, tmp_path, multiprocessing_distributed, case_common_dirs): + checkpoint_save_dir = os.path.join( + case_common_dirs["checkpoint_save_dir"], "distributed" if multiprocessing_distributed else "data_parallel" + ) + config_factory = ConfigFactory(config["sample_config"], tmp_path / "config.json") + config_factory.config = update_compression_algo_dict_with_legr_save_load_params( + config_factory.config, case_common_dirs["save_coeffs_path"] + ) + + args = { + "--mode": "train", + "--data": config["dataset_path"], + "--config": config_factory.serialize(), + "--log-dir": tmp_path, + "--batch-size": config["batch_size"] * NUM_DEVICES, + "--workers": 0, # Workaround for the PyTorch MultiProcessingDataLoader issue + "--epochs": 2, + "--checkpoint-save-dir": checkpoint_save_dir, + "--dist-url": "tcp://127.0.0.1:8989", + } + + if not torch.cuda.is_available(): + args["--cpu-only"] = True + elif multiprocessing_distributed: + args["--multiprocessing-distributed"] = True + elif config["sample_config"]["model"] == "inception_v3": + pytest.skip( + "InceptionV3 may not be trained in DataParallel " + "because it outputs namedtuple, which DP seems to be unable " + "to support even still." + ) + + runner = Command(create_command_line(args, config["sample_type"]), env=ROOT_PYTHONPATH_ENV) + runner.run(retries_on_segfault=RETRIES_ON_SEGFAULT) + last_checkpoint_path = os.path.join(checkpoint_save_dir, get_run_name(config_factory.config) + "_last.pth") + assert os.path.exists(last_checkpoint_path) + if "compression" in config["sample_config"]: + allowed_compression_stages = (CompressionStage.FULLY_COMPRESSED, CompressionStage.PARTIALLY_COMPRESSED) + else: + allowed_compression_stages = (CompressionStage.UNCOMPRESSED,) + compression_stage = extract_compression_stage_from_checkpoint(last_checkpoint_path) + assert compression_stage in allowed_compression_stages def extract_compression_stage_from_checkpoint(last_checkpoint_path: str) -> CompressionStage: compression_state = torch.load(last_checkpoint_path)[COMPRESSION_STATE_ATTR] @@ -150,12 +269,52 @@ def extract_compression_stage_from_checkpoint(last_checkpoint_path: str) -> Comp return compression_stage + def depends_on_pretrained_train(request, test_case_id: str, current_multiprocessing_distributed: bool): full_test_case_id = test_case_id + ("-distributed" if current_multiprocessing_distributed else "-dataparallel") primary_test_case_name = f"TestSanitySample::test_pretrained_model_train[{full_test_case_id}]" depends(request, [primary_test_case_name]) +@pytest.mark.dependency() +@pytest.mark.parametrize("multiprocessing_distributed", [True, False], ids=["distributed", "dataparallel"]) +def test_trained_model_eval(request, config, tmp_path, multiprocessing_distributed, case_common_dirs): + if version.parse(torchvision.__version__) < version.parse("0.13") and "voc" in str(config["dataset_path"]): + pytest.skip( + f"Test calls sample that uses `datasets.VOCDetection.parse_voc_xml` function from latest " + f"torchvision.\nThe signature of the function is not compatible with the corresponding signature " + f"from the current torchvision version : {torchvision.__version__}" + ) + depends_on_pretrained_train(request, config["test_case_id"], multiprocessing_distributed) + config_factory = ConfigFactory(config["sample_config"], tmp_path / "config.json") + config_factory.config = update_compression_algo_dict_with_legr_save_load_params( + config_factory.config, case_common_dirs["save_coeffs_path"] + ) + + ckpt_path = os.path.join( + case_common_dirs["checkpoint_save_dir"], + "distributed" if multiprocessing_distributed else "data_parallel", + get_run_name(config_factory.config) + "_last.pth", + ) + args = { + "--mode": "test", + "--data": config["dataset_path"], + "--config": config_factory.serialize(), + "--log-dir": tmp_path, + "--batch-size": config["batch_size"] * NUM_DEVICES, + "--workers": 0, # Workaround for the PyTorch MultiProcessingDataLoader issue + "--weights": ckpt_path, + "--dist-url": "tcp://127.0.0.1:8987", + } + + if not torch.cuda.is_available(): + args["--cpu-only"] = True + elif multiprocessing_distributed: + args["--multiprocessing-distributed"] = True + + runner = Command(create_command_line(args, config["sample_type"]), env=ROOT_PYTHONPATH_ENV) + runner.run(retries_on_segfault=RETRIES_ON_SEGFAULT) + def get_resuming_checkpoint_path(config_factory, multiprocessing_distributed, checkpoint_save_dir): return os.path.join( checkpoint_save_dir, @@ -173,6 +332,40 @@ def set_num_threads_locally(n=1): finally: torch.set_num_threads(old_n) + ckpt_path = get_resuming_checkpoint_path( + config_factory, multiprocessing_distributed, case_common_dirs["checkpoint_save_dir"] + ) + if "max_iter" in config_factory.config: + config_factory.config["max_iter"] += 2 + args = { + "--mode": "train", + "--data": config["dataset_path"], + "--config": config_factory.serialize(), + "--log-dir": tmp_path, + "--batch-size": config["batch_size"] * NUM_DEVICES, + "--workers": 0, # Workaround for the PyTorch MultiProcessingDataLoader issue + "--epochs": 3, + "--checkpoint-save-dir": checkpoint_save_dir, + "--resume": ckpt_path, + "--dist-url": "tcp://127.0.0.1:8986", + } + + if not torch.cuda.is_available(): + args["--cpu-only"] = True + elif multiprocessing_distributed: + args["--multiprocessing-distributed"] = True + + runner = Command(create_command_line(args, config["sample_type"]), env=ROOT_PYTHONPATH_ENV) + runner.run(retries_on_segfault=RETRIES_ON_SEGFAULT) + last_checkpoint_path = os.path.join(checkpoint_save_dir, get_run_name(config_factory.config) + "_last.pth") + assert os.path.exists(last_checkpoint_path) + if "compression" in config["sample_config"]: + allowed_compression_stages = (CompressionStage.FULLY_COMPRESSED, CompressionStage.PARTIALLY_COMPRESSED) + else: + allowed_compression_stages = (CompressionStage.UNCOMPRESSED,) + compression_stage = extract_compression_stage_from_checkpoint(last_checkpoint_path) + assert compression_stage in allowed_compression_stages + @pytest.mark.nightly class TestSanitySample: @@ -244,8 +437,9 @@ def test_pretrained_model_eval(config, tmp_path, multiprocessing_distributed, ca elif multiprocessing_distributed: args["--multiprocessing-distributed"] = True - runner = Command(create_command_line(args, config["sample_type"]), env=ROOT_PYTHONPATH_ENV) - runner.run() + runner = Command(create_command_line(args, config["sample_type"]), env=ROOT_PYTHONPATH_ENV) + runner.run(retries_on_segfault=RETRIES_ON_SEGFAULT) + assert os.path.exists(onnx_path) @staticmethod @pytest.mark.dependency() @@ -454,6 +648,112 @@ def test_cpu_only_mode_produces_cpu_only_model(config, tmp_path, mocker): "--cpu-only": True, } + model_to_be_trained = staged_worker.train_epoch_staged.call_args[0][2] # model + else: + model_to_be_trained = sample.train_epoch.call_args[0][1] # model + elif config["sample_type"] == "semantic_segmentation": + model_to_be_trained = examples.torch.semantic_segmentation.train.Train.__init__.call_args[0][1] # model + elif config["sample_type"] == "object_detection": + model_to_be_trained = sample.train.call_args[0][0] # net + + for p in model_to_be_trained.parameters(): + assert not p.is_cuda + + +@pytest.mark.parametrize("target_device", [x.value for x in HWConfigType]) +def test_sample_propagates_target_device_cl_param_to_nncf_config(mocker, tmp_path, target_device): + config_dict = { + "input_info": { + "sample_size": [1, 1, 32, 32], + }, + "compression": {"algorithm": "quantization"}, + } + config_factory = ConfigFactory(config_dict, tmp_path / "config.json") + args = { + "--data": str(tmp_path), + "--config": config_factory.serialize(), + "--log-dir": tmp_path, + "--batch-size": 1, + "--target-device": target_device, + } + if not torch.cuda.is_available(): + args["--cpu-only"] = True + + arg_list = arg_list_from_arg_dict(args) + import examples.torch.classification.main as sample + + start_worker_mock = mocker.patch("examples.torch.classification.main.start_worker") + sample.main(arg_list) + + config = start_worker_mock.call_args[0][1].nncf_config + assert config["target_device"] == target_device + + +@pytest.fixture( + name="accuracy_aware_config", + params=[ + TEST_ROOT / "torch" / "data" / "configs" / "resnet18_pruning_accuracy_aware.json", + TEST_ROOT / "torch" / "data" / "configs" / "resnet18_int8_accuracy_aware.json", + ], +) +def fixture_accuracy_aware_config(request): + config_path = request.param + with config_path.open() as f: + jconfig = json.load(f) + + dataset_name = "mock_32x32" + dataset_path = os.path.join("/tmp", "mock_32x32") + sample_type = "classification" + + jconfig["dataset"] = dataset_name + + return { + "sample_type": sample_type, + "sample_config": jconfig, + "model_name": jconfig["model"], + "dataset_path": dataset_path, + "batch_size": 12, + } + + +@pytest.mark.dependency() +@pytest.mark.parametrize("multiprocessing_distributed", [True, False], ids=["distributed", "dataparallel"]) +def test_accuracy_aware_training_pipeline(accuracy_aware_config, tmp_path, multiprocessing_distributed): + config_factory = ConfigFactory(accuracy_aware_config["sample_config"], tmp_path / "config.json") + log_dir = tmp_path / "accuracy_aware" + log_dir = log_dir / "distributed" if multiprocessing_distributed else log_dir / "dataparallel" + + args = { + "--mode": "train", + "--data": accuracy_aware_config["dataset_path"], + "--config": config_factory.serialize(), + "--log-dir": log_dir, + "--batch-size": accuracy_aware_config["batch_size"] * NUM_DEVICES, + "--workers": 0, # Workaround for the PyTorch MultiProcessingDataLoader issue + "--epochs": 2, + "--dist-url": "tcp://127.0.0.1:8989", + } + + if not torch.cuda.is_available(): + args["--cpu-only"] = True + elif multiprocessing_distributed: + args["--multiprocessing-distributed"] = True + + runner = Command(create_command_line(args, accuracy_aware_config["sample_type"]), env=ROOT_PYTHONPATH_ENV) + runner.run(retries_on_segfault=RETRIES_ON_SEGFAULT) + + checkpoint_save_dir = log_dir / get_run_name(config_factory.config) + aa_checkpoint_path = get_accuracy_aware_checkpoint_dir_path(checkpoint_save_dir) + last_checkpoint_path = aa_checkpoint_path / "acc_aware_checkpoint_last.pth" + + assert last_checkpoint_path.exists() + if "compression" in accuracy_aware_config["sample_config"]: + allowed_compression_stages = (CompressionStage.FULLY_COMPRESSED, CompressionStage.PARTIALLY_COMPRESSED) + else: + allowed_compression_stages = (CompressionStage.UNCOMPRESSED,) + compression_stage = extract_compression_stage_from_checkpoint(str(last_checkpoint_path)) + assert compression_stage in allowed_compression_stages + # to prevent starting a not closed mlflow session due to memory leak of config and SafeMLFLow happens with a # mocked train function mocker.patch("examples.torch.common.utils.SafeMLFLow") @@ -529,6 +829,7 @@ def test_sample_propagates_target_device_cl_param_to_nncf_config(mocker, tmp_pat start_worker_mock = mocker.patch("examples.torch.classification.main.start_worker") sample.main(arg_list) + config = start_worker_mock.call_args[0][1].nncf_config assert config["target_device"] == target_device