Skip to content

Commit

Permalink
Changing code to only use 1 gcloud worker for debugging purporses
Browse files Browse the repository at this point in the history
  • Loading branch information
gustavogaldinoo committed Aug 19, 2024
1 parent b523680 commit f940442
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 5 deletions.
12 changes: 8 additions & 4 deletions experiment/measurer/measure_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -851,9 +851,9 @@ def start_workers(self, request_queue: multiprocessing.queues.Queue,
# Since each worker is going to be in an infinite loop, we dont need
# result return. Workers' life scope will end automatically when
# there are no more snapshots left to measure.
log_message = ('Starting measure worker loop for'
log_message = ('Starting measure worker loop for '
f'{self.measurers_cpus}'
'workers in local measure manager')
' workers in local measure manager')
logger.info(log_message)
for _ in range(self.measurers_cpus):
pool.apply_async(local_measure_worker.measure_worker_loop)
Expand Down Expand Up @@ -937,13 +937,17 @@ def start_workers(self, request_queue, response_queue, pool):
'experiment': self.experiment,
}
google_cloud_worker = measure_worker.GoogleCloudMeasureWorker(config)
# Only creating one worker for debugging purposes
pool.apply_async(google_cloud_worker.measure_worker_loop)

return
# Since each worker is going to be in an infinite loop, we dont need
# result return. Workers' life scope will end automatically when
# there are no more snapshots left to measure.
log_message = ('Starting measure worker loop for'
# pylint: disable=unreachable
log_message = ('Starting measure worker loop for '
f'{self.measurers_cpus}'
'workers in google cloud measure manager')
' workers in google cloud measure manager')
logger.info(log_message)
for _ in range(self.measurers_cpus):
pool.apply_async(google_cloud_worker.measure_worker_loop)
Expand Down
6 changes: 5 additions & 1 deletion experiment/measurer/test_measure_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,7 @@ def test_consume_unmapped_type_from_response_queue(local_measure_manager):
response_queue, set())
assert not snapshots


def test_consume_none_from_response_queue(local_measure_manager):
"""Tests the scenario where None is retrieved from the response queue.
Should expect to raise queue.Empty exception, break loop early, and return
Expand All @@ -458,6 +459,7 @@ def test_consume_none_from_response_queue(local_measure_manager):
# Should return an empty list
assert not snapshots


def test_consume_retry_type_from_response_queue(local_measure_manager):
"""Tests the scenario where a retry object is retrieved from the
response queue. In this scenario, we want to remove the snapshot identifier
Expand Down Expand Up @@ -661,7 +663,9 @@ def test_gcloud_measure_manager_start_workers(mock_gcloud_measure_worker,
gcloud_measure_manager):
"""Tests that the start workers method is calling the measure worker loop
method, a number of times equal to the number of measurers CPUs."""
cpus_available = multiprocessing.cpu_count()
# Changing this to 1 temporarily to debug gcloud worker
cpus_available = 1
# cpus_available = multiprocessing.cpu_count()
gcloud_measure_manager.measurers_cpus = cpus_available
with mock.patch('multiprocessing.pool.Pool.apply_async') as pool:
gcloud_measure_manager.start_workers('request-queue-topic',
Expand Down

0 comments on commit f940442

Please sign in to comment.