From ed9000bd80dfdc294aaab63ff65069b50180e7a4 Mon Sep 17 00:00:00 2001 From: James Saryerwinnie Date: Mon, 12 Jan 2015 12:14:26 -0800 Subject: [PATCH 1/2] Fix S3 streaming download corruption --- awscli/customizations/s3/tasks.py | 27 ++++++++++++++++++---- tests/unit/customizations/s3/test_tasks.py | 25 ++++++++++++++++++++ 2 files changed, 48 insertions(+), 4 deletions(-) diff --git a/awscli/customizations/s3/tasks.py b/awscli/customizations/s3/tasks.py index 525965c70060..7ea7527b7ad8 100644 --- a/awscli/customizations/s3/tasks.py +++ b/awscli/customizations/s3/tasks.py @@ -401,10 +401,31 @@ def _queue_writes(self, body): self._part_number, self._filename.dest) iterate_chunk_size = self.ITERATE_CHUNK_SIZE body.set_socket_timeout(self.READ_TIMEOUT) + if self._filename.is_stream: + self._queue_writes_for_stream(body) + else: + self._queue_writes_in_chunks(body, iterate_chunk_size) + + def _queue_writes_for_stream(self, body): + # We have to handle an output stream differently. The main reason is + # that we cannot seek() in the output stream. This means that we need + # to queue the writes in order. If we queue IO writes in smaller than + # part size chunks, on the case of a retry we'll need to do a range GET + # for only the remaining parts. The other alternative, which is what + # we do here, is to just request the entire chunk size write. + self._context.wait_for_turn(self._part_number) + chunk = body.read(self._chunk_size) + offset = self._part_number * self._chunk_size + LOGGER.debug("Submitting IORequest to write queue.") + self._io_queue.put( + IORequest(self._filename.dest, offset, chunk, + self._filename.is_stream) + ) + self._context.done_with_turn() + + def _queue_writes_in_chunks(self, body, iterate_chunk_size): amount_read = 0 current = body.read(iterate_chunk_size) - if self._filename.is_stream: - self._context.wait_for_turn(self._part_number) while current: offset = self._part_number * self._chunk_size + amount_read LOGGER.debug("Submitting IORequest to write queue.") @@ -418,8 +439,6 @@ def _queue_writes(self, body): # Change log message. LOGGER.debug("Done queueing writes for part number %s to file: %s", self._part_number, self._filename.dest) - if self._filename.is_stream: - self._context.done_with_turn() class CreateMultipartUploadTask(BasicTask): diff --git a/tests/unit/customizations/s3/test_tasks.py b/tests/unit/customizations/s3/test_tasks.py index eda16f765778..9e65ce8ddba0 100644 --- a/tests/unit/customizations/s3/test_tasks.py +++ b/tests/unit/customizations/s3/test_tasks.py @@ -18,6 +18,7 @@ from botocore.exceptions import IncompleteReadError +from awscli.customizations.s3 import constants from awscli.customizations.s3.tasks import CreateLocalFileTask from awscli.customizations.s3.tasks import CompleteDownloadTask from awscli.customizations.s3.tasks import DownloadPartTask @@ -395,6 +396,30 @@ def test_incomplete_read_is_retried(self): self.assertEqual(DownloadPartTask.TOTAL_ATTEMPTS, self.service.get_operation.call_count) + def test_retried_requests_dont_enqueue_writes_twice(self): + error_body = mock.Mock() + error_body.read.side_effect = socket.timeout + success_body = mock.Mock() + success_body.read.side_effect = [b'foobar', b''] + + incomplete_read = (mock.Mock(), {'Body': error_body}) + success_read = (mock.Mock(), {'Body': success_body}) + self.service.get_operation.return_value.call.side_effect = [ + # The first request results in an error when reading the request. + incomplete_read, + success_read, + ] + self.filename.is_stream = True + task = DownloadPartTask(0, constants.CHUNKSIZE, self.result_queue, + self.service, self.filename, self.context, + self.io_queue) + task() + call_args_list = self.io_queue.put.call_args_list + self.assertEqual(len(call_args_list), 1) + self.assertEqual(call_args_list[0], + mock.call(('local/file', 0, b'foobar', True))) + success_body.read.assert_called_with(constants.CHUNKSIZE) + class TestMultipartDownloadContext(unittest.TestCase): def setUp(self): From 9dabc6572dcbfcfe4a9e1424bd646688988af765 Mon Sep 17 00:00:00 2001 From: James Saryerwinnie Date: Mon, 12 Jan 2015 14:19:30 -0800 Subject: [PATCH 2/2] Account for last dl part task being greater than chunk size --- awscli/customizations/s3/tasks.py | 2 +- tests/unit/customizations/s3/test_tasks.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/awscli/customizations/s3/tasks.py b/awscli/customizations/s3/tasks.py index 7ea7527b7ad8..7d1c09eb98d5 100644 --- a/awscli/customizations/s3/tasks.py +++ b/awscli/customizations/s3/tasks.py @@ -414,7 +414,7 @@ def _queue_writes_for_stream(self, body): # for only the remaining parts. The other alternative, which is what # we do here, is to just request the entire chunk size write. self._context.wait_for_turn(self._part_number) - chunk = body.read(self._chunk_size) + chunk = body.read() offset = self._part_number * self._chunk_size LOGGER.debug("Submitting IORequest to write queue.") self._io_queue.put( diff --git a/tests/unit/customizations/s3/test_tasks.py b/tests/unit/customizations/s3/test_tasks.py index 9e65ce8ddba0..96f65dd63fa8 100644 --- a/tests/unit/customizations/s3/test_tasks.py +++ b/tests/unit/customizations/s3/test_tasks.py @@ -418,7 +418,7 @@ def test_retried_requests_dont_enqueue_writes_twice(self): self.assertEqual(len(call_args_list), 1) self.assertEqual(call_args_list[0], mock.call(('local/file', 0, b'foobar', True))) - success_body.read.assert_called_with(constants.CHUNKSIZE) + success_body.read.assert_called_with() class TestMultipartDownloadContext(unittest.TestCase):