From 5042cbb2d0eae40676fd6fe1f88261c78120e5d8 Mon Sep 17 00:00:00 2001 From: gustavogaldinoo Date: Mon, 12 Aug 2024 17:33:04 +0000 Subject: [PATCH] Fixing local experiments due to misusage of multiprocessing queue instead of sync manager queue --- experiment/measurer/measure_manager.py | 118 ++++++++++++------------- 1 file changed, 58 insertions(+), 60 deletions(-) diff --git a/experiment/measurer/measure_manager.py b/experiment/measurer/measure_manager.py index 84934c210..af798c1d6 100644 --- a/experiment/measurer/measure_manager.py +++ b/experiment/measurer/measure_manager.py @@ -704,12 +704,13 @@ class BaseMeasureManager: def __init__(self, experiment: str, region_coverage: bool): self.region_coverage = region_coverage self.experiment = experiment + self.measurers_cpus = None - def initialize_queues(self): + def initialize_queues(self, manager): """Initialize and returns request and response queues, respectively.""" raise NotImplementedError - def start_workers(self, request_queue, response_queue): + def start_workers(self, request_queue, response_queue, pool): """Initialize measure workers.""" raise NotImplementedError @@ -798,18 +799,23 @@ def measure_manager_loop(self, max_total_time: int): measurements tasks from workers, retrieve measurement results from response queue and writes measured snapshots in database.""" logger.info('Starting measure manager loop.') - with multiprocessing.Pool() as pool: + if not self.measurers_cpus: + self.measurers_cpus = multiprocessing.cpu_count() + logger.info( + 'Number of measurer CPUs not passed as argument. using %d', + self.measurers_cpus) + with multiprocessing.Pool(processes=self.measurers_cpus) as pool, multiprocessing.Manager() as manager: # pylint: disable=line-too-long set_up_coverage_binaries(pool, self.experiment) - (request_queue, response_queue) = self.initialize_queues() - self.start_workers(request_queue, response_queue) - max_cycle = _time_to_cycle(max_total_time) - queued_snapshots = set() - while not scheduler.all_trials_ended(self.experiment): - continue_inner_loop = self.measure_manager_inner_loop( - max_cycle, request_queue, response_queue, queued_snapshots) - if not continue_inner_loop: - break - time.sleep(MEASUREMENT_LOOP_WAIT) + (request_queue, response_queue) = self.initialize_queues(manager) + self.start_workers(request_queue, response_queue, pool) + max_cycle = _time_to_cycle(max_total_time) + queued_snapshots = set() + while not scheduler.all_trials_ended(self.experiment): + continue_inner_loop = self.measure_manager_inner_loop( + max_cycle, request_queue, response_queue, queued_snapshots) + if not continue_inner_loop: + break + time.sleep(MEASUREMENT_LOOP_WAIT) logger.info('All trials ended. Ending measure manager loop') @@ -825,34 +831,28 @@ def __init__(self, self.measurers_cpus = measurers_cpus def initialize_queues( - self + self, manager ) -> Tuple[multiprocessing.queues.Queue, multiprocessing.queues.Queue]: - return (multiprocessing.Queue(), multiprocessing.Queue()) + return (manager.Queue(), manager.Queue()) def start_workers(self, request_queue: multiprocessing.queues.Queue, - response_queue: multiprocessing.queues.Queue): - if not self.measurers_cpus: - self.measurers_cpus = multiprocessing.cpu_count() - logger.info( - 'Number of measurer CPUs not passed as argument. using %d', - self.measurers_cpus) - with multiprocessing.Pool(processes=self.measurers_cpus) as pool: - config = { - 'request_queue': request_queue, - 'response_queue': response_queue, - 'region_coverage': self.region_coverage, - } - local_measure_worker = measure_worker.LocalMeasureWorker(config) - - # Since each worker is going to be in an infinite loop, we dont need - # result return. Workers' life scope will end automatically when - # there are no more snapshots left to measure. - log_message = ('Starting measure worker loop for' - f'{self.measurers_cpus}' - 'workers in local measure manager') - logger.info(log_message) - for _ in range(self.measurers_cpus): - pool.apply_async(local_measure_worker.measure_worker_loop) + response_queue: multiprocessing.queues.Queue, pool): + config = { + 'request_queue': request_queue, + 'response_queue': response_queue, + 'region_coverage': self.region_coverage, + } + local_measure_worker = measure_worker.LocalMeasureWorker(config) + + # Since each worker is going to be in an infinite loop, we dont need + # result return. Workers' life scope will end automatically when + # there are no more snapshots left to measure. + log_message = ('Starting measure worker loop for' + f'{self.measurers_cpus}' + 'workers in local measure manager') + logger.info(log_message) + for _ in range(self.measurers_cpus): + pool.apply_async(local_measure_worker.measure_worker_loop) def get_result_from_response_queue( self, response_queue: multiprocessing.queues.Queue): @@ -885,7 +885,7 @@ def __init__(self, self.project_id, self.response_queue_subscription_id) self.measurers_cpus = measurers_cpus - def initialize_queues(self) -> Tuple[str, str]: + def initialize_queues(self, manager) -> Tuple[str, str]: request_queue_topic_path = self.publisher_client.topic_path( self.project_id, self.request_queue_topic_id) request_queue_topic = self.publisher_client.create_topic( @@ -907,33 +907,31 @@ def _create_response_queue_subscription(self): return self.subscription_path - def start_workers(self, request_queue, response_queue): + def start_workers(self, request_queue, response_queue, pool): self._create_response_queue_subscription() if not self.measurers_cpus: self.measurers_cpus = multiprocessing.cpu_count() logger.info( 'Number of measurer CPUs not passed as argument. using %d', self.measurers_cpus) - with multiprocessing.Pool(processes=self.measurers_cpus) as pool: - config = { - 'request_queue_topic_id': self.request_queue_topic_id, - 'response_queue_topic_id': self.response_queue_topic_id, - 'region_coverage': self.region_coverage, - 'project_id': self.project_id, - 'experiment': self.experiment, - } - google_cloud_worker = measure_worker.GoogleCloudMeasureWorker( - config) - - # Since each worker is going to be in an infinite loop, we dont need - # result return. Workers' life scope will end automatically when - # there are no more snapshots left to measure. - log_message = ('Starting measure worker loop for' - f'{self.measurers_cpus}' - 'workers in google cloud measure manager') - logger.info(log_message) - for _ in range(self.measurers_cpus): - pool.apply_async(google_cloud_worker.measure_worker_loop) + config = { + 'request_queue_topic_id': self.request_queue_topic_id, + 'response_queue_topic_id': self.response_queue_topic_id, + 'region_coverage': self.region_coverage, + 'project_id': self.project_id, + 'experiment': self.experiment, + } + google_cloud_worker = measure_worker.GoogleCloudMeasureWorker(config) + + # Since each worker is going to be in an infinite loop, we dont need + # result return. Workers' life scope will end automatically when + # there are no more snapshots left to measure. + log_message = ('Starting measure worker loop for' + f'{self.measurers_cpus}' + 'workers in google cloud measure manager') + logger.info(log_message) + for _ in range(self.measurers_cpus): + pool.apply_async(google_cloud_worker.measure_worker_loop) def get_result_from_response_queue(self, response_queue: str):