Skip to content

Commit

Permalink
fix: issue of multiprocessing hangs
Browse files Browse the repository at this point in the history
  • Loading branch information
penglei0 committed Oct 19, 2024
1 parent 43789ff commit a9b97d6
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 18 deletions.
3 changes: 2 additions & 1 deletion src/core/network_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,11 @@ def start_networks(self, top_config: ITopology):
for i in range(self.num_of_networks):
if not self.networks[i].is_started():
self.networks[i].start()
logging.info("########## Oasis start the network %s.", i)
else:
# reload the network instances can save time
self.networks[i].reload(top_config)
logging.info("########## Oasis start the network %s.", i)
logging.info("########## Oasis reload the network %s.", i)

def stop_networks(self):
# Stop all networks
Expand Down
36 changes: 19 additions & 17 deletions src/run_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import platform
import multiprocessing
from multiprocessing import Manager
import yaml

# from mininet.cli import CLI
Expand Down Expand Up @@ -420,10 +421,6 @@ def perform_test_in_process(network, test_name, id, result_dict):
class ContainerTestRunner:
def __init__(self, test_yml_config, config_path):
self.test_yml_config = test_yml_config
self.process_manager = multiprocessing.Manager()
self.processes = []
self.shared_dict = self.process_manager.dict()
self.merged_results = {}
self.config_mapped_prefix = config_path
self.target_protocols = None
self._load_protocols()
Expand Down Expand Up @@ -457,26 +454,27 @@ def setup_tests(self, networks):
return False
return True

def execute_tests(self, networks):
def execute_tests(self, networks, shared_dict):
if self.target_protocols is None:
logging.error("Error: no target protocols.")
return False
net_num = len(networks)
if net_num == 0:
logging.error("Error: no networks.")
return False
processes = []
test_name = self.test_yml_config['name']
# 4. perform the test for each target protocol in parallel with different processes
for i in range(net_num):
p = multiprocessing.Process(target=perform_test_in_process,
args=(networks[i],
test_name,
i, self.shared_dict))
self.processes.append(p)
i, shared_dict))
processes.append(p)
p.start()

# 4.1 Wait for all processes to complete
for i, p in enumerate(self.processes):
for i, p in enumerate(processes):
p.join(timeout=600)
if p.is_alive():
logging.error(f"Process %s for test %s is stuck.",
Expand All @@ -490,16 +488,15 @@ def execute_tests(self, networks):
i, test_name)
return True

def handle_test_results(self, networks, top_index, top_description):
def handle_test_results(self, net_num, top_index, top_description, shared_dict):
# 5. merge multiple test results into one dictionary
net_num = len(networks)
test_name = self.test_yml_config['name']
merged_results = {}
for i in range(net_num):
if i not in self.shared_dict:
if i not in shared_dict:
logging.error(f"No results found for process %s.", i)
self._handle_failure()
for shared_test_type, shared_test_result in self.shared_dict[i].items():
for shared_test_type, shared_test_result in shared_dict[i].items():
if shared_test_type not in merged_results:
merged_results[shared_test_type] = {
'results': [],
Expand Down Expand Up @@ -539,7 +536,6 @@ def handle_test_results(self, networks, top_index, top_description):

def cleanup(self):
# 6. reset the network then go to the next test case
self.process_manager.shutdown()
return True

def _load_protocols(self):
Expand Down Expand Up @@ -656,13 +652,16 @@ def _handle_success(self):
# and each target protocol will be tested on each network instance.
required_network_ins = test_runner.target_protocol_num()
# 1.2 Build multiple network instances.
logging.info("cur_top_ins %s", cur_top_ins)
res = network_manager.build_networks(cur_node_config,
cur_top_ins,
required_network_ins,
test['route'])
if res is False:
continue
network_manager.start_networks(cur_top_ins)
all_networks = network_manager.get_networks()
all_networks_num = len(all_networks)
description = network_manager.get_top_description()
logging.info(
"######################################################")
Expand All @@ -678,21 +677,24 @@ def _handle_success(self):
network_manager.stop_networks()
sys.exit(1)
# 1.4 Execute the test on all network instances
res = test_runner.execute_tests(all_networks)
process_manager = Manager()
process_shared_dict = process_manager.dict()
res = test_runner.execute_tests(
all_networks, process_shared_dict)
if res is False:
test_runner.cleanup()
network_manager.stop_networks()
sys.exit(1)
# 1.5 Collect the test results, and analyze/diagnostic the results.
res = test_runner.handle_test_results(
all_networks, index, description)
all_networks_num, index, description, process_shared_dict)
if res is False:
test_runner.cleanup()
network_manager.stop_networks()
sys.exit(1)

test_runner.cleanup()
process_manager.shutdown()
network_manager.reset_networks()
test_runner.cleanup()
# > for cur_top_index, cur_top_ins in enumerate(cur_topology):
# > for test in all_tests
handle_test_success()
Expand Down

0 comments on commit a9b97d6

Please sign in to comment.