Skip to content

Commit

Permalink
Using futures in batched requests.
Browse files Browse the repository at this point in the history
Also removing limitation on GETs in batch requests.
  • Loading branch information
dhermes committed Apr 9, 2015
1 parent 7ab53f1 commit b50d446
Show file tree
Hide file tree
Showing 6 changed files with 302 additions and 97 deletions.
7 changes: 4 additions & 3 deletions gcloud/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,12 +292,13 @@ def api_request(self, method, path, query_params=None,
if not 200 <= response.status < 300:
raise make_exception(response, content)

if content and expect_json:
if isinstance(content, six.binary_type):
content = content.decode('utf-8')

if expect_json and content and isinstance(content, six.string_types):
content_type = response.get('content-type', '')
if not content_type.startswith('application/json'):
raise TypeError('Expected JSON, got %s' % content_type)
if isinstance(content, six.binary_type):
content = content.decode('utf-8')
return json.loads(content)

return content
Expand Down
10 changes: 5 additions & 5 deletions gcloud/storage/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,12 @@ def _patch_property(self, name, value):
def _set_properties(self, value):
"""Set the properties for the current object.
:type value: dict
:type value: dict or :class:`gcloud.storage.batch._Future`
:param value: The properties to be set.
"""
self._properties = value
if not isinstance(value, dict):
self._is_future = True
# If the values are reset, the changes must as well.
self._changes = set()

Expand All @@ -92,12 +94,10 @@ def _get_properties(self):
:rtype: dict
:returns: The properties of the current object.
:raises: :class:`ValueError` if the object is designated as a
future.
"""
if self._is_future:
raise ValueError(self, ('is a future. It cannot be used'
'until the request has completed'))
self._properties = self._properties.get_value()
self._is_future = False
return self._properties

def patch(self):
Expand Down
83 changes: 63 additions & 20 deletions gcloud/storage/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from email.parser import Parser
import httplib2
import io
import json

import six

from gcloud._helpers import _LocalStack
from gcloud.exceptions import make_exception
from gcloud.storage import _implicit_environ
from gcloud.storage.connection import Connection

Expand Down Expand Up @@ -120,7 +122,7 @@ def __init__(self, connection=None):
super(Batch, self).__init__()
self._connection = connection
self._requests = []
self._responses = []
self._futures = []

def _do_request(self, method, url, headers, data):
"""Override Connection: defer actual HTTP request.
Expand All @@ -143,22 +145,20 @@ def _do_request(self, method, url, headers, data):
and ``content`` (a string).
:returns: The HTTP response object and the content of the response.
"""
if method == 'GET':
_req = self._connection.http.request
return _req(method=method, uri=url, headers=headers, body=data)

if len(self._requests) >= self._MAX_BATCH_SIZE:
raise ValueError("Too many deferred requests (max %d)" %
self._MAX_BATCH_SIZE)
self._requests.append((method, url, headers, data))
return NoContent(), ''
result = _Future()
self._futures.append(result)
return NoContent(), result

def finish(self):
"""Submit a single `multipart/mixed` request w/ deferred requests.
def _prepare_batch_request(self):
"""Prepares headers and body for a batch request.
:rtype: list of tuples
:returns: one ``(status, reason, payload)`` tuple per deferred request.
:raises: ValueError if no requests have been deferred.
:rtype: tuple (dict, string)
:returns: The pair of headers and body of the batch request to be sent.
:raises: :class:`ValueError` if no requests have been deferred.
"""
if len(self._requests) == 0:
raise ValueError("No deferred requests")
Expand All @@ -180,14 +180,41 @@ def finish(self):

# Strip off redundant header text
_, body = payload.split('\n\n', 1)
headers = dict(multi._headers)
return dict(multi._headers), body

def finish(self):
"""Submit a single `multipart/mixed` request w/ deferred requests.
:rtype: list of tuples
:returns: one ``(status, reason, payload)`` tuple per deferred request.
:raises: ValueError if no requests have been deferred.
"""
headers, body = self._prepare_batch_request()

url = '%s/batch' % self.API_BASE_URL

_req = self._connection._make_request
response, content = _req('POST', url, data=body, headers=headers)
self._responses = list(_unpack_batch_response(response, content))
return self._responses
responses = list(_unpack_batch_response(response, content))

# If a bad status occurs, we track it, but don't raise an exception
# until all futures have been populated.
exception_args = None

if len(self._futures) != len(responses):
raise ValueError('Expected a response for every request.')

for future, sub_response in zip(self._futures, responses):
resp_headers, sub_payload = sub_response
if not 200 <= resp_headers.status < 300:
exception_args = exception_args or (resp_headers,
sub_payload)
future.set_value(sub_payload)

if exception_args is not None:
raise make_exception(*exception_args)

return responses

@staticmethod
def current():
Expand Down Expand Up @@ -233,7 +260,20 @@ def _generate_faux_mime_message(parser, response, content):


def _unpack_batch_response(response, content):
"""Convert response, content -> [(status, reason, payload)]."""
"""Convert response, content -> [(headers, payload)].
Creates a generator of tuples of emulating the responses to
:meth:`httplib2.Http.request` (a pair of headers and payload).
:type response: :class:`httplib2.Response`
:param response: HTTP response / headers from a request.
:type content: string
:param content: Response payload with a batch response.
:rtype: generator
:returns: A generator of header, payload pairs.
"""
parser = Parser()
message = _generate_faux_mime_message(parser, response, content)

Expand All @@ -242,10 +282,13 @@ def _unpack_batch_response(response, content):

for subrequest in message._payload:
status_line, rest = subrequest._payload.split('\n', 1)
_, status, reason = status_line.split(' ', 2)
message = parser.parsestr(rest)
payload = message._payload
ctype = message['Content-Type']
_, status, _ = status_line.split(' ', 2)
sub_message = parser.parsestr(rest)
payload = sub_message._payload
ctype = sub_message['Content-Type']
msg_headers = dict(sub_message._headers)
msg_headers['status'] = status
headers = httplib2.Response(msg_headers)
if ctype and ctype.startswith('application/json'):
payload = json.loads(payload)
yield status, reason, payload
yield headers, payload
35 changes: 34 additions & 1 deletion gcloud/storage/test__helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,44 @@ def test_path_is_abstract(self):
mixin = self._makeOne()
self.assertRaises(NotImplementedError, lambda: mixin.path)

def test_future_fails(self):
def test__get_properties_no_future(self):
mixin = self._makeOne()
mixin._properties = {}
self.assertEqual(mixin._get_properties(), {})

def test__get_properties_future_fail(self):
from gcloud.storage.batch import _Future
mixin = self._makeOne()
mixin._properties = _Future()
mixin._is_future = True
self.assertRaises(ValueError, mixin._get_properties)

def test__get_properties_future_success(self):
from gcloud.storage.batch import _Future
mixin = self._makeOne()
VALUE = object()
future = _Future()
future._value = VALUE
mixin._properties = future
mixin._is_future = True
self.assertEqual(mixin._get_properties(), VALUE)
self.assertFalse(mixin._is_future)

def test__set_properties_no_future(self):
mixin = self._makeOne()
self.assertEqual(mixin._get_properties(), {})
VALUE = {'foo': 'bar'}
mixin._set_properties(VALUE)
self.assertEqual(mixin._get_properties(), VALUE)

def test__set_properties_future(self):
from gcloud.storage.batch import _Future
mixin = self._makeOne()
self.assertFalse(mixin._is_future)
future = _Future()
mixin._set_properties(future)
self.assertEqual(mixin._properties, future)

def test_reload(self):
connection = _Connection({'foo': 'Foo'})
derived = self._derivedClass(connection, '/path')()
Expand Down
Loading

0 comments on commit b50d446

Please sign in to comment.