Skip to content

Commit

Permalink
Add system test for PubSub max_messages setting
Browse files Browse the repository at this point in the history
  • Loading branch information
plamut committed May 10, 2019
1 parent a9a329a commit d4c1d2e
Showing 1 changed file with 101 additions and 0 deletions.
101 changes: 101 additions & 0 deletions pubsub/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from __future__ import absolute_import

import datetime
import itertools
import threading
import time

Expand All @@ -23,6 +24,7 @@

import google.auth
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1 import types


from test_utils.system import unique_resource_id
Expand Down Expand Up @@ -178,6 +180,82 @@ def test_subscribe_to_messages_async_callbacks(
future.cancel()


class TestStreamingPull(object):
def test_streaming_pull_max_messages(
self, publisher, topic_path, subscriber, subscription_path, cleanup
):
# Make sure the topic and subscription get deleted.
cleanup.append((publisher.delete_topic, topic_path))
cleanup.append((subscriber.delete_subscription, subscription_path))

# create a topic and subscribe to it
publisher.create_topic(topic_path)
subscriber.create_subscription(subscription_path, topic_path)

batch_sizes = (7, 4, 8, 2, 10, 1, 3, 8, 6, 1) # total: 50
self._publish_messages(publisher, topic_path, batch_sizes=batch_sizes)

# now subscribe and do the main part, check for max pending messages
total_messages = sum(batch_sizes)
flow_control = types.FlowControl(max_messages=5)
callback = StreamingPullCallback(processing_time=1)

future = subscriber.subscribe(
subscription_path, callback, flow_control=flow_control
)

# Expected time to process all messages in ideal case:
# total_messages / FlowControl.max_messages * processing_time
#
# With total=50, max messages=5, and processing_time=1 this amounts to
# 10 seconds (+ overhead), thus a full minute should be more than enough
# for the processing to complete. If not, fail the test with a timeout.
for _ in six.moves.range(60):
time.sleep(1)
if callback.completed_calls >= total_messages:
break
else:
pytest.fail(
"Timeout: receiving/processing streamed messages took too long."
)

try:
# All messages should have been processed exactly once, and no more
# than max_messages simultaneously at any time.
assert callback.completed_calls == total_messages
assert sorted(callback.seen_message_ids) == list(
range(1, total_messages + 1)
)
assert callback.max_pending_ack <= flow_control.max_messages
finally:
future.cancel()

def _publish_messages(self, publisher, topic_path, batch_sizes):
"""Publish ``count`` messages in batches and wait until completion."""
publish_futures = []
msg_counter = itertools.count(start=1)

for batch_size in batch_sizes:
msg_batch = self._make_messages(count=batch_size)
for msg in msg_batch:
future = publisher.publish(
topic_path, msg, seq_num=str(next(msg_counter))
)
publish_futures.append(future)
time.sleep(0.1)

# wait untill all messages have been successfully published
for future in publish_futures:
future.result(timeout=30)

def _make_messages(self, count):
messages = [
u"message {}/{}".format(i, count).encode("utf-8")
for i in range(1, count + 1)
]
return messages


class AckCallback(object):
def __init__(self):
self.calls = 0
Expand Down Expand Up @@ -207,3 +285,26 @@ def __call__(self, message):
# ``calls`` is incremented to do it.
self.call_times.append(now)
self.calls += 1


class StreamingPullCallback(object):
def __init__(self, processing_time):
self._lock = threading.Lock()
self._processing_time = processing_time
self._pending_ack = 0
self.max_pending_ack = 0
self.completed_calls = 0
self.seen_message_ids = []

def __call__(self, message):
with self._lock:
self._pending_ack += 1
self.max_pending_ack = max(self.max_pending_ack, self._pending_ack)
self.seen_message_ids.append(int(message.attributes["seq_num"]))

time.sleep(self._processing_time)

with self._lock:
self._pending_ack -= 1
message.ack()
self.completed_calls += 1

0 comments on commit d4c1d2e

Please sign in to comment.