Skip to content

Commit

Permalink
feat: closed subscriber as context manager raises (#488)
Browse files Browse the repository at this point in the history
Closes #484.

**PR checklist:**
- [x] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-pubsub/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [x] Ensure the tests and linter pass
- [x] Code coverage does not decrease (if any source code was changed)
- [x] Appropriate docs were updated (if necessary)
  • Loading branch information
plamut authored Sep 2, 2021
1 parent 633e91b commit a05a3f2
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 0 deletions.
12 changes: 12 additions & 0 deletions google/cloud/pubsub_v1/subscriber/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def __init__(self, **kwargs):
# Instantiate the underlying GAPIC client.
self._api = subscriber_client.SubscriberClient(**kwargs)
self._target = self._api._transport._host
self._closed = False

@classmethod
def from_service_account_file(cls, filename, **kwargs):
Expand Down Expand Up @@ -120,6 +121,14 @@ def api(self):
"""The underlying gapic API client."""
return self._api

@property
def closed(self) -> bool:
"""Return whether the client has been closed and cannot be used anymore.
.. versionadded:: 2.8.0
"""
return self._closed

def subscribe(
self,
subscription,
Expand Down Expand Up @@ -252,8 +261,11 @@ def close(self):
This method is idempotent.
"""
self.api._transport.grpc_channel.close()
self._closed = True

def __enter__(self):
if self._closed:
raise RuntimeError("Closed subscriber cannot be used as context manager.")
return self

def __exit__(self, exc_type, exc_val, exc_tb):
Expand Down
18 changes: 18 additions & 0 deletions tests/unit/pubsub_v1/subscriber/test_subscriber_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ def test_init_default_client_info(creds):
assert expected_client_info in user_agent


def test_init_default_closed_state(creds):
client = subscriber.Client(credentials=creds)
assert not client.closed


def test_init_w_custom_transport(creds):
transport = SubscriberGrpcTransport(credentials=creds)
client = subscriber.Client(transport=transport)
Expand Down Expand Up @@ -185,6 +190,7 @@ def test_close(creds):
client.close()

patched_close.assert_called()
assert client.closed


def test_closes_channel_as_context_manager(creds):
Expand All @@ -198,6 +204,18 @@ def test_closes_channel_as_context_manager(creds):
patched_close.assert_called()


def test_context_manager_raises_if_closed(creds):
client = subscriber.Client(credentials=creds)

with mock.patch.object(client.api._transport.grpc_channel, "close"):
client.close()

expetect_msg = r"(?i).*closed.*cannot.*context manager.*"
with pytest.raises(RuntimeError, match=expetect_msg):
with client:
pass


def test_streaming_pull_gapic_monkeypatch(creds):
client = subscriber.Client(credentials=creds)

Expand Down

0 comments on commit a05a3f2

Please sign in to comment.