Skip to content

Commit

Permalink
Fixing local experiments due to misusage of multiprocessing queue ins…
Browse files Browse the repository at this point in the history
…tead of sync manager queue
gustavogaldinoo committed Aug 12, 2024
1 parent fd6a30b commit 5042cbb
Showing 1 changed file with 58 additions and 60 deletions.
118 changes: 58 additions & 60 deletions experiment/measurer/measure_manager.py
Original file line number Diff line number Diff line change
@@ -704,12 +704,13 @@ class BaseMeasureManager:
def __init__(self, experiment: str, region_coverage: bool):
self.region_coverage = region_coverage
self.experiment = experiment
self.measurers_cpus = None

def initialize_queues(self):
def initialize_queues(self, manager):
"""Initialize and returns request and response queues, respectively."""
raise NotImplementedError

def start_workers(self, request_queue, response_queue):
def start_workers(self, request_queue, response_queue, pool):
"""Initialize measure workers."""
raise NotImplementedError

@@ -798,18 +799,23 @@ def measure_manager_loop(self, max_total_time: int):
measurements tasks from workers, retrieve measurement results from
response queue and writes measured snapshots in database."""
logger.info('Starting measure manager loop.')
with multiprocessing.Pool() as pool:
if not self.measurers_cpus:
self.measurers_cpus = multiprocessing.cpu_count()
logger.info(
'Number of measurer CPUs not passed as argument. using %d',
self.measurers_cpus)
with multiprocessing.Pool(processes=self.measurers_cpus) as pool, multiprocessing.Manager() as manager: # pylint: disable=line-too-long
set_up_coverage_binaries(pool, self.experiment)
(request_queue, response_queue) = self.initialize_queues()
self.start_workers(request_queue, response_queue)
max_cycle = _time_to_cycle(max_total_time)
queued_snapshots = set()
while not scheduler.all_trials_ended(self.experiment):
continue_inner_loop = self.measure_manager_inner_loop(
max_cycle, request_queue, response_queue, queued_snapshots)
if not continue_inner_loop:
break
time.sleep(MEASUREMENT_LOOP_WAIT)
(request_queue, response_queue) = self.initialize_queues(manager)
self.start_workers(request_queue, response_queue, pool)
max_cycle = _time_to_cycle(max_total_time)
queued_snapshots = set()
while not scheduler.all_trials_ended(self.experiment):
continue_inner_loop = self.measure_manager_inner_loop(
max_cycle, request_queue, response_queue, queued_snapshots)
if not continue_inner_loop:
break
time.sleep(MEASUREMENT_LOOP_WAIT)
logger.info('All trials ended. Ending measure manager loop')


@@ -825,34 +831,28 @@ def __init__(self,
self.measurers_cpus = measurers_cpus

def initialize_queues(
self
self, manager
) -> Tuple[multiprocessing.queues.Queue, multiprocessing.queues.Queue]:
return (multiprocessing.Queue(), multiprocessing.Queue())
return (manager.Queue(), manager.Queue())

def start_workers(self, request_queue: multiprocessing.queues.Queue,
response_queue: multiprocessing.queues.Queue):
if not self.measurers_cpus:
self.measurers_cpus = multiprocessing.cpu_count()
logger.info(
'Number of measurer CPUs not passed as argument. using %d',
self.measurers_cpus)
with multiprocessing.Pool(processes=self.measurers_cpus) as pool:
config = {
'request_queue': request_queue,
'response_queue': response_queue,
'region_coverage': self.region_coverage,
}
local_measure_worker = measure_worker.LocalMeasureWorker(config)

# Since each worker is going to be in an infinite loop, we dont need
# result return. Workers' life scope will end automatically when
# there are no more snapshots left to measure.
log_message = ('Starting measure worker loop for'
f'{self.measurers_cpus}'
'workers in local measure manager')
logger.info(log_message)
for _ in range(self.measurers_cpus):
pool.apply_async(local_measure_worker.measure_worker_loop)
response_queue: multiprocessing.queues.Queue, pool):
config = {
'request_queue': request_queue,
'response_queue': response_queue,
'region_coverage': self.region_coverage,
}
local_measure_worker = measure_worker.LocalMeasureWorker(config)

# Since each worker is going to be in an infinite loop, we dont need
# result return. Workers' life scope will end automatically when
# there are no more snapshots left to measure.
log_message = ('Starting measure worker loop for'
f'{self.measurers_cpus}'
'workers in local measure manager')
logger.info(log_message)
for _ in range(self.measurers_cpus):
pool.apply_async(local_measure_worker.measure_worker_loop)

def get_result_from_response_queue(
self, response_queue: multiprocessing.queues.Queue):
@@ -885,7 +885,7 @@ def __init__(self,
self.project_id, self.response_queue_subscription_id)
self.measurers_cpus = measurers_cpus

def initialize_queues(self) -> Tuple[str, str]:
def initialize_queues(self, manager) -> Tuple[str, str]:
request_queue_topic_path = self.publisher_client.topic_path(
self.project_id, self.request_queue_topic_id)
request_queue_topic = self.publisher_client.create_topic(
@@ -907,33 +907,31 @@ def _create_response_queue_subscription(self):

return self.subscription_path

def start_workers(self, request_queue, response_queue):
def start_workers(self, request_queue, response_queue, pool):
self._create_response_queue_subscription()
if not self.measurers_cpus:
self.measurers_cpus = multiprocessing.cpu_count()
logger.info(
'Number of measurer CPUs not passed as argument. using %d',
self.measurers_cpus)
with multiprocessing.Pool(processes=self.measurers_cpus) as pool:
config = {
'request_queue_topic_id': self.request_queue_topic_id,
'response_queue_topic_id': self.response_queue_topic_id,
'region_coverage': self.region_coverage,
'project_id': self.project_id,
'experiment': self.experiment,
}
google_cloud_worker = measure_worker.GoogleCloudMeasureWorker(
config)

# Since each worker is going to be in an infinite loop, we dont need
# result return. Workers' life scope will end automatically when
# there are no more snapshots left to measure.
log_message = ('Starting measure worker loop for'
f'{self.measurers_cpus}'
'workers in google cloud measure manager')
logger.info(log_message)
for _ in range(self.measurers_cpus):
pool.apply_async(google_cloud_worker.measure_worker_loop)
config = {
'request_queue_topic_id': self.request_queue_topic_id,
'response_queue_topic_id': self.response_queue_topic_id,
'region_coverage': self.region_coverage,
'project_id': self.project_id,
'experiment': self.experiment,
}
google_cloud_worker = measure_worker.GoogleCloudMeasureWorker(config)

# Since each worker is going to be in an infinite loop, we dont need
# result return. Workers' life scope will end automatically when
# there are no more snapshots left to measure.
log_message = ('Starting measure worker loop for'
f'{self.measurers_cpus}'
'workers in google cloud measure manager')
logger.info(log_message)
for _ in range(self.measurers_cpus):
pool.apply_async(google_cloud_worker.measure_worker_loop)

def get_result_from_response_queue(self, response_queue: str):

0 comments on commit 5042cbb

Please sign in to comment.