Skip to content

Commit

Permalink
Refactoring gcloud workers to initialize a subscriber and publisher c…
Browse files Browse the repository at this point in the history
…lient for each worker
  • Loading branch information
gustavogaldinoo committed Aug 22, 2024
1 parent 0c7987d commit 05d3668
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 15 deletions.
32 changes: 23 additions & 9 deletions experiment/measurer/measure_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,7 @@ def initialize_queues(self, manager) -> Tuple[Optional[str], Optional[str]]:
def _create_response_queue_subscription(self):
"""Creates a new Pub/Sub subscription for the response queue."""
try:
subscription = self.subscriber_client.create_subscription(
subscription = pubsub_v1.SubscriberClient().create_subscription(
request={
'name': self.subscription_path,
'topic': self.response_queue_topic_path
Expand All @@ -929,14 +929,6 @@ def _create_response_queue_subscription(self):

def start_workers(self, request_queue, response_queue, pool):
self._create_response_queue_subscription()
config = {
'request_queue_topic_id': self.request_queue_topic_id,
'response_queue_topic_id': self.response_queue_topic_id,
'region_coverage': self.region_coverage,
'project_id': self.project_id,
'experiment': self.experiment,
}
google_cloud_worker = measure_worker.GoogleCloudMeasureWorker(config)

# Since each worker is going to be in an infinite loop, we dont need
# result return. Workers' life scope will end automatically when
Expand All @@ -946,7 +938,29 @@ def start_workers(self, request_queue, response_queue, pool):
f'{self.measurers_cpus}'
' workers in google cloud measure manager')
logger.info(log_message)

config = {
'request_queue_topic_id': self.request_queue_topic_id,
'response_queue_topic_id': self.response_queue_topic_id,
'region_coverage': self.region_coverage,
'project_id': self.project_id,
'experiment': self.experiment,
}

# Create the worker request queue subscription once, before starting all
# workers
worker_request_queue_subscription = ('request-queue-subscription-'
f'{self.experiment}')
worker_subscription_path = self.subscriber_client.subscription_path(
self.project_id, worker_request_queue_subscription)
worker_request_queue_topic_path = self.subscriber_client.topic_path(
self.project_id, self.request_queue_topic_id)
measure_worker.GoogleCloudMeasureWorker.create_request_queue_subscription( # pylint: disable=line-too-long
worker_subscription_path, worker_request_queue_topic_path)

for _ in range(self.measurers_cpus):
google_cloud_worker = measure_worker.GoogleCloudMeasureWorker(
config)
pool.apply_async(google_cloud_worker.measure_worker_loop)

def get_result_from_response_queue(self, response_queue: str):
Expand Down
11 changes: 6 additions & 5 deletions experiment/measurer/measure_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,16 @@ def __init__(self, config: Dict):
f'{self.experiment}')
self.subscription_path = self.subscriber_client.subscription_path(
self.project_id, self.request_queue_subscription)
self._create_request_queue_subscription()

def _create_request_queue_subscription(self):
@staticmethod
def create_request_queue_subscription(subscription_path,
request_queue_topic_path):
"""Creates a new Pub/Sub subscription for the request queue."""
try:
subscription = self.subscriber_client.create_subscription(
subscription = pubsub_v1.SubscriberClient().create_subscription(
request={
'name': self.subscription_path,
'topic': self.request_queue_topic_path,
'name': subscription_path,
'topic': request_queue_topic_path,
'enable_message_ordering': True,
})
logger.info('Subscription %s created successfully.',
Expand Down
5 changes: 4 additions & 1 deletion experiment/measurer/test_measure_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,10 @@ def test_gcloud_measure_manager_get_snapshot_from_response_queue(


@mock.patch('experiment.measurer.measure_worker.GoogleCloudMeasureWorker')
def test_gcloud_measure_manager_start_workers(mock_gcloud_measure_worker,
@mock.patch('google.cloud.pubsub_v1.PublisherClient')
@mock.patch('google.cloud.pubsub_v1.SubscriberClient')
def test_gcloud_measure_manager_start_workers(_mock_subscriber, _mock_publisher,
mock_gcloud_measure_worker,
gcloud_measure_manager):
"""Tests that the start workers method is calling the measure worker loop
method, a number of times equal to the number of measurers CPUs."""
Expand Down

0 comments on commit 05d3668

Please sign in to comment.