diff --git a/awscli/customizations/s3/executor.py b/awscli/customizations/s3/executor.py index 60a08818a972..644944f1c2ec 100644 --- a/awscli/customizations/s3/executor.py +++ b/awscli/customizations/s3/executor.py @@ -31,17 +31,19 @@ class Executor(object): """ def __init__(self, done, num_threads, result_queue, quiet, interrupt, max_queue_size, write_queue): - self.queue = None + self.interrupt = interrupt + self._max_queue_size = max_queue_size + self.queue = NoBlockQueue(self.interrupt, maxsize=self._max_queue_size) self.done = done self.num_threads = num_threads self.result_queue = result_queue self.quiet = quiet - self.interrupt = interrupt self.threads_list = [] - self._max_queue_size = max_queue_size self.write_queue = write_queue - self.print_thread = None - self.io_thread = None + self.print_thread = PrintThread(self.result_queue, self.done, + self.quiet, self.interrupt) + self.print_thread.daemon = True + self.io_thread = IOWriterThread(self.write_queue, self.done) @property def num_tasks_failed(self): @@ -51,13 +53,11 @@ def num_tasks_failed(self): return tasks_failed def start(self): - self.print_thread = PrintThread(self.result_queue, self.done, - self.quiet, self.interrupt) - self.print_thread.daemon = True - self.io_thread = IOWriterThread(self.write_queue, self.done) self.io_thread.start() - self.threads_list.append(self.io_thread) - self.queue = NoBlockQueue(self.interrupt, maxsize=self._max_queue_size) + # Note that we're *not* adding the IO thread to the threads_list. + # There's a specific shutdown order we need and we're going to be + # explicit about it rather than relying on the threads_list order. + # See .join() for more info. self.threads_list.append(self.print_thread) self.print_thread.start() for i in range(self.num_threads): @@ -84,13 +84,22 @@ def join(self): """ This is used to clean up the ``Executor``. """ - self.write_queue.put(QUEUE_END_SENTINEL) + LOGGER.debug("Queueing end sentinel for result thread.") self.result_queue.put(QUEUE_END_SENTINEL) for i in range(self.num_threads): + LOGGER.debug("Queueing end sentinel for worker thread.") self.queue.put(QUEUE_END_SENTINEL) for thread in self.threads_list: + LOGGER.debug("Waiting for thread to shutdown: %s", thread) thread.join() + LOGGER.debug("Thread has been shutdown: %s", thread) + + LOGGER.debug("Queueing end sentinel for IO thread.") + self.write_queue.put(QUEUE_END_SENTINEL) + LOGGER.debug("Waiting for IO thread to shutdown.") + self.io_thread.join() + LOGGER.debug("All threads have been shutdown.") class IOWriterThread(threading.Thread): @@ -148,9 +157,12 @@ def run(self): try: function = self.queue.get(True) if function is QUEUE_END_SENTINEL: + LOGGER.debug("End sentinel received in worker thread, " + "shutting down worker thread.") self.queue.task_done() break try: + LOGGER.debug("Worker thread invoking task: %s", function) function() except Exception as e: LOGGER.debug('Error calling task: %s', e, exc_info=True) diff --git a/awscli/customizations/s3/s3handler.py b/awscli/customizations/s3/s3handler.py index 390b69da6aa8..62639966f785 100644 --- a/awscli/customizations/s3/s3handler.py +++ b/awscli/customizations/s3/s3handler.py @@ -91,7 +91,7 @@ def call(self, files): except KeyboardInterrupt: self.interrupt.set() self.result_queue.put({'message': "Cleaning up. Please wait...", - 'error': False}) + 'error': True}) self._shutdown() return self.executor.num_tasks_failed diff --git a/awscli/customizations/s3/tasks.py b/awscli/customizations/s3/tasks.py index 4099ff35dcf2..83d354d3b689 100644 --- a/awscli/customizations/s3/tasks.py +++ b/awscli/customizations/s3/tasks.py @@ -355,6 +355,7 @@ def _download_part(self): result = {'message': message, 'error': False, 'total_parts': total_parts} self._result_queue.put(result) + LOGGER.debug("Task complete: %s", self) return except (socket.timeout, socket.error) as e: LOGGER.debug("Socket timeout caught, retrying request, " @@ -378,7 +379,9 @@ def _queue_writes(self, body): current = body.read(iterate_chunk_size) while current: offset = self._part_number * self._chunk_size + amount_read + LOGGER.debug("Submitting IORequest to write queue.") self._io_queue.put(IORequest(self._filename.dest, offset, current)) + LOGGER.debug("Request successfully submitted.") amount_read += len(current) current = body.read(iterate_chunk_size) # Change log message. diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py index fd2170a08227..db4027e8a4db 100644 --- a/tests/integration/__init__.py +++ b/tests/integration/__init__.py @@ -55,7 +55,8 @@ def _escape_quotes(command): return command -def aws(command, collect_memory=False, env_vars=None): +def aws(command, collect_memory=False, env_vars=None, + wait_for_finish=True): """Run an aws command. This help function abstracts the differences of running the "aws" @@ -68,6 +69,11 @@ def aws(command, collect_memory=False, env_vars=None): If env_vars is None, this will set the environment variables to be used by the aws process. + If wait_for_finish is False, then the Process object is returned + to the caller. It is then the caller's responsibility to ensure + proper cleanup. This can be useful if you want to test timeout's + or how the CLI responds to various signals. + """ if platform.system() == 'Windows': command = _escape_quotes(command) @@ -83,6 +89,8 @@ def aws(command, collect_memory=False, env_vars=None): env = env_vars process = Popen(full_command, stdout=PIPE, stderr=PIPE, shell=True, env=env) + if not wait_for_finish: + return process memory = None if not collect_memory: stdout, stderr = process.communicate() diff --git a/tests/integration/customizations/s3/test_plugin.py b/tests/integration/customizations/s3/test_plugin.py index e953ae61700a..d8a01b1f44f7 100644 --- a/tests/integration/customizations/s3/test_plugin.py +++ b/tests/integration/customizations/s3/test_plugin.py @@ -19,11 +19,10 @@ from tests import unittest import os import random -import tempfile -import shutil import platform import contextlib import time +import signal import botocore.session @@ -327,6 +326,28 @@ def test_download_large_file(self): self.assert_no_errors(p) self.assertEqual(os.path.getsize(local_foo_txt), len(foo_contents)) + def test_download_ctrl_c_does_not_hang(self): + bucket_name = self.create_bucket() + foo_contents = 'abcd' * (1024 * 1024 * 20) + self.put_object(bucket_name, key_name='foo.txt', contents=foo_contents) + local_foo_txt = self.files.full_path('foo.txt') + process = aws('s3 cp s3://%s/foo.txt %s' % (bucket_name, local_foo_txt), wait_for_finish=False) + # Give it some time to start up and enter it's main task loop. + time.sleep(2) + # The process has 30 seconds to finish after being sent a Ctrl+C, + # otherwise the test fails. + process.send_signal(signal.SIGINT) + deadline = time.time() + 30 + while time.time() < deadline: + rc = process.poll() + if rc is not None: + break + else: + process.kill() + self.fail("CLI did not exist within 30 seconds of receiving a Ctrl+C") + # A Ctrl+C should have a non-zero RC. + self.assertEqual(process.returncode, 1) + def test_cp_to_nonexistent_bucket(self): foo_txt = self.files.create_file('foo.txt', 'this is foo.txt') p = aws('s3 cp %s s3://noexist-bucket-foo-bar123/foo.txt' % (foo_txt,)) @@ -376,16 +397,16 @@ def test_sync_to_from_s3(self): self.assertEqual(f.read(), 'bar contents') def test_sync_to_nonexistent_bucket(self): - foo_txt = self.files.create_file('foo.txt', 'foo contents') - bar_txt = self.files.create_file('bar.txt', 'bar contents') + self.files.create_file('foo.txt', 'foo contents') + self.files.create_file('bar.txt', 'bar contents') # Sync the directory and the bucket. p = aws('s3 sync %s s3://noexist-bkt-nme-1412' % (self.files.rootdir,)) self.assertEqual(p.rc, 1) def test_sync_with_empty_files(self): - foo_txt = self.files.create_file('foo.txt', 'foo contents') - empty_txt = self.files.create_file('bar.txt', contents='') + self.files.create_file('foo.txt', 'foo contents') + self.files.create_file('bar.txt', contents='') bucket_name = self.create_bucket() p = aws('s3 sync %s s3://%s/' % (self.files.rootdir, bucket_name)) self.assertEqual(p.rc, 0) diff --git a/tests/integration/customizations/s3/test_s3handler.py b/tests/integration/customizations/s3/test_s3handler.py index cd22f240a706..f1f2f8840ceb 100644 --- a/tests/integration/customizations/s3/test_s3handler.py +++ b/tests/integration/customizations/s3/test_s3handler.py @@ -369,8 +369,8 @@ def setUp(self): self.session = botocore.session.get_session(EnvironmentVariables) self.service = self.session.get_service('s3') self.endpoint = self.service.get_endpoint('us-east-1') - params = {'region': 'us-east-1'} - self.s3_handler = S3Handler(self.session, params) + self.params = {'region': 'us-east-1'} + self.s3_handler = S3Handler(self.session, self.params) self.bucket = None def tearDown(self): @@ -386,7 +386,7 @@ def test_bucket(self): service=self.service, endpoint=self.endpoint, ) - self.s3_handler.call([file_info]) + S3Handler(self.session, self.params).call([file_info]) buckets_list = [] for bucket in list_buckets(self.session): buckets_list.append(bucket['Name']) @@ -395,7 +395,7 @@ def test_bucket(self): file_info = FileInfo( src=self.bucket, operation_name='remove_bucket', size=0, service=self.service, endpoint=self.endpoint) - self.s3_handler.call([file_info]) + S3Handler(self.session, self.params).call([file_info]) buckets_list = [] for bucket in list_buckets(self.session): buckets_list.append(bucket['Name']) diff --git a/tests/unit/customizations/s3/test_executor.py b/tests/unit/customizations/s3/test_executor.py index 3cb78ea45c04..ef1db928e5fe 100644 --- a/tests/unit/customizations/s3/test_executor.py +++ b/tests/unit/customizations/s3/test_executor.py @@ -10,7 +10,7 @@ # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. -from tests import unittest +from tests import unittest, temporary_file import os import tempfile import shutil @@ -19,6 +19,7 @@ from awscli.customizations.s3.executor import IOWriterThread from awscli.customizations.s3.executor import QUEUE_END_SENTINEL +from awscli.customizations.s3.executor import Executor from awscli.customizations.s3.utils import IORequest, IOCloseRequest @@ -67,3 +68,18 @@ def test_multiple_files_in_queue(self): self.assertEqual(f.read(), b'foobar') with open(second_file, 'rb') as f: self.assertEqual(f.read(), b'otherstuff') + + +class TestExecutor(unittest.TestCase): + def test_shutdown_does_not_hang(self): + executor = Executor(mock.Mock(), 2, queue.Queue(), False, mock.Mock(), + 10, queue.Queue(maxsize=1)) + with temporary_file('rb+') as f: + executor.start() + class FloodIOQueueTask(object): + def __call__(self): + for i in range(50): + executor.write_queue.put(IORequest(f.name, 0, b'foobar')) + executor.submit(FloodIOQueueTask()) + executor.join() + self.assertEqual(open(f.name, 'rb').read(), b'foobar') diff --git a/tests/unit/customizations/s3/test_s3handler.py b/tests/unit/customizations/s3/test_s3handler.py index 38e9a3f4762c..fcf5425d17df 100644 --- a/tests/unit/customizations/s3/test_s3handler.py +++ b/tests/unit/customizations/s3/test_s3handler.py @@ -578,8 +578,7 @@ def setUp(self): self.session = FakeSession() self.service = self.session.get_service('s3') self.endpoint = self.service.get_endpoint('us-east-1') - params = {'region': 'us-east-1'} - self.s3_handler = S3Handler(self.session, params) + self.params = {'region': 'us-east-1'} self.bucket = None def tearDown(self): @@ -598,7 +597,7 @@ def test_bucket(self): size=0, service=self.service, endpoint=self.endpoint) - self.s3_handler.call([file_info]) + S3Handler(self.session, self.params).call([file_info]) number_buckets = len(list_buckets(self.session)) self.assertEqual(orig_number_buckets + 1, number_buckets) @@ -608,7 +607,7 @@ def test_bucket(self): size=0, service=self.service, endpoint=self.endpoint) - self.s3_handler.call([file_info]) + S3Handler(self.session, self.params).call([file_info]) number_buckets = len(list_buckets(self.session)) self.assertEqual(orig_number_buckets, number_buckets)