Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub): add stop method #9365

Merged
merged 15 commits into from
Nov 7, 2019
4 changes: 1 addition & 3 deletions pubsub/google/cloud/pubsub_v1/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ def running(self):
bool: ``True`` if this method has not yet completed, or
``False`` if it has completed.
"""
if self.done():
return False
return True
pradn marked this conversation as resolved.
Show resolved Hide resolved
return not self.done()

def done(self):
"""Return True the future is done, False otherwise.
Expand Down
3 changes: 3 additions & 0 deletions pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ def __init__(self, client, topic, settings, autocommit=True):
self._state_lock = threading.Lock()
# These members are all communicated between threads; ensure that
# any writes to them use the "state lock" to remain atomic.
# _futures list should remain unchanged after batch
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved
# status changed from ACCEPTING_MESSAGES to any other
# in order to avoid race conditions
self._futures = []
self._messages = []
self._size = 0
Expand Down
74 changes: 53 additions & 21 deletions pubsub/google/cloud/pubsub_v1/publisher/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ def __init__(self, batch_settings=(), **kwargs):
# messages. One batch exists for each topic.
self._batch_lock = self._batch_class.make_lock()
self._batches = {}
self._is_stopped = False

@classmethod
def from_service_account_file(cls, filename, batch_settings=(), **kwargs):
Expand Down Expand Up @@ -187,20 +188,19 @@ def _batch(self, topic, create=False, autocommit=True):
"""
# If there is no matching batch yet, then potentially create one
# and place it on the batches dictionary.
with self._batch_lock:
if not create:
batch = self._batches.get(topic)
if batch is None:
create = True

if create:
batch = self._batch_class(
autocommit=autocommit,
client=self,
settings=self.batch_settings,
topic=topic,
)
self._batches[topic] = batch
if not create:
batch = self._batches.get(topic)
if batch is None:
create = True

if create:
batch = self._batch_class(
autocommit=autocommit,
client=self,
settings=self.batch_settings,
topic=topic,
)
self._batches[topic] = batch

return batch

Expand Down Expand Up @@ -242,12 +242,17 @@ def publish(self, topic, data, **attrs):
instance that conforms to Python Standard library's
:class:`~concurrent.futures.Future` interface (but not an
instance of that class).

Raises:
RuntimeError:
If called after publisher has been stopped
by a `stop()` method call.
plamut marked this conversation as resolved.
Show resolved Hide resolved
"""
# Sanity check: Is the data being sent as a bytestring?
# If it is literally anything else, complain loudly about it.
if not isinstance(data, six.binary_type):
raise TypeError(
"Data being published to Pub/Sub must be sent " "as a bytestring."
"Data being published to Pub/Sub must be sent as a bytestring."
)

# Coerce all attributes to text strings.
Expand All @@ -266,11 +271,38 @@ def publish(self, topic, data, **attrs):
message = types.PubsubMessage(data=data, attributes=attrs)

# Delegate the publishing to the batch.
batch = self._batch(topic)
future = None
while future is None:
future = batch.publish(message)
if future is None:
batch = self._batch(topic, create=True)
with self._batch_lock:
if self._is_stopped:
raise RuntimeError("Cannot publish on a stopped publisher.")

batch = self._batch(topic)
future = None
while future is None:
future = batch.publish(message)
if future is None:
batch = self._batch(topic, create=True)

pradn marked this conversation as resolved.
Show resolved Hide resolved
return future

def stop(self):
"""Immediately publish all outstanding messages.

Asynchronously sends all outstanding messages and
prevents future calls to `publish()`. Method should
be invoked prior to deleting this `Client()` object
in order to ensure that no pending messages are lost.

.. note::

This method is non-blocking. Use `Future()` objects
returned by `publish()` to make sure all publish
requests completed, either in success or error.
"""
with self._batch_lock:
if self._is_stopped:
raise RuntimeError("Cannot stop a publisher already stopped.")

self._is_stopped = True

for batch in self._batches.values():
batch.commit()
30 changes: 30 additions & 0 deletions pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,36 @@ def test_publish_attrs_type_error():
client.publish(topic, b"foo", answer=42)


def test_stop():
creds = mock.Mock(spec=credentials.Credentials)
client = publisher.Client(credentials=creds)

batch = client._batch("topic1", autocommit=False)
batch2 = client._batch("topic2", autocommit=False)

pubsub_msg = types.PubsubMessage(data=b"msg")

patch = mock.patch.object(batch, "commit")
patch2 = mock.patch.object(batch2, "commit")

with patch as commit_mock, patch2 as commit_mock2:
batch.publish(pubsub_msg)
batch2.publish(pubsub_msg)

client.stop()

# check if commit() called
commit_mock.assert_called()
commit_mock2.assert_called()

# check that closed publisher doesn't accept new messages
with pytest.raises(RuntimeError):
client.publish("topic1", b"msg2")

with pytest.raises(RuntimeError):
client.stop()


def test_gapic_instance_method():
creds = mock.Mock(spec=credentials.Credentials)
client = publisher.Client(credentials=creds)
Expand Down