From a9b97d62e35ee939bc8243acc2f64b39409061ae Mon Sep 17 00:00:00 2001 From: Peng LEI Date: Sun, 20 Oct 2024 00:48:12 +0800 Subject: [PATCH] fix: issue of multiprocessing hangs --- src/core/network_mgr.py | 3 ++- src/run_test.py | 36 +++++++++++++++++++----------------- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/src/core/network_mgr.py b/src/core/network_mgr.py index 4d7de38..82616a9 100644 --- a/src/core/network_mgr.py +++ b/src/core/network_mgr.py @@ -83,10 +83,11 @@ def start_networks(self, top_config: ITopology): for i in range(self.num_of_networks): if not self.networks[i].is_started(): self.networks[i].start() + logging.info("########## Oasis start the network %s.", i) else: # reload the network instances can save time self.networks[i].reload(top_config) - logging.info("########## Oasis start the network %s.", i) + logging.info("########## Oasis reload the network %s.", i) def stop_networks(self): # Stop all networks diff --git a/src/run_test.py b/src/run_test.py index 8f7cad3..d27cae2 100755 --- a/src/run_test.py +++ b/src/run_test.py @@ -3,6 +3,7 @@ import logging import platform import multiprocessing +from multiprocessing import Manager import yaml # from mininet.cli import CLI @@ -420,10 +421,6 @@ def perform_test_in_process(network, test_name, id, result_dict): class ContainerTestRunner: def __init__(self, test_yml_config, config_path): self.test_yml_config = test_yml_config - self.process_manager = multiprocessing.Manager() - self.processes = [] - self.shared_dict = self.process_manager.dict() - self.merged_results = {} self.config_mapped_prefix = config_path self.target_protocols = None self._load_protocols() @@ -457,7 +454,7 @@ def setup_tests(self, networks): return False return True - def execute_tests(self, networks): + def execute_tests(self, networks, shared_dict): if self.target_protocols is None: logging.error("Error: no target protocols.") return False @@ -465,18 +462,19 @@ def execute_tests(self, networks): if net_num == 0: logging.error("Error: no networks.") return False + processes = [] test_name = self.test_yml_config['name'] # 4. perform the test for each target protocol in parallel with different processes for i in range(net_num): p = multiprocessing.Process(target=perform_test_in_process, args=(networks[i], test_name, - i, self.shared_dict)) - self.processes.append(p) + i, shared_dict)) + processes.append(p) p.start() # 4.1 Wait for all processes to complete - for i, p in enumerate(self.processes): + for i, p in enumerate(processes): p.join(timeout=600) if p.is_alive(): logging.error(f"Process %s for test %s is stuck.", @@ -490,16 +488,15 @@ def execute_tests(self, networks): i, test_name) return True - def handle_test_results(self, networks, top_index, top_description): + def handle_test_results(self, net_num, top_index, top_description, shared_dict): # 5. merge multiple test results into one dictionary - net_num = len(networks) test_name = self.test_yml_config['name'] merged_results = {} for i in range(net_num): - if i not in self.shared_dict: + if i not in shared_dict: logging.error(f"No results found for process %s.", i) self._handle_failure() - for shared_test_type, shared_test_result in self.shared_dict[i].items(): + for shared_test_type, shared_test_result in shared_dict[i].items(): if shared_test_type not in merged_results: merged_results[shared_test_type] = { 'results': [], @@ -539,7 +536,6 @@ def handle_test_results(self, networks, top_index, top_description): def cleanup(self): # 6. reset the network then go to the next test case - self.process_manager.shutdown() return True def _load_protocols(self): @@ -656,13 +652,16 @@ def _handle_success(self): # and each target protocol will be tested on each network instance. required_network_ins = test_runner.target_protocol_num() # 1.2 Build multiple network instances. + logging.info("cur_top_ins %s", cur_top_ins) res = network_manager.build_networks(cur_node_config, cur_top_ins, required_network_ins, test['route']) if res is False: continue + network_manager.start_networks(cur_top_ins) all_networks = network_manager.get_networks() + all_networks_num = len(all_networks) description = network_manager.get_top_description() logging.info( "######################################################") @@ -678,21 +677,24 @@ def _handle_success(self): network_manager.stop_networks() sys.exit(1) # 1.4 Execute the test on all network instances - res = test_runner.execute_tests(all_networks) + process_manager = Manager() + process_shared_dict = process_manager.dict() + res = test_runner.execute_tests( + all_networks, process_shared_dict) if res is False: test_runner.cleanup() network_manager.stop_networks() sys.exit(1) # 1.5 Collect the test results, and analyze/diagnostic the results. res = test_runner.handle_test_results( - all_networks, index, description) + all_networks_num, index, description, process_shared_dict) if res is False: test_runner.cleanup() network_manager.stop_networks() sys.exit(1) - - test_runner.cleanup() + process_manager.shutdown() network_manager.reset_networks() + test_runner.cleanup() # > for cur_top_index, cur_top_ins in enumerate(cur_topology): # > for test in all_tests handle_test_success()