Skip to content

Commit

Permalink
Add system test for PubSub max_messages setting
Browse files Browse the repository at this point in the history
  • Loading branch information
plamut committed May 17, 2019
1 parent 9b01060 commit 7e5b9d7
Showing 1 changed file with 113 additions and 0 deletions.
113 changes: 113 additions & 0 deletions pubsub/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from __future__ import absolute_import

import datetime
import itertools
import threading
import time

Expand All @@ -24,6 +25,9 @@

import google.auth
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1 import exceptions
from google.cloud.pubsub_v1 import futures
from google.cloud.pubsub_v1 import types


from test_utils.system import unique_resource_id
Expand Down Expand Up @@ -206,6 +210,85 @@ class CallbackError(Exception):
with pytest.raises(CallbackError):
future.result(timeout=30)

def test_streaming_pull_max_messages(
self, publisher, topic_path, subscriber, subscription_path, cleanup
):
# Make sure the topic and subscription get deleted.
cleanup.append((publisher.delete_topic, topic_path))
cleanup.append((subscriber.delete_subscription, subscription_path))

# create a topic and subscribe to it
publisher.create_topic(topic_path)
subscriber.create_subscription(subscription_path, topic_path)

batch_sizes = (7, 4, 8, 2, 10, 1, 3, 8, 6, 1) # total: 50
self._publish_messages(publisher, topic_path, batch_sizes=batch_sizes)

# now subscribe and do the main part, check for max pending messages
total_messages = sum(batch_sizes)
flow_control = types.FlowControl(max_messages=5)
callback = StreamingPullCallback(
processing_time=1, resolve_at_msg_count=total_messages
)

subscription_future = subscriber.subscribe(
subscription_path, callback, flow_control=flow_control
)

# Expected time to process all messages in ideal case:
# (total_messages / FlowControl.max_messages) * processing_time
#
# With total=50, max messages=5, and processing_time=1 this amounts to
# 10 seconds (+ overhead), thus a full minute should be more than enough
# for the processing to complete. If not, fail the test with a timeout.
try:
callback.done_future.result(timeout=60)
except exceptions.TimeoutError:
pytest.fail(
"Timeout: receiving/processing streamed messages took too long."
)

# The callback future gets resolved once total_messages have been processed,
# but we want to wait for just a little bit longer to possibly catch cases
# when the callback gets invoked *more* than total_messages times.
time.sleep(3)

try:
# All messages should have been processed exactly once, and no more
# than max_messages simultaneously at any time.
assert callback.completed_calls == total_messages
assert sorted(callback.seen_message_ids) == list(
range(1, total_messages + 1)
)
assert callback.max_pending_ack <= flow_control.max_messages
finally:
subscription_future.cancel() # trigger clean shutdown

def _publish_messages(self, publisher, topic_path, batch_sizes):
"""Publish ``count`` messages in batches and wait until completion."""
publish_futures = []
msg_counter = itertools.count(start=1)

for batch_size in batch_sizes:
msg_batch = self._make_messages(count=batch_size)
for msg in msg_batch:
future = publisher.publish(
topic_path, msg, seq_num=str(next(msg_counter))
)
publish_futures.append(future)
time.sleep(0.1)

# wait untill all messages have been successfully published
for future in publish_futures:
future.result(timeout=30)

def _make_messages(self, count):
messages = [
u"message {}/{}".format(i, count).encode("utf-8")
for i in range(1, count + 1)
]
return messages


class AckCallback(object):
def __init__(self):
Expand Down Expand Up @@ -236,3 +319,33 @@ def __call__(self, message):
# ``calls`` is incremented to do it.
self.call_times.append(now)
self.calls += 1


class StreamingPullCallback(object):
def __init__(self, processing_time, resolve_at_msg_count):
self._lock = threading.Lock()
self._processing_time = processing_time
self._pending_ack = 0
self.max_pending_ack = 0
self.completed_calls = 0
self.seen_message_ids = []

self._resolve_at_msg_count = resolve_at_msg_count
self.done_future = futures.Future()

def __call__(self, message):
with self._lock:
self._pending_ack += 1
self.max_pending_ack = max(self.max_pending_ack, self._pending_ack)
self.seen_message_ids.append(int(message.attributes["seq_num"]))

time.sleep(self._processing_time)

with self._lock:
self._pending_ack -= 1
message.ack()
self.completed_calls += 1

if self.completed_calls >= self._resolve_at_msg_count:
if not self.done_future.done():
self.done_future.set_result(None)

0 comments on commit 7e5b9d7

Please sign in to comment.