Skip to content

Commit

Permalink
Using futures in batched requests.
Browse files Browse the repository at this point in the history
Also removing limitation on GETs in batch requests.
  • Loading branch information
dhermes committed Apr 12, 2015
1 parent 4d263b7 commit ce9e260
Show file tree
Hide file tree
Showing 5 changed files with 283 additions and 92 deletions.
7 changes: 4 additions & 3 deletions gcloud/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,12 +292,13 @@ def api_request(self, method, path, query_params=None,
if not 200 <= response.status < 300:
raise make_exception(response, content)

if content and expect_json:
if isinstance(content, six.binary_type):
content = content.decode('utf-8')

if expect_json and content and isinstance(content, six.string_types):
content_type = response.get('content-type', '')
if not content_type.startswith('application/json'):
raise TypeError('Expected JSON, got %s' % content_type)
if isinstance(content, six.binary_type):
content = content.decode('utf-8')
return json.loads(content)

return content
Expand Down
2 changes: 1 addition & 1 deletion gcloud/storage/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def _patch_property(self, name, value):
def _set_properties(self, value):
"""Set the properties for the current object.
:type value: dict
:type value: dict or :class:`gcloud.storage.batch._FutureDict`
:param value: The properties to be set.
"""
self._properties = value
Expand Down
106 changes: 86 additions & 20 deletions gcloud/storage/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from email.parser import Parser
import httplib2
import io
import json

import six

from gcloud._helpers import _LocalStack
from gcloud.exceptions import make_exception
from gcloud.storage import _implicit_environ
from gcloud.storage.connection import Connection

Expand Down Expand Up @@ -82,6 +84,7 @@ class _FutureDict(object):
"""

def __init__(self, owner=None):
self._value = None
self.owner = owner

@staticmethod
Expand Down Expand Up @@ -125,6 +128,20 @@ def __setitem__(self, key, value):
"""
raise KeyError('Cannot set %r -> %r on a future' % (key, value))

def set_future_value(self, value):
"""Sets the value associated with the future.
:type value: object
:param value: The future value that was set.
:raises: :class:`ValueError` if the value is already set.
"""
if self._value is not None:
raise ValueError('Future value is already set.')
self._value = value
if self.owner is not None:
self.owner._properties = value


class Batch(Connection):
"""Proxy an underlying connection, batching up change operations.
Expand All @@ -141,7 +158,7 @@ def __init__(self, connection=None):
super(Batch, self).__init__()
self._connection = connection
self._requests = []
self._responses = []
self._futures = []

def _do_request(self, method, url, headers, data):
"""Override Connection: defer actual HTTP request.
Expand All @@ -164,22 +181,20 @@ def _do_request(self, method, url, headers, data):
and ``content`` (a string).
:returns: The HTTP response object and the content of the response.
"""
if method == 'GET':
_req = self._connection.http.request
return _req(method=method, uri=url, headers=headers, body=data)

if len(self._requests) >= self._MAX_BATCH_SIZE:
raise ValueError("Too many deferred requests (max %d)" %
self._MAX_BATCH_SIZE)
self._requests.append((method, url, headers, data))
return NoContent(), ''
result = _FutureDict()
self._futures.append(result)
return NoContent(), result

def finish(self):
"""Submit a single `multipart/mixed` request w/ deferred requests.
def _prepare_batch_request(self):
"""Prepares headers and body for a batch request.
:rtype: list of tuples
:returns: one ``(status, reason, payload)`` tuple per deferred request.
:raises: ValueError if no requests have been deferred.
:rtype: tuple (dict, string)
:returns: The pair of headers and body of the batch request to be sent.
:raises: :class:`ValueError` if no requests have been deferred.
"""
if len(self._requests) == 0:
raise ValueError("No deferred requests")
Expand All @@ -201,14 +216,49 @@ def finish(self):

# Strip off redundant header text
_, body = payload.split('\n\n', 1)
headers = dict(multi._headers)
return dict(multi._headers), body

def _finish_futures(self, responses):
"""Apply all the batch responses to the futures created.
:type responses: list of (headers, payload) tuples.
:param responses: List of headers and payloads from each response in
the batch.
:raises: :class:`ValueError` if no requests have been deferred.
"""
# If a bad status occurs, we track it, but don't raise an exception
# until all futures have been populated.
exception_args = None

if len(self._futures) != len(responses):
raise ValueError('Expected a response for every request.')

for future, sub_response in zip(self._futures, responses):
resp_headers, sub_payload = sub_response
if not 200 <= resp_headers.status < 300:
exception_args = exception_args or (resp_headers,
sub_payload)
future.set_future_value(sub_payload)

if exception_args is not None:
raise make_exception(*exception_args)

def finish(self):
"""Submit a single `multipart/mixed` request w/ deferred requests.
:rtype: list of tuples
:returns: one ``(headers, payload)`` tuple per deferred request.
"""
headers, body = self._prepare_batch_request()

url = '%s/batch' % self.API_BASE_URL

_req = self._connection._make_request
response, content = _req('POST', url, data=body, headers=headers)
self._responses = list(_unpack_batch_response(response, content))
return self._responses
responses = list(_unpack_batch_response(response, content))
self._finish_futures(responses)
return responses

@staticmethod
def current():
Expand Down Expand Up @@ -254,7 +304,20 @@ def _generate_faux_mime_message(parser, response, content):


def _unpack_batch_response(response, content):
"""Convert response, content -> [(status, reason, payload)]."""
"""Convert response, content -> [(headers, payload)].
Creates a generator of tuples of emulating the responses to
:meth:`httplib2.Http.request` (a pair of headers and payload).
:type response: :class:`httplib2.Response`
:param response: HTTP response / headers from a request.
:type content: string
:param content: Response payload with a batch response.
:rtype: generator
:returns: A generator of header, payload pairs.
"""
parser = Parser()
message = _generate_faux_mime_message(parser, response, content)

Expand All @@ -263,10 +326,13 @@ def _unpack_batch_response(response, content):

for subrequest in message._payload:
status_line, rest = subrequest._payload.split('\n', 1)
_, status, reason = status_line.split(' ', 2)
message = parser.parsestr(rest)
payload = message._payload
ctype = message['Content-Type']
_, status, _ = status_line.split(' ', 2)
sub_message = parser.parsestr(rest)
payload = sub_message._payload
ctype = sub_message['Content-Type']
msg_headers = dict(sub_message._headers)
msg_headers['status'] = status
headers = httplib2.Response(msg_headers)
if ctype and ctype.startswith('application/json'):
payload = json.loads(payload)
yield status, reason, payload
yield headers, payload
Loading

0 comments on commit ce9e260

Please sign in to comment.