Skip to content

Commit

Permalink
refactor: use 'Client._list_resource' in 'Bucket.list_notificatoins'
Browse files Browse the repository at this point in the history
Also, provide explicit coverage for 'bucket._item_to_notification'.

Toward #38
  • Loading branch information
tseaver committed Jun 8, 2021
1 parent 3688363 commit 5024d1f
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 76 deletions.
12 changes: 2 additions & 10 deletions google/cloud/storage/bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@
import base64
import copy
import datetime
import functools
import json
import warnings

import six
from six.moves.urllib.parse import urlsplit

from google.api_core import page_iterator
from google.api_core import datetime_helpers
from google.cloud._helpers import _datetime_to_rfc3339
from google.cloud._helpers import _NOW
Expand Down Expand Up @@ -1392,14 +1390,8 @@ def list_notifications(
"""
client = self._require_client(client)
path = self.path + "/notificationConfigs"
api_request = functools.partial(
client._connection.api_request, timeout=timeout, retry=retry
)
iterator = page_iterator.HTTPIterator(
client=client,
api_request=api_request,
path=path,
item_to_value=_item_to_notification,
iterator = client._list_resource(
path, _item_to_notification, timeout=timeout, retry=retry,
)
iterator.bucket = self
return iterator
Expand Down
141 changes: 75 additions & 66 deletions tests/unit/test_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -1000,63 +1000,50 @@ def test_list_blobs_w_explicit(self):
retry=retry,
)

def test_list_notifications(self):
from google.cloud.storage.notification import BucketNotification
from google.cloud.storage.notification import _TOPIC_REF_FMT
from google.cloud.storage.notification import (
JSON_API_V1_PAYLOAD_FORMAT,
NONE_PAYLOAD_FORMAT,
)
def test_list_notifications_w_defaults(self):
from google.cloud.storage.bucket import _item_to_notification

NAME = "name"
bucket_name = "name"
client = self._make_client()
client._list_resource = mock.Mock(spec=[])
bucket = self._make_one(client=client, name=bucket_name)

topic_refs = [("my-project-123", "topic-1"), ("other-project-456", "topic-2")]
iterator = bucket.list_notifications()

resources = [
{
"topic": _TOPIC_REF_FMT.format(*topic_refs[0]),
"id": "1",
"etag": "DEADBEEF",
"selfLink": "https://example.com/notification/1",
"payload_format": NONE_PAYLOAD_FORMAT,
},
{
"topic": _TOPIC_REF_FMT.format(*topic_refs[1]),
"id": "2",
"etag": "FACECABB",
"selfLink": "https://example.com/notification/2",
"payload_format": JSON_API_V1_PAYLOAD_FORMAT,
},
]
connection = _Connection({"items": resources})
client = _Client(connection)
bucket = self._make_one(client=client, name=NAME)
self.assertIs(iterator, client._list_resource.return_value)
self.assertIs(iterator.bucket, bucket)

notifications = list(bucket.list_notifications(timeout=42))
expected_path = "/b/{}/notificationConfigs".format(bucket_name)
expected_item_to_value = _item_to_notification
client._list_resource.assert_called_once_with(
expected_path,
expected_item_to_value,
timeout=self._get_default_timeout(),
retry=DEFAULT_RETRY,
)

req_args = client._connection._requested[0]
self.assertEqual(req_args.get("timeout"), 42)
def test_list_notifications_w_explicit(self):
from google.cloud.storage.bucket import _item_to_notification

self.assertEqual(len(notifications), len(resources))
for notification, resource, topic_ref in zip(
notifications, resources, topic_refs
):
self.assertIsInstance(notification, BucketNotification)
self.assertEqual(notification.topic_project, topic_ref[0])
self.assertEqual(notification.topic_name, topic_ref[1])
self.assertEqual(notification.notification_id, resource["id"])
self.assertEqual(notification.etag, resource["etag"])
self.assertEqual(notification.self_link, resource["selfLink"])
self.assertEqual(
notification.custom_attributes, resource.get("custom_attributes")
)
self.assertEqual(notification.event_types, resource.get("event_types"))
self.assertEqual(
notification.blob_name_prefix, resource.get("blob_name_prefix")
)
self.assertEqual(
notification.payload_format, resource.get("payload_format")
)
bucket_name = "name"
other_client = self._make_client()
other_client._list_resource = mock.Mock(spec=[])
bucket = self._make_one(client=None, name=bucket_name)
timeout = 42
retry = mock.Mock(spec=[])

iterator = bucket.list_notifications(
client=other_client, timeout=timeout, retry=retry,
)

self.assertIs(iterator, other_client._list_resource.return_value)
self.assertIs(iterator.bucket, bucket)

expected_path = "/b/{}/notificationConfigs".format(bucket_name)
expected_item_to_value = _item_to_notification
other_client._list_resource.assert_called_once_with(
expected_path, expected_item_to_value, timeout=timeout, retry=retry,
)

def test_get_notification_miss_w_defaults(self):
from google.cloud.exceptions import NotFound
Expand Down Expand Up @@ -1622,7 +1609,7 @@ def test_reload_w_metageneration_match(self):
)

def test_reload_w_generation_match(self):
connection = _Connection({})
connection = _Connection()
client = _Client(connection)
bucket = self._make_one(client=client, name="name")

Expand Down Expand Up @@ -3872,30 +3859,52 @@ def test_generate_signed_url_v4_w_bucket_bound_hostname_w_bare_hostname(self):
self._generate_signed_url_v4_helper(bucket_bound_hostname="cdn.example.com")


class Test__item_to_notification(unittest.TestCase):
def _call_fut(self, iterator, item):
from google.cloud.storage.bucket import _item_to_notification

return _item_to_notification(iterator, item)

def test_it(self):
from google.cloud.storage.notification import BucketNotification
from google.cloud.storage.notification import _TOPIC_REF_FMT
from google.cloud.storage.notification import NONE_PAYLOAD_FORMAT

iterator = mock.Mock(spec=["bucket"])
project = "my-project-123"
topic = "topic-1"
item = {
"topic": _TOPIC_REF_FMT.format(project, topic),
"id": "1",
"etag": "DEADBEEF",
"selfLink": "https://example.com/notification/1",
"payload_format": NONE_PAYLOAD_FORMAT,
}

notification = self._call_fut(iterator, item)

self.assertIsInstance(notification, BucketNotification)
self.assertIs(notification._bucket, iterator.bucket)
self.assertEqual(notification._topic_name, topic)
self.assertEqual(notification._topic_project, project)
self.assertEqual(notification._properties, item)


class _Connection(object):
_delete_bucket = False
credentials = None

def __init__(self, *responses):
self._responses = responses
self._requested = []
self._deleted_buckets = []
self.credentials = None
def __init__(self):
pass

def api_request(self, **kw):
self._requested.append(kw)
response, self._responses = self._responses[0], self._responses[1:]
return response
def api_request(self, **kw): # pragma: NO COVER
pass


class _Client(object):
def __init__(self, connection, project=None):
self._base_connection = connection
self.project = project

@property
def _connection(self):
return self._base_connection

@property
def _credentials(self):
return self._base_connection.credentials

0 comments on commit 5024d1f

Please sign in to comment.