Skip to content

Commit

Permalink
Refactoring put result to response queue method, and enable ordering …
Browse files Browse the repository at this point in the history
…messages
  • Loading branch information
gustavogaldinoo committed Aug 13, 2024
1 parent 632f23f commit 20d2f03
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 28 deletions.
4 changes: 3 additions & 1 deletion experiment/measurer/measure_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,9 @@ def __init__(self,
measurers_cpus: Optional[int] = None):
super().__init__(experiment, region_coverage)
self.subscriber_client = pubsub_v1.SubscriberClient()
self.publisher_client = pubsub_v1.PublisherClient()
self.publisher_client = pubsub_v1.PublisherClient(
publisher_options=pubsub_v1.types.PublisherOptions(
enable_message_ordering=True))
self.project_id = cloud_project
self.request_queue_topic_id = f'request-queue-topic-{self.experiment}'
self.request_queue_topic_path = self.publisher_client.topic_path(
Expand Down
71 changes: 44 additions & 27 deletions experiment/measurer/measure_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"""Module for measurer workers logic."""
import time
import json
from typing import Dict, Optional
from typing import Dict, Union, Optional
import google.api_core.exceptions
from google.cloud import pubsub_v1
from common import logs
Expand All @@ -38,7 +38,16 @@ def get_task_from_request_queue(self):
""""Get task from request queue"""
raise NotImplementedError

def put_result_in_response_queue(self, measured_snapshot, request):
def process_measured_snapshot_result(
self, measured_snapshot: Optional[Snapshot],
request: measurer_datatypes.SnapshotMeasureRequest):
"""Process a measured snapshot result, and return either a serialized
measured snapshot, or a serialized retry request, depending on whether a
corpus was found for that cycle or not"""
raise NotImplementedError

def put_result_in_response_queue(
self, result: Union[measurer_datatypes.RetryRequest, Snapshot]):
"""Save measurement result in response queue, for the measure manager to
retrieve"""
raise NotImplementedError
Expand All @@ -62,7 +71,9 @@ def measure_worker_loop(self):
measured_snapshot = measure_manager.measure_snapshot_coverage(
request.fuzzer, request.benchmark, request.trial_id,
request.cycle, self.region_coverage)
self.put_result_in_response_queue(measured_snapshot, request)
result = self.process_measured_snapshot_result(
measured_snapshot, request)
self.put_result_in_response_queue(result)
time.sleep(MEASUREMENT_TIMEOUT)


Expand All @@ -82,17 +93,17 @@ def get_task_from_request_queue(
request = self.request_queue.get(block=True)
return request

def put_result_in_response_queue(
self, measured_snapshot: Optional[Snapshot],
request: measurer_datatypes.SnapshotMeasureRequest):
def process_measured_snapshot_result(self, measured_snapshot, request):
if measured_snapshot:
logger.info('Put measured snapshot in response_queue')
self.response_queue.put(measured_snapshot)
else:
retry_request = measurer_datatypes.RetryRequest(
request.fuzzer, request.benchmark, request.trial_id,
request.cycle)
self.response_queue.put(retry_request)
return measured_snapshot
retry_request = measurer_datatypes.RetryRequest(request.fuzzer,
request.benchmark,
request.trial_id,
request.cycle)
return retry_request

def put_result_in_response_queue(self, result):
self.response_queue.put(result)


class GoogleCloudMeasureWorker(BaseMeasureWorker): # pylint: disable=too-many-instance-attributes
Expand Down Expand Up @@ -129,7 +140,7 @@ def _create_request_queue_subscription(self):
subscription.name)
return subscription.name
except google.api_core.exceptions.GoogleAPICallError as error:
logger.error('Error while creating request queue subscription: %s',
logger.error('Error while creating request queue subscription: %s.',
error)
return None

Expand All @@ -155,18 +166,24 @@ def get_task_from_request_queue(

return message.message.data

def put_result_in_response_queue(self, measured_snapshot, request):
topic_path = self.publisher_client.topic_path(
self.project_id, self.response_queue_topic_id)
def process_measured_snapshot_result(self, measured_snapshot, request):
if measured_snapshot:
logger.info('Put measured snapshot in response_queue')
measured_snapshot_encoded = json.dumps(
measured_snapshot_serialized = json.dumps(
measured_snapshot.__dict__).encode('utf-8')
self.publisher_client.publish(topic_path, measured_snapshot_encoded)
else:
retry_request = measurer_datatypes.SnapshotMeasureRequest(
request.fuzzer, request.benchmark, request.trial_id,
request.cycle)
retry_request_encoded = json.dumps(
retry_request._asdict()).encode('utf-8')
self.publisher_client.publish(topic_path, retry_request_encoded)
return measured_snapshot_serialized

retry_request = measurer_datatypes.SnapshotMeasureRequest(
request.fuzzer, request.benchmark, request.trial_id, request.cycle)
retry_request_encoded = json.dumps(
retry_request._asdict()).encode('utf-8')
return retry_request_encoded

def put_result_in_response_queue(self, result):
topic_path = self.publisher_client.topic_path(
self.project_id, self.response_queue_topic_id)
try:
self.publisher_client.publish(topic_path, result)
logger.info('Result published successfully in response queue.')
except google.api_core.exceptions.GoogleAPICallError as error:
logger.error('Error when publishing result in response queue %s.',
error)

0 comments on commit 20d2f03

Please sign in to comment.