Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a single IO thread when downloading S3 objects #620

Merged
merged 1 commit into from
Jan 29, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ Next Release (TBD)

* feature:``aws configure``: Add support for ``configure get`` and ``configure
set`` command which allow you to set and get configuration values from the
AWS config file (`issue 602 <https://github.com/aws/aws-cli/issues/602`)
AWS config file (`issue 602 <https://github.com/aws/aws-cli/issues/602`__)
* bugfix:``aws s3``: Fix issue with Amazon S3 downloads on certain OSes
(`issue 619 <https://github.com/aws/aws-cli/issues/619`__)


1.2.11
Expand Down
53 changes: 49 additions & 4 deletions awscli/customizations/s3/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
import sys
import threading

from awscli.customizations.s3.utils import NoBlockQueue, uni_print
from awscli.customizations.s3.utils import NoBlockQueue, uni_print, \
IORequest, IOCloseRequest


LOGGER = logging.getLogger(__name__)
Expand All @@ -29,7 +30,7 @@ class Executor(object):
``Executor``runs is a worker and a print thread.
"""
def __init__(self, done, num_threads, result_queue,
quiet, interrupt, max_queue_size):
quiet, interrupt, max_queue_size, write_queue):
self.queue = None
self.done = done
self.num_threads = num_threads
Expand All @@ -38,7 +39,9 @@ def __init__(self, done, num_threads, result_queue,
self.interrupt = interrupt
self.threads_list = []
self._max_queue_size = max_queue_size
self.write_queue = write_queue
self.print_thread = None
self.io_thread = None

@property
def num_tasks_failed(self):
Expand All @@ -51,6 +54,9 @@ def start(self):
self.print_thread = PrintThread(self.result_queue, self.done,
self.quiet, self.interrupt)
self.print_thread.daemon = True
self.io_thread = IOWriterThread(self.write_queue, self.done)
self.io_thread.start()
self.threads_list.append(self.io_thread)
self.queue = NoBlockQueue(self.interrupt, maxsize=self._max_queue_size)
self.threads_list.append(self.print_thread)
self.print_thread.start()
Expand Down Expand Up @@ -78,6 +84,7 @@ def join(self):
"""
This is used to clean up the ``Executor``.
"""
self.write_queue.put(QUEUE_END_SENTINEL)
self.result_queue.put(QUEUE_END_SENTINEL)
for i in range(self.num_threads):
self.queue.put(QUEUE_END_SENTINEL)
Expand All @@ -86,6 +93,45 @@ def join(self):
thread.join()


class IOWriterThread(threading.Thread):
def __init__(self, queue, done):
threading.Thread.__init__(self)
self.queue = queue
self.done = done
self.fd_descriptor_cache = {}

def run(self):
while True:
task = self.queue.get(True)
if task is QUEUE_END_SENTINEL:
LOGGER.debug("Sentinel received in IO thread, "
"shutting down.")
self._cleanup()
return
elif isinstance(task, IORequest):
filename, offset, data = task
fileobj = self.fd_descriptor_cache.get(filename)
if fileobj is None:
fileobj = open(filename, 'rb+')
self.fd_descriptor_cache[filename] = fileobj
fileobj.seek(offset)
LOGGER.debug("Writing data to: %s, offset: %s",
filename, offset)
fileobj.write(data)
fileobj.flush()
elif isinstance(task, IOCloseRequest):
LOGGER.debug("IOCloseRequest received for %s, closing file.",
task.filename)
fileobj = self.fd_descriptor_cache.get(task.filename)
if fileobj is not None:
fileobj.close()
del self.fd_descriptor_cache[task.filename]

def _cleanup(self):
for fileobj in self.fd_descriptor_cache.values():
fileobj.close()


class Worker(threading.Thread):
"""
This thread is in charge of performing the tasks provided via
Expand All @@ -107,8 +153,7 @@ def run(self):
try:
function()
except Exception as e:
LOGGER.debug('Error calling task: %s', e,
exc_info=True)
LOGGER.debug('Error calling task: %s', e, exc_info=True)
self.queue.task_done()
except Queue.Empty:
pass
Expand Down
13 changes: 10 additions & 3 deletions awscli/customizations/s3/s3handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,19 @@ class S3Handler(object):
sources the ``self.executor`` from which threads inside the
class pull tasks from to complete.
"""
MAX_IO_QUEUE_SIZE = 20

def __init__(self, session, params, multi_threshold=MULTI_THRESHOLD,
chunksize=CHUNKSIZE):
self.session = session
self.done = threading.Event()
self.interrupt = threading.Event()
self.result_queue = NoBlockQueue()
# The write_queue has potential for optimizations, so the constant
# for maxsize is scoped to this class (as opposed to constants.py)
# so we have the ability to change this value later.
self.write_queue = NoBlockQueue(self.interrupt,
maxsize=self.MAX_IO_QUEUE_SIZE)
self.params = {'dryrun': False, 'quiet': False, 'acl': None,
'guess_mime_type': True, 'sse': False,
'storage_class': None, 'website_redirect': None,
Expand All @@ -53,7 +60,7 @@ def __init__(self, session, params, multi_threshold=MULTI_THRESHOLD,
self.executor = Executor(
done=self.done, num_threads=NUM_THREADS, result_queue=self.result_queue,
quiet=self.params['quiet'], interrupt=self.interrupt,
max_queue_size=MAX_QUEUE_SIZE,
max_queue_size=MAX_QUEUE_SIZE, write_queue=self.write_queue
)
self._multipart_uploads = []
self._multipart_downloads = []
Expand Down Expand Up @@ -223,11 +230,11 @@ def _enqueue_range_download_tasks(self, filename, remove_remote_file=False):
task = tasks.DownloadPartTask(
part_number=i, chunk_size=chunksize,
result_queue=self.result_queue, service=filename.service,
filename=filename, context=context)
filename=filename, context=context, io_queue=self.write_queue)
self.executor.submit(task)
complete_file_task = tasks.CompleteDownloadTask(
context=context, filename=filename, result_queue=self.result_queue,
params=self.params)
params=self.params, io_queue=self.write_queue)
self.executor.submit(complete_file_task)
self._multipart_downloads.append((context, filename.dest))
if remove_remote_file:
Expand Down
55 changes: 32 additions & 23 deletions awscli/customizations/s3/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from botocore.exceptions import IncompleteReadError

from awscli.customizations.s3.utils import find_bucket_key, MD5Error, \
operate, ReadFileChunk, relative_path
operate, ReadFileChunk, relative_path, IORequest, IOCloseRequest


LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -42,7 +42,8 @@ def print_operation(filename, failed, dryrun=False):
print_str = print_str + "s3://" + filename.src
else:
print_str += relative_path(filename.src)
if filename.operation_name not in ["delete", "make_bucket", "remove_bucket"]:
if filename.operation_name not in ["delete", "make_bucket",
"remove_bucket"]:
if filename.dest_type == "s3":
print_str += " to s3://" + filename.dest
else:
Expand Down Expand Up @@ -123,7 +124,7 @@ def __init__(self, part_number, chunk_size,
self._filename = filename

def _is_last_part(self, part_number):
return self._part_number == int(
return self._part_number == int(
math.ceil(self._filename.size / float(self._chunk_size)))

def _total_parts(self):
Expand Down Expand Up @@ -167,7 +168,7 @@ def __call__(self):
# task has already queued a message.
LOGGER.debug("Not uploading part copy, task has been cancelled.")
except Exception as e:
LOGGER.debug('Error during upload part copy: %s' , e,
LOGGER.debug('Error during upload part copy: %s', e,
exc_info=True)
message = print_operation(self._filename, failed=True,
dryrun=False)
Expand Down Expand Up @@ -237,7 +238,7 @@ def __call__(self):
# task has already queued a message.
LOGGER.debug("Not uploading part, task has been cancelled.")
except Exception as e:
LOGGER.debug('Error during part upload: %s' , e,
LOGGER.debug('Error during part upload: %s', e,
exc_info=True)
message = print_operation(self._filename, failed=True,
dryrun=False)
Expand All @@ -247,7 +248,7 @@ def __call__(self):
self._upload_context.cancel_upload()
else:
LOGGER.debug("Part number %s completed for filename: %s",
self._part_number, self._filename.src)
self._part_number, self._filename.src)


class CreateLocalFileTask(object):
Expand All @@ -267,16 +268,19 @@ def __call__(self):


class CompleteDownloadTask(object):
def __init__(self, context, filename, result_queue, params):
def __init__(self, context, filename, result_queue, params, io_queue):
self._context = context
self._filename = filename
self._result_queue = result_queue
self._parameters = params
self._io_queue = io_queue

def __call__(self):
# When the file is downloading, we have a few things we need to do:
# 1) Fix up the last modified time to match s3.
# 2) Tell the result_queue we're done.
# 3) Queue an IO request to the IO thread letting it know we're
# done with the file.
self._context.wait_for_completion()
last_update_tuple = self._filename.last_update.timetuple()
mod_timestamp = time.mktime(last_update_tuple)
Expand All @@ -285,6 +289,7 @@ def __call__(self):
self._parameters['dryrun'])
print_task = {'message': message, 'error': False}
self._result_queue.put(print_task)
self._io_queue.put(IOCloseRequest(self._filename.dest))


class DownloadPartTask(object):
Expand All @@ -302,14 +307,14 @@ class DownloadPartTask(object):
TOTAL_ATTEMPTS = 5

def __init__(self, part_number, chunk_size, result_queue, service,
filename, context, open=open):
filename, context, io_queue):
self._part_number = part_number
self._chunk_size = chunk_size
self._result_queue = result_queue
self._filename = filename
self._service = filename.service
self._context = context
self._open = open
self._io_queue = io_queue

def __call__(self):
try:
Expand Down Expand Up @@ -337,12 +342,12 @@ def _download_part(self):
for i in range(self.TOTAL_ATTEMPTS):
try:
LOGGER.debug("Making GetObject requests with byte range: %s",
range_param)
range_param)
response_data, http = operate(self._service, 'GetObject',
params)
params)
LOGGER.debug("Response received from GetObject")
body = response_data['Body']
self._write_to_file(body)
self._queue_writes(body)
self._context.announce_completed_part(self._part_number)

message = print_operation(self._filename, 0)
Expand All @@ -363,19 +368,21 @@ def _download_part(self):
raise RetriesExeededError("Maximum number of attempts exceeded: %s" %
self.TOTAL_ATTEMPTS)

def _write_to_file(self, body):
def _queue_writes(self, body):
self._context.wait_for_file_created()
LOGGER.debug("Writing part number %s to file: %s",
self._part_number, self._filename.dest)
iterate_chunk_size = self.ITERATE_CHUNK_SIZE
body.set_socket_timeout(self.READ_TIMEOUT)
with self._open(self._filename.dest, 'rb+') as f:
f.seek(self._part_number * self._chunk_size)
amount_read = 0
current = body.read(iterate_chunk_size)
while current:
offset = self._part_number * self._chunk_size + amount_read
self._io_queue.put(IORequest(self._filename.dest, offset, current))
amount_read += len(current)
current = body.read(iterate_chunk_size)
while current:
f.write(current)
current = body.read(iterate_chunk_size)
LOGGER.debug("Done writing part number %s to file: %s",
# Change log message.
LOGGER.debug("Done queueing writes for part number %s to file: %s",
self._part_number, self._filename.dest)


Expand Down Expand Up @@ -455,7 +462,7 @@ def __call__(self):
}
else:
LOGGER.debug("Multipart upload completed for: %s",
self.filename.src)
self.filename.src)
message = print_operation(self.filename, False,
self.parameters['dryrun'])
result = {'message': message, 'error': False}
Expand Down Expand Up @@ -617,7 +624,7 @@ class MultipartDownloadContext(object):
'UNSTARTED': 'UNSTARTED',
'STARTED': 'STARTED',
'COMPLETED': 'COMPLETED',
'CANCELLED':'CANCELLED'
'CANCELLED': 'CANCELLED'
}

def __init__(self, num_parts, lock=None):
Expand Down Expand Up @@ -647,14 +654,16 @@ def wait_for_file_created(self):
with self._created_condition:
while not self._state == self._STATES['STARTED']:
if self._state == self._STATES['CANCELLED']:
raise DownloadCancelledError("Download has been cancelled.")
raise DownloadCancelledError(
"Download has been cancelled.")
self._created_condition.wait(timeout=1)

def wait_for_completion(self):
with self._completed_condition:
while not self._state == self._STATES['COMPLETED']:
if self._state == self._STATES['CANCELLED']:
raise DownloadCancelledError("Download has been cancelled.")
raise DownloadCancelledError(
"Download has been cancelled.")
self._completed_condition.wait(timeout=1)

def cancel(self):
Expand Down
7 changes: 7 additions & 0 deletions awscli/customizations/s3/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import math
import os
import sys
from collections import namedtuple
from functools import partial

from six import PY3
Expand Down Expand Up @@ -260,3 +261,9 @@ def __iter__(self):
# already exhausted the stream so iterating over the file immediately
# steps, which is what we're simulating here.
return iter([])


IORequest = namedtuple('IORequest', ['filename', 'offset', 'data'])
# Used to signal that IO for the filename is finished, and that
# any associated resources may be cleaned up.
IOCloseRequest = namedtuple('IOCloseRequest', ['filename'])
Loading