Skip to content

Commit

Permalink
Fix hanging Ctrl+C on S3 downloads
Browse files Browse the repository at this point in the history
This is a regression that was introduced when the IO queue/thread
was introduced (aws#619).

The current shutdown order of the threads is:

1. Queue end sentinel for IO thread.
2. Queue end sentinel for result thread.
3. Queue end sentinel for worker threads.
4. .join() threads in this order:
   [io_thread, result_thread [, worker threads]]

Though the actual thread shutdown order is non-deterministic,
it's fairly common that the threads shutdown in roughly the above
order.  This means that the IO thread will generally shutdown before
all the worker threads have shutdown.

However, the download tasks can still be enqueueing writes to the
IO queue.  If the IO thread shutsdown there's nothing consuming
writes on the other end of the queue.  Given that the queue is
bounded in maxsize, .put() calls to the queue will block until space
becomes available.  This will never happen if the IO queue is already
shutdown.

The fix here is to ensure that the IO thread is always the last thing
to shutdown.  This means any remaining IO requests will be executed
before the IO thread shutsdown.  This will prevent deadlock.

Added unit tests demonstrates this issue.  I've also added an
integration test that actually sends a SIGINT to the process
and verifies it exits in a timely manner so can ensure we
don't regress on this again.

Note: some unit/integ tests needed updating because they were
using .call() multiple times.

Fixes aws#650
Fixes aws#657
  • Loading branch information
jamesls committed Feb 24, 2014
1 parent 37a1ef7 commit 4263d0d
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 29 deletions.
36 changes: 24 additions & 12 deletions awscli/customizations/s3/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,19 @@ class Executor(object):
"""
def __init__(self, done, num_threads, result_queue,
quiet, interrupt, max_queue_size, write_queue):
self.queue = None
self.interrupt = interrupt
self._max_queue_size = max_queue_size
self.queue = NoBlockQueue(self.interrupt, maxsize=self._max_queue_size)
self.done = done
self.num_threads = num_threads
self.result_queue = result_queue
self.quiet = quiet
self.interrupt = interrupt
self.threads_list = []
self._max_queue_size = max_queue_size
self.write_queue = write_queue
self.print_thread = None
self.io_thread = None
self.print_thread = PrintThread(self.result_queue, self.done,
self.quiet, self.interrupt)
self.print_thread.daemon = True
self.io_thread = IOWriterThread(self.write_queue, self.done)

@property
def num_tasks_failed(self):
Expand All @@ -51,13 +53,11 @@ def num_tasks_failed(self):
return tasks_failed

def start(self):
self.print_thread = PrintThread(self.result_queue, self.done,
self.quiet, self.interrupt)
self.print_thread.daemon = True
self.io_thread = IOWriterThread(self.write_queue, self.done)
self.io_thread.start()
self.threads_list.append(self.io_thread)
self.queue = NoBlockQueue(self.interrupt, maxsize=self._max_queue_size)
# Note that we're *not* adding the IO thread to the threads_list.
# There's a specific shutdown order we need and we're going to be
# explicit about it rather than relying on the threads_list order.
# See .join() for more info.
self.threads_list.append(self.print_thread)
self.print_thread.start()
for i in range(self.num_threads):
Expand All @@ -84,13 +84,22 @@ def join(self):
"""
This is used to clean up the ``Executor``.
"""
self.write_queue.put(QUEUE_END_SENTINEL)
LOGGER.debug("Queueing end sentinel for result thread.")
self.result_queue.put(QUEUE_END_SENTINEL)
for i in range(self.num_threads):
LOGGER.debug("Queueing end sentinel for worker thread.")
self.queue.put(QUEUE_END_SENTINEL)

for thread in self.threads_list:
LOGGER.debug("Waiting for thread to shutdown: %s", thread)
thread.join()
LOGGER.debug("Thread has been shutdown: %s", thread)

LOGGER.debug("Queueing end sentinel for IO thread.")
self.write_queue.put(QUEUE_END_SENTINEL)
LOGGER.debug("Waiting for IO thread to shutdown.")
self.io_thread.join()
LOGGER.debug("All threads have been shutdown.")


class IOWriterThread(threading.Thread):
Expand Down Expand Up @@ -148,9 +157,12 @@ def run(self):
try:
function = self.queue.get(True)
if function is QUEUE_END_SENTINEL:
LOGGER.debug("End sentinel received in worker thread, "
"shutting down worker thread.")
self.queue.task_done()
break
try:
LOGGER.debug("Worker thread invoking task: %s", function)
function()
except Exception as e:
LOGGER.debug('Error calling task: %s', e, exc_info=True)
Expand Down
2 changes: 1 addition & 1 deletion awscli/customizations/s3/s3handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def call(self, files):
except KeyboardInterrupt:
self.interrupt.set()
self.result_queue.put({'message': "Cleaning up. Please wait...",
'error': False})
'error': True})
self._shutdown()
return self.executor.num_tasks_failed

Expand Down
3 changes: 3 additions & 0 deletions awscli/customizations/s3/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ def _download_part(self):
result = {'message': message, 'error': False,
'total_parts': total_parts}
self._result_queue.put(result)
LOGGER.debug("Task complete: %s", self)
return
except (socket.timeout, socket.error) as e:
LOGGER.debug("Socket timeout caught, retrying request, "
Expand All @@ -378,7 +379,9 @@ def _queue_writes(self, body):
current = body.read(iterate_chunk_size)
while current:
offset = self._part_number * self._chunk_size + amount_read
LOGGER.debug("Submitting IORequest to write queue.")
self._io_queue.put(IORequest(self._filename.dest, offset, current))
LOGGER.debug("Request successfully submitted.")
amount_read += len(current)
current = body.read(iterate_chunk_size)
# Change log message.
Expand Down
10 changes: 9 additions & 1 deletion tests/integration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ def _escape_quotes(command):
return command


def aws(command, collect_memory=False, env_vars=None):
def aws(command, collect_memory=False, env_vars=None,
wait_for_finish=True):
"""Run an aws command.
This help function abstracts the differences of running the "aws"
Expand All @@ -68,6 +69,11 @@ def aws(command, collect_memory=False, env_vars=None):
If env_vars is None, this will set the environment variables
to be used by the aws process.
If wait_for_finish is False, then the Process object is returned
to the caller. It is then the caller's responsibility to ensure
proper cleanup. This can be useful if you want to test timeout's
or how the CLI responds to various signals.
"""
if platform.system() == 'Windows':
command = _escape_quotes(command)
Expand All @@ -83,6 +89,8 @@ def aws(command, collect_memory=False, env_vars=None):
env = env_vars
process = Popen(full_command, stdout=PIPE, stderr=PIPE, shell=True,
env=env)
if not wait_for_finish:
return process
memory = None
if not collect_memory:
stdout, stderr = process.communicate()
Expand Down
33 changes: 27 additions & 6 deletions tests/integration/customizations/s3/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@
from tests import unittest
import os
import random
import tempfile
import shutil
import platform
import contextlib
import time
import signal

import botocore.session

Expand Down Expand Up @@ -327,6 +326,28 @@ def test_download_large_file(self):
self.assert_no_errors(p)
self.assertEqual(os.path.getsize(local_foo_txt), len(foo_contents))

def test_download_ctrl_c_does_not_hang(self):
bucket_name = self.create_bucket()
foo_contents = 'abcd' * (1024 * 1024 * 20)
self.put_object(bucket_name, key_name='foo.txt', contents=foo_contents)
local_foo_txt = self.files.full_path('foo.txt')
process = aws('s3 cp s3://%s/foo.txt %s' % (bucket_name, local_foo_txt), wait_for_finish=False)
# Give it some time to start up and enter it's main task loop.
time.sleep(2)
# The process has 30 seconds to finish after being sent a Ctrl+C,
# otherwise the test fails.
process.send_signal(signal.SIGINT)
deadline = time.time() + 30
while time.time() < deadline:
rc = process.poll()
if rc is not None:
break
else:
process.kill()
self.fail("CLI did not exist within 30 seconds of receiving a Ctrl+C")
# A Ctrl+C should have a non-zero RC.
self.assertEqual(process.returncode, 1)

def test_cp_to_nonexistent_bucket(self):
foo_txt = self.files.create_file('foo.txt', 'this is foo.txt')
p = aws('s3 cp %s s3://noexist-bucket-foo-bar123/foo.txt' % (foo_txt,))
Expand Down Expand Up @@ -376,16 +397,16 @@ def test_sync_to_from_s3(self):
self.assertEqual(f.read(), 'bar contents')

def test_sync_to_nonexistent_bucket(self):
foo_txt = self.files.create_file('foo.txt', 'foo contents')
bar_txt = self.files.create_file('bar.txt', 'bar contents')
self.files.create_file('foo.txt', 'foo contents')
self.files.create_file('bar.txt', 'bar contents')

# Sync the directory and the bucket.
p = aws('s3 sync %s s3://noexist-bkt-nme-1412' % (self.files.rootdir,))
self.assertEqual(p.rc, 1)

def test_sync_with_empty_files(self):
foo_txt = self.files.create_file('foo.txt', 'foo contents')
empty_txt = self.files.create_file('bar.txt', contents='')
self.files.create_file('foo.txt', 'foo contents')
self.files.create_file('bar.txt', contents='')
bucket_name = self.create_bucket()
p = aws('s3 sync %s s3://%s/' % (self.files.rootdir, bucket_name))
self.assertEqual(p.rc, 0)
Expand Down
8 changes: 4 additions & 4 deletions tests/integration/customizations/s3/test_s3handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,8 @@ def setUp(self):
self.session = botocore.session.get_session(EnvironmentVariables)
self.service = self.session.get_service('s3')
self.endpoint = self.service.get_endpoint('us-east-1')
params = {'region': 'us-east-1'}
self.s3_handler = S3Handler(self.session, params)
self.params = {'region': 'us-east-1'}
self.s3_handler = S3Handler(self.session, self.params)
self.bucket = None

def tearDown(self):
Expand All @@ -386,7 +386,7 @@ def test_bucket(self):
service=self.service,
endpoint=self.endpoint,
)
self.s3_handler.call([file_info])
S3Handler(self.session, self.params).call([file_info])
buckets_list = []
for bucket in list_buckets(self.session):
buckets_list.append(bucket['Name'])
Expand All @@ -395,7 +395,7 @@ def test_bucket(self):
file_info = FileInfo(
src=self.bucket, operation_name='remove_bucket', size=0,
service=self.service, endpoint=self.endpoint)
self.s3_handler.call([file_info])
S3Handler(self.session, self.params).call([file_info])
buckets_list = []
for bucket in list_buckets(self.session):
buckets_list.append(bucket['Name'])
Expand Down
18 changes: 17 additions & 1 deletion tests/unit/customizations/s3/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
from tests import unittest
from tests import unittest, temporary_file
import os
import tempfile
import shutil
Expand All @@ -19,6 +19,7 @@

from awscli.customizations.s3.executor import IOWriterThread
from awscli.customizations.s3.executor import QUEUE_END_SENTINEL
from awscli.customizations.s3.executor import Executor
from awscli.customizations.s3.utils import IORequest, IOCloseRequest


Expand Down Expand Up @@ -67,3 +68,18 @@ def test_multiple_files_in_queue(self):
self.assertEqual(f.read(), b'foobar')
with open(second_file, 'rb') as f:
self.assertEqual(f.read(), b'otherstuff')


class TestExecutor(unittest.TestCase):
def test_shutdown_does_not_hang(self):
executor = Executor(mock.Mock(), 2, queue.Queue(), False, mock.Mock(),
10, queue.Queue(maxsize=1))
with temporary_file('rb+') as f:
executor.start()
class FloodIOQueueTask(object):
def __call__(self):
for i in range(50):
executor.write_queue.put(IORequest(f.name, 0, b'foobar'))
executor.submit(FloodIOQueueTask())
executor.join()
self.assertEqual(open(f.name, 'rb').read(), b'foobar')
7 changes: 3 additions & 4 deletions tests/unit/customizations/s3/test_s3handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,8 +578,7 @@ def setUp(self):
self.session = FakeSession()
self.service = self.session.get_service('s3')
self.endpoint = self.service.get_endpoint('us-east-1')
params = {'region': 'us-east-1'}
self.s3_handler = S3Handler(self.session, params)
self.params = {'region': 'us-east-1'}
self.bucket = None

def tearDown(self):
Expand All @@ -598,7 +597,7 @@ def test_bucket(self):
size=0,
service=self.service,
endpoint=self.endpoint)
self.s3_handler.call([file_info])
S3Handler(self.session, self.params).call([file_info])
number_buckets = len(list_buckets(self.session))
self.assertEqual(orig_number_buckets + 1, number_buckets)

Expand All @@ -608,7 +607,7 @@ def test_bucket(self):
size=0,
service=self.service,
endpoint=self.endpoint)
self.s3_handler.call([file_info])
S3Handler(self.session, self.params).call([file_info])
number_buckets = len(list_buckets(self.session))
self.assertEqual(orig_number_buckets, number_buckets)

Expand Down

0 comments on commit 4263d0d

Please sign in to comment.