Skip to content

Commit

Permalink
Added the ability to stream data using cp.
Browse files Browse the repository at this point in the history
This feature enables users to stream from stdin to s3 or from s3 to stdout.
Streaming large files is both multithreaded and uses multipart transfers.
The streaming feature is limited to single file ``cp`` commands.
  • Loading branch information
kyleknap committed Sep 2, 2014
1 parent 4e093bd commit abce027
Show file tree
Hide file tree
Showing 22 changed files with 871 additions and 143 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ CHANGELOG
Next Release (TBD)
==================

* feature:``aws s3 cp``: Added ability to upload local
file streams from standard input to s3 and download s3
objects as local file streams to standard output.
* feature:Page Size: Add a ``--page-size`` option, that
controls page size when perfoming an operation that
uses pagination.
Expand Down
1 change: 1 addition & 0 deletions awscli/customizations/s3/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@
MAX_SINGLE_UPLOAD_SIZE = 5 * (1024 ** 3)
MAX_UPLOAD_SIZE = 5 * (1024 ** 4)
MAX_QUEUE_SIZE = 1000
STREAM_INPUT_TIMEOUT = 0.1
36 changes: 20 additions & 16 deletions awscli/customizations/s3/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
import sys
import threading

from awscli.customizations.s3.utils import uni_print, \
IORequest, IOCloseRequest, StablePriorityQueue
from awscli.customizations.s3.utils import uni_print, bytes_print, \
IORequest, IOCloseRequest, StablePriorityQueue
from awscli.customizations.s3.tasks import OrderableTask


Expand Down Expand Up @@ -50,8 +50,7 @@ def __init__(self, num_threads, result_queue,
self.quiet = quiet
self.threads_list = []
self.write_queue = write_queue
self.print_thread = PrintThread(self.result_queue,
self.quiet)
self.print_thread = PrintThread(self.result_queue, self.quiet)
self.print_thread.daemon = True
self.io_thread = IOWriterThread(self.write_queue)

Expand Down Expand Up @@ -153,23 +152,28 @@ def run(self):
self._cleanup()
return
elif isinstance(task, IORequest):
filename, offset, data = task
fileobj = self.fd_descriptor_cache.get(filename)
if fileobj is None:
fileobj = open(filename, 'rb+')
self.fd_descriptor_cache[filename] = fileobj
fileobj.seek(offset)
filename, offset, data, is_stream = task
if is_stream:
fileobj = sys.stdout
bytes_print(data)
else:
fileobj = self.fd_descriptor_cache.get(filename)
if fileobj is None:
fileobj = open(filename, 'rb+')
self.fd_descriptor_cache[filename] = fileobj
fileobj.seek(offset)
fileobj.write(data)
LOGGER.debug("Writing data to: %s, offset: %s",
filename, offset)
fileobj.write(data)
fileobj.flush()
elif isinstance(task, IOCloseRequest):
LOGGER.debug("IOCloseRequest received for %s, closing file.",
task.filename)
fileobj = self.fd_descriptor_cache.get(task.filename)
if fileobj is not None:
fileobj.close()
del self.fd_descriptor_cache[task.filename]
if not task.is_stream:
fileobj = self.fd_descriptor_cache.get(task.filename)
if fileobj is not None:
fileobj.close()
del self.fd_descriptor_cache[task.filename]

def _cleanup(self):
for fileobj in self.fd_descriptor_cache.values():
Expand Down Expand Up @@ -237,7 +241,7 @@ def __init__(self, result_queue, quiet):
self._lock = threading.Lock()
self._needs_newline = False

self._total_parts = 0
self._total_parts = '...'
self._total_files = '...'

# This is a public attribute that clients can inspect to determine
Expand Down
23 changes: 19 additions & 4 deletions awscli/customizations/s3/filegenerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def __init__(self, directory, filename):
class FileStat(object):
def __init__(self, src, dest=None, compare_key=None, size=None,
last_update=None, src_type=None, dest_type=None,
operation_name=None):
operation_name=None, is_stream=False):
self.src = src
self.dest = dest
self.compare_key = compare_key
Expand All @@ -104,6 +104,7 @@ def __init__(self, src, dest=None, compare_key=None, size=None,
self.src_type = src_type
self.dest_type = dest_type
self.operation_name = operation_name
self.is_stream = is_stream


class FileGenerator(object):
Expand All @@ -115,7 +116,8 @@ class FileGenerator(object):
``FileInfo`` objects to send to a ``Comparator`` or ``S3Handler``.
"""
def __init__(self, service, endpoint, operation_name,
follow_symlinks=True, page_size=None, result_queue=None):
follow_symlinks=True, page_size=None, result_queue=None,
is_stream=False):
self._service = service
self._endpoint = endpoint
self.operation_name = operation_name
Expand All @@ -124,6 +126,7 @@ def __init__(self, service, endpoint, operation_name,
self.result_queue = result_queue
if not result_queue:
self.result_queue = queue.Queue()
self.is_stream = is_stream

def call(self, files):
"""
Expand All @@ -135,7 +138,11 @@ def call(self, files):
dest = files['dest']
src_type = src['type']
dest_type = dest['type']
function_table = {'s3': self.list_objects, 'local': self.list_files}
function_table = {'s3': self.list_objects}
if self.is_stream:
function_table['local'] = self.list_local_file_stream
else:
function_table['local'] = self.list_files
sep_table = {'s3': '/', 'local': os.sep}
source = src['path']
file_list = function_table[src_type](source, files['dir_op'])
Expand All @@ -155,7 +162,15 @@ def call(self, files):
compare_key=compare_key, size=size,
last_update=last_update, src_type=src_type,
dest_type=dest_type,
operation_name=self.operation_name)
operation_name=self.operation_name,
is_stream=self.is_stream)

