Skip to content

Commit

Permalink
Adding tests, enabling message ordering, and passing ordering key as …
Browse files Browse the repository at this point in the history
…correct type (str)
  • Loading branch information
gustavogaldinoo committed Aug 13, 2024
1 parent 20d2f03 commit a036561
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 20 deletions.
6 changes: 3 additions & 3 deletions experiment/measurer/measure_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -985,9 +985,9 @@ def put_task_in_request_queue(
# Convert message data to bytes
message_as_bytes = self._task_to_bytes(task)
# Build the Pub/Sub message object
future = self.publisher_client.publish(topic_path,
message_as_bytes,
ordering_key=task.cycle)
future = self.publisher_client.publish(topic=topic_path,
data=message_as_bytes,
ordering_key=str(task.cycle))
message_id = future.result() # Get the published message ID
logger.info(
'Manager successfully published task with message ID %s to %s.',
Expand Down
3 changes: 2 additions & 1 deletion experiment/measurer/measure_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ def _create_request_queue_subscription(self):
subscription = self.subscriber_client.create_subscription(
request={
'name': self.subscription_path,
'topic': self.request_queue_topic_path
'topic': self.request_queue_topic_path,
'enable_message_ordering': True,
})
logger.info('Subscription %s created successfully.',
subscription.name)
Expand Down
42 changes: 26 additions & 16 deletions experiment/measurer/test_measure_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,34 @@ def local_measure_worker():
return measure_worker.LocalMeasureWorker(config)


def test_put_snapshot_in_response_queue(local_measure_worker): # pylint: disable=redefined-outer-name
"""Tests the scenario where measure_snapshot is not None, so snapshot is put
in response_queue"""
def test_process_measured_snapshot_as_retry_request(local_measure_worker): # pylint: disable=redefined-outer-name
""""Tests the scenario where measure_snapshot is None, so task needs to be
retried"""
request = measurer_datatypes.SnapshotMeasureRequest('fuzzer', 'benchmark',
1, 0)
snapshot = None
result = local_measure_worker.process_measured_snapshot_result(
snapshot, request)
assert isinstance(result, measurer_datatypes.RetryRequest)


def test_process_measured_snapshot_as_snapshot(local_measure_worker): # pylint: disable=redefined-outer-name
""""Tests the scenario where measure_snapshot is not None, so snapshot is
returned"""
request = measurer_datatypes.SnapshotMeasureRequest('fuzzer', 'benchmark',
1, 0)
snapshot = Snapshot(trial_id=1)
local_measure_worker.put_result_in_response_queue(snapshot, request)
response_queue = local_measure_worker.response_queue
assert response_queue.qsize() == 1
assert isinstance(response_queue.get(), Snapshot)
result = local_measure_worker.process_measured_snapshot_result(
snapshot, request)
assert isinstance(result, Snapshot)


def test_put_retry_in_response_queue(local_measure_worker): # pylint: disable=redefined-outer-name
"""Tests the scenario where measure_snapshot is None, so task needs to be
retried"""
request = measurer_datatypes.RetryRequest('fuzzer', 'benchmark', 1, 0)
snapshot = None
local_measure_worker.put_result_in_response_queue(snapshot, request)
response_queue = local_measure_worker.response_queue
assert response_queue.qsize() == 1
assert isinstance(response_queue.get(), measurer_datatypes.RetryRequest)
def test_put_snapshot_in_response_queue(local_measure_worker): # pylint: disable=redefined-outer-name
"""Tests if result is being put in response queue as expected"""
request = measurer_datatypes.SnapshotMeasureRequest('fuzzer', 'benchmark',
1, 0)
snapshot = Snapshot(trial_id=1)
result = local_measure_worker.process_measured_snapshot_result(
snapshot, request)
local_measure_worker.put_result_in_response_queue(result)
assert local_measure_worker.response_queue.qsize() == 1

0 comments on commit a036561

Please sign in to comment.