From dad43aa135141b2a23784c35306041e2d1489f93 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Tue, 1 Oct 2019 11:20:48 +0300 Subject: [PATCH 01/15] feat(pubsub): add stop method --- pubsub/google/cloud/pubsub_v1/futures.py | 4 +-- .../pubsub_v1/publisher/_batch/thread.py | 11 +++++++ .../cloud/pubsub_v1/publisher/client.py | 19 ++++++++++++ .../pubsub_v1/publisher/batch/test_thread.py | 14 +++++++++ .../publisher/test_publisher_client.py | 29 +++++++++++++++++++ 5 files changed, 74 insertions(+), 3 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/futures.py b/pubsub/google/cloud/pubsub_v1/futures.py index 0d7ba7f9bf52..ba861e40c653 100644 --- a/pubsub/google/cloud/pubsub_v1/futures.py +++ b/pubsub/google/cloud/pubsub_v1/futures.py @@ -74,9 +74,7 @@ def running(self): bool: ``True`` if this method has not yet completed, or ``False`` if it has completed. """ - if self.done(): - return False - return True + return not self.done() def done(self): """Return True the future is done, False otherwise. diff --git a/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py b/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py index 117ee12b8463..559ee6852bfd 100644 --- a/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py +++ b/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py @@ -137,6 +137,17 @@ def status(self): """ return self._status + def wait(self): + """If commit in progress, waits until all of the futures resolved. + + .. note:: + + This method blocks until all futures of this batch resolved. + """ + if self._status in (base.BatchStatus.STARTING, base.BatchStatus.IN_PROGRESS): + for future in self._futures: + future.result() + def commit(self): """Actually publish all of the messages on the active batch. diff --git a/pubsub/google/cloud/pubsub_v1/publisher/client.py b/pubsub/google/cloud/pubsub_v1/publisher/client.py index 05a4161e889a..f20d5808c7e8 100644 --- a/pubsub/google/cloud/pubsub_v1/publisher/client.py +++ b/pubsub/google/cloud/pubsub_v1/publisher/client.py @@ -274,3 +274,22 @@ def publish(self, topic, data, **attrs): batch = self._batch(topic, create=True) return future + + def stop(self): + """Immediately publish all outstanding batches. + + This asynchronously pushes all outstanding messages + and waits until all futures resolved. Method should be + invoked prior to deleting this Client object in order + to ensure that no pending messages are lost. + + .. note:: + + This method blocks until all futures of all + batches resolved. + """ + for topic in self._batches: + self._batches[topic].commit() + + for topic in self._batches: + self._batches[topic].wait() diff --git a/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py b/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py index 60425e748043..f5922c27cf36 100644 --- a/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py +++ b/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py @@ -252,6 +252,20 @@ def test_block__commmit_api_error(): assert future.exception() == error +def test_wait(): + batch = create_batch() + with mock.patch( + "google.cloud.pubsub_v1.publisher.futures.Future.result", return_value=True + ) as result_mock: + futures = (batch.publish({"data": b"msg"}),) + + batch._status = BatchStatus.IN_PROGRESS + futures[0]._completed.set() + batch.wait() + + result_mock.assert_called() + + def test_block__commmit_retry_error(): batch = create_batch() futures = ( diff --git a/pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py index 05e4c8c67209..ae7d764d44c1 100644 --- a/pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -201,6 +201,35 @@ def test_publish_attrs_type_error(): client.publish(topic, b"foo", answer=42) +def test_stop(): + creds = mock.Mock(spec=credentials.Credentials) + client = publisher.Client(credentials=creds) + + batch = client._batch("topic1", autocommit=False) + batch2 = client._batch("topic2", autocommit=False) + + pubsub_msg = types.PubsubMessage(data=b"msg") + + cp = mock.patch.object(batch, "commit") + wp = mock.patch.object(batch, "wait") + cp2 = mock.patch.object(batch2, "commit") + wp2 = mock.patch.object(batch2, "wait") + + with cp as c_mock, cp2 as c_mock2, wp as w_mock, wp2 as w_mock2: + batch.publish(pubsub_msg) + batch2.publish(pubsub_msg) + + client.stop() + + # check if commit() called + c_mock.assert_called() + c_mock2.assert_called() + + # check if wait() called + w_mock.assert_called() + w_mock2.assert_called() + + def test_gapic_instance_method(): creds = mock.Mock(spec=credentials.Credentials) client = publisher.Client(credentials=creds) From ec7677c5d5d4164ce35336e6f742bbebae324f89 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Tue, 1 Oct 2019 11:55:58 +0300 Subject: [PATCH 02/15] feat(pubsub): add unit-test for stop method --- .../unit/pubsub_v1/publisher/batch/test_thread.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py b/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py index f5922c27cf36..87f331a3a1f6 100644 --- a/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py +++ b/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py @@ -266,6 +266,17 @@ def test_wait(): result_mock.assert_called() +def test_wait_not_started(): + batch = create_batch() + with mock.patch( + "google.cloud.pubsub_v1.publisher.futures.Future.result", return_value=True + ) as result_mock: + batch.publish({"data": b"msg"}) + batch.wait() + + result_mock.assert_not_called() + + def test_block__commmit_retry_error(): batch = create_batch() futures = ( From 93b5ec8cb02e8e2d1c7d934aa27963212fead446 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Tue, 22 Oct 2019 16:27:04 +0300 Subject: [PATCH 03/15] Add comment for _futures. Change wait() conditions. --- pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py b/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py index 559ee6852bfd..ca56af085625 100644 --- a/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py +++ b/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py @@ -75,6 +75,9 @@ def __init__(self, client, topic, settings, autocommit=True): # These members are all communicated between threads; ensure that # any writes to them use the "state lock" to remain atomic. self._futures = [] + # _futures list should remain unchanged after batch + # status changed from ACCEPTING_MESSAGES to any other + # in order to avoid race conditions self._messages = [] self._size = 0 self._status = base.BatchStatus.ACCEPTING_MESSAGES @@ -138,13 +141,13 @@ def status(self): return self._status def wait(self): - """If commit in progress, waits until all of the futures resolved. + """If commit is in progress, waits until all of the futures resolved. .. note:: This method blocks until all futures of this batch resolved. """ - if self._status in (base.BatchStatus.STARTING, base.BatchStatus.IN_PROGRESS): + if self._status != base.BatchStatus.ACCEPTING_MESSAGES: for future in self._futures: future.result() From a0d82e67d04dfc7c6b9add12939936c1f35e14ea Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Wed, 23 Oct 2019 18:36:32 +0300 Subject: [PATCH 04/15] Move attribute comment --- pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py b/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py index ca56af085625..865ee681d644 100644 --- a/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py +++ b/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py @@ -74,10 +74,10 @@ def __init__(self, client, topic, settings, autocommit=True): self._state_lock = threading.Lock() # These members are all communicated between threads; ensure that # any writes to them use the "state lock" to remain atomic. - self._futures = [] # _futures list should remain unchanged after batch # status changed from ACCEPTING_MESSAGES to any other # in order to avoid race conditions + self._futures = [] self._messages = [] self._size = 0 self._status = base.BatchStatus.ACCEPTING_MESSAGES From 210abf0b4cf93f00fd3c2eb2993f474a4221d414 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Fri, 25 Oct 2019 13:05:44 +0300 Subject: [PATCH 05/15] Make stop() method non-blocking. --- .../pubsub_v1/publisher/_batch/thread.py | 11 ------- .../cloud/pubsub_v1/publisher/client.py | 30 ++++++++++++------- .../pubsub_v1/publisher/batch/test_thread.py | 25 ---------------- .../publisher/test_publisher_client.py | 18 +++++------ 4 files changed, 28 insertions(+), 56 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py b/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py index 865ee681d644..726e93166cda 100644 --- a/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py +++ b/pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py @@ -140,17 +140,6 @@ def status(self): """ return self._status - def wait(self): - """If commit is in progress, waits until all of the futures resolved. - - .. note:: - - This method blocks until all futures of this batch resolved. - """ - if self._status != base.BatchStatus.ACCEPTING_MESSAGES: - for future in self._futures: - future.result() - def commit(self): """Actually publish all of the messages on the active batch. diff --git a/pubsub/google/cloud/pubsub_v1/publisher/client.py b/pubsub/google/cloud/pubsub_v1/publisher/client.py index f20d5808c7e8..cd6db9db31f3 100644 --- a/pubsub/google/cloud/pubsub_v1/publisher/client.py +++ b/pubsub/google/cloud/pubsub_v1/publisher/client.py @@ -134,6 +134,7 @@ def __init__(self, batch_settings=(), **kwargs): # messages. One batch exists for each topic. self._batch_lock = self._batch_class.make_lock() self._batches = {} + self._is_stopped = False @classmethod def from_service_account_file(cls, filename, batch_settings=(), **kwargs): @@ -242,7 +243,14 @@ def publish(self, topic, data, **attrs): instance that conforms to Python Standard library's :class:`~concurrent.futures.Future` interface (but not an instance of that class). + + Raises: + ValueError: + If called after publisher has been stopped + by a `stop()` method call. """ + if self._is_stopped: + raise ValueError("Cannot publish on a stopped publisher.") # Sanity check: Is the data being sent as a bytestring? # If it is literally anything else, complain loudly about it. if not isinstance(data, six.binary_type): @@ -276,20 +284,22 @@ def publish(self, topic, data, **attrs): return future def stop(self): - """Immediately publish all outstanding batches. + """Immediately publish all outstanding messages. - This asynchronously pushes all outstanding messages - and waits until all futures resolved. Method should be - invoked prior to deleting this Client object in order - to ensure that no pending messages are lost. + Asynchronously sends all outstanding messages and + prevents future calls to `publish()`. Method should + be invoked prior to deleting this `Client()` object + in order to ensure that no pending messages are lost. .. note:: - This method blocks until all futures of all - batches resolved. + This method is non-blocking. Use `Future()` objects + returned by `publish()` to make sure all messages + sent or failed to. """ - for topic in self._batches: - self._batches[topic].commit() + if self._is_stopped: + raise ValueError("Cannot stop a publisher already stopped.") + self._is_stopped = True for topic in self._batches: - self._batches[topic].wait() + self._batches[topic].commit() diff --git a/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py b/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py index 87f331a3a1f6..60425e748043 100644 --- a/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py +++ b/pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py @@ -252,31 +252,6 @@ def test_block__commmit_api_error(): assert future.exception() == error -def test_wait(): - batch = create_batch() - with mock.patch( - "google.cloud.pubsub_v1.publisher.futures.Future.result", return_value=True - ) as result_mock: - futures = (batch.publish({"data": b"msg"}),) - - batch._status = BatchStatus.IN_PROGRESS - futures[0]._completed.set() - batch.wait() - - result_mock.assert_called() - - -def test_wait_not_started(): - batch = create_batch() - with mock.patch( - "google.cloud.pubsub_v1.publisher.futures.Future.result", return_value=True - ) as result_mock: - batch.publish({"data": b"msg"}) - batch.wait() - - result_mock.assert_not_called() - - def test_block__commmit_retry_error(): batch = create_batch() futures = ( diff --git a/pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py index ae7d764d44c1..45c1d131b79e 100644 --- a/pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -210,24 +210,22 @@ def test_stop(): pubsub_msg = types.PubsubMessage(data=b"msg") - cp = mock.patch.object(batch, "commit") - wp = mock.patch.object(batch, "wait") - cp2 = mock.patch.object(batch2, "commit") - wp2 = mock.patch.object(batch2, "wait") + patch = mock.patch.object(batch, "commit") + patch2 = mock.patch.object(batch2, "commit") - with cp as c_mock, cp2 as c_mock2, wp as w_mock, wp2 as w_mock2: + with patch as commit_mock, patch2 as commit_mock2: batch.publish(pubsub_msg) batch2.publish(pubsub_msg) client.stop() # check if commit() called - c_mock.assert_called() - c_mock2.assert_called() + commit_mock.assert_called() + commit_mock2.assert_called() - # check if wait() called - w_mock.assert_called() - w_mock2.assert_called() + # check that closed publisher doesn't accept new messages + with pytest.raises(ValueError): + client.publish("topic1", pubsub_msg) def test_gapic_instance_method(): From 99bfc2608afccdc0a71c0ce43139f09876d51603 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Fri, 25 Oct 2019 14:40:00 +0300 Subject: [PATCH 06/15] Add one more assert for stop() method. --- pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py index 45c1d131b79e..e8819c9565de 100644 --- a/pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -227,6 +227,9 @@ def test_stop(): with pytest.raises(ValueError): client.publish("topic1", pubsub_msg) + with pytest.raises(ValueError): + client.stop() + def test_gapic_instance_method(): creds = mock.Mock(spec=credentials.Credentials) From 3216d849da9a0461c71f2a338cc142eda950ab1a Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Fri, 25 Oct 2019 16:41:50 +0300 Subject: [PATCH 07/15] Fix comment. --- pubsub/google/cloud/pubsub_v1/publisher/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pubsub/google/cloud/pubsub_v1/publisher/client.py b/pubsub/google/cloud/pubsub_v1/publisher/client.py index cd6db9db31f3..53b809bacf77 100644 --- a/pubsub/google/cloud/pubsub_v1/publisher/client.py +++ b/pubsub/google/cloud/pubsub_v1/publisher/client.py @@ -295,7 +295,7 @@ def stop(self): This method is non-blocking. Use `Future()` objects returned by `publish()` to make sure all messages - sent or failed to. + sent or failed. """ if self._is_stopped: raise ValueError("Cannot stop a publisher already stopped.") From 94d30237ed12f569f2c4729d06f3f028f4a1f34c Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Fri, 25 Oct 2019 17:29:57 +0300 Subject: [PATCH 08/15] Comment fix. --- pubsub/google/cloud/pubsub_v1/publisher/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/publisher/client.py b/pubsub/google/cloud/pubsub_v1/publisher/client.py index 53b809bacf77..23409f24a7f1 100644 --- a/pubsub/google/cloud/pubsub_v1/publisher/client.py +++ b/pubsub/google/cloud/pubsub_v1/publisher/client.py @@ -294,8 +294,8 @@ def stop(self): .. note:: This method is non-blocking. Use `Future()` objects - returned by `publish()` to make sure all messages - sent or failed. + returned by `publish()` to make sure all publish + requests completed, either in success or error. """ if self._is_stopped: raise ValueError("Cannot stop a publisher already stopped.") From 3f5333350e4370ac697d4495362dec0e2fefb292 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Mon, 28 Oct 2019 10:48:48 +0300 Subject: [PATCH 09/15] Add stopping lock. --- pubsub/google/cloud/pubsub_v1/publisher/client.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/publisher/client.py b/pubsub/google/cloud/pubsub_v1/publisher/client.py index 23409f24a7f1..e268354deda0 100644 --- a/pubsub/google/cloud/pubsub_v1/publisher/client.py +++ b/pubsub/google/cloud/pubsub_v1/publisher/client.py @@ -17,6 +17,7 @@ import copy import os import pkg_resources +import threading import grpc import six @@ -133,6 +134,7 @@ def __init__(self, batch_settings=(), **kwargs): # The batches on the publisher client are responsible for holding # messages. One batch exists for each topic. self._batch_lock = self._batch_class.make_lock() + self._stoping_lock = threading.Lock() self._batches = {} self._is_stopped = False @@ -249,8 +251,9 @@ def publish(self, topic, data, **attrs): If called after publisher has been stopped by a `stop()` method call. """ - if self._is_stopped: - raise ValueError("Cannot publish on a stopped publisher.") + with self._stoping_lock: + if self._is_stopped: + raise ValueError("Cannot publish on a stopped publisher.") # Sanity check: Is the data being sent as a bytestring? # If it is literally anything else, complain loudly about it. if not isinstance(data, six.binary_type): @@ -297,9 +300,11 @@ def stop(self): returned by `publish()` to make sure all publish requests completed, either in success or error. """ - if self._is_stopped: - raise ValueError("Cannot stop a publisher already stopped.") + with self._stoping_lock: + if self._is_stopped: + raise ValueError("Cannot stop a publisher already stopped.") + + self._is_stopped = True - self._is_stopped = True for topic in self._batches: self._batches[topic].commit() From b63c17693d73b0efc7a12bbff1e4b362d58d3d9d Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Tue, 29 Oct 2019 10:26:38 +0300 Subject: [PATCH 10/15] Spelling mistake fix. --- pubsub/google/cloud/pubsub_v1/publisher/client.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/publisher/client.py b/pubsub/google/cloud/pubsub_v1/publisher/client.py index e268354deda0..bd04ed9ad0b2 100644 --- a/pubsub/google/cloud/pubsub_v1/publisher/client.py +++ b/pubsub/google/cloud/pubsub_v1/publisher/client.py @@ -134,7 +134,7 @@ def __init__(self, batch_settings=(), **kwargs): # The batches on the publisher client are responsible for holding # messages. One batch exists for each topic. self._batch_lock = self._batch_class.make_lock() - self._stoping_lock = threading.Lock() + self._stopping_lock = threading.Lock() self._batches = {} self._is_stopped = False @@ -251,7 +251,7 @@ def publish(self, topic, data, **attrs): If called after publisher has been stopped by a `stop()` method call. """ - with self._stoping_lock: + with self._stopping_lock: if self._is_stopped: raise ValueError("Cannot publish on a stopped publisher.") # Sanity check: Is the data being sent as a bytestring? @@ -300,7 +300,7 @@ def stop(self): returned by `publish()` to make sure all publish requests completed, either in success or error. """ - with self._stoping_lock: + with self._stopping_lock: if self._is_stopped: raise ValueError("Cannot stop a publisher already stopped.") From 3ef412cd146567162e6d1570f1c114c8994ca01f Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Mon, 4 Nov 2019 15:10:41 +0300 Subject: [PATCH 11/15] Preventing race conditions while stopping publisher. --- .../cloud/pubsub_v1/publisher/client.py | 50 +++++++++---------- 1 file changed, 24 insertions(+), 26 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/publisher/client.py b/pubsub/google/cloud/pubsub_v1/publisher/client.py index bd04ed9ad0b2..eb630d1806c4 100644 --- a/pubsub/google/cloud/pubsub_v1/publisher/client.py +++ b/pubsub/google/cloud/pubsub_v1/publisher/client.py @@ -17,7 +17,6 @@ import copy import os import pkg_resources -import threading import grpc import six @@ -134,7 +133,6 @@ def __init__(self, batch_settings=(), **kwargs): # The batches on the publisher client are responsible for holding # messages. One batch exists for each topic. self._batch_lock = self._batch_class.make_lock() - self._stopping_lock = threading.Lock() self._batches = {} self._is_stopped = False @@ -190,20 +188,19 @@ def _batch(self, topic, create=False, autocommit=True): """ # If there is no matching batch yet, then potentially create one # and place it on the batches dictionary. - with self._batch_lock: - if not create: - batch = self._batches.get(topic) - if batch is None: - create = True - - if create: - batch = self._batch_class( - autocommit=autocommit, - client=self, - settings=self.batch_settings, - topic=topic, - ) - self._batches[topic] = batch + if not create: + batch = self._batches.get(topic) + if batch is None: + create = True + + if create: + batch = self._batch_class( + autocommit=autocommit, + client=self, + settings=self.batch_settings, + topic=topic, + ) + self._batches[topic] = batch return batch @@ -251,9 +248,6 @@ def publish(self, topic, data, **attrs): If called after publisher has been stopped by a `stop()` method call. """ - with self._stopping_lock: - if self._is_stopped: - raise ValueError("Cannot publish on a stopped publisher.") # Sanity check: Is the data being sent as a bytestring? # If it is literally anything else, complain loudly about it. if not isinstance(data, six.binary_type): @@ -277,12 +271,16 @@ def publish(self, topic, data, **attrs): message = types.PubsubMessage(data=data, attributes=attrs) # Delegate the publishing to the batch. - batch = self._batch(topic) - future = None - while future is None: - future = batch.publish(message) - if future is None: - batch = self._batch(topic, create=True) + with self._batch_lock: + if self._is_stopped: + raise ValueError("Cannot publish on a stopped publisher.") + + batch = self._batch(topic) + future = None + while future is None: + future = batch.publish(message) + if future is None: + batch = self._batch(topic, create=True) return future @@ -300,7 +298,7 @@ def stop(self): returned by `publish()` to make sure all publish requests completed, either in success or error. """ - with self._stopping_lock: + with self._batch_lock: if self._is_stopped: raise ValueError("Cannot stop a publisher already stopped.") From b1d515d8b65736df46afa0477d47c9ee98d40fcc Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Tue, 5 Nov 2019 11:01:38 +0300 Subject: [PATCH 12/15] Fix test. --- pubsub/google/cloud/pubsub_v1/publisher/client.py | 2 +- pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/publisher/client.py b/pubsub/google/cloud/pubsub_v1/publisher/client.py index eb630d1806c4..baab1180e368 100644 --- a/pubsub/google/cloud/pubsub_v1/publisher/client.py +++ b/pubsub/google/cloud/pubsub_v1/publisher/client.py @@ -252,7 +252,7 @@ def publish(self, topic, data, **attrs): # If it is literally anything else, complain loudly about it. if not isinstance(data, six.binary_type): raise TypeError( - "Data being published to Pub/Sub must be sent " "as a bytestring." + "Data being published to Pub/Sub must be sent as a bytestring." ) # Coerce all attributes to text strings. diff --git a/pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py index e8819c9565de..323a6d321b5b 100644 --- a/pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -225,7 +225,7 @@ def test_stop(): # check that closed publisher doesn't accept new messages with pytest.raises(ValueError): - client.publish("topic1", pubsub_msg) + client.publish("topic1", b"msg2") with pytest.raises(ValueError): client.stop() From 25344673d0cc5a8ee86d16827231bc9403e534db Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Wed, 6 Nov 2019 10:38:15 +0300 Subject: [PATCH 13/15] Lock method stop() completely. --- pubsub/google/cloud/pubsub_v1/publisher/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/publisher/client.py b/pubsub/google/cloud/pubsub_v1/publisher/client.py index baab1180e368..14bb884e954f 100644 --- a/pubsub/google/cloud/pubsub_v1/publisher/client.py +++ b/pubsub/google/cloud/pubsub_v1/publisher/client.py @@ -304,5 +304,5 @@ def stop(self): self._is_stopped = True - for topic in self._batches: - self._batches[topic].commit() + for topic in self._batches: + self._batches[topic].commit() From 45a6bf890de245bbef0aa14522e96b3db6d27eb4 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Thu, 7 Nov 2019 20:12:48 +0300 Subject: [PATCH 14/15] Small refactor. --- pubsub/google/cloud/pubsub_v1/publisher/client.py | 8 ++++---- .../unit/pubsub_v1/publisher/test_publisher_client.py | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pubsub/google/cloud/pubsub_v1/publisher/client.py b/pubsub/google/cloud/pubsub_v1/publisher/client.py index 14bb884e954f..5ddcfedbdd9e 100644 --- a/pubsub/google/cloud/pubsub_v1/publisher/client.py +++ b/pubsub/google/cloud/pubsub_v1/publisher/client.py @@ -273,7 +273,7 @@ def publish(self, topic, data, **attrs): # Delegate the publishing to the batch. with self._batch_lock: if self._is_stopped: - raise ValueError("Cannot publish on a stopped publisher.") + raise RuntimeError("Cannot publish on a stopped publisher.") batch = self._batch(topic) future = None @@ -300,9 +300,9 @@ def stop(self): """ with self._batch_lock: if self._is_stopped: - raise ValueError("Cannot stop a publisher already stopped.") + raise RuntimeError("Cannot stop a publisher already stopped.") self._is_stopped = True - for topic in self._batches: - self._batches[topic].commit() + for batch in self._batches.values(): + batch.commit() diff --git a/pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py index 323a6d321b5b..6519b2b23149 100644 --- a/pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -224,10 +224,10 @@ def test_stop(): commit_mock2.assert_called() # check that closed publisher doesn't accept new messages - with pytest.raises(ValueError): + with pytest.raises(RuntimeError): client.publish("topic1", b"msg2") - with pytest.raises(ValueError): + with pytest.raises(RuntimeError): client.stop() From ba01ec190a062502065e48278daf12514e6d296e Mon Sep 17 00:00:00 2001 From: Gurov Ilya Date: Thu, 7 Nov 2019 20:19:17 +0300 Subject: [PATCH 15/15] Update pubsub/google/cloud/pubsub_v1/publisher/client.py Co-Authored-By: Peter Lamut --- pubsub/google/cloud/pubsub_v1/publisher/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pubsub/google/cloud/pubsub_v1/publisher/client.py b/pubsub/google/cloud/pubsub_v1/publisher/client.py index 5ddcfedbdd9e..60a03bb652ab 100644 --- a/pubsub/google/cloud/pubsub_v1/publisher/client.py +++ b/pubsub/google/cloud/pubsub_v1/publisher/client.py @@ -244,7 +244,7 @@ def publish(self, topic, data, **attrs): instance of that class). Raises: - ValueError: + RuntimeError: If called after publisher has been stopped by a `stop()` method call. """