diff --git a/experiment/measurer/measure_manager.py b/experiment/measurer/measure_manager.py index dfa4a7041..525a2c110 100644 --- a/experiment/measurer/measure_manager.py +++ b/experiment/measurer/measure_manager.py @@ -877,7 +877,9 @@ def __init__(self, measurers_cpus: Optional[int] = None): super().__init__(experiment, region_coverage) self.subscriber_client = pubsub_v1.SubscriberClient() - self.publisher_client = pubsub_v1.PublisherClient() + self.publisher_client = pubsub_v1.PublisherClient( + publisher_options=pubsub_v1.types.PublisherOptions( + enable_message_ordering=True)) self.project_id = cloud_project self.request_queue_topic_id = f'request-queue-topic-{self.experiment}' self.request_queue_topic_path = self.publisher_client.topic_path( diff --git a/experiment/measurer/measure_worker.py b/experiment/measurer/measure_worker.py index 61a85c61d..510bcbc1a 100644 --- a/experiment/measurer/measure_worker.py +++ b/experiment/measurer/measure_worker.py @@ -14,7 +14,7 @@ """Module for measurer workers logic.""" import time import json -from typing import Dict, Optional +from typing import Dict, Union, Optional import google.api_core.exceptions from google.cloud import pubsub_v1 from common import logs @@ -38,7 +38,16 @@ def get_task_from_request_queue(self): """"Get task from request queue""" raise NotImplementedError - def put_result_in_response_queue(self, measured_snapshot, request): + def process_measured_snapshot_result( + self, measured_snapshot: Optional[Snapshot], + request: measurer_datatypes.SnapshotMeasureRequest): + """Process a measured snapshot result, and return either a serialized + measured snapshot, or a serialized retry request, depending on whether a + corpus was found for that cycle or not""" + raise NotImplementedError + + def put_result_in_response_queue( + self, result: Union[measurer_datatypes.RetryRequest, Snapshot]): """Save measurement result in response queue, for the measure manager to retrieve""" raise NotImplementedError @@ -62,7 +71,9 @@ def measure_worker_loop(self): measured_snapshot = measure_manager.measure_snapshot_coverage( request.fuzzer, request.benchmark, request.trial_id, request.cycle, self.region_coverage) - self.put_result_in_response_queue(measured_snapshot, request) + result = self.process_measured_snapshot_result( + measured_snapshot, request) + self.put_result_in_response_queue(result) time.sleep(MEASUREMENT_TIMEOUT) @@ -82,17 +93,17 @@ def get_task_from_request_queue( request = self.request_queue.get(block=True) return request - def put_result_in_response_queue( - self, measured_snapshot: Optional[Snapshot], - request: measurer_datatypes.SnapshotMeasureRequest): + def process_measured_snapshot_result(self, measured_snapshot, request): if measured_snapshot: - logger.info('Put measured snapshot in response_queue') - self.response_queue.put(measured_snapshot) - else: - retry_request = measurer_datatypes.RetryRequest( - request.fuzzer, request.benchmark, request.trial_id, - request.cycle) - self.response_queue.put(retry_request) + return measured_snapshot + retry_request = measurer_datatypes.RetryRequest(request.fuzzer, + request.benchmark, + request.trial_id, + request.cycle) + return retry_request + + def put_result_in_response_queue(self, result): + self.response_queue.put(result) class GoogleCloudMeasureWorker(BaseMeasureWorker): # pylint: disable=too-many-instance-attributes @@ -129,7 +140,7 @@ def _create_request_queue_subscription(self): subscription.name) return subscription.name except google.api_core.exceptions.GoogleAPICallError as error: - logger.error('Error while creating request queue subscription: %s', + logger.error('Error while creating request queue subscription: %s.', error) return None @@ -155,18 +166,24 @@ def get_task_from_request_queue( return message.message.data - def put_result_in_response_queue(self, measured_snapshot, request): - topic_path = self.publisher_client.topic_path( - self.project_id, self.response_queue_topic_id) + def process_measured_snapshot_result(self, measured_snapshot, request): if measured_snapshot: - logger.info('Put measured snapshot in response_queue') - measured_snapshot_encoded = json.dumps( + measured_snapshot_serialized = json.dumps( measured_snapshot.__dict__).encode('utf-8') - self.publisher_client.publish(topic_path, measured_snapshot_encoded) - else: - retry_request = measurer_datatypes.SnapshotMeasureRequest( - request.fuzzer, request.benchmark, request.trial_id, - request.cycle) - retry_request_encoded = json.dumps( - retry_request._asdict()).encode('utf-8') - self.publisher_client.publish(topic_path, retry_request_encoded) + return measured_snapshot_serialized + + retry_request = measurer_datatypes.SnapshotMeasureRequest( + request.fuzzer, request.benchmark, request.trial_id, request.cycle) + retry_request_encoded = json.dumps( + retry_request._asdict()).encode('utf-8') + return retry_request_encoded + + def put_result_in_response_queue(self, result): + topic_path = self.publisher_client.topic_path( + self.project_id, self.response_queue_topic_id) + try: + self.publisher_client.publish(topic_path, result) + logger.info('Result published successfully in response queue.') + except google.api_core.exceptions.GoogleAPICallError as error: + logger.error('Error when publishing result in response queue %s.', + error)