From 4263d0d272ca90e658281fa00ae10c1d2a2e5f42 Mon Sep 17 00:00:00 2001 From: James Saryerwinnie Date: Mon, 24 Feb 2014 09:34:07 -0800 Subject: [PATCH] Fix hanging Ctrl+C on S3 downloads This is a regression that was introduced when the IO queue/thread was introduced (https://github.com/aws/aws-cli/issues/619). The current shutdown order of the threads is: 1. Queue end sentinel for IO thread. 2. Queue end sentinel for result thread. 3. Queue end sentinel for worker threads. 4. .join() threads in this order: [io_thread, result_thread [, worker threads]] Though the actual thread shutdown order is non-deterministic, it's fairly common that the threads shutdown in roughly the above order. This means that the IO thread will generally shutdown before all the worker threads have shutdown. However, the download tasks can still be enqueueing writes to the IO queue. If the IO thread shutsdown there's nothing consuming writes on the other end of the queue. Given that the queue is bounded in maxsize, .put() calls to the queue will block until space becomes available. This will never happen if the IO queue is already shutdown. The fix here is to ensure that the IO thread is always the last thing to shutdown. This means any remaining IO requests will be executed before the IO thread shutsdown. This will prevent deadlock. Added unit tests demonstrates this issue. I've also added an integration test that actually sends a SIGINT to the process and verifies it exits in a timely manner so can ensure we don't regress on this again. Note: some unit/integ tests needed updating because they were using .call() multiple times. Fixes #650 Fixes #657 --- awscli/customizations/s3/executor.py | 36 ++++++++++++------- awscli/customizations/s3/s3handler.py | 2 +- awscli/customizations/s3/tasks.py | 3 ++ tests/integration/__init__.py | 10 +++++- .../customizations/s3/test_plugin.py | 33 +++++++++++++---- .../customizations/s3/test_s3handler.py | 8 ++--- tests/unit/customizations/s3/test_executor.py | 18 +++++++++- .../unit/customizations/s3/test_s3handler.py | 7 ++-- 8 files changed, 88 insertions(+), 29 deletions(-) diff --git a/awscli/customizations/s3/executor.py b/awscli/customizations/s3/executor.py index 60a08818a972..644944f1c2ec 100644 --- a/awscli/customizations/s3/executor.py +++ b/awscli/customizations/s3/executor.py @@ -31,17 +31,19 @@ class Executor(object): """ def __init__(self, done, num_threads, result_queue, quiet, interrupt, max_queue_size, write_queue): - self.queue = None + self.interrupt = interrupt + self._max_queue_size = max_queue_size + self.queue = NoBlockQueue(self.interrupt, maxsize=self._max_queue_size) self.done = done self.num_threads = num_threads self.result_queue = result_queue self.quiet = quiet - self.interrupt = interrupt self.threads_list = [] - self._max_queue_size = max_queue_size self.write_queue = write_queue - self.print_thread = None - self.io_thread = None + self.print_thread = PrintThread(self.result_queue, self.done, + self.quiet, self.interrupt) + self.print_thread.daemon = True + self.io_thread = IOWriterThread(self.write_queue, self.done) @property def num_tasks_failed(self): @@ -51,13 +53,11 @@ def num_tasks_failed(self): return tasks_failed def start(self): - self.print_thread = PrintThread(self.result_queue, self.done, - self.quiet, self.interrupt) - self.print_thread.daemon = True - self.io_thread = IOWriterThread(self.write_queue, self.done) self.io_thread.start() - self.threads_list.append(self.io_thread) - self.queue = NoBlockQueue(self.interrupt, maxsize=self._max_queue_size) + # Note that we're *not* adding the IO thread to the threads_list. + # There's a specific shutdown order we need and we're going to be + # explicit about it rather than relying on the threads_list order. + # See .join() for more info. self.threads_list.append(self.print_thread) self.print_thread.start() for i in range(self.num_threads): @@ -84,13 +84,22 @@ def join(self): """ This is used to clean up the ``Executor``. """ - self.write_queue.put(QUEUE_END_SENTINEL) + LOGGER.debug("Queueing end sentinel for result thread.") self.result_queue.put(QUEUE_END_SENTINEL) for i in range(self.num_threads): + LOGGER.debug("Queueing end sentinel for worker thread.") self.queue.put(QUEUE_END_SENTINEL) for thread in self.threads_list: + LOGGER.debug("Waiting for thread to shutdown: %s", thread) thread.join() + LOGGER.debug("Thread has been shutdown: %s", thread) + + LOGGER.debug("Queueing end sentinel for IO thread.") + self.write_queue.put(QUEUE_END_SENTINEL) + LOGGER.debug("Waiting for IO thread to shutdown.") + self.io_thread.join() + LOGGER.debug("All threads have been shutdown.") class IOWriterThread(threading.Thread): @@ -148,9 +157,12 @@ def run(self): try: function = self.queue.get(True) if function is QUEUE_END_SENTINEL: + LOGGER.debug("End sentinel received in worker thread, " + "shutting down worker thread.") self.queue.task_done() break try: + LOGGER.debug("Worker thread invoking task: %s", function) function() except Exception as e: LOGGER.debug('Error calling task: %s', e, exc_info=True) diff --git a/awscli/customizations/s3/s3handler.py b/awscli/customizations/s3/s3handler.py index 390b69da6aa8..62639966f785 100644 --- a/awscli/customizations/s3/s3handler.py +++ b/awscli/customizations/s3/s3handler.py @@ -91,7 +91,7 @@ def call(self, files): except KeyboardInterrupt: self.interrupt.set() self.result_queue.put({'message': "Cleaning up. Please wait...", - 'error': False}) + 'error': True}) self._shutdown() return self.executor.num_tasks_failed diff --git a/awscli/customizations/s3/tasks.py b/awscli/customizations/s3/tasks.py index 4099ff35dcf2..83d354d3b689 100644 --- a/awscli/customizations/s3/tasks.py +++ b/awscli/customizations/s3/tasks.py @@ -355,6 +355,7 @@ def _download_part(self): result = {'message': message, 'error': False, 'total_parts': total_parts} self._result_queue.put(result) + LOGGER.debug("Task complete: %s", self) return except (socket.timeout, socket.error) as e: LOGGER.debug("Socket timeout caught, retrying request, " @@ -378,7 +379,9 @@ def _queue_writes(self, body): current = body.read(iterate_chunk_size) while current: offset = self._part_number * self._chunk_size + amount_read + LOGGER.debug("Submitting IORequest to write queue.") self._io_queue.put(IORequest(self._filename.dest, offset, current)) + LOGGER.debug("Request successfully submitted.") amount_read += len(current) current = body.read(iterate_chunk_size) # Change log message. diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py index fd2170a08227..db4027e8a4db 100644 --- a/tests/integration/__init__.py +++ b/tests/integration/__init__.py @@ -55,7 +55,8 @@ def _escape_quotes(command): return command -def aws(command, collect_memory=False, env_vars=None): +def aws(command, collect_memory=False, env_vars=None, + wait_for_finish=True): """Run an aws command. This help function abstracts the differences of running the "aws" @@ -68,6 +69,11 @@ def aws(command, collect_memory=False, env_vars=None): If env_vars is None, this will set the environment variables to be used by the aws process. + If wait_for_finish is False, then the Process object is returned + to the caller. It is then the caller's responsibility to ensure + proper cleanup. This can be useful if you want to test timeout's + or how the CLI responds to various signals. + """ if platform.system() == 'Windows': command = _escape_quotes(command) @@ -83,6 +89,8 @@ def aws(command, collect_memory=False, env_vars=None): env = env_vars process = Popen(full_command, stdout=PIPE, stderr=PIPE, shell=True, env=env) + if not wait_for_finish: + return process memory = None if not collect_memory: stdout, stderr = process.communicate() diff --git a/tests/integration/customizations/s3/test_plugin.py b/tests/integration/customizations/s3/test_plugin.py index e953ae61700a..d8a01b1f44f7 100644 --- a/tests/integration/customizations/s3/test_plugin.py +++ b/tests/integration/customizations/s3/test_plugin.py @@ -19,11 +19,10 @@ from tests import unittest import os import random -import tempfile -import shutil import platform import contextlib import time +import signal import botocore.session @@ -327,6 +326,28 @@ def test_download_large_file(self): self.assert_no_errors(p) self.assertEqual(os.path.getsize(local_foo_txt), len(foo_contents)) + def test_download_ctrl_c_does_not_hang(self): + bucket_name = self.create_bucket() + foo_contents = 'abcd' * (1024 * 1024 * 20) + self.put_object(bucket_name, key_name='foo.txt', contents=foo_contents) + local_foo_txt = self.files.full_path('foo.txt') + process = aws('s3 cp s3://%s/foo.txt %s' % (bucket_name, local_foo_txt), wait_for_finish=False) + # Give it some time to start up and enter it's main task loop. + time.sleep(2) + # The process has 30 seconds to finish after being sent a Ctrl+C, + # otherwise the test fails. + process.send_signal(signal.SIGINT) + deadline = time.time() + 30 + while time.time() < deadline: + rc = process.poll() + if rc is not None: + break + else: + process.kill() + self.fail("CLI did not exist within 30 seconds of receiving a Ctrl+C") + # A Ctrl+C should have a non-zero RC. + self.assertEqual(process.returncode, 1) + def test_cp_to_nonexistent_bucket(self): foo_txt = self.files.create_file('foo.txt', 'this is foo.txt') p = aws('s3 cp %s s3://noexist-bucket-foo-bar123/foo.txt' % (foo_txt,)) @@ -376,16 +397,16 @@ def test_sync_to_from_s3(self): self.assertEqual(f.read(), 'bar contents') def test_sync_to_nonexistent_bucket(self): - foo_txt = self.files.create_file('foo.txt', 'foo contents') - bar_txt = self.files.create_file('bar.txt', 'bar contents') + self.files.create_file('foo.txt', 'foo contents') + self.files.create_file('bar.txt', 'bar contents') # Sync the directory and the bucket. p = aws('s3 sync %s s3://noexist-bkt-nme-1412' % (self.files.rootdir,)) self.assertEqual(p.rc, 1) def test_sync_with_empty_files(self): - foo_txt = self.files.create_file('foo.txt', 'foo contents') - empty_txt = self.files.create_file('bar.txt', contents='') + self.files.create_file('foo.txt', 'foo contents') + self.files.create_file('bar.txt', contents='') bucket_name = self.create_bucket() p = aws('s3 sync %s s3://%s/' % (self.files.rootdir, bucket_name)) self.assertEqual(p.rc, 0) diff --git a/tests/integration/customizations/s3/test_s3handler.py b/tests/integration/customizations/s3/test_s3handler.py index cd22f240a706..f1f2f8840ceb 100644 --- a/tests/integration/customizations/s3/test_s3handler.py +++ b/tests/integration/customizations/s3/test_s3handler.py @@ -369,8 +369,8 @@ def setUp(self): self.session = botocore.session.get_session(EnvironmentVariables) self.service = self.session.get_service('s3') self.endpoint = self.service.get_endpoint('us-east-1') - params = {'region': 'us-east-1'} - self.s3_handler = S3Handler(self.session, params) + self.params = {'region': 'us-east-1'} + self.s3_handler = S3Handler(self.session, self.params) self.bucket = None def tearDown(self): @@ -386,7 +386,7 @@ def test_bucket(self): service=self.service, endpoint=self.endpoint, ) - self.s3_handler.call([file_info]) + S3Handler(self.session, self.params).call([file_info]) buckets_list = [] for bucket in list_buckets(self.session): buckets_list.append(bucket['Name']) @@ -395,7 +395,7 @@ def test_bucket(self): file_info = FileInfo( src=self.bucket, operation_name='remove_bucket', size=0, service=self.service, endpoint=self.endpoint) - self.s3_handler.call([file_info]) + S3Handler(self.session, self.params).call([file_info]) buckets_list = [] for bucket in list_buckets(self.session): buckets_list.append(bucket['Name']) diff --git a/tests/unit/customizations/s3/test_executor.py b/tests/unit/customizations/s3/test_executor.py index 3cb78ea45c04..ef1db928e5fe 100644 --- a/tests/unit/customizations/s3/test_executor.py +++ b/tests/unit/customizations/s3/test_executor.py @@ -10,7 +10,7 @@ # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. -from tests import unittest +from tests import unittest, temporary_file import os import tempfile import shutil @@ -19,6 +19,7 @@ from awscli.customizations.s3.executor import IOWriterThread from awscli.customizations.s3.executor import QUEUE_END_SENTINEL +from awscli.customizations.s3.executor import Executor from awscli.customizations.s3.utils import IORequest, IOCloseRequest @@ -67,3 +68,18 @@ def test_multiple_files_in_queue(self): self.assertEqual(f.read(), b'foobar') with open(second_file, 'rb') as f: self.assertEqual(f.read(), b'otherstuff') + + +class TestExecutor(unittest.TestCase): + def test_shutdown_does_not_hang(self): + executor = Executor(mock.Mock(), 2, queue.Queue(), False, mock.Mock(), + 10, queue.Queue(maxsize=1)) + with temporary_file('rb+') as f: + executor.start() + class FloodIOQueueTask(object): + def __call__(self): + for i in range(50): + executor.write_queue.put(IORequest(f.name, 0, b'foobar')) + executor.submit(FloodIOQueueTask()) + executor.join() + self.assertEqual(open(f.name, 'rb').read(), b'foobar') diff --git a/tests/unit/customizations/s3/test_s3handler.py b/tests/unit/customizations/s3/test_s3handler.py index 38e9a3f4762c..fcf5425d17df 100644 --- a/tests/unit/customizations/s3/test_s3handler.py +++ b/tests/unit/customizations/s3/test_s3handler.py @@ -578,8 +578,7 @@ def setUp(self): self.session = FakeSession() self.service = self.session.get_service('s3') self.endpoint = self.service.get_endpoint('us-east-1') - params = {'region': 'us-east-1'} - self.s3_handler = S3Handler(self.session, params) + self.params = {'region': 'us-east-1'} self.bucket = None def tearDown(self): @@ -598,7 +597,7 @@ def test_bucket(self): size=0, service=self.service, endpoint=self.endpoint) - self.s3_handler.call([file_info]) + S3Handler(self.session, self.params).call([file_info]) number_buckets = len(list_buckets(self.session)) self.assertEqual(orig_number_buckets + 1, number_buckets) @@ -608,7 +607,7 @@ def test_bucket(self): size=0, service=self.service, endpoint=self.endpoint) - self.s3_handler.call([file_info]) + S3Handler(self.session, self.params).call([file_info]) number_buckets = len(list_buckets(self.session)) self.assertEqual(orig_number_buckets, number_buckets)