From d4c1d2eab04994743e371e2b80f39ec4eb242f80 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Fri, 10 May 2019 18:08:07 +0200 Subject: [PATCH] Add system test for PubSub max_messages setting --- pubsub/tests/system.py | 101 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 101 insertions(+) diff --git a/pubsub/tests/system.py b/pubsub/tests/system.py index e6001f8e7801..688884c418a4 100644 --- a/pubsub/tests/system.py +++ b/pubsub/tests/system.py @@ -15,6 +15,7 @@ from __future__ import absolute_import import datetime +import itertools import threading import time @@ -23,6 +24,7 @@ import google.auth from google.cloud import pubsub_v1 +from google.cloud.pubsub_v1 import types from test_utils.system import unique_resource_id @@ -178,6 +180,82 @@ def test_subscribe_to_messages_async_callbacks( future.cancel() +class TestStreamingPull(object): + def test_streaming_pull_max_messages( + self, publisher, topic_path, subscriber, subscription_path, cleanup + ): + # Make sure the topic and subscription get deleted. + cleanup.append((publisher.delete_topic, topic_path)) + cleanup.append((subscriber.delete_subscription, subscription_path)) + + # create a topic and subscribe to it + publisher.create_topic(topic_path) + subscriber.create_subscription(subscription_path, topic_path) + + batch_sizes = (7, 4, 8, 2, 10, 1, 3, 8, 6, 1) # total: 50 + self._publish_messages(publisher, topic_path, batch_sizes=batch_sizes) + + # now subscribe and do the main part, check for max pending messages + total_messages = sum(batch_sizes) + flow_control = types.FlowControl(max_messages=5) + callback = StreamingPullCallback(processing_time=1) + + future = subscriber.subscribe( + subscription_path, callback, flow_control=flow_control + ) + + # Expected time to process all messages in ideal case: + # total_messages / FlowControl.max_messages * processing_time + # + # With total=50, max messages=5, and processing_time=1 this amounts to + # 10 seconds (+ overhead), thus a full minute should be more than enough + # for the processing to complete. If not, fail the test with a timeout. + for _ in six.moves.range(60): + time.sleep(1) + if callback.completed_calls >= total_messages: + break + else: + pytest.fail( + "Timeout: receiving/processing streamed messages took too long." + ) + + try: + # All messages should have been processed exactly once, and no more + # than max_messages simultaneously at any time. + assert callback.completed_calls == total_messages + assert sorted(callback.seen_message_ids) == list( + range(1, total_messages + 1) + ) + assert callback.max_pending_ack <= flow_control.max_messages + finally: + future.cancel() + + def _publish_messages(self, publisher, topic_path, batch_sizes): + """Publish ``count`` messages in batches and wait until completion.""" + publish_futures = [] + msg_counter = itertools.count(start=1) + + for batch_size in batch_sizes: + msg_batch = self._make_messages(count=batch_size) + for msg in msg_batch: + future = publisher.publish( + topic_path, msg, seq_num=str(next(msg_counter)) + ) + publish_futures.append(future) + time.sleep(0.1) + + # wait untill all messages have been successfully published + for future in publish_futures: + future.result(timeout=30) + + def _make_messages(self, count): + messages = [ + u"message {}/{}".format(i, count).encode("utf-8") + for i in range(1, count + 1) + ] + return messages + + class AckCallback(object): def __init__(self): self.calls = 0 @@ -207,3 +285,26 @@ def __call__(self, message): # ``calls`` is incremented to do it. self.call_times.append(now) self.calls += 1 + + +class StreamingPullCallback(object): + def __init__(self, processing_time): + self._lock = threading.Lock() + self._processing_time = processing_time + self._pending_ack = 0 + self.max_pending_ack = 0 + self.completed_calls = 0 + self.seen_message_ids = [] + + def __call__(self, message): + with self._lock: + self._pending_ack += 1 + self.max_pending_ack = max(self.max_pending_ack, self._pending_ack) + self.seen_message_ids.append(int(message.attributes["seq_num"])) + + time.sleep(self._processing_time) + + with self._lock: + self._pending_ack -= 1 + message.ack() + self.completed_calls += 1