def list_local_file_stream(self, path, dir_op):
"""
Yield some dummy values for a local file stream since it does not
actually have a file.
"""
yield '-', 0, None

def list_files(self, path, dir_op):
"""
Expand Down
110 changes: 72 additions & 38 deletions awscli/customizations/s3/fileinfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from botocore.compat import quote
from awscli.customizations.s3.utils import find_bucket_key, \
check_etag, check_error, operate, uni_print, \
guess_content_type, MD5Error
guess_content_type, MD5Error, bytes_print


class CreateDirectoryError(Exception):
Expand All @@ -26,7 +26,7 @@ def read_file(filename):
return in_file.read()


def save_file(filename, response_data, last_update):
def save_file(filename, response_data, last_update, is_stream=False):
"""
This writes to the file upon downloading. It reads the data in the
response. Makes a new directory if needed and then writes the
Expand All @@ -35,31 +35,57 @@ def save_file(filename, response_data, last_update):
"""
body = response_data['Body']
etag = response_data['ETag'][1:-1]
d = os.path.dirname(filename)
try:
if not os.path.exists(d):
os.makedirs(d)
except OSError as e:
if not e.errno == errno.EEXIST:
raise CreateDirectoryError(
"Could not create directory %s: %s" % (d, e))
if not is_stream:
d = os.path.dirname(filename)
try:
if not os.path.exists(d):
os.makedirs(d)
except OSError as e:
if not e.errno == errno.EEXIST:
raise CreateDirectoryError(
"Could not create directory %s: %s" % (d, e))
md5 = hashlib.md5()
file_chunks = iter(partial(body.read, 1024 * 1024), b'')
with open(filename, 'wb') as out_file:
if not _is_multipart_etag(etag):
for chunk in file_chunks:
md5.update(chunk)
out_file.write(chunk)
else:
for chunk in file_chunks:
out_file.write(chunk)
if is_stream:
# Need to save the data to be able to check the etag for a stream
# becuase once the data is written to the stream there is no
# undoing it.
payload = write_to_file(None, etag, md5, file_chunks, True)
else:
with open(filename, 'wb') as out_file:
write_to_file(out_file, etag, md5, file_chunks)

if not _is_multipart_etag(etag):
if etag != md5.hexdigest():
os.remove(filename)
if not is_stream:
os.remove(filename)
raise MD5Error(filename)
last_update_tuple = last_update.timetuple()
mod_timestamp = time.mktime(last_update_tuple)
os.utime(filename, (int(mod_timestamp), int(mod_timestamp)))

if not is_stream:
last_update_tuple = last_update.timetuple()
mod_timestamp = time.mktime(last_update_tuple)
os.utime(filename, (int(mod_timestamp), int(mod_timestamp)))
else:
# Now write the output to stdout since the md5 is correct.
bytes_print(payload)
sys.stdout.flush()


def write_to_file(out_file, etag, md5, file_chunks, is_stream=False):
"""
Updates the etag for each file chunk. It will write to the file if it a
file but if it is a stream it will return a byte string to be later
written to a stream.
"""
body = b''
for chunk in file_chunks:
if not _is_multipart_etag(etag):
md5.update(chunk)
if is_stream:
body += chunk
else:
out_file.write(chunk)
return body


def _is_multipart_etag(etag):
Expand Down Expand Up @@ -140,7 +166,7 @@ class FileInfo(TaskInfo):
def __init__(self, src, dest=None, compare_key=None, size=None,
last_update=None, src_type=None, dest_type=None,
operation_name=None, service=None, endpoint=None,
parameters=None, source_endpoint=None):
parameters=None, source_endpoint=None, is_stream=False):
super(FileInfo, self).__init__(src, src_type=src_type,
operation_name=operation_name,
service=service,
Expand All @@ -157,6 +183,7 @@ def __init__(self, src, dest=None, compare_key=None, size=None,
self.parameters = {'acl': None,
'sse': None}
self.source_endpoint = source_endpoint
self.is_stream = is_stream

def _permission_to_param(self, permission):
if permission == 'read':
Expand Down Expand Up @@ -204,24 +231,30 @@ def _handle_object_params(self, params):
if self.parameters['expires']:
params['expires'] = self.parameters['expires'][0]

def upload(self):
def upload(self, payload=None):
"""
Redirects the file to the multipart upload function if the file is
large. If it is small enough, it puts the file as an object in s3.
"""
with open(self.src, 'rb') as body:
bucket, key = find_bucket_key(self.dest)
params = {
'endpoint': self.endpoint,
'bucket': bucket,
'key': key,
'body': body,
}
self._handle_object_params(params)
response_data, http = operate(self.service, 'PutObject', params)
etag = response_data['ETag'][1:-1]
body.seek(0)
check_etag(etag, body)
if payload:
self._handle_upload(payload)
else:
with open(self.src, 'rb') as body:
self._handle_upload(body)

def _handle_upload(self, body):
bucket, key = find_bucket_key(self.dest)
params = {
'endpoint': self.endpoint,
'bucket': bucket,
'key': key,
'body': body,
}
self._handle_object_params(params)
response_data, http = operate(self.service, 'PutObject', params)
etag = response_data['ETag'][1:-1]
body.seek(0)
check_etag(etag, body)

def _inject_content_type(self, params, filename):
# Add a content type param if we can guess the type.
Expand All @@ -237,7 +270,8 @@ def download(self):
bucket, key = find_bucket_key(self.src)
params = {'endpoint': self.endpoint, 'bucket': bucket, 'key': key}
response_data, http = operate(self.service, 'GetObject', params)
save_file(self.dest, response_data, self.last_update)
save_file(self.dest, response_data, self.last_update,
self.is_stream)

def copy(self):
"""
Expand Down
1 change: 1 addition & 0 deletions awscli/customizations/s3/fileinfobuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def _inject_info(self, file_base):
file_info_attr['src_type'] = file_base.src_type
file_info_attr['dest_type'] = file_base.dest_type
file_info_attr['operation_name'] = file_base.operation_name
file_info_attr['is_stream'] = file_base.is_stream
file_info_attr['service'] = self._service
file_info_attr['endpoint'] = self._endpoint
file_info_attr['source_endpoint'] = self._source_endpoint
Expand Down
Loading

0 comments on commit abce027

Please sign in to comment.