Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-enable bundling (for consideration) #1950

Merged
merged 2 commits into from
Jul 1, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions gcloud/_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,17 @@ def __init__(self, items, page_token):
def next(self):
items, self._items = self._items, None
return items


class _GAXBundlingEvent(object):

result = None

def __init__(self, result):
self._result = result

def is_set(self):
return self.result is not None

def wait(self, *_):
self.result = self._result
8 changes: 4 additions & 4 deletions gcloud/pubsub/_gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,17 +162,17 @@ def topic_publish(self, topic_path, messages):
:raises: :exc:`gcloud.exceptions.NotFound` if the topic does not
exist
"""
options = CallOptions(is_bundling=False)
message_pbs = [_message_pb_from_dict(message)
for message in messages]
try:
result = self._gax_api.publish(topic_path, message_pbs,
options=options)
event = self._gax_api.publish(topic_path, message_pbs)
if not event.is_set():
event.wait()
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
raise NotFound(topic_path)
raise
return result.message_ids
return event.result.message_ids

def topic_list_subscriptions(self, topic_path, page_size=0,
page_token=None):
Expand Down
33 changes: 29 additions & 4 deletions gcloud/pubsub/test__gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,15 @@ def test_topic_delete_error(self):

def test_topic_publish_hit(self):
import base64
from gcloud._testing import _GAXBundlingEvent
PAYLOAD = b'This is the message text'
B64 = base64.b64encode(PAYLOAD).decode('ascii')
MSGID = 'DEADBEEF'
MESSAGE = {'data': B64, 'attributes': {}}
response = _PublishResponsePB([MSGID])
gax_api = _GAXPublisherAPI(_publish_response=response)
event = _GAXBundlingEvent(response)
event.wait() # already received result
gax_api = _GAXPublisherAPI(_publish_response=event)
api = self._makeOne(gax_api)

resource = api.topic_publish(self.TOPIC_PATH, [MESSAGE])
Expand All @@ -220,7 +223,29 @@ def test_topic_publish_hit(self):
message_pb, = message_pbs
self.assertEqual(message_pb.data, B64)
self.assertEqual(message_pb.attributes, {})
self.assertEqual(options.is_bundling, False)
self.assertEqual(options, None)

def test_topic_publish_hit_with_wait(self):
import base64
from gcloud._testing import _GAXBundlingEvent
PAYLOAD = b'This is the message text'
B64 = base64.b64encode(PAYLOAD).decode('ascii')
MSGID = 'DEADBEEF'
MESSAGE = {'data': B64, 'attributes': {}}
response = _PublishResponsePB([MSGID])
event = _GAXBundlingEvent(response)
gax_api = _GAXPublisherAPI(_publish_response=event)
api = self._makeOne(gax_api)

resource = api.topic_publish(self.TOPIC_PATH, [MESSAGE])

self.assertEqual(resource, [MSGID])
topic_path, message_pbs, options = gax_api._publish_called_with
self.assertEqual(topic_path, self.TOPIC_PATH)
message_pb, = message_pbs
self.assertEqual(message_pb.data, B64)
self.assertEqual(message_pb.attributes, {})
self.assertEqual(options, None)

def test_topic_publish_miss_w_attrs_w_bytes_payload(self):
import base64
Expand All @@ -239,7 +264,7 @@ def test_topic_publish_miss_w_attrs_w_bytes_payload(self):
message_pb, = message_pbs
self.assertEqual(message_pb.data, B64)
self.assertEqual(message_pb.attributes, {'foo': 'bar'})
self.assertEqual(options.is_bundling, False)
self.assertEqual(options, None)

def test_topic_publish_error(self):
import base64
Expand All @@ -258,7 +283,7 @@ def test_topic_publish_error(self):
message_pb, = message_pbs
self.assertEqual(message_pb.data, B64)
self.assertEqual(message_pb.attributes, {})
self.assertEqual(options.is_bundling, False)
self.assertEqual(options, None)

def test_topic_list_subscriptions_no_paging(self):
from google.gax import INITIAL_PAGE
Expand Down