From c6dc8fdf598e435f8005d338788bd3bfdb019197 Mon Sep 17 00:00:00 2001 From: Thea Flowers Date: Tue, 11 Sep 2018 14:04:32 -0700 Subject: [PATCH] Fix race condition in recv()'s usage of self.call. --- .../pubsub_v1/subscriber/_protocol/bidi.py | 38 +----- .../_protocol/streaming_pull_manager.py | 3 +- pubsub/test.py | 110 ------------------ 3 files changed, 4 insertions(+), 147 deletions(-) delete mode 100644 pubsub/test.py diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/bidi.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/bidi.py index 92c268000cab1..7c995c57652e2 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/bidi.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/bidi.py @@ -207,21 +207,8 @@ def open(self): else: call.add_done_callback(self._on_call_done) - import time - import random - from unittest import mock - from google.cloud.pubsub_v1.proto import pubsub_pb2 - - mock_call = mock.MagicMock(wraps=call) - - def mock_next(): - #time.sleep(random.uniform(0.0, 1.0)) - return pubsub_pb2.StreamingPullResponse() - - mock_call.__next__.side_effect = mock_next - self._request_generator = request_generator - self.call = mock_call + self.call = call def close(self): """Closes the stream.""" @@ -252,7 +239,6 @@ def send(self, request): # to mean something semantically different. if self.call.is_active(): self._request_queue.put(request) - pass else: # calling next should cause the call to raise. next(self.call) @@ -356,7 +342,7 @@ def _reopen(self): # Another thread already managed to re-open this stream. if self.call is not None and self.call.is_active(): _LOGGER.debug('Stream was already re-established.') - #return + return self.call = None # Request generator should exit cleanly since the RPC its bound to @@ -454,8 +440,7 @@ def _recv(self): return next(call) def recv(self): - return self._recoverable( - super(ResumableBidiRpc, self).recv) + return self._recoverable(self._recv) @property def is_active(self): @@ -471,16 +456,6 @@ def is_active(self): return self.call is not None and not self._finalized -def meanie_thread_main(rpc): - import time - import random - - while True: - time.sleep(random.uniform(0, 0.1)) - print('Wahaha') - rpc._reopen() - - class BackgroundConsumer(object): """A bi-directional stream consumer that runs in a separate thread. @@ -536,13 +511,6 @@ def _thread_main(self): self._bidi_rpc.add_done_callback(self._on_call_done) self._bidi_rpc.open() - import functools - meanie_thread = threading.Thread( - name='meanie thread', - target=functools.partial(meanie_thread_main, self._bidi_rpc)) - meanie_thread.deamon = True - meanie_thread.start() - while self._bidi_rpc.is_active: # Do not allow the paused status to change at all during this # section. There is a condition where we could be resumed diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 1409dccc2f95f..17d1a2cad1665 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -263,8 +263,7 @@ def send(self, request): """Queue a request to be sent to the RPC.""" if self._UNARY_REQUESTS: try: - pass - #self._send_unary_request(request) + self._send_unary_request(request) except exceptions.GoogleAPICallError as exc: _LOGGER.debug( 'Exception while sending unary RPC. This is typically ' diff --git a/pubsub/test.py b/pubsub/test.py deleted file mode 100644 index 83757f2570d36..0000000000000 --- a/pubsub/test.py +++ /dev/null @@ -1,110 +0,0 @@ - -import logging -import random -import threading -import time - -import google.api_core.exceptions -from google.cloud import pubsub - -message_count = 0 -message_count_lock = threading.Lock() - - -def monitor(future, interval=10): - import datetime - import textwrap - import time - - manager = future._manager - start_time = datetime.datetime.now() - - while not future.done(): - run_time = datetime.datetime.now() - start_time - rate = message_count / run_time.total_seconds() - status = textwrap.dedent("""\ - Messages processed: {message_count} - Rate: {rate:.2f} Messages/second - Run time: {run_time} - Load: {load:.2f} - p99 ack: {ack_deadline} seconds - Leased Messages: {leased_messages} - Executor queue size: {work_queue} - Callback queue size: {callback_size} - Request queue size: {queue_size} - """).format( - message_count=message_count, - rate=rate, - run_time=run_time, - ack_deadline=manager.ack_deadline, - load=manager.load, - leased_messages=manager.leaser.message_count, - work_queue=manager._scheduler._executor._work_queue.qsize(), - callback_size=manager._scheduler.queue.qsize(), - queue_size=manager._rpc.pending_requests - ) - - #print('===== Subscriber Monitor =====') - #print(status) - - try: - time.sleep(interval) - except KeyboardInterrupt: - future.cancel() - - print('waiting on future...') - print(future.result()) - print('clean exit') - - -def incr_count(): - # Note: this should be done within a lock as multiple threads mess with - # this, however, using a lock slows down the program enough to possibly - # affect the results. Consider this count as incredibly inaccurate and - # best-effort. - global message_count - message_count += 1 - return message_count - - -def callback(message): - incr_count() - - # Sleep a random amount of time to simulate a heterogenous load. - time.sleep(random.uniform(5, 10)) - - message.ack() - - -def main(): - # Enabling logging will output a *ton* of stuff, but it might be helpful. - logging.basicConfig(level=logging.DEBUG) - logging.getLogger( - 'google.cloud.pubsub_v1.subscriber._protocol.leaser').setLevel('INFO') - - subscriber = pubsub.SubscriberClient() - topic = 'projects/{project_id}/topics/{topic}'.format( - project_id='python-docs-samples-tests', - topic='repro-topic', # Set this to something appropriate. - ) - subscription = 'projects/{project_id}/subscriptions/{sub}'.format( - project_id='python-docs-samples-tests', - sub='repro-sub2', # Set this to something appropriate. - ) - - try: - subscriber.create_subscription(subscription, topic) - except google.api_core.exceptions.AlreadyExists: - print('subscription exists') - pass - - future = subscriber.subscribe( - subscription=subscription, - callback=callback) - - print('listening') - monitor(future) - - -if __name__ == '__main__': - main()